Skip to content

Commit

Permalink
HPCC-14396 REFRESH(boolean) option on persist
Browse files Browse the repository at this point in the history
New REFRESH option for Persist.

Usage:
  1) PERSIST('test',REFRESH(TRUE))
	- same as current persist behaviour

  2) PERSIST('test',REFRESH(FALSE))
	- only rebuild if file missing (ignore any file date or content changes)

Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.co.uk>
  • Loading branch information
shamser committed Dec 16, 2015
1 parent 6e7d589 commit 2684a17
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 20 deletions.
9 changes: 7 additions & 2 deletions common/workunit/workflow.cpp
Expand Up @@ -161,18 +161,20 @@ class CWorkflowItem : public CInterface, implements IWorkflowItem
virtual IStringVal & getPersistName(IStringVal & val) const { val.set(tree->queryProp("@persistName")); return val; }
virtual unsigned queryPersistWfid() const { return tree->getPropInt("@persistWfid", 0); }
virtual int queryPersistCopies() const { return tree->getPropInt("@persistCopies", 0); }
virtual bool queryPersistRefresh() const { return tree->getPropBool("@persistRefresh", true); }
virtual IStringVal & queryCluster(IStringVal & val) const { val.set(tree->queryProp("@cluster")); return val; }
virtual void setScheduledNow() { tree->setPropTree("Schedule", createPTree()); setEnum(tree, "@state", WFStateReqd, wfstates); }
virtual void setScheduledOn(char const * name, char const * text) { IPropertyTree * stree = createPTree(); stree->setProp("@name", name); stree->setProp("@text", text); tree->setPropTree("Schedule", createPTree())->setPropTree("Event", stree); setEnum(tree, "@state", WFStateWait, wfstates); }
virtual void setSchedulePriority(unsigned priority) { assertex(tree->hasProp("Schedule")); tree->setPropInt("Schedule/@priority", priority); }
virtual void setScheduleCount(unsigned count) { assertex(tree->hasProp("Schedule")); tree->setPropInt("Schedule/@count", count); tree->setPropInt("Schedule/@countRemaining", count); }
virtual void addDependency(unsigned wfid) { tree->addPropTree("Dependency", createPTree())->setPropInt("@wfid", wfid); }
virtual void setPersistInfo(char const * name, unsigned wfid, int numPersistInstances)
virtual void setPersistInfo(char const * name, unsigned wfid, int numPersistInstances, bool refresh)
{
tree->setProp("@persistName", name);
tree->setPropInt("@persistWfid", wfid);
if (numPersistInstances != 0)
tree->setPropInt("@persistCopies", (int)numPersistInstances);
tree->setPropBool("@persistRefresh", refresh);
}
virtual void setCluster(const char * cluster) { tree->setProp("@cluster", cluster); }
//info set at run time
Expand Down Expand Up @@ -349,11 +351,12 @@ class CCloneWorkflowItem : public CInterface, implements IRuntimeWorkflowItem
SCMStringBuffer clusterName;
unsigned persistWfid;
int persistCopies;
bool persistRefresh;
StringAttr eventName;
StringAttr eventExtra;

