Permalink
Browse files

Implement DISTRIBUTED DATASET(COUNT) in Thor

Adding inline table activity (TAKinlinetable) with
64-bit integers for rows and distributed logic.
Similar implementtions in Roxie and HThor, but
without distributed behaviour. This is now the
default for DATASET(COUNT) construct.

Tested on Thor clusters with up to 9 nodes. Changed
the test to print strings as headers, rather than
boolean separators.
  • Loading branch information...
rengolin committed Mar 23, 2012
1 parent 8c631d7 commit 97028340995b09e826f464edadfd2c86a9c6f6aa
@@ -60,6 +60,7 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
kindArray[TAKfunnel] = "funnel";
kindArray[TAKapply] = "apply";
kindArray[TAKtemptable] = "temptable";
+ kindArray[TAKinlinetable] = "inlinetable";
kindArray[TAKtemprow] = "temprow";
kindArray[TAKhashdistribute] = "hashdistribute";
kindArray[TAKhashdedup] = "hashdedup";
@@ -625,6 +625,7 @@ extern const char * getActivityText(ThorActivityKind kind)
case TAKfunnel: return "Funnel";
case TAKapply: return "Apply";
case TAKtemptable: return "Inline Dataset";
+ case TAKinlinetable: return "Inline Dataset";
case TAKtemprow: return "Inline Row";
case TAKhashdistribute: return "Hash Distribute";
case TAKhashdedup: return "Hash Dedup";
@@ -779,6 +780,7 @@ extern bool isActivitySource(ThorActivityKind kind)
{
case TAKpiperead:
case TAKtemptable:
+ case TAKinlinetable:
case TAKtemprow:
case TAKworkunitread:
case TAKnull:
@@ -106,6 +106,8 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
case TAKtemptable:
case TAKtemprow:
return createTempTableActivity(agent, activityId, subgraphId, (IHThorTempTableArg &)arg, kind);
+ case TAKinlinetable:
+ return createInlineTableActivity(agent, activityId, subgraphId, (IHThorInlineTableArg &)arg, kind);
case TAKnormalize:
return createNormalizeActivity(agent, activityId, subgraphId, (IHThorNormalizeArg &)arg, kind);
case TAKnormalizechild:
View
@@ -15988,21 +15988,21 @@ ABoundActivity * HqlCppTranslator::doBuildActivityCountTransform(BuildCtx & ctx,
IHqlExpression * counter = queryPropertyChild(expr, _countProject_Atom, 0);
// Overriding IHThorTempTableArg
- Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, TAKtemptable, expr,"TempTable");
+ Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, TAKinlinetable, expr,"InlineTable");
buildActivityFramework(instance);
buildInstancePrefix(instance);
// size32_t getRow()
BuildCtx funcctx(instance->startctx);
- funcctx.addQuotedCompound("virtual size32_t getRow(ARowBuilder & crSelf, unsigned row)");
+ funcctx.addQuotedCompound("virtual size32_t getRow(ARowBuilder & crSelf, __uint64 row)");
ensureRowAllocated(funcctx, "crSelf");
BoundRow * selfCursor = bindSelf(funcctx, instance->dataset, "crSelf");
IHqlExpression * self = selfCursor->querySelector();
associateCounter(funcctx, counter, "row");
buildTransformBody(funcctx, transform, NULL, NULL, instance->dataset, self);
// unsigned numRows() - count is guaranteed by lexer
- doBuildUnsignedFunction(instance->startctx, "numRows", count);
+ doBuildUnsigned64Function(instance->startctx, "numRows", count);
doBuildTempTableFlags(instance->startctx, expr, isConstantTransform(transform));
View
@@ -5863,6 +5863,39 @@ const void *CHThorTempTableActivity::nextInGroup()
//=====================================================================================================
+CHThorInlineTableActivity::CHThorInlineTableActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorInlineTableArg &_arg, ThorActivityKind _kind) :
+ CHThorSimpleActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
+{
+}
+
+void CHThorInlineTableActivity::ready()
+{
+ CHThorSimpleActivityBase::ready();
+ curRow = 0;
+ numRows = helper.numRows();
+ if (helper.getFlags() & TTFdistributed != 0)
+ WARNLOG("HThor does not support distributed inline tables, using master");
+}
+
+
+const void *CHThorInlineTableActivity::nextInGroup()
+{
+ // Filtering empty rows, returns the next valid row
+ while (curRow < numRows)
+ {
+ RtlDynamicRowBuilder rowBuilder(rowAllocator);
+ size32_t size = helper.getRow(rowBuilder, curRow++);
+ if (size)
+ {
+ processed++;
+ return rowBuilder.finalizeRowClear(size);
+ }
+ }
+ return NULL;
+}
+
+//=====================================================================================================
+
CHThorNullActivity::CHThorNullActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, ThorActivityKind _kind) : CHThorSimpleActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
{
}
@@ -9628,6 +9661,7 @@ MAKEFACTORY(WorkUnitWrite);
MAKEFACTORY(FirstN);
MAKEFACTORY(Count);
MAKEFACTORY(TempTable);
+MAKEFACTORY(InlineTable);
MAKEFACTORY_ARG(Concat, Funnel);
MAKEFACTORY(Apply);
MAKEFACTORY(Sample);
View
@@ -108,6 +108,7 @@ extern HTHOR_API IHThorActivity *createWorkUnitWriteActivity(IAgentContext &, un
extern HTHOR_API IHThorActivity *createFirstNActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFirstNArg &arg, ThorActivityKind kind);
extern HTHOR_API IHThorActivity *createCountActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCountArg &arg, ThorActivityKind kind);
extern HTHOR_API IHThorActivity *createTempTableActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorTempTableArg &arg, ThorActivityKind kind);
+extern HTHOR_API IHThorActivity *createInlineTableActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorInlineTableArg &arg, ThorActivityKind kind);
extern HTHOR_API IHThorActivity *createConcatActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFunnelArg &arg, ThorActivityKind kind);
extern HTHOR_API IHThorActivity *createApplyActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorApplyArg &arg, ThorActivityKind kind);
extern HTHOR_API IHThorActivity *createSampleActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorSampleArg &arg, ThorActivityKind kind);
View
@@ -1508,6 +1508,22 @@ public:
};
+class CHThorInlineTableActivity : public CHThorSimpleActivityBase
+{
+ IHThorInlineTableArg &helper;
+ __uint64 curRow;
+ __uint64 numRows;
+
+public:
+ CHThorInlineTableActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorInlineTableArg &_arg, ThorActivityKind _kind);
+
+ virtual void ready();
+ virtual bool needsAllocator() const { return true; }
+
+ //interface IHThorInput
+ virtual const void *nextInGroup();
+};
+
class CHThorNullActivity : public CHThorSimpleActivityBase
{
IHThorArg &helper;
View
@@ -444,6 +444,8 @@ class CQueryFactory : public CInterface, implements IQueryFactory, implements IR
case TAKtemptable:
case TAKtemprow:
return createRoxieServerTempTableActivityFactory(id, subgraphId, *this, helperFactory, kind);
+ case TAKinlinetable:
+ return createRoxieServerInlineTableActivityFactory(id, subgraphId, *this, helperFactory, kind);
case TAKthroughaggregate:
throwUnexpected(); // Concept of through aggregates has been proven not to work in Roxie - codegen should not be creating them any more.
case TAKtopn:
View
@@ -5287,6 +5287,78 @@ IRoxieServerActivityFactory *createRoxieServerTempTableActivityFactory(unsigned
//=================================================================================
+class CRoxieServerInlineTableActivity : public CRoxieServerActivity
+{
+ IHThorInlineTableArg &helper;
+ __uint64 curRow;
+ __uint64 numRows;
+
+public:
+ CRoxieServerInlineTableActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
+ : CRoxieServerActivity(_factory, _probeManager), helper((IHThorInlineTableArg &) basehelper)
+ {
+ }
+
+ virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+ {
+ curRow = 0;
+ CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+ numRows = helper.numRows();
+ if (helper.getFlags() & TTFdistributed != 0)
+ CTXLOG("Roxie does not support distributed inline tables, using master");
+ }
+
+ virtual bool needsAllocator() const { return true; }
+ virtual const void *nextInGroup()
+ {
+ ActivityTimer t(totalCycles, timeActivities, ctx->queryDebugContext());
+ // Filtering empty rows, returns the next valid row
+ while (curRow < numRows)
+ {
+ RtlDynamicRowBuilder rowBuilder(rowAllocator);
+ unsigned outSize = helper.getRow(rowBuilder, curRow++);
+ if (outSize)
+ {
+ processed++;
+ return rowBuilder.finalizeRowClear(outSize);
+ }
+ }
+ return NULL;
+ }
+
+ virtual void setInput(unsigned idx, IRoxieInput *_in)
+ {
+ throw MakeStringException(ROXIE_SET_INPUT, "Internal error: setInput() called for source activity");
+ }
+
+};
+
+class CRoxieServerInlineTableActivityFactory : public CRoxieServerActivityFactory
+{
+
+public:
+ CRoxieServerInlineTableActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+ : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind)
+ {
+ }
+
+ virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
+ {
+ return new CRoxieServerInlineTableActivity(this, _probeManager);
+ }
+ virtual void setInput(unsigned idx, unsigned source, unsigned sourceidx)
+ {
+ throw MakeStringException(ROXIE_SET_INPUT, "Internal error: setInput() should not be called for InlineTable activity");
+ }
+};
+
+IRoxieServerActivityFactory *createRoxieServerInlineTableActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+{
+ return new CRoxieServerInlineTableActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind);
+}
+
+//=================================================================================
+
class CRoxieServerWorkUnitReadActivity : public CRoxieServerActivity
{
IHThorWorkunitReadArg &helper;
View
@@ -424,6 +424,7 @@ extern IRoxieServerActivityFactory *createRoxieServerNewChildGroupAggregateActiv
extern IRoxieServerActivityFactory *createRoxieServerNewChildThroughNormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
extern IRoxieServerActivityFactory *createRoxieServerDatasetResultActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
extern IRoxieServerActivityFactory *createRoxieServerTempTableActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerInlineTableActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
extern IRoxieServerActivityFactory *createRoxieServerWorkUnitReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
extern IRoxieServerActivityFactory *createRoxieServerLocalResultReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned graphId);
extern IRoxieServerActivityFactory *createRoxieServerLocalResultStreamReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
@@ -809,7 +809,7 @@ enum ThorActivityKind
TAKhashdistributemerge,
TAKselfjoinlight,
TAKhttp_rowdataset, // a source activity
- TAKunused3,
+ TAKinlinetable,
TAKcountdisk,
TAKstreamediterator,
TAKexternalsource,
@@ -956,7 +956,7 @@ enum ActivityInterfaceEnum
TAIexternal_1,
TAIpipethrougharg_2,
TAIpipewritearg_2,
- TAItemptablearg_2,
+ TAIinlinetablearg_1,
TAIshuffleextra_1,
//Should remain as last of all meaningful tags, but before aliases
@@ -1421,8 +1421,10 @@ struct IHThorTempTableArg : public IHThorArg
* single method. Future-proof and should merge with the interface
* above in the next major release.
*/
-struct IHThorTempTableExtraArg : public IHThorTempTableArg
+struct IHThorInlineTableArg : public IHThorArg
{
+ virtual size32_t getRow(ARowBuilder & rowBuilder, __uint64 row) = 0;
+ virtual __uint64 numRows() = 0;
virtual unsigned getFlags() = 0;
};
@@ -1299,7 +1299,7 @@ class CThorHashAggregateArg : public CThorArg, implements IHThorHashAggregateArg
virtual size32_t mergeAggregate(ARowBuilder & rowBuilder, const void * src) { rtlFailUnexpected(); return 0; }
};
-class CThorTempTableArg : public CThorArg, implements IHThorTempTableExtraArg
+class CThorTempTableArg : public CThorArg, implements IHThorTempTableArg
{
public:
virtual void Link() const { RtlCInterface::Link(); }
@@ -1313,8 +1313,6 @@ class CThorTempTableArg : public CThorArg, implements IHThorTempTableExtraArg
case TAIarg:
case TAItemptablearg_1:
return static_cast<IHThorTempTableArg *>(this);
- case TAItemptablearg_2:
- return static_cast<IHThorTempTableExtraArg *>(this);
}
return NULL;
}
@@ -1324,6 +1322,28 @@ class CThorTempTableArg : public CThorArg, implements IHThorTempTableExtraArg
virtual size32_t getRowSingle(ARowBuilder & rowBuilder) { return 0; }
};
+class CThorInlineTableArg : public CThorArg, implements IHThorInlineTableArg
+{
+public:
+ virtual void Link() const { RtlCInterface::Link(); }
+ virtual bool Release() const { return RtlCInterface::Release(); }
+ virtual void onCreate(ICodeContext * _ctx, IHThorArg *, MemoryBuffer * in) { ctx = _ctx; }
+
+ virtual IInterface * selectInterface(ActivityInterfaceEnum which)
+ {
+ switch (which)
+ {
+ case TAIarg:
+ case TAIinlinetablearg_1:
+ return static_cast<IHThorInlineTableArg *>(this);
+ }
+ return NULL;
+ }
+
+ virtual unsigned getFlags() { return 0; }
+ virtual size32_t getRowSingle(ARowBuilder & rowBuilder) { return 0; }
+};
+
class CThorTempRowArg : public CThorTempTableArg
{
virtual size32_t getRow(ARowBuilder & rowBuilder, unsigned row) { if (row) return 0; return getRowSingle(rowBuilder); }
@@ -31,26 +31,26 @@ r t2() := transform
end;
// zero
-output(true);
+output('Zero Rows');
ds := DATASET(0, t1(COUNTER));
output(ds);
// plain
-output(true);
+output('Constant, 10');
ds10 := DATASET(10, t1(COUNTER));
output(ds10);
// expr
-output(true);
+output('Constant expression, 50');
ds50 := DATASET(5 * 10, t1(COUNTER));
output(ds50);
// variable
-output(true);
+output('Variable 5, row constant');
ds5 := DATASET(C, t2());
output(ds5);
// distributed
-output(true);
+output('Distributed 10');
dsd := DATASET(10, t1(COUNTER), DISTRIBUTED);
output(dsd);
@@ -1,10 +1,10 @@
<Dataset name='Result 1'>
- <Row><Result_1>true</Result_1></Row>
+ <Row><Result_1>Zero Rows</Result_1></Row>
</Dataset>
<Dataset name='Result 2'>
</Dataset>
<Dataset name='Result 3'>
- <Row><Result_3>true</Result_3></Row>
+ <Row><Result_3>Constant, 10</Result_3></Row>
</Dataset>
<Dataset name='Result 4'>
<Row><i>0</i></Row>
@@ -19,7 +19,7 @@
<Row><i>90</i></Row>
</Dataset>
<Dataset name='Result 5'>
- <Row><Result_5>true</Result_5></Row>
+ <Row><Result_5>Constant expression, 50</Result_5></Row>
</Dataset>
<Dataset name='Result 6'>
<Row><i>0</i></Row>
@@ -74,7 +74,7 @@
<Row><i>490</i></Row>
</Dataset>
<Dataset name='Result 7'>
- <Row><Result_7>true</Result_7></Row>
+ <Row><Result_7>Variable 5, row constant</Result_7></Row>
</Dataset>
<Dataset name='Result 8'>
<Row><i>10</i></Row>
@@ -84,7 +84,7 @@
<Row><i>10</i></Row>
</Dataset>
<Dataset name='Result 9'>
- <Row><Result_9>true</Result_9></Row>
+ <Row><Result_9>Distributed 10</Result_9></Row>
</Dataset>
<Dataset name='Result 10'>
<Row><i>0</i></Row>
Oops, something went wrong.

0 comments on commit 9702834

Please sign in to comment.