diff --git a/ecl/hql/hqlatoms.cpp b/ecl/hql/hqlatoms.cpp index ba4424131b7..c3aeb332aec 100644 --- a/ecl/hql/hqlatoms.cpp +++ b/ecl/hql/hqlatoms.cpp @@ -77,6 +77,7 @@ IAtom * actionAtom; IAtom * activeAtom; IAtom * activeFailureAtom; IAtom * activeNlpAtom; +IAtom * activityAtom; IAtom * afterAtom; IAtom * algorithmAtom; IAtom * _aliased_Atom; @@ -538,6 +539,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM) MAKEATOM(active); MAKEATOM(activeFailure); MAKEATOM(activeNlp); + MAKEATOM(activity); MAKEATOM(after); MAKEATOM(algorithm); MAKESYSATOM(aliased); diff --git a/ecl/hql/hqlatoms.hpp b/ecl/hql/hqlatoms.hpp index 4da997ff998..4f32ed2b0c5 100644 --- a/ecl/hql/hqlatoms.hpp +++ b/ecl/hql/hqlatoms.hpp @@ -81,6 +81,7 @@ extern HQL_API IAtom * actionAtom; extern HQL_API IAtom * activeAtom; extern HQL_API IAtom * activeFailureAtom; extern HQL_API IAtom * activeNlpAtom; +extern HQL_API IAtom * activityAtom; extern HQL_API IAtom * afterAtom; extern HQL_API IAtom * algorithmAtom; extern HQL_API IAtom * _aliased_Atom; diff --git a/ecl/hql/hqlattr.cpp b/ecl/hql/hqlattr.cpp index 25f46b77117..eae185b8aa7 100644 --- a/ecl/hql/hqlattr.cpp +++ b/ecl/hql/hqlattr.cpp @@ -1706,6 +1706,15 @@ bool isLocalActivity(IHqlExpression * expr) return false; case no_compound: return isLocalActivity(expr->queryChild(1)); + case no_call: + case no_externalcall: + if (callIsActivity(expr)) + { + //Can only be deduced by substituting the parameters into the body and seeing if the local attribute has a constant value + //currently assume false. It may need improving when support for global embed activities is added. + return false; + } + return false; case no_compound_diskread: case no_compound_disknormalize: case no_compound_diskaggregate: @@ -1904,7 +1913,10 @@ bool localChangesActivityAction(IHqlExpression * expr) case no_nwaymerge: case no_selfjoin: case no_joincount: - return !isKeyedJoin(expr); // Keyed joins always + return !isKeyedJoin(expr); // Keyed joins always + case no_call: + case no_externalcall: + return callIsActivity(expr); } return false; } diff --git a/ecl/hql/hqlexpr.cpp b/ecl/hql/hqlexpr.cpp index 65b71b565f0..909a1912aab 100644 --- a/ecl/hql/hqlexpr.cpp +++ b/ecl/hql/hqlexpr.cpp @@ -2264,7 +2264,6 @@ childDatasetType getChildDatasetType(IHqlExpression * expr) case no_httpcall: case no_soapcall: case no_newsoapcall: - case no_externalcall: // None in the sense it is generally used for. case no_alias: case no_id2blob: case no_embedbody: @@ -2274,7 +2273,6 @@ childDatasetType getChildDatasetType(IHqlExpression * expr) case no_param: case no_typetransfer: case no_translated: - case no_call: case no_rows: case no_external: case no_delayedselect: @@ -2450,6 +2448,14 @@ childDatasetType getChildDatasetType(IHqlExpression * expr) return childdataset_left; case no_chooseds: return childdataset_many_noscope; + case no_call: + if (functionCallIsActivity(expr)) + return childdataset_many_noscope; + return childdataset_none; + case no_externalcall: + if (externalCallIsActivity(expr)) + return childdataset_many_noscope; + return childdataset_none; case no_merge: case no_regroup: case no_cogroup: @@ -2698,7 +2704,6 @@ inline unsigned doGetNumChildTables(IHqlExpression * dataset) case no_httpcall: case no_soapcall: case no_newsoapcall: - case no_externalcall: // None in the sense it is generally used for. case no_alias: case no_id2blob: case no_embedbody: @@ -2706,7 +2711,6 @@ inline unsigned doGetNumChildTables(IHqlExpression * dataset) case no_datasetfromdictionary: case no_param: case no_translated: - case no_call: case no_rows: case no_external: case no_rowsetindex: @@ -2767,6 +2771,14 @@ inline unsigned doGetNumChildTables(IHqlExpression * dataset) case no_quoted: case no_variable: return 0; + case no_call: + if (functionCallIsActivity(dataset)) + return numStreamInputs(dataset->queryBody()->queryFunctionDefinition()); + return 0; + case no_externalcall: + if (externalCallIsActivity(dataset)) + return numStreamInputs(dataset->queryExternalDefinition()); + return 0; case no_mapto: case no_compound: return 0; // a lie. @@ -8398,6 +8410,81 @@ bool functionBodyUsesContext(IHqlExpression * body) } } +bool functionBodyIsActivity(IHqlExpression * body) +{ + switch (body->getOperator()) + { + case no_external: + return body->hasAttribute(activityAtom); + case no_outofline: + case no_funcdef: + return functionBodyIsActivity(body->queryChild(0)); + case no_embedbody: + return body->hasAttribute(activityAtom); + default: + return false; + } +} + +bool functionCallIsActivity(IHqlExpression * call) +{ + dbgassertex(call->getOperator() == no_call); + return functionBodyIsActivity(call->queryBody()->queryFunctionDefinition()); +} + +bool externalCallIsActivity(IHqlExpression * call) +{ + dbgassertex(call->getOperator() == no_externalcall); + return functionBodyIsActivity(call->queryExternalDefinition()); +} + +IHqlExpression * queryFuncdef(IHqlExpression * call) +{ + switch (call->getOperator()) + { + case no_call: + return call->queryBody()->queryFunctionDefinition(); + case no_externalcall: + return call->queryExternalDefinition(); + default: + throwUnexpected(); + } +} + +bool callIsActivity(IHqlExpression * call) +{ + return functionBodyIsActivity(queryFuncdef(call)); +} + +bool isStreamingDataset(IHqlExpression * param) +{ + dbgassertex(param->getOperator() == no_param); + ITypeInfo * paramType = param->queryType(); + switch (paramType->getTypeCode()) + { + case type_table: + case type_groupedtable: + if (hasStreamedModifier(paramType)) + return true; + break; + } + return false; +} + +unsigned numStreamInputs(IHqlExpression * funcdef) +{ + dbgassertex(funcdef->getOperator() == no_funcdef); + IHqlExpression * formals = funcdef->queryChild(1); + unsigned numStreams = 0; + ForEachChild(i, formals) + { + if (!isStreamingDataset(formals->queryChild(i))) + break; + numStreams++; + } + return numStreams; +} + IHqlExpression * createFunctionDefinition(IIdAtom * id, HqlExprArray & args) { IHqlExpression * body = &args.item(0); @@ -12438,7 +12525,6 @@ IHqlExpression * expandOutOfLineFunctionCall(IHqlExpression * expr) CallExpansionContext ctx; ctx.functionCache = &functionCache; ctx.forceOutOfLineExpansion = true; - assertex(expr->getOperator() == no_call); if (ctx.expandFunctionCall(expr)) return expandFunctionCallPreserveAnnotation(ctx, expr); return LINK(expr); @@ -12931,6 +13017,8 @@ IHqlExpression * createExternalFuncdefFromInternal(IHqlExpression * funcdef) attrs.append(*createAttribute(contextSensitiveAtom)); if (functionBodyUsesContext(body)) attrs.append(*LINK(cachedContextAttribute)); + if (functionBodyIsActivity(body)) + attrs.append(*createAttribute(activityAtom)); IHqlExpression *child = body->queryChild(0); if (child && child->getOperator()==no_embedbody) diff --git a/ecl/hql/hqlexpr.hpp b/ecl/hql/hqlexpr.hpp index efe8c865452..735031a13f5 100644 --- a/ecl/hql/hqlexpr.hpp +++ b/ecl/hql/hqlexpr.hpp @@ -1889,6 +1889,13 @@ extern HQL_API IHqlExpression * createFunctionDefinition(IIdAtom * name, IHqlExp extern HQL_API IHqlExpression * createFunctionDefinition(IIdAtom * name, HqlExprArray & args); extern HQL_API IHqlExpression * queryNonDelayedBaseAttribute(IHqlExpression * expr); extern HQL_API bool functionBodyUsesContext(IHqlExpression * body); +extern HQL_API bool functionBodyIsActivity(IHqlExpression * body); +extern HQL_API bool functionCallIsActivity(IHqlExpression * call); +extern HQL_API bool externalCallIsActivity(IHqlExpression * call); +extern HQL_API bool callIsActivity(IHqlExpression * call); // can be no_call or no_externalcall +extern HQL_API IHqlExpression * queryFuncdef(IHqlExpression * call); +extern HQL_API bool isStreamingDataset(IHqlExpression * param); +extern HQL_API unsigned numStreamInputs(IHqlExpression * funcdef); #define NO_AGGREGATE \ no_count: \ diff --git a/ecl/hql/hqlutil.cpp b/ecl/hql/hqlutil.cpp index c445435a53d..3cc086234cc 100644 --- a/ecl/hql/hqlutil.cpp +++ b/ecl/hql/hqlutil.cpp @@ -7809,6 +7809,8 @@ class GccCppNameMangler mangled.append("P18IGlobalCodeContext"); else if (body->hasAttribute(userMatchFunctionAtom)) mangled.append("P12IMatchWalker"); + if (functionBodyIsActivity(body)) + mangled.append("P20IThorActivityContext"); mangled.append(mangledReturnParameters); @@ -8079,6 +8081,8 @@ class Vs6CppNameMangler mangled.append("PVIGlobalCodeContext@@"); else if (body->hasAttribute(userMatchFunctionAtom)) mangled.append("PVIMatchWalker@@"); + if (functionBodyIsActivity(body)) + mangled.append("PVIThorActivityContext@@"); if (mangledReturnParameters.length()) mangled.append(mangledReturnParameters); diff --git a/ecl/hql/hqlutil.hpp b/ecl/hql/hqlutil.hpp index b5a18e5c86e..b2d1b96aec4 100644 --- a/ecl/hql/hqlutil.hpp +++ b/ecl/hql/hqlutil.hpp @@ -252,7 +252,8 @@ extern HQL_API bool isTimed(IHqlExpression * expr); inline bool isInternalEmbedAttr(IAtom *name) { - return name == languageAtom || name == projectedAtom || name == streamedAtom || name == _linkCounted_Atom || name == importAtom || name==foldAtom || name==timeAtom || name==prebindAtom; + return name == languageAtom || name == projectedAtom || name == streamedAtom || name == _linkCounted_Atom || + name == importAtom || name==foldAtom || name==timeAtom || name==prebindAtom || name == activityAtom; } diff --git a/ecl/hqlcpp/hqlcerrors.hpp b/ecl/hqlcpp/hqlcerrors.hpp index f41d3610f17..03e910f2846 100644 --- a/ecl/hqlcpp/hqlcerrors.hpp +++ b/ecl/hqlcpp/hqlcerrors.hpp @@ -221,6 +221,7 @@ #define HQLERR_ExpectedFileLhsFetch 4209 #define HQLERR_IncompatibleKeyedSubString 4210 #define HQLERR_NonNullChildDSDefault 4211 +#define HQLERR_AttributeXMustBeConstant 4212 //Warnings.... #define HQLWRN_PersistDataNotLikely 4500 @@ -523,6 +524,7 @@ #define HQLERR_ExpectedFileLhsFetch_Text "The first argument of FETCH must be a disk file (had %s)" #define HQLERR_IncompatibleKeyedSubString_Text "Cannot use two different KEYED substring filters for field %s in key %s" #define HQLERR_NonNullChildDSDefault_Text "Non-null child dataset may not be used as default value (target field '%s')" +#define HQLERR_AttributeXMustBeConstant_Text "Attribute %s must be set to a constant value" //Warnings. #define HQLWRN_CannotRecreateDistribution_Text "Cannot recreate the distribution for a persistent dataset" diff --git a/ecl/hqlcpp/hqlcpp.ipp b/ecl/hqlcpp/hqlcpp.ipp index 2d378347408..9be2d073aa8 100644 --- a/ecl/hqlcpp/hqlcpp.ipp +++ b/ecl/hqlcpp/hqlcpp.ipp @@ -1488,6 +1488,7 @@ public: ABoundActivity * doBuildActivityDistribute(BuildCtx & ctx, IHqlExpression * expr); ABoundActivity * doBuildActivityDistribution(BuildCtx & ctx, IHqlExpression * expr, bool isRoot); ABoundActivity * doBuildActivitySectionInput(BuildCtx & ctx, IHqlExpression * expr); + ABoundActivity * doBuildActivityEmbed(BuildCtx & ctx, IHqlExpression * expr, bool isRoot); ABoundActivity * doBuildActivityEnth(BuildCtx & ctx, IHqlExpression * expr); ABoundActivity * doBuildActivityExecuteWhen(BuildCtx & ctx, IHqlExpression * expr, bool isRoot); ABoundActivity * doBuildActivityForceLocal(BuildCtx & ctx, IHqlExpression * expr); diff --git a/ecl/hqlcpp/hqlcppds.cpp b/ecl/hqlcpp/hqlcppds.cpp index 37f08e7f1f3..5359d910b4d 100644 --- a/ecl/hqlcpp/hqlcppds.cpp +++ b/ecl/hqlcpp/hqlcppds.cpp @@ -2370,6 +2370,9 @@ void HqlCppTranslator::doBuildDataset(BuildCtx & ctx, IHqlExpression * expr, CHq buildTempExpr(ctx, expr, tgt, format); return; } + break; + case no_quoted: + throwUnexpectedX("Translated expression passed to doBuildDataset()"); } if (expr->isDictionary()) diff --git a/ecl/hqlcpp/hqlcppsys.ecl b/ecl/hqlcpp/hqlcppsys.ecl index e9c1ff00f3d..77cbde07455 100644 --- a/ecl/hqlcpp/hqlcppsys.ecl +++ b/ecl/hqlcpp/hqlcppsys.ecl @@ -790,6 +790,7 @@ const char * cppSystemText[] = { " boolean newMemorySpillSplitArg(unsigned4 usageCount, const varstring name, boolean meta) : include, pseudoentrypoint='new CLibraryMemorySpillSplitArg';", " boolean newWorkUnitReadArg(const varstring _name, boolean _meta) : include, pseudoentrypoint='new CLibraryWorkUnitReadArg';", " boolean newWorkUnitWriteArg(const varstring _name, unsigned4 _flags, boolean _meta) : include, pseudoentrypoint='new CLibraryWorkUnitWriteArg';", + " CThorExternalArg(unsigned4 _numInputs) : include;", " destructMetaMember(row _x) : omethod,entrypoint='destruct';", " walkIndirectMetaMember(row _x, boolean _visitor) : omethod,entrypoint='walkIndirectMembers';", diff --git a/ecl/hqlcpp/hqlhtcpp.cpp b/ecl/hqlcpp/hqlhtcpp.cpp index d61960d812f..12474b64b10 100644 --- a/ecl/hqlcpp/hqlhtcpp.cpp +++ b/ecl/hqlcpp/hqlhtcpp.cpp @@ -2351,6 +2351,21 @@ void ActivityInstance::buildPrefix() void ActivityInstance::buildSuffix() { + if (!implementationClassName && constructorArgs) + { + StringBuffer baseClassName; + baseClassName.append("CThor").append(activityArgName).append("Arg"); + IIdAtom * baseClassId = createIdAtom(baseClassName); + OwnedHqlExpr call = translator.bindFunctionCall(baseClassId, constructorArgs); + + StringBuffer s; + s.append(className).append("() : "); + translator.generateExprCpp(s, call); + s.append(" {}"); + + classctx.addQuoted(s); + } + if (numChildQueries) addAttributeInt(WaNumChildQueries, numChildQueries); @@ -6490,7 +6505,9 @@ ABoundActivity * HqlCppTranslator::buildActivity(BuildCtx & ctx, IHqlExpression //Items in this list need to also be in the list inside doBuildActivityChildDataset case no_call: case no_externalcall: - if (expr->isAction()) + if (callIsActivity(expr)) + result = doBuildActivityEmbed(ctx, expr, isRoot); + else if (expr->isAction()) result = doBuildActivityAction(ctx, expr, isRoot); else if (expr->isDatarow()) result = doBuildActivityCreateRow(ctx, expr, false); @@ -8864,6 +8881,92 @@ ABoundActivity * HqlCppTranslator::doBuildActivityRemote(BuildCtx & ctx, IHqlExp } +//--------------------------------------------------------------------------------------------------------------------- + +ABoundActivity * HqlCppTranslator::doBuildActivityEmbed(BuildCtx & ctx, IHqlExpression * expr, bool isRoot) +{ + CIArray bound; + IHqlExpression * funcdef = queryFuncdef(expr); + IHqlExpression * formals = funcdef->queryChild(1); + ForEachChild(iInput, formals) + { + IHqlExpression * input = formals->queryChild(iInput); + if (!isStreamingDataset(input)) + break; + + IHqlExpression * actual = expr->queryChild(iInput); + assertex(actual); + bound.append(*buildCachedActivity(ctx, actual)); + } + + ThorActivityKind kind; + if (expr->isDataset()) + kind = bound.empty() ? TAKexternalsource : TAKexternalprocess; + else + kind = TAKexternalsink; + + Owned instance = new ActivityInstance(*this, ctx, kind, expr, "External"); + + //Substitute the parameters into the external call/embed definition, so that any attributes that depend on the arguments + //are expanded. + OwnedHqlExpr expandedCall = expandOutOfLineFunctionCall(expr); + + //The setting for the local attribute allows the localness to be configured for the activity + IHqlExpression * localAttr = expandedCall->queryAttribute(localAtom); + if (localAttr) + { + IHqlExpression * value = localAttr->queryChild(0); + if (value) + instance->isLocal = getBoolValue(value, false); + else + instance->isLocal = true; + } + + buildActivityFramework(instance, isRoot); + OwnedHqlExpr numInputsExpr = getSizetConstant(bound.ordinality()); + instance->addConstructorParameter(numInputsExpr); + + buildInstancePrefix(instance); + + HqlExprArray actuals; + ForEachChild(i, formals) + { + if (i < bound.ordinality()) + { + IHqlExpression * input = formals->queryChild(i); + StringBuffer argument; + argument.append("inputs[").append(i).append("]"); + OwnedHqlExpr ds = createQuoted(argument, input->getType()); + actuals.append(*createTranslated(ds)); + } + else + { + IHqlExpression * actual = expr->queryChild(i); + actuals.append(*LINK(actual)); + } + } + OwnedHqlExpr newCall = expr->clone(actuals); + + if (expr->isDataset()) + { + MemberFunction func(*this, instance->startctx, "virtual IRowStream * createOutput(IThorActivityContext * activityContext) override"); + buildReturn(func.ctx, newCall); + } + else + { + MemberFunction func(*this, instance->startctx, "virtual void execute(IThorActivityContext * activityContext) override"); + buildStmt(func.ctx, newCall); + } + + buildInstanceSuffix(instance); + + ForEachItemIn(idx2, bound) + buildConnectInputOutput(ctx, instance, (ABoundActivity *)&bound.item(idx2), 0, idx2); + + return instance->getBoundActivity(); +} + + //--------------------------------------------------------------------------- // no_update diff --git a/ecl/hqlcpp/hqlinline.cpp b/ecl/hqlcpp/hqlinline.cpp index 02d2eb9c42e..7c5b3ee2c73 100644 --- a/ecl/hqlcpp/hqlinline.cpp +++ b/ecl/hqlcpp/hqlinline.cpp @@ -112,6 +112,9 @@ static unsigned calcInlineFlags(BuildCtx * ctx, IHqlExpression * expr) return getInlineFlags(ctx, expr->queryChild(0)); case no_call: case no_externalcall: // no so sure about this - should possibly be assignable only. (also no_call above) + if (callIsActivity(expr)) + return 0; + //fallthrough case no_getresult: if (isStreamed(expr)) return RETiterate; diff --git a/ecl/hqlcpp/hqliproj.cpp b/ecl/hqlcpp/hqliproj.cpp index b0f4ec72780..0bc4b0159df 100644 --- a/ecl/hqlcpp/hqliproj.cpp +++ b/ecl/hqlcpp/hqliproj.cpp @@ -2173,6 +2173,8 @@ ProjectExprKind ImplicitProjectTransformer::getProjectExprKind(IHqlExpression * return SinkActivity; case no_call: case no_externalcall: + if (callIsActivity(expr) && (getNumChildTables(expr) != 0)) + return FixedInputActivity; if (hasActivityType(expr)) { if (isProjectableCall(expr)) diff --git a/ecl/hqlcpp/hqlttcpp.cpp b/ecl/hqlcpp/hqlttcpp.cpp index 4da4c56c582..2276fc5df37 100644 --- a/ecl/hqlcpp/hqlttcpp.cpp +++ b/ecl/hqlcpp/hqlttcpp.cpp @@ -987,6 +987,8 @@ YesNoOption HqlThorBoundaryTransformer::calcNormalizeThor(IHqlExpression * expr) case no_call: { + if (functionCallIsActivity(expr)) + return OptionYes; YesNoOption bodyOption = normalizeThor(expr->queryBody()->queryFunctionDefinition()); //do Something with it break; @@ -997,6 +999,8 @@ YesNoOption HqlThorBoundaryTransformer::calcNormalizeThor(IHqlExpression * expr) IHqlExpression * funcDef = func->queryChild(0); if (funcDef->hasAttribute(gctxmethodAtom) || funcDef->hasAttribute(globalContextAtom)) return OptionNo; + if (externalCallIsActivity(expr)) + return OptionYes; // if (funcDef->hasAttribute(graphAtom)) // return OptionYes; if (!resourceConditionalActions && expr->isAction()) @@ -2215,6 +2219,8 @@ IHqlExpression * ThorHqlTransformer::createTransformed(IHqlExpression * expr) if (expr->isDatarow()) args.append(*createAttribute(allocatorAtom)); } + inheritAttribute(args, expr, activityAtom); + OwnedHqlExpr body = createWrapper(no_outofline, expr->queryType(), args); HqlExprArray newFormals; if (expr->hasAttribute(languageAtom)) @@ -13966,6 +13972,10 @@ class SemanticErrorChecker : public QuickHqlTransformer case no_choosen: checkChoosen(expr); break; + case no_call: + if (callIsActivity(expr)) + checkEmbedActivity(expr); + break; } QuickHqlTransformer::doAnalyse(expr); } @@ -13974,6 +13984,7 @@ class SemanticErrorChecker : public QuickHqlTransformer void checkBloom(IHqlExpression * expr); void checkJoin(IHqlExpression * expr); void checkChoosen(IHqlExpression * expr); + void checkEmbedActivity(IHqlExpression * expr); void reportError(int errNo, const char * format, ...) __attribute__((format(printf, 3, 4))); void reportWarning(WarnErrorCategory category, int warnNo, const char * format, ...) __attribute__((format(printf, 4, 5))); protected: @@ -14056,6 +14067,22 @@ void SemanticErrorChecker::checkChoosen(IHqlExpression * expr) reportWarning(CategoryUnusual, WRN_CHOOSEN_ALL,"Use CHOOSEN(dataset, ALL) to remove implicit choosen. CHOOSEN(dataset, 0) now returns no records."); } +void SemanticErrorChecker::checkEmbedActivity(IHqlExpression * call) +{ + //Substitute the parameters into the external call/embed definition, so that any attributes that depend on the arguments + //are expanded. + OwnedHqlExpr expandedCall = expandOutOfLineFunctionCall(call); + + //The setting for the local attribute allows the localness to be configured for the activity + IHqlExpression * localAttr = expandedCall->queryAttribute(localAtom); + if (localAttr) + { + IHqlExpression * value = localAttr->queryChild(0); + if (value && !value->isConstant()) + reportError(ECODETEXT(HQLERR_AttributeXMustBeConstant), "LOCAL"); + } +} + void SemanticErrorChecker::reportError(int errNo, const char * format, ...) { ECLlocation location; diff --git a/ecl/hqlcpp/hqlwcpp.cpp b/ecl/hqlcpp/hqlwcpp.cpp index d6699c35a5a..8b0f3046895 100644 --- a/ecl/hqlcpp/hqlwcpp.cpp +++ b/ecl/hqlcpp/hqlwcpp.cpp @@ -702,6 +702,13 @@ bool HqlCppWriter::generateFunctionPrototype(IHqlExpression * funcdef, const cha out.append("IMatchWalker * results"); firstParam = false; } + if (functionBodyIsActivity(body)) + { + if (!firstParam) + out.append(","); + out.append("IThorActivityContext * activity"); + firstParam = false; + } if (returnParameters.length()) { @@ -1220,6 +1227,13 @@ StringBuffer & HqlCppWriter::generateExprCpp(IHqlExpression * expr) out.append("gctx"); needComma = true; } + if (functionBodyIsActivity(funcdef)) + { + if (needComma) + out.append(","); + out.append("activityContext"); + needComma = true; + } for (unsigned index = firstArg; index < numArgs; index++) { IHqlExpression * cur = expr->queryChild(index); diff --git a/ecl/hthor/hthor.cpp b/ecl/hthor/hthor.cpp index 5fc9294e330..19e45d00c37 100644 --- a/ecl/hthor/hthor.cpp +++ b/ecl/hthor/hthor.cpp @@ -10178,24 +10178,23 @@ void CHThorStreamedIteratorActivity::stop() //===================================================================================================== CHThorExternalActivity::CHThorExternalActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorExternalArg &_arg, ThorActivityKind _kind, IPropertyTree * _graphNode) -: CHThorMultiInputActivity(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), graphNode(_graphNode) +: CHThorMultiInputActivity(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), graphNode(_graphNode), activityContext(1, 0) { } +void CHThorExternalActivity::setInput(unsigned index, IHThorInput *_input) +{ + CHThorMultiInputActivity::setInput(index, _input); + CHThorInputAdaptor * adaptedInput = new CHThorInputAdaptor(_input); + inputAdaptors.append(*adaptedInput); + helper.setInput(index, adaptedInput); +} + void CHThorExternalActivity::ready() { CHThorMultiInputActivity::ready(); - //must be called after onStart() - processor.setown(helper.createProcessor()); - processor->onCreate(agent.queryCodeContext(), graphNode); - ForEachItemIn(idx, inputs) - { - Owned adaptedInput = new CHThorInputAdaptor(inputs.item(idx)); - processor->addInput(idx, adaptedInput); - } - processor->start(); - if (outputMeta.getMinRecordSize() > 0) - rows.setown(processor->createOutput(0)); + if (kind != TAKexternalsink) + rows.setown(helper.createOutput(&activityContext)); } const void *CHThorExternalActivity::nextRow() @@ -10210,14 +10209,7 @@ const void *CHThorExternalActivity::nextRow() void CHThorExternalActivity::execute() { assertex(!rows); - processor->execute(); -} - -void CHThorExternalActivity::reset() -{ - rows.clear(); - processor->reset(); - processor.clear(); + helper.execute(&activityContext); } void CHThorExternalActivity::stop() @@ -10227,7 +10219,7 @@ void CHThorExternalActivity::stop() rows->stop(); rows.clear(); } - processor->stop(); + CHThorMultiInputActivity::stop(); } diff --git a/ecl/hthor/hthor.ipp b/ecl/hthor/hthor.ipp index 5dbae407644..46b6e45ffe9 100644 --- a/ecl/hthor/hthor.ipp +++ b/ecl/hthor/hthor.ipp @@ -2878,24 +2878,41 @@ protected: IHThorInput * input; // not currently a linkable interface }; +class SingleNodeActivityContext : public IThorActivityContext +{ +public: + SingleNodeActivityContext(unsigned _numStrands, unsigned _curStrand) : strands(_numStrands), curStrand(_curStrand) { assertex(curStrand < strands); } + + virtual bool isLocal() const override { return false; } + virtual unsigned numSlaves() const override { return 1; } + virtual unsigned numStrands() const override { return strands; } + virtual unsigned querySlave() const override { return 0; } + virtual unsigned queryStrand() const override { return curStrand; } +protected: + unsigned strands; + unsigned curStrand; +}; + + class CHThorExternalActivity : public CHThorMultiInputActivity { IHThorExternalArg &helper; - Owned processor; Owned rows; Linked graphNode; + SingleNodeActivityContext activityContext; + IArrayOf inputAdaptors; public: CHThorExternalActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorExternalArg &_arg, ThorActivityKind _kind, IPropertyTree * _graphNode); - virtual void ready(); - virtual void stop(); - virtual void reset(); + virtual void ready() override; + virtual void stop() override; - virtual void execute(); + virtual void execute() override; - virtual const void *nextRow(); + virtual const void *nextRow() override; + virtual void setInput(unsigned index, IHThorInput *_input) override; - virtual bool isGrouped() { return outputMeta.isGrouped(); } + virtual bool isGrouped() override { return outputMeta.isGrouped(); } }; diff --git a/ecl/regress/embedactivity1err.ecl b/ecl/regress/embedactivity1err.ecl new file mode 100644 index 00000000000..6f080a3e31c --- /dev/null +++ b/ecl/regress/embedactivity1err.ecl @@ -0,0 +1,74 @@ +r := RECORD + UNSIGNED id; + STRING name; +END; + +streamed dataset(r) myDataset(unsigned numRows, boolean isLocal = false, unsigned numParallel = 0) := EMBED(C++ : activity, local(isLocal), parallel(numParallel)) +static const char * const names[] = {"Gavin","John","Bart"}; +static const unsigned numNames = (unsigned)(sizeof(names) / sizeof(names[0])); +#body + class MyStreamInlineDataset : public RtlCInterface, implements IRowStream + { + public: + MyStreamInlineDataset(IEngineRowAllocator * _resultAllocator, unsigned _first, unsigned _last) + : resultAllocator(_resultAllocator), first(_first), last(_last) + { + current = first; + } + RTLIMPLEMENT_IINTERFACE + + virtual const void *nextRow() override + { + if (current >= last) + return nullptr; + + unsigned id = current++; + unsigned curName = id % numNames; + const char * name = names[curName]; + size32_t lenName = strlen(name); + + RtlDynamicRowBuilder rowBuilder(resultAllocator); + unsigned len = sizeof(__int64) + sizeof(size32_t) + lenName; + byte * row = rowBuilder.ensureCapacity(len, NULL); + *(__uint64 *)(row) = id; + *(size32_t *)(row + sizeof(__uint64)) = lenName; + memcpy(row+sizeof(__uint64)+sizeof(size32_t), name, lenName); + return rowBuilder.finalizeRowClear(len); + } + virtual void stop() override + { + current = (unsigned)-1; + } + + + protected: + Linked resultAllocator; + unsigned current; + unsigned first; + unsigned last; + }; + + unsigned numRows = numrows; + unsigned numSlaves = activity->numSlaves(); + unsigned numParallel = numSlaves * activity->numStrands(); + unsigned rowsPerPart = (numRows + numParallel - 1) / numParallel; + unsigned thisSlave = activity->querySlave(); + unsigned thisIndex = thisSlave * activity->numStrands() + activity->queryStrand(); + unsigned first = thisIndex * rowsPerPart; + unsigned last = first + rowsPerPart; + if (first > numRows) + first = numRows; + if (last > numRows) + last = numRows; + + return new MyStreamInlineDataset(_resultAllocator, first, last); +ENDEMBED; + + +reallyLocal := true : stored('reallyLocal'); + +//Global activity - fixed number of rows +output(myDataset(10)); + +//Local version of the activity +output(count(myDataset(10, isLocal := reallyLocal)) = CLUSTERSIZE * 10); diff --git a/roxie/ccd/ccdquery.cpp b/roxie/ccd/ccdquery.cpp index 9d387d45fd3..995936b8712 100644 --- a/roxie/ccd/ccdquery.cpp +++ b/roxie/ccd/ccdquery.cpp @@ -860,6 +860,10 @@ class CQueryFactory : implements IQueryFactory, implements IResourceContext, pub return createRoxieServerWhenActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node)); case TAKdistribution: return createRoxieServerDistributionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node)); + case TAKexternalprocess: + case TAKexternalsink: + case TAKexternalsource: + return createRoxieServerExternalActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node)); // These are not required in Roxie for the time being - code generator should trap them case TAKchilddataset: diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index 96e5325ec0b..11822816747 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -2681,14 +2681,24 @@ class CRoxieServerMultiInputBaseActivity : public CRoxieServerActivity sourceIdxArray[idx] = _sourceIdx; } - virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf &streams, const StrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override + virtual void connectInputStreams(bool consumerOrdered) { //There could be situations (e.g., NONEMPTY), where you might want to strand the activity. - for (unsigned i = 0; i < numInputs; i++) - streamArray[i] = connectSingleStream(ctx, inputArray[i], sourceIdxArray[i], junctionArray[i], consumerOrdered); + connectSingleInputStreams(consumerOrdered); + CRoxieServerActivity::connectInputStreams(consumerOrdered); + } + + virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf &streams, const StrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override + { return CRoxieServerActivity::getOutputStreams(ctx, idx, streams, NULL, consumerOrdered, nullptr); // The input basesclass does not have an input } +protected: + void connectSingleInputStreams(bool consumerOrdered) + { + for (unsigned i = 0; i < numInputs; i++) + streamArray[i] = connectSingleStream(ctx, inputArray[i], sourceIdxArray[i], junctionArray[i], consumerOrdered); + } }; //================================================================================= @@ -13609,6 +13619,135 @@ IRoxieServerActivityFactory *createRoxieServerNonEmptyActivityFactory(unsigned _ //================================================================================= +class SingleNodeActivityContext : public IThorActivityContext +{ +public: + SingleNodeActivityContext(unsigned _numStrands, unsigned _curStrand) : strands(_numStrands), curStrand(_curStrand) { assertex(curStrand < strands); } + + virtual bool isLocal() const override { return false; } + virtual unsigned numSlaves() const override { return 1; } + virtual unsigned numStrands() const override { return strands; } + virtual unsigned querySlave() const override { return 0; } + virtual unsigned queryStrand() const override { return curStrand; } +protected: + unsigned strands; + unsigned curStrand; +}; + +class CRoxieServerExternalActivity : public CRoxieServerMultiInputActivity +{ + Owned rows; + SingleNodeActivityContext activityContext; + +public: + CRoxieServerExternalActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs) + : CRoxieServerMultiInputActivity(_ctx, _factory, _probeManager, _numInputs), activityContext(1, 0) + { + } + + virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf &streams, const StrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override + { + Owned junction = CRoxieServerMultiInputActivity::getOutputStreams(ctx, idx, streams, consumerOptions, consumerOrdered, orderedCallbacks); + associateInputsWithHelper(); + return junction.getClear(); + } + + virtual void connectInputStreams(bool consumerOrdered) + { + CRoxieServerMultiInputActivity::connectInputStreams(consumerOrdered); + associateInputsWithHelper(); + } + + + virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) + { + CRoxieServerMultiInputActivity::start(parentExtractSize, parentExtract, paused); + if (factory->getKind() != TAKexternalsink) + { + IHThorExternalArg & helper = static_cast(basehelper); + rows.setown(helper.createOutput(&activityContext)); + } + } + + virtual void stop() + { + if (rows) + { + rows->stop(); + rows.clear(); + } + CRoxieServerMultiInputBaseActivity::stop(); + } + + virtual void reset() + { + rows.clear(); + } + + virtual const void * nextRow() + { + ActivityTimer t(totalCycles, timeActivities); + assertex(rows); + const void * next = rows->nextRow(); + if (next) + processed++; + return next; + } + + virtual void execute(unsigned parentExtractSize, const byte * parentExtract) + { + try + { + start(parentExtractSize, parentExtract, false); + assertex(!rows); + IHThorExternalArg & helper = static_cast(basehelper); + helper.execute(&activityContext); + stop(); + } + catch (IException * E) + { + ctx->notifyAbort(E); + abort(); + throw; + } + } + +protected: + void associateInputsWithHelper() + { + IHThorExternalArg & helper = static_cast(basehelper); + for (unsigned i = 0; i < numInputs; i++) + helper.setInput(i, streamArray[i]); + } +}; + +class CRoxieServerExternalActivityFactory : public CRoxieServerMultiInputFactory +{ + bool isRoot; +public: + CRoxieServerExternalActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot) + : CRoxieServerMultiInputFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode), isRoot(_isRoot) + { + } + + virtual IRoxieServerActivity *createActivity(IRoxieSlaveContext *_ctx, IProbeManager *_probeManager) const + { + return new CRoxieServerExternalActivity(_ctx, this, _probeManager, numInputs()); + } + + virtual bool isSink() const + { + return (kind == TAKexternalsink) && isRoot; + } +}; + +IRoxieServerActivityFactory *createRoxieServerExternalActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot) +{ + return new CRoxieServerExternalActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode, _isRoot); +} + +//================================================================================= + class CRoxieServerMergeActivity : public CRoxieServerMultiInputActivity { IHThorMergeArg &helper; diff --git a/roxie/ccd/ccdserver.hpp b/roxie/ccd/ccdserver.hpp index b44a5032a5e..51a91e71ac5 100644 --- a/roxie/ccd/ccdserver.hpp +++ b/roxie/ccd/ccdserver.hpp @@ -437,6 +437,7 @@ extern IRoxieServerActivityFactory *createRoxieServerWhenActionActivityFactory(u extern IRoxieServerActivityFactory *createRoxieServerDistributionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot); extern IRoxieServerActivityFactory *createRoxieServerPullActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode); extern IRoxieServerActivityFactory *createRoxieServerTraceActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode); +extern IRoxieServerActivityFactory *createRoxieServerExternalActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot); extern void throwRemoteException(IMessageUnpackCursor *extra); diff --git a/rtl/eclrtl/eclhelper_base.cpp b/rtl/eclrtl/eclhelper_base.cpp index b2c810a620b..5cb4a0a94ef 100644 --- a/rtl/eclrtl/eclhelper_base.cpp +++ b/rtl/eclrtl/eclhelper_base.cpp @@ -801,3 +801,18 @@ unsigned CThorTraceArg::getSample() { return 0; } unsigned CThorTraceArg::getSkip() { return 0; } const char *CThorTraceArg::getName() { return NULL; } + +//CThorExternalArg +CThorExternalArg::CThorExternalArg(unsigned _numInputs) +{ + inputs = new IRowStream *[_numInputs]; + for (unsigned i=0; i < _numInputs; i++) + inputs[i] = nullptr; +} + +CThorExternalArg::~CThorExternalArg() { delete [] inputs; } + +IRowStream * CThorExternalArg::createOutput(IThorActivityContext * activityContext) { rtlFailUnexpected(); } +void CThorExternalArg::execute(IThorActivityContext * activityContext) { rtlFailUnexpected(); } +void CThorExternalArg::setInput(unsigned whichInput, IRowStream * input) { inputs[whichInput] = input; } + diff --git a/rtl/eclrtl/rtlnewkey.hpp b/rtl/eclrtl/rtlnewkey.hpp index a062fc8b53b..583c8854bd2 100644 --- a/rtl/eclrtl/rtlnewkey.hpp +++ b/rtl/eclrtl/rtlnewkey.hpp @@ -91,7 +91,7 @@ class ECLRTL_API RowCursor { if (nextSeekIsGT()) { - assertex(nextUnmatchedRange == -1U); + assertex(nextUnmatchedRange == (unsigned)-1); assertex(numMatched != filters.ordinality()); unsigned i; int c = 0; @@ -147,7 +147,7 @@ class ECLRTL_API RowCursor unsigned numFilterFields() const { return filters.ordinality(); } const RtlRow & queryRow() const { return currentRow; } bool setRowForward(const byte * row); - bool nextSeekIsGT() const { return (nextUnmatchedRange == -1U); } + bool nextSeekIsGT() const { return (nextUnmatchedRange == (unsigned)-1); } bool noMoreMatches() const { return eos; } protected: diff --git a/rtl/include/eclhelper.hpp b/rtl/include/eclhelper.hpp index 2d363a58f77..b4272cf97ec 100644 --- a/rtl/include/eclhelper.hpp +++ b/rtl/include/eclhelper.hpp @@ -2716,26 +2716,6 @@ struct IHThorStreamedIteratorArg : public IHThorArg }; - -interface IPropertyTree; -interface IThorExternalRowProcessor : public IInterface -{ - virtual void onCreate(ICodeContext * ctx, IPropertyTree * graph) = 0; - virtual void addInput(unsigned idx, ITypedRowStream * input) = 0; - virtual IRowStream * createOutput(unsigned idx) = 0; - virtual void start() = 0; - virtual void execute() = 0; - virtual void stop() = 0; - virtual void reset() = 0; - virtual void onDestroy() = 0; -}; - - -struct IHThorExternalArg : public IHThorArg -{ - virtual IThorExternalRowProcessor * createProcessor() = 0; -}; - //------------------------- Dictionary stuff ------------------------- interface IHThorHashLookupInfo @@ -2772,6 +2752,47 @@ struct IHThorTraceArg : public IHThorArg virtual const char *getName() = 0; }; +//This interface is passed as an implicit parameter to the embed activity factory. It allows the activity to determine +//if it is being executed in a child query, is stranded and other useful information. +interface IThorActivityContext +{ +public: + virtual bool isLocal() const = 0; + virtual unsigned numSlaves() const = 0; + virtual unsigned numStrands() const = 0; + virtual unsigned querySlave() const = 0; // 0 based 0..numSlaves-1 + virtual unsigned queryStrand() const = 0; // 0 based 0..numStrands-1 +}; + +//MORE: How does is this extended to support onStart/onCreate +//MORE: How is this extended to allow multiple outputs +interface IHThorExternalArg : public IHThorArg +{ + virtual IRowStream * createOutput(IThorActivityContext * activityContext) = 0; + virtual void execute(IThorActivityContext * activityContext) = 0; + virtual void setInput(unsigned whichInput, IRowStream * input) = 0; +}; + +/* +interface IPropertyTree; +interface IThorExternalRowProcessor : public IInterface +{ + virtual void onCreate(ICodeContext * ctx, IPropertyTree * graph) = 0; + virtual void addInput(unsigned idx, ITypedRowStream * input) = 0; + virtual IRowStream * createOutput(unsigned idx) = 0; + virtual void start() = 0; + virtual void execute() = 0; + virtual void stop() = 0; + virtual void reset() = 0; + virtual void onDestroy() = 0; +}; + + +struct IHThorExternalArg : public IHThorArg +{ + virtual IThorExternalRowProcessor * createProcessor() = 0; +}; +*/ //------------------------- Other stuff ------------------------- diff --git a/rtl/include/eclhelper_base.hpp b/rtl/include/eclhelper_base.hpp index 5fcbe2442d6..8cdfab9a57c 100644 --- a/rtl/include/eclhelper_base.hpp +++ b/rtl/include/eclhelper_base.hpp @@ -1115,6 +1115,20 @@ class ECLRTL_API CThorStreamedIteratorArg : public CThorArgOf +{ +public: + CThorExternalArg(unsigned _numInputs); + virtual ~CThorExternalArg(); + virtual IRowStream * createOutput(IThorActivityContext * activityContext) override; + virtual void execute(IThorActivityContext * activityContext) override; + virtual void setInput(unsigned whichInput, IRowStream * input) override; + +protected: + IRowStream * * inputs; +}; + + //-- Full implementations of selective activities that don't ever require any access to the context. class ECLRTL_API CLibraryNullArg : public CThorNullArg diff --git a/testing/regress/ecl/embedactivity1.ecl b/testing/regress/ecl/embedactivity1.ecl new file mode 100644 index 00000000000..0f59ed00541 --- /dev/null +++ b/testing/regress/ecl/embedactivity1.ecl @@ -0,0 +1,85 @@ +r := RECORD + UNSIGNED id; + STRING name; +END; + +streamed dataset(r) myDataset(unsigned numRows, boolean isLocal = false, unsigned numParallel = 0) := EMBED(C++ : activity, local(isLocal), parallel(numParallel)) +static const char * const names[] = {"Gavin","John","Bart"}; +static const unsigned numNames = (unsigned)(sizeof(names) / sizeof(names[0])); +#body + class MyStreamInlineDataset : public RtlCInterface, implements IRowStream + { + public: + MyStreamInlineDataset(IEngineRowAllocator * _resultAllocator, unsigned _first, unsigned _last) + : resultAllocator(_resultAllocator), first(_first), last(_last) + { + current = first; + } + RTLIMPLEMENT_IINTERFACE + + virtual const void *nextRow() override + { + if (current >= last) + return nullptr; + + unsigned id = current++; + unsigned curName = id % numNames; + const char * name = names[curName]; + size32_t lenName = strlen(name); + + RtlDynamicRowBuilder rowBuilder(resultAllocator); + unsigned len = sizeof(__int64) + sizeof(size32_t) + lenName; + byte * row = rowBuilder.ensureCapacity(len, NULL); + *(__uint64 *)(row) = id; + *(size32_t *)(row + sizeof(__uint64)) = lenName; + memcpy(row+sizeof(__uint64)+sizeof(size32_t), name, lenName); + return rowBuilder.finalizeRowClear(len); + } + virtual void stop() override + { + current = (unsigned)-1; + } + + + protected: + Linked resultAllocator; + unsigned current; + unsigned first; + unsigned last; + }; + + unsigned numRows = numrows; + unsigned numSlaves = activity->numSlaves(); + unsigned numParallel = numSlaves * activity->numStrands(); + unsigned rowsPerPart = (numRows + numParallel - 1) / numParallel; + unsigned thisSlave = activity->querySlave(); + unsigned thisIndex = thisSlave * activity->numStrands() + activity->queryStrand(); + unsigned first = thisIndex * rowsPerPart; + unsigned last = first + rowsPerPart; + if (first > numRows) + first = numRows; + if (last > numRows) + last = numRows; + + return new MyStreamInlineDataset(_resultAllocator, first, last); +ENDEMBED; + + +//Global activity - fixed number of rows +output(myDataset(10)); +//Local version of the activity +output(count(myDataset(10, isLocal := true)) = CLUSTERSIZE * 10); + +//Check that stranding (if implemented) still generates unique records +output(COUNT(DEDUP(myDataset(1000, numParallel := 5), id, ALL))); + +r2 := RECORD + UNSIGNED id; + DATASET(r) child; +END; + +//Check that the activity can also be executed in a child query +output(DATASET(10, TRANSFORM(r2, SELF.id := COUNTER; SELF.child := myDataset(COUNTER)))); + +//Test stranding inside a child query +output(DATASET(10, TRANSFORM(r2, SELF.id := COUNTER; SELF.child := myDataset(COUNTER, NumParallel := 3)))); diff --git a/testing/regress/ecl/embedactivity2.ecl b/testing/regress/ecl/embedactivity2.ecl new file mode 100644 index 00000000000..cab5e07b778 --- /dev/null +++ b/testing/regress/ecl/embedactivity2.ecl @@ -0,0 +1,33 @@ +r := RECORD + UNSIGNED id; + STRING name; +END; + +traceDataset(streamed dataset(r) ds, boolean isLocal = false) := EMBED(C++ : activity, local(isLocal)) +#include +#body + for(;;) + { + const byte * next = (const byte *)ds->nextRow(); + if (!next) + { + next = (const byte *)ds->nextRow(); + if (!next) + return; + } + + unsigned __int64 id = *(__uint64 *)(next); + size32_t lenName = *(size32_t *)(next + sizeof(__uint64)); + const char * name = (char *)(next + sizeof(__uint64) + sizeof(size32_t)); + + printf("id(%u) name(%.*s)\n", (unsigned)id, lenName, name); + rtlReleaseRow(next); + } + +ENDEMBED; + +ds := DATASET([ + {1,'GCH'},{2,'RKC'},{3,'Count Dracular'}, {4, 'Boris'} + ], r); + +traceDataset(ds); diff --git a/testing/regress/ecl/embedactivity3.ecl b/testing/regress/ecl/embedactivity3.ecl new file mode 100644 index 00000000000..73bc1a9258b --- /dev/null +++ b/testing/regress/ecl/embedactivity3.ecl @@ -0,0 +1,62 @@ +r := RECORD + UNSIGNED value; +END; + +//This function takes two streamed inputs, and outputs the result of two values from the left multiply together and added to a row from the right + +streamed dataset(r) myDataset(streamed dataset(r) ds1, streamed dataset(r) ds2) := EMBED(C++ : activity) +#include +#body + class MyStreamInlineDataset : public RtlCInterface, implements IRowStream + { + public: + MyStreamInlineDataset(IEngineRowAllocator * _resultAllocator, IRowStream * _ds1, IRowStream * _ds2) + : resultAllocator(_resultAllocator), ds1(_ds1), ds2(_ds2) + { + } + RTLIMPLEMENT_IINTERFACE + + virtual const void *nextRow() override + { + const byte * next1a = (const byte *)ds1->nextRow(); + if (!next1a) + return nullptr; + const byte * next1b = (const byte *)ds1->nextRow(); + const byte * next2 = (const byte *)ds2->nextRow(); + if (!next1b || !next2) + rtlFailUnexpected(); + + unsigned __int64 value1a = *(const unsigned __int64 *)next1a; + unsigned __int64 value1b = *(const unsigned __int64 *)next1b; + unsigned __int64 value2 = *(const unsigned __int64 *)next2; + rtlReleaseRow(next1a); + rtlReleaseRow(next1b); + rtlReleaseRow(next2); + + unsigned __int64 result = value1a * value1b + value2; + RtlDynamicRowBuilder rowBuilder(resultAllocator); + byte * row = rowBuilder.getSelf(); + *(__uint64 *)(row) = result; + return rowBuilder.finalizeRowClear(sizeof(unsigned __int64)); + } + virtual void stop() override + { + ds1->stop(); + ds2->stop(); + } + + + protected: + Linked resultAllocator; + IRowStream * ds1; + IRowStream * ds2; + }; + + return new MyStreamInlineDataset(_resultAllocator, ds1, ds2); +ENDEMBED; + + +ds1 := DATASET([1,3,4,5,9,10,1,1], r); +ds2 := DATASET([0,3,9,-1], r); + +output(myDataset(ds1, ds2)); diff --git a/testing/regress/ecl/embedactivity4.ecl b/testing/regress/ecl/embedactivity4.ecl new file mode 100644 index 00000000000..a638200146a --- /dev/null +++ b/testing/regress/ecl/embedactivity4.ecl @@ -0,0 +1,77 @@ +r := RECORD + UNSIGNED value; +END; + +//This function takes four streamed inputs, and outputs the result of ds1*ds2+ds3*ds4 + +streamed dataset(r) myDataset(streamed dataset(r) ds1, streamed dataset(r) ds2, streamed dataset(r) ds3, streamed dataset(r) ds4) := EMBED(C++ : activity) +#include +#body + class MyStreamInlineDataset : public RtlCInterface, implements IRowStream + { + public: + MyStreamInlineDataset(IEngineRowAllocator * _resultAllocator, IRowStream * _ds1, IRowStream * _ds2, IRowStream * _ds3, IRowStream * _ds4) + : resultAllocator(_resultAllocator), ds1(_ds1), ds2(_ds2), ds3(_ds3), ds4(_ds4) + { + } + RTLIMPLEMENT_IINTERFACE + + virtual const void *nextRow() override + { + const byte * next1 = (const byte *)ds1->nextRow(); + if (!next1) + return nullptr; + const byte * next2 = (const byte *)ds2->nextRow(); + const byte * next3 = (const byte *)ds3->nextRow(); + const byte * next4 = (const byte *)ds4->nextRow(); + if (!next2 || !next3 || !next4) + rtlFailUnexpected(); + + unsigned __int64 value1 = *(const unsigned __int64 *)next1; + unsigned __int64 value2 = *(const unsigned __int64 *)next2; + unsigned __int64 value3 = *(const unsigned __int64 *)next3; + unsigned __int64 value4 = *(const unsigned __int64 *)next4; + rtlReleaseRow(next1); + rtlReleaseRow(next2); + rtlReleaseRow(next3); + rtlReleaseRow(next4); + + unsigned __int64 result = value1 * value2 + value3 * value4; + RtlDynamicRowBuilder rowBuilder(resultAllocator); + byte * row = rowBuilder.getSelf(); + *(__uint64 *)(row) = result; + return rowBuilder.finalizeRowClear(sizeof(unsigned __int64)); + } + virtual void stop() override + { + ds1->stop(); + ds2->stop(); + ds3->stop(); + ds4->stop(); + } + + protected: + Linked resultAllocator; + IRowStream * ds1; + IRowStream * ds2; + IRowStream * ds3; + IRowStream * ds4; + }; + + return new MyStreamInlineDataset(_resultAllocator, ds1, ds2, ds3, ds4); +ENDEMBED; + + +ds1 := DATASET([1,3,4,5], r); +ds2 := DATASET([1,2,3,4], r); +ds3 := DATASET([0,1,1,2], r); +ds4 := DATASET([9,8,7,6], r); + +//Global operation +output(myDataset(ds1, ds2, ds3, ds4)); + +//Execute within a child query +mkDs(unsigned num, unsigned base) := DATASET(num, TRANSFORM(r, SELF.value := COUNTER+base)); +dsx := mkDs(10, 0); +dsy := PROJECT(dsx, TRANSFORM(r, SELF.value := SUM(myDataset(mkDs(LEFT.value, 0), mkDs(LEFT.value, 1), mkDs(LEFT.value, 2), mkDs(LEFT.value, 3)), value))); +output(dsy); diff --git a/testing/regress/ecl/key/embedactivity1.xml b/testing/regress/ecl/key/embedactivity1.xml new file mode 100644 index 00000000000..4e312ebb942 --- /dev/null +++ b/testing/regress/ecl/key/embedactivity1.xml @@ -0,0 +1,42 @@ + + 0Gavin + 1John + 2Bart + 3Gavin + 4John + 5Bart + 6Gavin + 7John + 8Bart + 9Gavin + + + true + + + 1000 + + + 10Gavin + 20Gavin1John + 30Gavin1John2Bart + 40Gavin1John2Bart3Gavin + 50Gavin1John2Bart3Gavin4John + 60Gavin1John2Bart3Gavin4John5Bart + 70Gavin1John2Bart3Gavin4John5Bart6Gavin + 80Gavin1John2Bart3Gavin4John5Bart6Gavin7John + 90Gavin1John2Bart3Gavin4John5Bart6Gavin7John8Bart + 100Gavin1John2Bart3Gavin4John5Bart6Gavin7John8Bart9Gavin + + + 10Gavin + 20Gavin1John + 30Gavin1John2Bart + 40Gavin1John2Bart3Gavin + 50Gavin1John2Bart3Gavin4John + 60Gavin1John2Bart3Gavin4John5Bart + 70Gavin1John2Bart3Gavin4John5Bart6Gavin + 80Gavin1John2Bart3Gavin4John5Bart6Gavin7John + 90Gavin1John2Bart3Gavin4John5Bart6Gavin7John8Bart + 100Gavin1John2Bart3Gavin4John5Bart6Gavin7John8Bart9Gavin + diff --git a/testing/regress/ecl/key/embedactivity2.xml b/testing/regress/ecl/key/embedactivity2.xml new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/testing/regress/ecl/key/embedactivity2.xml @@ -0,0 +1 @@ + diff --git a/testing/regress/ecl/key/embedactivity3.xml b/testing/regress/ecl/key/embedactivity3.xml new file mode 100644 index 00000000000..aba2dff8ef9 --- /dev/null +++ b/testing/regress/ecl/key/embedactivity3.xml @@ -0,0 +1,6 @@ + + 3 + 23 + 99 + 0 + diff --git a/testing/regress/ecl/key/embedactivity4.xml b/testing/regress/ecl/key/embedactivity4.xml new file mode 100644 index 00000000000..a16f08bfa95 --- /dev/null +++ b/testing/regress/ecl/key/embedactivity4.xml @@ -0,0 +1,18 @@ + + 1 + 14 + 19 + 32 + + + 14 + 40 + 82 + 144 + 230 + 344 + 490 + 672 + 894 + 1160 + diff --git a/thorlcr/activities/activitymasters_lcr.cmake b/thorlcr/activities/activitymasters_lcr.cmake index 1002892e7fa..4ab5041d14a 100644 --- a/thorlcr/activities/activitymasters_lcr.cmake +++ b/thorlcr/activities/activitymasters_lcr.cmake @@ -36,6 +36,7 @@ set ( SRCS diskwrite/thdiskwrite.cpp distribution/thdistribution.cpp enth/thenth.cpp + external/thexternal.cpp fetch/thfetch.cpp filter/thfilter.cpp firstn/thfirstn.cpp diff --git a/thorlcr/activities/activityslaves_lcr.cmake b/thorlcr/activities/activityslaves_lcr.cmake index a6f3ca61a1d..4ceffb98d00 100644 --- a/thorlcr/activities/activityslaves_lcr.cmake +++ b/thorlcr/activities/activityslaves_lcr.cmake @@ -37,6 +37,7 @@ set ( SRCS diskwrite/thdwslave.cpp distribution/thdistributionslave.cpp enth/thenthslave.cpp + external/thexternalslave.cpp fetch/thfetchslave.cpp filter/thfilterslave.cpp firstn/thfirstnslave.cpp diff --git a/thorlcr/activities/external/thexternal.cpp b/thorlcr/activities/external/thexternal.cpp new file mode 100644 index 00000000000..829c534589f --- /dev/null +++ b/thorlcr/activities/external/thexternal.cpp @@ -0,0 +1,27 @@ +/*############################################################################## + + HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +#include "dasess.hpp" +#include "dadfs.hpp" +#include "thbufdef.hpp" +#include "thexception.hpp" +#include "thexternal.ipp" + +CActivityBase *createExternalActivityMaster(CMasterGraphElement *container) +{ + return new CMasterActivity(container); +} diff --git a/thorlcr/activities/external/thexternal.ipp b/thorlcr/activities/external/thexternal.ipp new file mode 100644 index 00000000000..f337ce6d183 --- /dev/null +++ b/thorlcr/activities/external/thexternal.ipp @@ -0,0 +1,25 @@ +/*############################################################################## + + HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +#ifndef _THEXTERNAL_IPP +#define _THEXTERNAL_IPP + +#include "thactivitymaster.ipp" + +CActivityBase *createExternalActivityMaster(CMasterGraphElement *info); + +#endif diff --git a/thorlcr/activities/external/thexternalslave.cpp b/thorlcr/activities/external/thexternalslave.cpp new file mode 100644 index 00000000000..954228a4ba1 --- /dev/null +++ b/thorlcr/activities/external/thexternalslave.cpp @@ -0,0 +1,175 @@ +/*############################################################################## + + HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +#include "jlib.hpp" +#include "jset.hpp" +#include "jqueue.tpp" +#include "commonext.hpp" + +#include "thormisc.hpp" +#include "thexception.hpp" +#include "thbufdef.hpp" +#include "thalloc.hpp" +#include "eclrtl_imp.hpp" + +#include "slave.ipp" +#include "thactivityutil.ipp" + +///// +/////////////////// +// +// CExternalSlaveActivity +// + +class CNodeActivityContext : public IThorActivityContext +{ +public: + CNodeActivityContext(bool _local, unsigned _numSlaves, unsigned _curSlave) + : local(_local), slaves(_local ? 1 : _numSlaves), curSlave(_local ? 0 : _curSlave) + { + assertex(curSlave < slaves); + } + + virtual bool isLocal() const override { return local; } + virtual unsigned numSlaves() const override { return slaves; } + virtual unsigned numStrands() const override { return 1; } + virtual unsigned querySlave() const override { return curSlave; } + virtual unsigned queryStrand() const override { return 0; } +protected: + unsigned slaves; + unsigned curSlave; + bool local; +}; + +class CExternalSlaveActivity : public CSlaveActivity +{ + IHThorExternalArg *helper; + Owned rows; + CNodeActivityContext activityContext; + bool grouped; + +public: + CExternalSlaveActivity(CGraphElementBase *_container) + : CSlaveActivity(_container), + activityContext(container.queryLocalData() || container.queryOwner().isLocalChild(), container.queryCodeContext()->getNodes(), container.queryCodeContext()->getNodeNum()) + { + grouped = container.queryGrouped(); + helper = (IHThorExternalArg *) queryHelper(); + setRequireInitData(false); + appendOutputLinked(this); + } + virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override + { + CSlaveActivity::setInputStream(index, _input, consumerOrdered); + helper->setInput(index, _input.stream); + } + virtual void start() override + { + //Cannot call base base start because that will error if > 1 input + startAllInputs(); + dataLinkStart(); + rows.setown(helper->createOutput(&activityContext)); + } + virtual void stop() override + { + if (rows) + { + rows->stop(); + rows.clear(); + } + stopAllInputs(); + dataLinkStop(); + } + CATCH_NEXTROW() + { + assertex(rows); + ActivityTimer t(totalCycles, timeActivities); + OwnedConstThorRow row = rows->nextRow(); + if (row) + dataLinkIncrement(); + return row.getClear(); + } + virtual bool isGrouped() const override + { + return grouped; + } + virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override + { + initMetaInfo(info); + } +}; + + +CActivityBase *createExternalSlave(CGraphElementBase *container) +{ + return new CExternalSlaveActivity(container); +} + +//--------------------------------------------------------------------------------------------------------------------- + +class CExternalSinkSlaveActivity : public ProcessSlaveActivity +{ + IHThorExternalArg *helper; + CNodeActivityContext activityContext; + +public: + CExternalSinkSlaveActivity(CGraphElementBase *_container) + : ProcessSlaveActivity(_container), + activityContext(container.queryLocalData() || container.queryOwner().isLocalChild(), container.queryCodeContext()->getNodes(), container.queryCodeContext()->getNodeNum()) + { + helper = (IHThorExternalArg *) queryHelper(); + setRequireInitData(false); + appendOutputLinked(this); + } + virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override + { + CSlaveActivity::setInputStream(index, _input, consumerOrdered); + helper->setInput(index, _input.stream); + } + virtual void start() override + { + startAllInputs(); + } + virtual void process() override + { + start(); + processed = THORDATALINK_STARTED; + try + { + helper->execute(&activityContext); + } + catch(CATCHALL) + { + ActPrintLog("APPLY: exception"); + throw; + } + } + virtual void endProcess() override + { + if ((processed & THORDATALINK_STARTED) && !(processed & THORDATALINK_STOPPED)) + { + stopAllInputs(); + processed |= THORDATALINK_STOPPED; + } + } +}; + + +CActivityBase *createExternalSinkSlave(CGraphElementBase *container) +{ + return new CExternalSinkSlaveActivity(container); +} diff --git a/thorlcr/graph/thgraph.cpp b/thorlcr/graph/thgraph.cpp index c9d45a55b8e..7bb8db5e863 100644 --- a/thorlcr/graph/thgraph.cpp +++ b/thorlcr/graph/thgraph.cpp @@ -852,6 +852,9 @@ bool isGlobalActivity(CGraphElementBase &container) case TAKgraphloop: case TAKparallelgraphloop: case TAKloopdataset: + case TAKexternalsink: + case TAKexternalsource: + case TAKexternalprocess: return false; // dependent on local/grouped case TAKkeyeddistribute: diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index eac5c04b7b8..b401408a5d3 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -274,9 +274,15 @@ void CSlaveActivity::start() void CSlaveActivity::startAllInputs() { + ActivityTimer s(totalCycles, timeActivities); ForEachItemIn(i, inputs) { - startInput(i); + try { startInput(i); } + catch (CATCHALL) + { + ActPrintLog("External(%" ACTPF "d): Error staring input %d", container.queryId(), i); + throw; + } } } diff --git a/thorlcr/master/thactivitymaster.cpp b/thorlcr/master/thactivitymaster.cpp index c01ac7636af..6461ad6988c 100644 --- a/thorlcr/master/thactivitymaster.cpp +++ b/thorlcr/master/thactivitymaster.cpp @@ -47,6 +47,7 @@ MODULE_INIT(INIT_PRIORITY_STANDARD) #include "diskwrite/thdiskwrite.ipp" #include "distribution/thdistribution.ipp" #include "enth/thenth.ipp" +#include "external/thexternal.ipp" #include "filter/thfilter.ipp" #include "firstn/thfirstn.ipp" #include "funnel/thfunnel.ipp" @@ -393,6 +394,11 @@ class CGenericMasterGraphElement : public CMasterGraphElement case TAKwhen_dataset: ret = createWhenActivityMaster(this); break; + case TAKexternalprocess: + case TAKexternalsink: + case TAKexternalsource: + ret = createExternalActivityMaster(this); + break; default: throw MakeActivityException(this, TE_UnsupportedActivityKind, "Unsupported activity kind: %s", activityKindStr(kind)); } diff --git a/thorlcr/slave/slave.cpp b/thorlcr/slave/slave.cpp index 2663117faab..65baea45e2d 100644 --- a/thorlcr/slave/slave.cpp +++ b/thorlcr/slave/slave.cpp @@ -269,6 +269,8 @@ CActivityBase *createDictionaryWorkunitWriteSlave(CGraphElementBase *container); CActivityBase *createDictionaryResultWriteSlave(CGraphElementBase *container); CActivityBase *createTraceSlave(CGraphElementBase *container); CActivityBase *createIfActionSlave(CGraphElementBase *container); +CActivityBase *createExternalSlave(CGraphElementBase *container); +CActivityBase *createExternalSinkSlave(CGraphElementBase *container); class CGenericSlaveGraphElement : public CSlaveGraphElement @@ -761,6 +763,11 @@ class CGenericSlaveGraphElement : public CSlaveGraphElement case TAKstreamediterator: ret = createStreamedIteratorSlave(this); break; + case TAKexternalprocess: + case TAKexternalsink: + case TAKexternalsource: + ret = createExternalSlave(this); + break; default: throw MakeStringException(TE_UnsupportedActivityKind, "Unsupported activity kind: %s", activityKindStr(kind)); }