Roxie dynamic files - Round 2 #2175

Closed
wants to merge 3 commits into
from
@@ -100,6 +100,7 @@ CActivityFactory::CActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFac
helperFactory(_helperFactory),
kind(_kind)
{
+ variableFileName = _queryFactory.dynamicFileResolution();
if (helperFactory)
{
Owned<IHThorArg> helper = helperFactory();
@@ -222,6 +223,11 @@ class CSlaveActivityFactory : public CActivityFactory, implements ISlaveActivity
return NULL;
}
+ virtual bool dynamicFileResolution() const
+ {
+ return variableFileName;
+ }
+
protected:
static IPropertyTree *queryStatsNode(IPropertyTree *parent, const char *xpath)
@@ -371,7 +377,7 @@ class CRoxieSlaveActivity : public CInterface, implements IRoxieSlaveActivity, i
lastPartNo.partNo = 0xffff;
lastPartNo.fileNo = 0xffff;
isOpt = false;
- variableFileName = false;
+ variableFileName = _factory->dynamicFileResolution();
meta.set(basehelper->queryOutputMeta());
}
@@ -772,7 +778,7 @@ class CRoxieDiskReadBaseActivity : public CRoxieSlaveActivity, implements IIndex
forceUnkeyed(_forceUnkeyed)
{
helper = (IHThorDiskReadBaseArg *) basehelper;
- variableFileName = (helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0;
+ variableFileName = variableFileName || (helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0;
isOpt = (helper->getFlags() & TDRoptional) != 0;
diskSize.set(helper->queryDiskRecordSize());
processed = 0;
@@ -916,7 +922,7 @@ class CRoxieDiskBaseActivityFactory : public CSlaveActivityFactory
: CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
{
Owned<IHThorDiskReadBaseArg> helper = (IHThorDiskReadBaseArg *) helperFactory();
- bool variableFileName = (helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0;
+ variableFileName = variableFileName || (helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0;
if (!variableFileName)
{
bool isOpt = (helper->getFlags() & TDRoptional) != 0;
@@ -3010,7 +3016,7 @@ class CRoxieIndexActivityFactory : public CRoxieKeyedActivityFactory
m.setBuffer(indexLayoutSize, indexLayoutMeta.getdata());
activityMeta.setown(deserializeRecordMeta(m, true));
layoutTranslators.setown(new TranslatorArray);
- bool variableFileName = (helper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) != 0;
+ variableFileName = variableFileName || (helper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) != 0;
if (!variableFileName)
{
bool isOpt = (helper->getFlags() & TIRoptional) != 0;
@@ -3164,7 +3170,7 @@ class CRoxieIndexActivity : public CRoxieKeyedActivity
stepExtra(SSEFreadAhead, NULL)
{
indexHelper = (IHThorIndexReadBaseArg *) basehelper;
- variableFileName = (indexHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) != 0;
+ variableFileName = variableFileName || (indexHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) != 0;
isOpt = (indexHelper->getFlags() & TDRoptional) != 0;
inputData = NULL;
inputCount = 0;
@@ -4208,7 +4214,7 @@ class CRoxieFetchActivityFactory : public CSlaveActivityFactory
{
Owned<IHThorFetchBaseArg> helper = (IHThorFetchBaseArg *) helperFactory();
IHThorFetchContext * fetchContext = static_cast<IHThorFetchContext *>(helper->selectInterface(TAIfetchcontext_1));
- bool variableFileName = (fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0;
+ variableFileName = variableFileName || (fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0;
if (!variableFileName)
{
bool isOpt = (fetchContext->getFetchFlags() & FFdatafileoptional) != 0;
@@ -4256,7 +4262,7 @@ class CRoxieFetchActivityBase : public CRoxieSlaveActivity
helper = (IHThorFetchBaseArg *) basehelper;
fetchContext = static_cast<IHThorFetchContext *>(helper->selectInterface(TAIfetchcontext_1));
base = 0;
- variableFileName = (fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0;
+ variableFileName = variableFileName || (fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0;
isOpt = (fetchContext->getFetchFlags() & FFdatafileoptional) != 0;
onCreate();
inputData = (char *) serializedCreate.readDirect(0);
@@ -4595,7 +4601,7 @@ class CRoxieKeyedJoinIndexActivity : public CRoxieKeyedActivity
: factory(_aFactory), CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory)
{
helper = (IHThorKeyedJoinArg *) basehelper;
- variableFileName = (helper->getJoinFlags() & (JFvarindexfilename|JFdynamicindexfilename)) != 0;
+ variableFileName = variableFileName || (helper->getJoinFlags() & (JFvarindexfilename|JFdynamicindexfilename)) != 0;
inputDone = 0;
processed = 0;
candidateCount = 0;
@@ -4900,7 +4906,7 @@ class CRoxieKeyedJoinFetchActivityFactory : public CSlaveActivityFactory
{
Owned<IHThorKeyedJoinArg> helper = (IHThorKeyedJoinArg *) helperFactory();
assertex(helper->diskAccessRequired());
- bool variableFileName = (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0;
+ variableFileName = variableFileName || (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0;
if (!variableFileName)
{
bool isOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
@@ -4954,7 +4960,7 @@ class CRoxieKeyedJoinFetchActivity : public CRoxieSlaveActivity
// MORE - no continuation row support?
base = 0;
helper = (IHThorKeyedJoinArg *) basehelper;
- variableFileName = (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0;
+ variableFileName = variableFileName || (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0;
onCreate();
inputData = (const char *) serializedCreate.readDirect(0);
inputLimit = inputData + (serializedCreate.length() - serializedCreate.getPos());
View
@@ -326,9 +326,20 @@ class CRoxieDaliHelper : public CInterface, implements IRoxieDaliHelper
{
assertex(isConnected);
Owned<IWorkUnitFactory> wuFactory = getWorkUnitFactory();
- Owned<IWorkUnit> w = wuFactory->updateWorkUnit(wuid);
- if (!w)
- return NULL;
+ Owned<IWorkUnit> w;
+ StringAttr newWuid;
+ if (wuid && *wuid)
+ {
+ w.setown(wuFactory->updateWorkUnit(wuid));
+ if (!w)
+ return NULL;
+ }
+ else
+ {
+ w.setown(wuFactory->createWorkUnit(NULL, NULL, NULL));
+ w->getWuid(StringAttrAdaptor(newWuid));
+ wuid = newWuid.get();
+ }
w->setAgentSession(myProcessSession());
if (source)
{
View
@@ -934,7 +934,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
delete [] primaries;
setDaliServixSocketCaching(true); // enable daliservix caching
loadPlugins();
- globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
+ globalPackageSetManager = createRoxiePackageSetManager(standAloneDll);
globalPackageSetManager->load();
unsigned snifferChannel = numChannels+2; // MORE - why +2 not +1 ??
ROQ = createOutputQueueManager(snifferChannel, isCCD ? numSlaveThreads : 1);
@@ -949,7 +949,13 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
setSEHtoExceptionHandler(&abortHandler);
if (runOnce)
{
- Owned <IRoxieListener> roxieServer = createRoxieSocketListener(0, 1, 0, false);
+ Owned <IRoxieListener> roxieServer;
+ if (fileNameServiceDali.length() == 0) // Local
+ roxieServer.setown(createRoxieSocketListener(0, 1, 0, false));
+ else // Dali
+ {
+ roxieServer.setown(createRoxieWorkUnitListener(1, false, standAloneDll->queryDll()));
+ }
try
{
const char *format = globals->queryProp("format");
@@ -966,7 +972,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
}
StringBuffer query;
query.appendf("<roxie format='%s'/>", format);
- roxieServer->runOnce(query.str()); // MORE - should use the wu listener instead I suspect
+ roxieServer->runOnce(query.str());
}
catch (IException *E)
{
@@ -989,7 +995,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
if (port)
roxieServer.setown(createRoxieSocketListener(port, numThreads, listenQueue, suspended));
else
- roxieServer.setown(createRoxieWorkUnitListener(numThreads, suspended));
+ roxieServer.setown(createRoxieWorkUnitListener(numThreads, suspended, NULL));
Owned<IPropertyTreeIterator> accesses = serverInfo.getElements("Access");
ForEach(*accesses)
{
@@ -124,10 +124,15 @@ class CQueryDll : public CInterface, implements IQueryDll
Owned<IConstWorkUnit> wu = daliHelper->attachWorkunit(wuid, NULL);
if (wu)
{
- SCMStringBuffer dllName;
+ SCMStringBuffer name;
Owned<IConstWUQuery> q = wu->getQuery();
- q->getQueryDllName(dllName);
- return getQueryDll(dllName.str(), false);
+ q->getQueryDllName(name);
+ if (name.length() != 0)
+ return getQueryDll(name.str(), false);
+ q->getQueryCppName(name);
+ if (name.length() != 0)
+ return getQueryDll(name.str(), true);
+ assertex(0);
@rengolin
rengolin May 2, 2012 Contributor

I'm not sure what to do here. The previous behaviour would have returned getQueryDll even if the name was empty, which doesn't make much sense. However, that's more robust, unless it's clearly an error, in which case, the assert makes more sense.

}
else
return NULL;
@@ -190,6 +195,7 @@ class CQueryFactory : public CInterface, implements IQueryFactory, implements IR
unsigned priority;
unsigned libraryInterfaceHash;
hash64_t hashValue;
+ bool dynamicFiles;
static SpinLock queriesCrit;
static CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> queryMap;
@@ -722,7 +728,8 @@ class CQueryFactory : public CInterface, implements IQueryFactory, implements IR
unsigned channelNo;
CQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo)
- : id(_id), package(_package), dll(_dll), channelNo(_channelNo), hashValue(_hashValue)
+ : id(_id), package(_package), dll(_dll), channelNo(_channelNo), hashValue(_hashValue),
+ dynamicFiles(_dll!=NULL) // this is too broad
@rengolin
rengolin May 2, 2012 Contributor

Is this correct? Is DLL any binary (including pre-loaded queries) or just stand alone DLLs?

{
package.Link();
isSuspended = false;
@@ -1092,6 +1099,16 @@ class CQueryFactory : public CInterface, implements IQueryFactory, implements IR
}
}
+ virtual void setDynamicFileResolution(bool flag)
+ {
+ dynamicFiles = flag;
+ }
+
+ virtual bool dynamicFileResolution() const
+ {
+ return dynamicFiles;
+ }
+
protected:
void checkSuspended() const
{
@@ -1103,7 +1120,6 @@ class CQueryFactory : public CInterface, implements IQueryFactory, implements IR
throw MakeStringException(ROXIE_QUERY_SUSPENDED, "Query %s is suspended%s", id.get(), err.str());
}
}
-
};
CriticalSection CQueryFactory::queryCreateLock;
@@ -111,6 +111,9 @@ interface IQueryFactory : extends IInterface
virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut) = 0;
virtual IPropertyTree *getQueryStats(time_t from, time_t to) = 0;
virtual void getGraphNames(StringArray &ret) const = 0;
+
+ virtual void setDynamicFileResolution(bool flag) = 0; // resolve files on-demand
+ virtual bool dynamicFileResolution() const = 0; // resolve files on-demand
};
class ActivityArray : public CInterface
@@ -161,6 +164,7 @@ class CActivityFactory : public CInterface
UnsignedArray childQueryIndexes;
CachedOutputMetaData meta;
mutable StatsCollector mystats;
+ bool variableFileName;
public:
CActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
Oops, something went wrong.