Skip to content

Commit

Permalink
HPCC-13036 Add support for embed activities
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
  • Loading branch information
ghalliday committed Apr 18, 2018
1 parent 70b4020 commit be08eb7
Show file tree
Hide file tree
Showing 40 changed files with 1,134 additions and 62 deletions.
2 changes: 2 additions & 0 deletions ecl/hql/hqlatoms.cpp
Expand Up @@ -77,6 +77,7 @@ IAtom * actionAtom;
IAtom * activeAtom;
IAtom * activeFailureAtom;
IAtom * activeNlpAtom;
IAtom * activityAtom;
IAtom * afterAtom;
IAtom * algorithmAtom;
IAtom * _aliased_Atom;
Expand Down Expand Up @@ -536,6 +537,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM)
MAKEATOM(active);
MAKEATOM(activeFailure);
MAKEATOM(activeNlp);
MAKEATOM(activity);
MAKEATOM(after);
MAKEATOM(algorithm);
MAKESYSATOM(aliased);
Expand Down
1 change: 1 addition & 0 deletions ecl/hql/hqlatoms.hpp
Expand Up @@ -81,6 +81,7 @@ extern HQL_API IAtom * actionAtom;
extern HQL_API IAtom * activeAtom;
extern HQL_API IAtom * activeFailureAtom;
extern HQL_API IAtom * activeNlpAtom;
extern HQL_API IAtom * activityAtom;
extern HQL_API IAtom * afterAtom;
extern HQL_API IAtom * algorithmAtom;
extern HQL_API IAtom * _aliased_Atom;
Expand Down
14 changes: 13 additions & 1 deletion ecl/hql/hqlattr.cpp
Expand Up @@ -1706,6 +1706,15 @@ bool isLocalActivity(IHqlExpression * expr)
return false;
case no_compound:
return isLocalActivity(expr->queryChild(1));
case no_call:
case no_externalcall:
if (callIsActivity(expr))
{
//Can only be deduced by substituting the parameters into the body and seeing if the local attribute has a constant value
//currently assume false.
return false;
}
return false;
case no_compound_diskread:
case no_compound_disknormalize:
case no_compound_diskaggregate:
Expand Down Expand Up @@ -1904,7 +1913,10 @@ bool localChangesActivityAction(IHqlExpression * expr)
case no_nwaymerge:
case no_selfjoin:
case no_joincount:
return !isKeyedJoin(expr); // Keyed joins always
return !isKeyedJoin(expr); // Keyed joins always
case no_call:
case no_externalcall:
return callIsActivity(expr);
}
return false;
}
Expand Down
98 changes: 93 additions & 5 deletions ecl/hql/hqlexpr.cpp
Expand Up @@ -2269,7 +2269,6 @@ childDatasetType getChildDatasetType(IHqlExpression * expr)
case no_httpcall:
case no_soapcall:
case no_newsoapcall:
case no_externalcall: // None in the sense it is generally used for.
case no_alias:
case no_id2blob:
case no_embedbody:
Expand All @@ -2279,7 +2278,6 @@ childDatasetType getChildDatasetType(IHqlExpression * expr)
case no_param:
case no_typetransfer:
case no_translated:
case no_call:
case no_rows:
case no_external:
case no_delayedselect:
Expand Down Expand Up @@ -2455,6 +2453,14 @@ childDatasetType getChildDatasetType(IHqlExpression * expr)
return childdataset_left;
case no_chooseds:
return childdataset_many_noscope;
case no_call:
if (functionCallIsActivity(expr))
return childdataset_many_noscope;
return childdataset_none;
case no_externalcall:
if (externalCallIsActivity(expr))
return childdataset_many_noscope;
return childdataset_none;
case no_merge:
case no_regroup:
case no_cogroup:
Expand Down Expand Up @@ -2703,15 +2709,13 @@ inline unsigned doGetNumChildTables(IHqlExpression * dataset)
case no_httpcall:
case no_soapcall:
case no_newsoapcall:
case no_externalcall: // None in the sense it is generally used for.
case no_alias:
case no_id2blob:
case no_embedbody:
case no_datasetfromrow:
case no_datasetfromdictionary:
case no_param:
case no_translated:
case no_call:
case no_rows:
case no_external:
case no_rowsetindex:
Expand Down Expand Up @@ -2772,6 +2776,14 @@ inline unsigned doGetNumChildTables(IHqlExpression * dataset)
case no_quoted:
case no_variable:
return 0;
case no_call:
if (functionCallIsActivity(dataset))
return numStreamInputs(dataset->queryBody()->queryFunctionDefinition());
return 0;
case no_externalcall:
if (externalCallIsActivity(dataset))
return numStreamInputs(dataset->queryExternalDefinition());
return 0;
case no_mapto:
case no_compound:
return 0; // a lie.
Expand Down Expand Up @@ -8403,6 +8415,81 @@ bool functionBodyUsesContext(IHqlExpression * body)
}
}