public:
CCloneWorkflowItem() {}
CCloneWorkflowItem() : persistRefresh(true) {}
IMPLEMENT_IINTERFACE;
void copy(IConstWorkflowItem const * other)
{
Expand Down Expand Up @@ -383,6 +386,7 @@ class CCloneWorkflowItem : public CInterface, implements IRuntimeWorkflowItem
persistWfid = other->queryPersistWfid();
scheduledWfid = other->queryScheduledWfid();
persistCopies = other->queryPersistCopies();
persistRefresh = other->queryPersistRefresh();
other->queryCluster(clusterName);
}
//info set at compile time
Expand All @@ -404,6 +408,7 @@ class CCloneWorkflowItem : public CInterface, implements IRuntimeWorkflowItem
virtual IStringVal & getPersistName(IStringVal & val) const { val.set(persistName.str()); return val; }
virtual unsigned queryPersistWfid() const { return persistWfid; }
virtual int queryPersistCopies() const { return persistCopies; }
virtual bool queryPersistRefresh() const { return persistRefresh; }
virtual IStringVal & queryCluster(IStringVal & val) const { val.set(clusterName.str()); return val; }
//info set at run time
virtual unsigned queryScheduleCountRemaining() const { return schedule ? schedule->queryCountRemaining() : 0; }
Expand Down
3 changes: 2 additions & 1 deletion common/workunit/workunit.hpp
Expand Up @@ -601,6 +601,7 @@ interface IConstWorkflowItem : extends IInterface
virtual IStringVal & getPersistName(IStringVal & val) const = 0;
virtual unsigned queryPersistWfid() const = 0;
virtual int queryPersistCopies() const = 0; // 0 - unmangled name, < 0 - use default, > 0 - max number
virtual bool queryPersistRefresh() const = 0;
virtual unsigned queryScheduleCountRemaining() const = 0;
virtual WFState queryState() const = 0;
virtual unsigned queryRetriesRemaining() const = 0;
Expand Down Expand Up @@ -632,7 +633,7 @@ interface IWorkflowItem : extends IRuntimeWorkflowItem
virtual void setSchedulePriority(unsigned priority) = 0;
virtual void setScheduleCount(unsigned count) = 0;
virtual void addDependency(unsigned wfid) = 0;
virtual void setPersistInfo(const char * name, unsigned wfid, int maxCopies) = 0;
virtual void setPersistInfo(const char * name, unsigned wfid, int maxCopies, bool refresh) = 0;
virtual void syncRuntimeData(const IConstWorkflowItem & other) = 0;
virtual void setScheduledWfid(unsigned wfid) = 0;
virtual void setCluster(const char * cluster) = 0;
Expand Down
34 changes: 26 additions & 8 deletions ecl/eclagent/eclagent.cpp
Expand Up @@ -2498,10 +2498,15 @@ bool EclAgent::checkPersistUptoDate(IRuntimeWorkflowItem & item, const char * lo

if (!isResult(lfn, ResultSequencePersist))
errText.appendf("Building PERSIST('%s'): It hasn't been calculated before", logicalName);
else if (!isResult(crcName, ResultSequencePersist))
errText.appendf("Rebuilding PERSIST('%s'): Saved CRC isn't present", logicalName);
else if (isFile && !fileExists(logicalName))
errText.appendf("Rebuilding PERSIST('%s'): Persistent file does not exist", logicalName);
else if (!item.queryPersistRefresh())
{
errText.appendf("Not rebuilding PERSIST('%s'): due to REFRESH(false)", logicalName);
return true;
}
else if (!isResult(crcName, ResultSequencePersist))
errText.appendf("Rebuilding PERSIST('%s'): Saved CRC isn't present", logicalName);
else
{
unsigned savedEclCRC = (unsigned)getResultInt(eclName, ResultSequencePersist);
Expand Down Expand Up @@ -2613,9 +2618,14 @@ bool EclAgent::isPersistUptoDate(Owned<IRemoteConnection> &persistLock, IRuntime
StringBuffer dummy;
if (checkPersistUptoDate(item, logicalName, eclCRC, allCRC, isFile, dummy) && !rebuildAllPersists)
{
StringBuffer msg;
msg.append("PERSIST('").append(logicalName).append("') is up to date");
logException(SeverityInformation, 0, msg.str(), false);
if (dummy.length())
logException(SeverityInformation, 0, dummy.str(), false);
else
{
StringBuffer msg;
msg.append("PERSIST('").append(logicalName).append("') is up to date");
logException(SeverityInformation, 0, msg.str(), false);
}
return true;
}

Expand All @@ -2636,9 +2646,17 @@ bool EclAgent::isPersistUptoDate(Owned<IRemoteConnection> &persistLock, IRuntime
StringBuffer errText;
if (checkPersistUptoDate(item, logicalName, eclCRC, allCRC, isFile, errText) && !rebuildAllPersists)
{
StringBuffer msg;
msg.append("PERSIST('").append(logicalName).append("') is up to date (after being calculated by another job)");
logException(SeverityInformation, 0, msg.str(), false);
if (errText.length())
{
errText.append(" (after being calculated by another job)");
logException(SeverityInformation, 0, errText.str(), false);
}
else
{
StringBuffer msg;
msg.append("PERSIST('").append(logicalName).append("') is up to date (after being calculated by another job)");
logException(SeverityInformation, 0, msg.str(), false);
}
changePersistLockMode(persistLock, RTM_LOCK_READ, logicalName, true);
return true;
}
Expand Down
6 changes: 6 additions & 0 deletions ecl/hql/hqlgram.y
Expand Up @@ -373,6 +373,7 @@ static void eclsyntaxerror(HqlGram * parser, const char * s, short yystate, int
RECORD
RECORDOF
RECOVERY
REFRESH
REGEXFIND
REGEXREPLACE
REGROUP
Expand Down Expand Up @@ -1742,6 +1743,11 @@ persistOpt
: fewMany
| expireAttr
| clusterAttr
| REFRESH '(' expression ')'
{
parser->normalizeExpression($3, type_boolean, true);
$$.setExpr(createExprAttribute(refreshAtom, $3.getExpr()), $1);
}
| SINGLE { $$.setExpr(createAttribute(singleAtom), $1); }
| MULTIPLE { $$.setExpr(createExprAttribute(multipleAtom), $1); }
| MULTIPLE '(' expression ')'
Expand Down
1 change: 1 addition & 0 deletions ecl/hql/hqllex.l
Expand Up @@ -870,6 +870,7 @@ REALFORMAT { RETURNSYM(REALFORMAT); }
RECORD { RETURNHARD(RECORD); }
RECORDOF { RETURNSYM(RECORDOF); }
RECOVERY { RETURNSYM(RECOVERY); }
REFRESH { RETURNSYM(REFRESH); }
REGEXFIND { RETURNSYM(REGEXFIND); }
REGEXREPLACE { RETURNSYM(REGEXREPLACE); }
REGROUP { RETURNSYM(REGROUP); }
Expand Down
11 changes: 7 additions & 4 deletions ecl/hqlcpp/hqlttcpp.cpp
Expand Up @@ -132,7 +132,7 @@ IHqlExpression * getDebugValueExpr(IConstWorkUnit * wu, IHqlExpression * expr)
struct GlobalAttributeInfo
{
public:
GlobalAttributeInfo(const char * _filePrefix, const char * _storedPrefix, IHqlExpression * _value) : value(_value)
GlobalAttributeInfo(const char * _filePrefix, const char * _storedPrefix, IHqlExpression * _value) : value(_value), persistRefresh(true)
{
setOp = no_none;
persistOp = no_none;
Expand All @@ -150,6 +150,7 @@ struct GlobalAttributeInfo
void preventDiskSpill() { few = true; }
IHqlExpression * queryCluster() const { return cluster; }
int queryMaxPersistCopies() const { return numPersistInstances; }
bool queryPersistRefresh() const { return persistRefresh; }

protected:
void doSplitGlobalDefinition(ITypeInfo * type, IHqlExpression * value, IConstWorkUnit * wu, SharedHqlExpr & setOutput, OwnedHqlExpr * getOutput, bool isRoxie);
Expand Down Expand Up @@ -179,6 +180,7 @@ struct GlobalAttributeInfo
const char * storedPrefix;
int numPersistInstances;
bool few;
bool persistRefresh;
};


Expand Down Expand Up @@ -5081,6 +5083,7 @@ void GlobalAttributeInfo::extractStoredInfo(IHqlExpression * expr, IHqlExpressio
getStringValue(s, codehash);
storedName.setown(createConstant(s.str()));
}
persistRefresh = getBoolValue(queryAttributeChild(expr, refreshAtom, 0), true);
break;
case no_global:
throwUnexpected();
Expand Down Expand Up @@ -5534,9 +5537,9 @@ void WorkflowTransformer::setWorkflowSchedule(IWorkflowItem * wf, const Schedule
wf->setSchedulePriority(priority);
}

void WorkflowTransformer::setWorkflowPersist(IWorkflowItem * wf, char const * persistName, unsigned persistWfid, int numPersistInstances)
void WorkflowTransformer::setWorkflowPersist(IWorkflowItem * wf, char const * persistName, unsigned persistWfid, int numPersistInstances, bool refresh)
{
wf->setPersistInfo(persistName, persistWfid, numPersistInstances);
wf->setPersistInfo(persistName, persistWfid, numPersistInstances, refresh);
}

WorkflowItem * WorkflowTransformer::createWorkflowItem(IHqlExpression * expr, unsigned wfid, node_operator workflowOp)
Expand Down Expand Up @@ -5925,7 +5928,7 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
info.storedName->queryValue()->getStringValue(persistName);
unsigned persistWfid = ++wfidCount;
Owned<IWorkflowItem> wf = addWorkflowToWorkunit(wfid, WFTypeNormal, WFModePersist, queryDirectDependencies(setValue), conts, info.queryCluster());
setWorkflowPersist(wf, persistName.str(), persistWfid, info.queryMaxPersistCopies());
setWorkflowPersist(wf, persistName.str(), persistWfid, info.queryMaxPersistCopies(), info.queryPersistRefresh());

DependenciesUsed dependencies(false);
UnsignedArray visited;
Expand Down
2 changes: 1 addition & 1 deletion ecl/hqlcpp/hqlttcpp.ipp
Expand Up @@ -457,7 +457,7 @@ protected:
IWorkflowItem * addWorkflowToWorkunit(unsigned wfid, WFType type, WFMode mode, UnsignedArray const & dependencies, ContingencyData const & conts, IHqlExpression * cluster);
IWorkflowItem * addWorkflowContingencyToWorkunit(unsigned wfid, WFType type, WFMode mode, UnsignedArray const & dependencies, IHqlExpression * cluster, unsigned wfidFor) { ContingencyData conts; conts.contingencyFor = wfidFor; return addWorkflowToWorkunit(wfid, type, mode, dependencies, conts, cluster); }

void setWorkflowPersist(IWorkflowItem * wf, char const * persistName, unsigned persistWfid, int numPersistInstances);
void setWorkflowPersist(IWorkflowItem * wf, char const * persistName, unsigned persistWfid, int numPersistInstances, bool refresh);
void setWorkflowSchedule(IWorkflowItem * wf, ScheduleData const & sched);

virtual IHqlExpression * createTransformed(IHqlExpression * expr);
Expand Down
22 changes: 18 additions & 4 deletions roxie/ccd/ccdcontext.cpp
Expand Up @@ -360,10 +360,15 @@ class CRoxieWorkflowMachine : public WorkflowMachine

if (!isResult(lfn, ResultSequencePersist))
errText.appendf("Building PERSIST('%s'): It hasn't been calculated before", logicalName);
else if (!isResult(crcName, ResultSequencePersist))
errText.appendf("Rebuilding PERSIST('%s'): Saved CRC isn't present", logicalName);
else if (isFile && !fileExists(lfn))
errText.appendf("Rebuilding PERSIST('%s'): Persistent file does not exist", logicalName);
else if (!item.queryPersistRefresh())
{
errText.appendf("Not rebuilding PERSIST('%s'): due to REFRESH(false)", logicalName);
return true;
}
else if (!isResult(crcName, ResultSequencePersist))
errText.appendf("Rebuilding PERSIST('%s'): Saved CRC isn't present", logicalName);
else
{
unsigned savedEclCRC = (unsigned) getResultInt(eclName, ResultSequencePersist);
Expand Down Expand Up @@ -474,7 +479,10 @@ class CRoxieWorkflowMachine : public WorkflowMachine
StringBuffer dummy;
if (checkPersistUptoDate(item, logicalName, eclCRC, allCRC, isFile, dummy) && !rebuildAllPersists)
{
logctx.CTXLOG("PERSIST('%s') is up to date", logicalName);
if (dummy.length())
logctx.CTXLOG("%s", dummy.str());
else
logctx.CTXLOG("PERSIST('%s') is up to date", logicalName);
return true;
}

Expand All @@ -495,7 +503,13 @@ class CRoxieWorkflowMachine : public WorkflowMachine
StringBuffer errText;
if (checkPersistUptoDate(item, logicalName, eclCRC, allCRC, isFile, errText) && !rebuildAllPersists)
{
logctx.CTXLOG("PERSIST('%s') is up to date (after being calculated by another job)", logicalName);
if (errText.length())
{
errText.append(" (after being calculated by another job)");
logctx.CTXLOG("%s", errText.str());
}
else
logctx.CTXLOG("PERSIST('%s') is up to date (after being calculated by another job)", logicalName);
changePersistLockMode(persistLock, RTM_LOCK_READ, logicalName, true);
return true;
}
Expand Down
6 changes: 6 additions & 0 deletions testing/regress/ecl/key/persist_refresh.xml
@@ -0,0 +1,6 @@
<Dataset name='Result 1'>
<Row><country>Spain</country><population>40397842</population></Row>
<Row><country>Sweden</country><population>9016596</population></Row>
<Row><country>Switzerland</country><population>7523934</population></Row>
<Row><country>United Kingdom</country><population>60609153</population></Row>
</Dataset>
44 changes: 44 additions & 0 deletions testing/regress/ecl/persist_refresh.ecl
@@ -0,0 +1,44 @@
/*##############################################################################
HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
Licensed under the Apache License, Version 2.0 (the 'License');
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an 'AS IS' BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
############################################################################## */

//version persistRefresh=true
//version persistRefresh=false

import ^ as root;

countryRecord := RECORD
string country;
integer4 population;
END;

ds1 := DATASET([{'Spain', 40397842},
{'Sweden', 9016596},
{'Switzerland', 7523934},
{'UK',60609153}], countryRecord);

ds2 := DATASET([{'Spain', 40397842},
{'Sweden', 9016596},
{'Switzerland', 7523934},
{'United Kingdom', 60609153}], countryRecord);

persistRefresh := #IFDEFINED(root.persistRefresh, true);

ds := if(persistRefresh=true, ds2, ds1);

CountriesDS := ds:PERSIST('~REGRESS::PersistRefresh', SINGLE, REFRESH(persistRefresh));

OUTPUT(CountriesDS);

0 comments on commit 2684a17

Please sign in to comment.