Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion Analysis/Tutorials/src/partitions.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,24 @@ struct ATask {
}
};

// Partition inside process
// Caveat: partitioned table cannot be passed as const& to process()
struct BTask {
void process(aod::Collisions const& collisions, aod::Tracks& tracks)
{
for (auto& c : collisions) {
Partition<aod::Tracks> groupedTracks = aod::track::collisionId == c.globalIndex();
groupedTracks.bindTable(tracks);
for (auto& t : groupedTracks) {
LOGF(INFO, "collision global index: %d grouped track collision id: %d", c.globalIndex(), t.collisionId());
}
}
}
};

WorkflowSpec defineDataProcessing(ConfigContext const&)
{
return WorkflowSpec{
adaptAnalysisTask<ATask>("consume-tracks")};
adaptAnalysisTask<ATask>("consume-tracks"),
adaptAnalysisTask<BTask>("partition-in-process")};
}
7 changes: 7 additions & 0 deletions Framework/Core/include/Framework/AnalysisHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,13 @@ struct Partition {
{
}

void bindTable(T& table)
{
mFiltered.reset(getTableFromFilter(table, filter));
bindExternalIndices(&table);
getBoundToExternalIndices(table);
}

void setTable(const T& table)
{
mFiltered.reset(getTableFromFilter(table, filter));
Expand Down
4 changes: 3 additions & 1 deletion Framework/Core/include/Framework/Expressions.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ struct LiteralStorage {
using stored_pack = framework::pack<T...>;
};

using LiteralValue = LiteralStorage<int, bool, float, double, uint8_t>;
using LiteralValue = LiteralStorage<int, bool, float, double, uint8_t, int64_t>;

template <typename T>
constexpr auto selectArrowType()
Expand All @@ -73,6 +73,8 @@ constexpr auto selectArrowType()
return atype::INT8;
} else if constexpr (std::is_same_v<T, uint16_t>) {
return atype::INT16;
} else if constexpr (std::is_same_v<T, int64_t>) {
return atype::INT64;
} else if constexpr (std::is_same_v<T, uint8_t>) {
return atype::UINT8;
} else {
Expand Down
11 changes: 8 additions & 3 deletions Framework/Core/src/Expressions.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ std::shared_ptr<arrow::DataType> concreteArrowType(atype::type type)
return arrow::int16();
case atype::INT32:
return arrow::int32();
case atype::INT64:
return arrow::int64();
case atype::FLOAT:
return arrow::float32();
case atype::DOUBLE:
Expand Down Expand Up @@ -268,8 +270,8 @@ Operations createOperations(Filter const& expression)
return t1;
}

if (t1 == atype::INT32 || t1 == atype::INT8 || t1 == atype::INT16 || t1 == atype::UINT8) {
if (t2 == atype::INT32 || t2 == atype::INT8 || t2 == atype::INT16 || t2 == atype::UINT8) {
if (t1 == atype::INT32 || t1 == atype::INT8 || t1 == atype::INT16 || t1 == atype::INT64 || t1 == atype::UINT8) {
if (t2 == atype::INT32 || t2 == atype::INT8 || t2 == atype::INT16 || t2 == atype::INT64 || t2 == atype::UINT8) {
return atype::FLOAT;
}
if (t2 == atype::FLOAT) {
Expand All @@ -280,7 +282,7 @@ Operations createOperations(Filter const& expression)
}
}
if (t1 == atype::FLOAT) {
if (t2 == atype::INT32 || t2 == atype::INT8 || t2 == atype::INT16 || t2 == atype::UINT8) {
if (t2 == atype::INT32 || t2 == atype::INT8 || t2 == atype::INT16 || t2 == atype::INT64 || t2 == atype::UINT8) {
return atype::FLOAT;
}
if (t2 == atype::DOUBLE) {
Expand Down Expand Up @@ -451,6 +453,9 @@ gandiva::NodePtr createExpressionTree(Operations const& opSpecs,
if (content.index() == 4) {
return gandiva::TreeExprBuilder::MakeLiteral(std::get<uint8_t>(content));
}
if (content.index() == 5) {
return gandiva::TreeExprBuilder::MakeLiteral(std::get<int64_t>(content));
}
throw runtime_error("Malformed LiteralNode");
}

Expand Down