bool functionBodyIsActivity(IHqlExpression * body)
{
switch (body->getOperator())
{
case no_external:
return body->hasAttribute(activityAtom);
case no_outofline:
case no_funcdef:
return functionBodyIsActivity(body->queryChild(0));
case no_embedbody:
return body->hasAttribute(activityAtom);
default:
return false;
}
}

bool functionCallIsActivity(IHqlExpression * call)
{
dbgassertex(call->getOperator() == no_call);
return functionBodyIsActivity(call->queryBody()->queryFunctionDefinition());
}

bool externalCallIsActivity(IHqlExpression * call)
{
dbgassertex(call->getOperator() == no_externalcall);
return functionBodyIsActivity(call->queryExternalDefinition());
}

IHqlExpression * queryFuncdef(IHqlExpression * call)
{
switch (call->getOperator())
{
case no_call:
return call->queryBody()->queryFunctionDefinition();
case no_externalcall:
return call->queryExternalDefinition();
default:
throwUnexpected();
}
}

bool callIsActivity(IHqlExpression * call)
{
return functionBodyIsActivity(queryFuncdef(call));
}

bool isStreamingDataset(IHqlExpression * param)
{
dbgassertex(param->getOperator() == no_param);
ITypeInfo * paramType = param->queryType();
switch (paramType->getTypeCode())
{
case type_table:
case type_groupedtable:
if (hasStreamedModifier(paramType))
return true;
break;
}
return false;
}

unsigned numStreamInputs(IHqlExpression * funcdef)
{
dbgassertex(funcdef->getOperator() == no_funcdef);
IHqlExpression * formals = funcdef->queryChild(1);
unsigned numStreams = 0;
ForEachChild(i, formals)
{
if (!isStreamingDataset(formals->queryChild(i)))
break;
numStreams++;
}
return numStreams;
}

IHqlExpression * createFunctionDefinition(IIdAtom * id, HqlExprArray & args)
{
IHqlExpression * body = &args.item(0);
Expand Down Expand Up @@ -12443,7 +12530,6 @@ IHqlExpression * expandOutOfLineFunctionCall(IHqlExpression * expr)
CallExpansionContext ctx;
ctx.functionCache = &functionCache;
ctx.forceOutOfLineExpansion = true;
assertex(expr->getOperator() == no_call);
if (ctx.expandFunctionCall(expr))
return expandFunctionCallPreserveAnnotation(ctx, expr);
return LINK(expr);
Expand Down Expand Up @@ -12936,6 +13022,8 @@ IHqlExpression * createExternalFuncdefFromInternal(IHqlExpression * funcdef)
attrs.append(*createAttribute(contextSensitiveAtom));
if (functionBodyUsesContext(body))
attrs.append(*LINK(cachedContextAttribute));
if (functionBodyIsActivity(body))
attrs.append(*createAttribute(activityAtom));

IHqlExpression *child = body->queryChild(0);
if (child && child->getOperator()==no_embedbody)
Expand Down
7 changes: 7 additions & 0 deletions ecl/hql/hqlexpr.hpp
Expand Up @@ -1889,6 +1889,13 @@ extern HQL_API IHqlExpression * createFunctionDefinition(IIdAtom * name, IHqlExp
extern HQL_API IHqlExpression * createFunctionDefinition(IIdAtom * name, HqlExprArray & args);
extern HQL_API IHqlExpression * queryNonDelayedBaseAttribute(IHqlExpression * expr);
extern HQL_API bool functionBodyUsesContext(IHqlExpression * body);
extern HQL_API bool functionBodyIsActivity(IHqlExpression * body);
extern HQL_API bool functionCallIsActivity(IHqlExpression * call);
extern HQL_API bool externalCallIsActivity(IHqlExpression * call);
extern HQL_API bool callIsActivity(IHqlExpression * call); // can be no_call or no_externalcall
extern HQL_API IHqlExpression * queryFuncdef(IHqlExpression * call);
extern HQL_API bool isStreamingDataset(IHqlExpression * param);
extern HQL_API unsigned numStreamInputs(IHqlExpression * funcdef);

#define NO_AGGREGATE \
no_count: \
Expand Down
2 changes: 2 additions & 0 deletions ecl/hql/hqlutil.cpp
Expand Up @@ -8079,6 +8079,8 @@ class Vs6CppNameMangler
mangled.append("PVIGlobalCodeContext@@");
else if (body->hasAttribute(userMatchFunctionAtom))
mangled.append("PVIMatchWalker@@");
if (functionBodyIsActivity(body))
mangled.append("PVIThorActivityContext@@");

if (mangledReturnParameters.length())
mangled.append(mangledReturnParameters);
Expand Down
3 changes: 2 additions & 1 deletion ecl/hql/hqlutil.hpp
Expand Up @@ -252,7 +252,8 @@ extern HQL_API bool isTimed(IHqlExpression * expr);

inline bool isInternalEmbedAttr(IAtom *name)
{
return name == languageAtom || name == projectedAtom || name == streamedAtom || name == _linkCounted_Atom || name == importAtom || name==foldAtom || name==timeAtom || name==prebindAtom;
return name == languageAtom || name == projectedAtom || name == streamedAtom || name == _linkCounted_Atom ||
name == importAtom || name==foldAtom || name==timeAtom || name==prebindAtom || name == activityAtom;
}


Expand Down
2 changes: 2 additions & 0 deletions ecl/hqlcpp/hqlcerrors.hpp
Expand Up @@ -221,6 +221,7 @@
#define HQLERR_ExpectedFileLhsFetch 4209
#define HQLERR_IncompatibleKeyedSubString 4210
#define HQLERR_NonNullChildDSDefault 4211
#define HQLERR_AttributeXMustBeConstant 4212

//Warnings....
#define HQLWRN_PersistDataNotLikely 4500
Expand Down Expand Up @@ -523,6 +524,7 @@
#define HQLERR_ExpectedFileLhsFetch_Text "The first argument of FETCH must be a disk file (had %s)"
#define HQLERR_IncompatibleKeyedSubString_Text "Cannot use two different KEYED substring filters for field %s in key %s"
#define HQLERR_NonNullChildDSDefault_Text "Non-null child dataset may not be used as default value (target field '%s')"
#define HQLERR_AttributeXMustBeConstant_Text "Attribute %s must be set to a constant value"

//Warnings.
#define HQLWRN_CannotRecreateDistribution_Text "Cannot recreate the distribution for a persistent dataset"
Expand Down
1 change: 1 addition & 0 deletions ecl/hqlcpp/hqlcpp.ipp
Expand Up @@ -1487,6 +1487,7 @@ public:
ABoundActivity * doBuildActivityDistribute(BuildCtx & ctx, IHqlExpression * expr);
ABoundActivity * doBuildActivityDistribution(BuildCtx & ctx, IHqlExpression * expr, bool isRoot);
ABoundActivity * doBuildActivitySectionInput(BuildCtx & ctx, IHqlExpression * expr);
ABoundActivity * doBuildActivityEmbed(BuildCtx & ctx, IHqlExpression * expr, bool isRoot);
ABoundActivity * doBuildActivityEnth(BuildCtx & ctx, IHqlExpression * expr);
ABoundActivity * doBuildActivityExecuteWhen(BuildCtx & ctx, IHqlExpression * expr, bool isRoot);
ABoundActivity * doBuildActivityForceLocal(BuildCtx & ctx, IHqlExpression * expr);
Expand Down
3 changes: 3 additions & 0 deletions ecl/hqlcpp/hqlcppds.cpp
Expand Up @@ -2370,6 +2370,9 @@ void HqlCppTranslator::doBuildDataset(BuildCtx & ctx, IHqlExpression * expr, CHq
buildTempExpr(ctx, expr, tgt, format);
return;
}
break;
case no_quoted:
throwUnexpectedX("Translated expression passed to doBuildDataset()");
}

if (expr->isDictionary())
Expand Down
1 change: 1 addition & 0 deletions ecl/hqlcpp/hqlcppsys.ecl
Expand Up @@ -790,6 +790,7 @@ const char * cppSystemText[] = {
" boolean newMemorySpillSplitArg(unsigned4 usageCount, const varstring name, boolean meta) : include, pseudoentrypoint='new CLibraryMemorySpillSplitArg';",
" boolean newWorkUnitReadArg(const varstring _name, boolean _meta) : include, pseudoentrypoint='new CLibraryWorkUnitReadArg';",
" boolean newWorkUnitWriteArg(const varstring _name, unsigned4 _flags, boolean _meta) : include, pseudoentrypoint='new CLibraryWorkUnitWriteArg';",
" CThorExternalArg(unsigned4 _numInputs) : include;",

" destructMetaMember(row _x) : omethod,entrypoint='destruct';",
" walkIndirectMetaMember(row _x, boolean _visitor) : omethod,entrypoint='walkIndirectMembers';",
Expand Down

0 comments on commit be08eb7

Please sign in to comment.