From 3f2fe5c7b6d120356614c35dcb1bf79265ebd535 Mon Sep 17 00:00:00 2001 From: Anthony Fishbeck Date: Thu, 21 Sep 2017 19:22:05 -0400 Subject: [PATCH] HPCC-16870 Add method to rebuild a query from its ECL archive This will be really useful when upgrading the ECL compiler, allowing the user to rebuild their query from the exact same code. Signed-off-by: Anthony Fishbeck --- ecl/eclcmd/eclcmd_common.hpp | 1 + ecl/eclcmd/queries/ecl-queries.cpp | 270 ++++++++++++++++++ esp/scm/ws_workunits.ecm | 67 ++++- .../ws_workunits/ws_workunitsHelpers.cpp | 23 +- .../ws_workunits/ws_workunitsHelpers.hpp | 4 +- .../ws_workunits/ws_workunitsQuerySets.cpp | 194 +++++++++++++ .../ws_workunits/ws_workunitsService.cpp | 3 +- .../ws_workunits/ws_workunitsService.hpp | 3 + 8 files changed, 548 insertions(+), 17 deletions(-) diff --git a/ecl/eclcmd/eclcmd_common.hpp b/ecl/eclcmd/eclcmd_common.hpp index 75e585a168a..02179abe79e 100644 --- a/ecl/eclcmd/eclcmd_common.hpp +++ b/ecl/eclcmd/eclcmd_common.hpp @@ -67,6 +67,7 @@ typedef IEclCommand *(*EclCommandFactory)(const char *cmdname); #define ECLOPT_PASSWORD_ENV "ECL_PASSWORD" #define ECLOPT_NORELOAD "--no-reload" +#define ECLOPT_NOPUBLISH "--no-publish" #define ECLOPT_OVERWRITE "--overwrite" #define ECLOPT_REPLACE "--replace" #define ECLOPT_OVERWRITE_S "-O" diff --git a/ecl/eclcmd/queries/ecl-queries.cpp b/ecl/eclcmd/queries/ecl-queries.cpp index 9839e273562..753460825d0 100644 --- a/ecl/eclcmd/queries/ecl-queries.cpp +++ b/ecl/eclcmd/queries/ecl-queries.cpp @@ -910,6 +910,273 @@ class EclCmdQueriesConfig : public EclCmdCommon bool optNoReload; }; +class EclCmdQueriesRecreate : public EclCmdCommon +{ +public: + EclCmdQueriesRecreate() + { + } + virtual eclCmdOptionMatchIndicator parseCommandLineOptions(ArgvIterator &iter) + { + if (iter.done()) + return EclCmdOptionNoMatch; + + for (; !iter.done(); iter.next()) + { + const char *arg = iter.query(); + if (*arg!='-') + { + if (optTarget.isEmpty()) + optTarget.set(arg); + else if (optQueryId.isEmpty()) + optQueryId.set(arg); + else if (optDestTarget.isEmpty()) + optDestTarget.set(arg); + else + { + fprintf(stderr, "\nunrecognized argument %s\n", arg); + return EclCmdOptionNoMatch; + } + continue; + } + if (iter.matchOption(optDaliIP, ECLOPT_DALIIP)) + continue; + if (iter.matchOption(optSourceProcess, ECLOPT_SOURCE_PROCESS)) + continue; + if (iter.matchOption(optMsToWait, ECLOPT_WAIT)) + continue; + if (iter.matchOption(optTimeLimit, ECLOPT_TIME_LIMIT)) + continue; + if (iter.matchOption(optWarnTimeLimit, ECLOPT_WARN_TIME_LIMIT)) + continue; + if (iter.matchOption(optMemoryLimit, ECLOPT_MEMORY_LIMIT)) + continue; + if (iter.matchOption(optPriority, ECLOPT_PRIORITY)) + continue; + if (iter.matchOption(optComment, ECLOPT_COMMENT)) + continue; + if (iter.matchFlag(optDontCopyFiles, ECLOPT_DONT_COPY_FILES)) + continue; + if (iter.matchFlag(optAllowForeign, ECLOPT_ALLOW_FOREIGN)) + continue; + if (iter.matchFlag(optNoActivate, ECLOPT_NO_ACTIVATE)) + { + activateSet=true; + continue; + } + if (iter.matchFlag(optNoReload, ECLOPT_NORELOAD)) + continue; + if (iter.matchFlag(optNoPublish, ECLOPT_NOPUBLISH)) + continue; + bool activate; //also supports "-A-" + if (iter.matchFlag(activate, ECLOPT_ACTIVATE)||iter.matchFlag(activate, ECLOPT_ACTIVATE_S)) + { + activateSet=true; + optNoActivate=!activate; + continue; + } + if (iter.matchFlag(optSuspendPrevious, ECLOPT_SUSPEND_PREVIOUS)||iter.matchFlag(optSuspendPrevious, ECLOPT_SUSPEND_PREVIOUS_S)) + continue; + if (iter.matchFlag(optDeletePrevious, ECLOPT_DELETE_PREVIOUS)||iter.matchFlag(optDeletePrevious, ECLOPT_DELETE_PREVIOUS_S)) + continue; + if (iter.matchFlag(optUpdateDfs, ECLOPT_UPDATE_DFS)) + continue; + if (iter.matchFlag(optUpdateSuperfiles, ECLOPT_UPDATE_SUPER_FILES)) + continue; + if (iter.matchFlag(optUpdateCloneFrom, ECLOPT_UPDATE_CLONE_FROM)) + continue; + if (iter.matchFlag(optDontAppendCluster, ECLOPT_DONT_APPEND_CLUSTER)) + continue; + if (iter.matchOption(optResultLimit, ECLOPT_RESULT_LIMIT)) + continue; + if (matchVariableOption(iter, 'f', debugValues)) + continue; + eclCmdOptionMatchIndicator ind = EclCmdCommon::matchCommandLineOption(iter, true); + if (ind != EclCmdOptionMatch) + return ind; + } + return EclCmdOptionMatch; + } + virtual bool finalizeOptions(IProperties *globals) + { + if (!EclCmdCommon::finalizeOptions(globals)) + return false; + if (optTarget.isEmpty()) + { + fputs("Target not specified.\n", stderr); + return false; + } + if (optQueryId.isEmpty()) + { + fputs("Query not specified.\n", stderr); + return false; + } + if (!activateSet) + { + bool activate; + if (extractEclCmdOption(activate, globals, ECLOPT_ACTIVATE_ENV, ECLOPT_ACTIVATE_INI, true)) + optNoActivate=!activate; + } + if (optNoActivate && (optSuspendPrevious || optDeletePrevious)) + { + fputs("invalid --suspend-prev and --delete-prev require activation.\n", stderr); + return false; + } + if (!optSuspendPrevious && !optDeletePrevious) + { + extractEclCmdOption(optDeletePrevious, globals, ECLOPT_DELETE_PREVIOUS_ENV, ECLOPT_DELETE_PREVIOUS_INI, false); + if (!optDeletePrevious) + extractEclCmdOption(optSuspendPrevious, globals, ECLOPT_SUSPEND_PREVIOUS_ENV, ECLOPT_SUSPEND_PREVIOUS_INI, false); + } + if (optSuspendPrevious && optDeletePrevious) + { + fputs("invalid --suspend-prev and --delete-prev are mutually exclusive options.\n", stderr); + return false; + } + if (optMemoryLimit.length() && !isValidMemoryValue(optMemoryLimit)) + { + fprintf(stderr, "invalid --memoryLimit value of %s.\n", optMemoryLimit.get()); + return false; + } + if (optPriority.length() && !isValidPriorityValue(optPriority)) + { + fprintf(stderr, "invalid --priority value of %s.\n", optPriority.get()); + return false; + } + return true; + } + virtual int processCMD() + { + if (optVerbose) + fprintf(stdout, "\nRecreating %s/%s\n", optTarget.str(), optQueryId.str()); + + Owned client = createCmdClient(WsWorkunits, *this); //upload_ disables maxRequestEntityLength + Owned req = client->createWURecreateQueryRequest(); + + if (optDeletePrevious) + req->setActivate(CWUQueryActivationMode_ActivateDeletePrevious); + else if (optSuspendPrevious) + req->setActivate(CWUQueryActivationMode_ActivateSuspendPrevious); + else + req->setActivate(optNoActivate ? CWUQueryActivationMode_NoActivate : CWUQueryActivationMode_Activate); + + req->setTarget(optTarget); + req->setDestTarget(optDestTarget); + req->setQueryId(optQueryId); + req->setRemoteDali(optDaliIP); + req->setSourceProcess(optSourceProcess); + req->setWait(optMsToWait); + req->setNoReload(optNoReload); + req->setRepublish(!optNoPublish); + req->setDontCopyFiles(optDontCopyFiles); + req->setAllowForeignFiles(optAllowForeign); + req->setUpdateDfs(optUpdateDfs); + req->setUpdateSuperFiles(optUpdateSuperfiles); + req->setUpdateCloneFrom(optUpdateCloneFrom); + req->setAppendCluster(!optDontAppendCluster); + req->setIncludeFileErrors(true); + req->setDebugValues(debugValues); + + if (optTimeLimit != (unsigned) -1) + req->setTimeLimit(optTimeLimit); + if (optWarnTimeLimit != (unsigned) -1) + req->setWarnTimeLimit(optWarnTimeLimit); + if (!optMemoryLimit.isEmpty()) + req->setMemoryLimit(optMemoryLimit); + if (!optPriority.isEmpty()) + req->setPriority(optPriority); + if (optComment.get()) //allow empty + req->setComment(optComment); + + Owned resp = client->WURecreateQuery(req); + const char *wuid = resp->getWuid(); + if (wuid && *wuid) + fprintf(stdout, "\nWorkunit: %s\n", wuid); + const char *id = resp->getQueryId(); + if (id && *id) + { + const char *qs = resp->getQuerySet(); + fprintf(stdout, "\nPublished: %s/%s\n", qs ? qs : "", resp->getQueryId()); + } + if (resp->getReloadFailed()) + fputs("\nAdded to Target, but request to reload queries on cluster failed\n", stderr); + + int ret = outputMultiExceptionsEx(resp->getExceptions()); + if (outputQueryFileCopyErrors(resp->getFileErrors())) + ret = 1; + + return ret; + } + virtual void usage() + { + fputs("\nUsage:\n" + "\n" + "The 'queries recreate' command recompiles a query into a new workunit and republishes\n" + "the new workunit. This is usefull when upgrading to a new ECL compiler and you\n" + "want to recompile a query with the exact same source.\n" + "\n" + "The ECL archive must be available within the workunit of the query.\n" + "\n" + "ecl queries recreate [options]\n\n" + "ecl queries recreate [options]\n\n" + " the target the query you wish to recreate is in\n" + " the query ID of the query you wish to recreate\n" + " the target you want to move the new query to\n" + " (if different from the source target\n" + " Options:\n" + " -A, --activate Activate query when published (default)\n" + " --limit= Sets the result limit for the query, defaults to 100\n" + " -sp, --suspend-prev Suspend previously active query\n" + " -dp, --delete-prev Delete previously active query\n" + " -A-, --no-activate Do not activate query when published\n" + " --no-publish Create a recompiled workunit, but do not publish it\n" + " --no-reload Do not request a reload of the (roxie) cluster\n" + " --no-files Do not copy DFS file information for referenced files\n" + " --allow-foreign Do not fail if foreign files are used in query (roxie)\n" + " --daliip= The IP of the DALI to be used to locate remote files\n" + " --update-super-files Update local DFS super-files if remote DALI has changed\n" + " --update-clone-from Update local clone from location if remote DALI has changed\n" + " --dont-append-cluster Only use to avoid locking issues due to adding cluster to file\n" + " --source-process Process cluster to copy files from\n" + " --timeLimit= Value to set for query timeLimit configuration\n" + " --warnTimeLimit= Value to set for query warnTimeLimit configuration\n" + " --memoryLimit= Value to set for query memoryLimit configuration\n" + " format as 500000B, 550K, 100M, 10G, 1T etc.\n" + " --priority= set the priority for this query. Value can be LOW,\n" + " HIGH, SLA, NONE. NONE will clear current setting.\n" + " --comment= Set the comment associated with this query\n" + " --wait= Max time to wait in milliseconds\n", + stdout); + EclCmdCommon::usage(); + } +private: + StringAttr optTarget; + StringAttr optDestTarget; + StringAttr optQueryId; + StringAttr optDaliIP; + StringAttr optSourceProcess; + StringAttr optMemoryLimit; + StringAttr optPriority; + StringAttr optComment; + IArrayOf debugValues; + unsigned optMsToWait = (unsigned) -1; + unsigned optTimeLimit = (unsigned) -1; + unsigned optWarnTimeLimit = (unsigned) -1; + unsigned optResultLimit = (unsigned) -1; + bool optNoActivate = false; + bool activateSet = false; + bool optNoReload = false; + bool optNoPublish = false; + bool optDontCopyFiles = false; + bool optSuspendPrevious = false; + bool optDeletePrevious = false; + bool optAllowForeign = false; + bool optUpdateDfs = false; + bool optUpdateSuperfiles = false; + bool optUpdateCloneFrom = false; + bool optDontAppendCluster = false; //Undesirable but here temporarily because DALI may have locking issues +}; + IEclCommand *createEclQueriesCommand(const char *cmdname) { if (!cmdname || !*cmdname) @@ -924,6 +1191,8 @@ IEclCommand *createEclQueriesCommand(const char *cmdname) return new EclCmdQueriesCopy(); if (strieq(cmdname, "copy-set")) return new EclCmdQueriesCopyQueryset(); + if (strieq(cmdname, "recreate")) + return new EclCmdQueriesRecreate(); return NULL; } @@ -947,6 +1216,7 @@ class EclQueriesCMDShell : public EclCMDShell " config update query settings\n" " copy copy a query from one target cluster to another\n" " copy-set copy queries from one target cluster to another\n" + " recreate recompiles query into a new workunit\n" ); } }; diff --git a/esp/scm/ws_workunits.ecm b/esp/scm/ws_workunits.ecm index 65e81656684..fa8efb4e64c 100644 --- a/esp/scm/ws_workunits.ecm +++ b/esp/scm/ws_workunits.ecm @@ -597,6 +597,62 @@ ESPresponse [exceptions_inline] WUResubmitResponse [min_ver("1.40")] ESParray WUs; }; +ESPenum WUQueryActivationMode : int +{ + NoActivate(0, "Do not activate query"), + Activate(1, "Activate query"), + ActivateSuspendPrevious(2, "Activate query, suspend previous"), + ActivateDeletePrevious(3, "Activate query, delete previous") +}; + +ESPrequest [nil_remove] WURecreateQueryRequest +{ + string Target; + string QueryId; + ESParray DebugValues; + string DestTarget; + bool Republish(0); + ESPEnum WUQueryActivationMode Activate; + bool NoReload(0); + + string MemoryLimit; + nonNegativeInteger TimeLimit(0); + nonNegativeInteger WarnTimeLimit(0); + string Priority; + string Comment; + + string RemoteDali; + bool DontCopyFiles(false); + string SourceProcess; + bool AllowForeignFiles(false); + bool UpdateDfs(false); + bool UpdateSuperFiles(false); //update content of superfiles if changed + bool UpdateCloneFrom(false); //explicity wan't to change where roxie will grab from + bool AppendCluster(true); //file exists on other local cluster, add new one, make optional in case of locking issues, but should be made to work + bool IncludeFileErrors(false); + + int Wait(-1); +}; + +ESPresponse [exceptions_inline, nil_remove] WURecreateQueryResponse +{ + string Wuid; + string QuerySet; + string QueryName; + string QueryId; + + string MemoryLimit; + nonNegativeInteger TimeLimit; + nonNegativeInteger WarnTimeLimit; + string Priority; + string Comment; + + bool ReloadFailed; + bool Suspended; + string ErrorMessage; + ESParray FileErrors; +}; + ESPenum WUExceptionSeverity : string { INFO("info"), @@ -1221,14 +1277,6 @@ ESPresponse [exceptions_inline] WUCopyLogicalFilesResponse }; -ESPenum WUQueryActivationMode : int -{ - NoActivate(0, "Do not activate query"), - Activate(1, "Activate query"), - ActivateSuspendPrevious(2, "Activate query, suspend previous"), - ActivateDeletePrevious(3, "Activate query, delete previous") -}; - ESPrequest [nil_remove] WUPublishWorkunitRequest { string Wuid; @@ -1828,7 +1876,7 @@ ESPresponse [exceptions_inline, nil_remove] WUGetNumFileToCopyResponse ESPservice [ auth_feature("DEFERRED"), //This declares that the method logic handles feature level authorization - version("1.69"), default_client_version("1.69"), + version("1.70"), default_client_version("1.70"), noforms,exceptions_inline("./smc_xslt/exceptions.xslt"),use_method_name] WsWorkunits { ESPmethod [cache_seconds(60), resp_xsl_default("/esp/xslt/workunits.xslt")] WUQuery(WUQueryRequest, WUQueryResponse); @@ -1864,6 +1912,7 @@ ESPservice [ ESPmethod WUAbort(WUAbortRequest, WUAbortResponse); ESPmethod WUProtect(WUProtectRequest, WUProtectResponse); + ESPmethod [min_ver("1.70")] WURecreateQuery(WURecreateQueryRequest, WURecreateQueryResponse); ESPmethod WUResubmit(WUResubmitRequest, WUResubmitResponse); //???? ESPmethod WURun(WURunRequest, WURunResponse); diff --git a/esp/services/ws_workunits/ws_workunitsHelpers.cpp b/esp/services/ws_workunits/ws_workunitsHelpers.cpp index 36da26db893..dab7a6cbd3c 100644 --- a/esp/services/ws_workunits/ws_workunitsHelpers.cpp +++ b/esp/services/ws_workunits/ws_workunitsHelpers.cpp @@ -2066,15 +2066,14 @@ IConstWUQuery* WsWuInfo::getEmbeddedQuery() return NULL; } -void WsWuInfo::getWorkunitArchiveQuery(MemoryBuffer& buf) +void WsWuInfo::getWorkunitArchiveQuery(IStringVal& str) { Owned query = cw->getQuery(); if(!query) throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str()); - SCMStringBuffer queryText; - query->getQueryText(queryText); - if ((queryText.length() < 1) || !isArchiveQuery(queryText.str())) + query->getQueryText(str); + if ((str.length() < 1) || !isArchiveQuery(str.str())) { if (!query->hasArchive()) throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Archive query not found for workunit %s.", wuid.str()); @@ -2083,10 +2082,22 @@ void WsWuInfo::getWorkunitArchiveQuery(MemoryBuffer& buf) if (!embeddedQuery) throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Embedded query not found for workunit %s.", wuid.str()); - embeddedQuery->getQueryText(queryText); - if ((queryText.length() < 1) || !isArchiveQuery(queryText.str())) + embeddedQuery->getQueryText(str); + if ((str.length() < 1) || !isArchiveQuery(str.str())) throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Archive query not found for workunit %s.", wuid.str()); } +} + +void WsWuInfo::getWorkunitArchiveQuery(StringBuffer& buf) +{ + StringBufferAdaptor queryText(buf); + getWorkunitArchiveQuery(queryText); +} + +void WsWuInfo::getWorkunitArchiveQuery(MemoryBuffer& buf) +{ + SCMStringBuffer queryText; + getWorkunitArchiveQuery(queryText); buf.append(queryText.length(), queryText.str()); } diff --git a/esp/services/ws_workunits/ws_workunitsHelpers.hpp b/esp/services/ws_workunits/ws_workunitsHelpers.hpp index bb997b6f09e..2e9f355582d 100644 --- a/esp/services/ws_workunits/ws_workunitsHelpers.hpp +++ b/esp/services/ws_workunits/ws_workunitsHelpers.hpp @@ -191,7 +191,9 @@ class WsWuInfo void getWorkunitThorLog(const char *processName, MemoryBuffer& buf); void getWorkunitThorSlaveLog(const char *groupName, const char *ipAddress, const char* logDate, const char* logDir, int slaveNum, MemoryBuffer& buf, bool forDownload); void getWorkunitResTxt(MemoryBuffer& buf); - void getWorkunitArchiveQuery(MemoryBuffer& buf); + void getWorkunitArchiveQuery(IStringVal& str); + void getWorkunitArchiveQuery(StringBuffer& str); + void getWorkunitArchiveQuery(MemoryBuffer& mb); void getWorkunitDll(StringBuffer &name, MemoryBuffer& buf); void getWorkunitXml(const char* plainText, MemoryBuffer& buf); void getWorkunitQueryShortText(MemoryBuffer& buf); diff --git a/esp/services/ws_workunits/ws_workunitsQuerySets.cpp b/esp/services/ws_workunits/ws_workunitsQuerySets.cpp index 0894a6ba19e..d2907bea05d 100644 --- a/esp/services/ws_workunits/ws_workunitsQuerySets.cpp +++ b/esp/services/ws_workunits/ws_workunitsQuerySets.cpp @@ -1645,6 +1645,200 @@ bool CWsWorkunitsEx::onWUQueryFiles(IEspContext &context, IEspWUQueryFilesReques return true; } +void copyWorkunitForRecompile(IEspContext &context, IWorkUnitFactory *factory, const char *srcWuid, StringAttr &wuid, StringAttr &jobname) +{ + Owned src(factory->openWorkUnit(srcWuid)); + if (!src) + throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.", srcWuid); + WsWuInfo info(context, src); + StringBuffer archiveText; + info.getWorkunitArchiveQuery(archiveText); //archive required, fail otherwise + if (!isArchiveQuery(archiveText)) + throw MakeStringException(ECLWATCH_RESOURCE_NOT_FOUND,"Cannot retrieve workunit ECL archive %s.", srcWuid); + + SCMStringBuffer mainDefinition; + Owned query = src->getQuery(); + if (query) + query->getQueryMainDefinition(mainDefinition); + + NewWsWorkunit wu(factory, context); + wuid.set(wu->queryWuid()); + + wu->setAction(WUActionCompile); + + SCMStringBuffer token; + wu->setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str()); + + jobname.set(src->queryJobName()); + if (jobname.length()) + wu->setJobName(jobname); + wu.setQueryText(archiveText.str()); + if (mainDefinition.length()) + wu.setQueryMain(mainDefinition.str()); + wu->setResultLimit(src->getResultLimit()); + IStringIterator &names = src->getDebugValues(); + ForEach(names) + { + SCMStringBuffer name, value; + names.str(name); + if (0==strncmp(name.str(), "eclcc", 5)) + wu->setDebugValue(name.str(), src->getDebugValue(name.str(), value).str(), true); + } +} + + +bool CWsWorkunitsEx::onWURecreateQuery(IEspContext &context, IEspWURecreateQueryRequest &req, IEspWURecreateQueryResponse &resp) +{ + try + { + const char* srcTarget = req.getTarget(); + const char* queryIdOrAlias = req.getQueryId(); + if (!srcTarget || !*srcTarget) + throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified"); + if (!queryIdOrAlias || !*queryIdOrAlias) + throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "QueryId not specified"); + + const char *target = req.getDestTarget(); + if (isEmptyString(target)) + target = srcTarget; + + Owned queryRegistry = getQueryRegistry(srcTarget, false); + Owned srcQueryTree = resolveQueryAlias(queryRegistry, queryIdOrAlias); + if (!srcQueryTree) + { + DBGLOG("WURecreateQuery - No matching Query"); + throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND,"No matching query for given id or alias %s.", queryIdOrAlias); + } + + resp.setPriority(isEmptyString(req.getPriority()) ? srcQueryTree->queryProp("@priority") : req.getPriority()); + resp.setComment(isEmptyString(req.getComment()) ? srcQueryTree->queryProp("@comment") : req.getComment()); + resp.setMemoryLimit(isEmptyString(req.getMemoryLimit()) ? srcQueryTree->queryProp("@memoryLimit") : req.getMemoryLimit()); + resp.setTimeLimit(req.getTimeLimit_isNull() ? srcQueryTree->getPropInt("@timeLimit") : req.getTimeLimit()); + resp.setWarnTimeLimit(req.getWarnTimeLimit_isNull() ? srcQueryTree->getPropInt("@warnTimeLimit") : req.getWarnTimeLimit()); + + StringAttr wuid; + StringAttr jobname; + + const char* srcQueryId = srcQueryTree->queryProp("@id"); + const char* srcQueryName = srcQueryTree->queryProp("@name"); + const char *srcWuid = srcQueryTree->queryProp("@wuid"); + + PROGLOG("WURecreateQuery: QuerySet %s, query %s, wuid %s", srcTarget, srcQueryId, srcWuid); + + ensureWsWorkunitAccess(context, srcWuid, SecAccess_Write); + + Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); + copyWorkunitForRecompile(context, factory, srcWuid, wuid, jobname); + resp.setWuid(wuid); + + WsWuHelpers::submitWsWorkunit(context, wuid.str(), target, NULL, 0, true, false, false, NULL, NULL, &req.getDebugValues()); + waitForWorkUnitToCompile(wuid.str(), req.getWait()); + + Owned cw(factory->openWorkUnit(wuid.str())); + if (!cw) + throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open recreated workunit %s.",wuid.str()); + + if (jobname.length()) + { + StringBuffer name; + origValueChanged(jobname.str(), cw->queryJobName(), name, false); + if (name.length()) //non generated user specified name, so override #Workunit('name') + { + WorkunitUpdate wx(&cw->lock()); + wx->setJobName(name.str()); + } + } + PROGLOG("WURecreateQuery generated: %s", wuid.str()); + AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str()); + + queryRegistry.clear(); + srcQueryTree.clear(); + + if (req.getRepublish()) + { + if (!req.getDontCopyFiles()) + { + StringBuffer daliIP; + StringBuffer srcCluster; + StringBuffer srcPrefix; + splitDerivedDfsLocation(req.getRemoteDali(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), NULL, NULL); + + if (srcCluster.length()) + { + if (!isProcessCluster(daliIP, srcCluster)) + throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s DALI", srcCluster.str(), daliIP.length() ? daliIP.str() : "local"); + } + unsigned updateFlags = 0; + if (req.getUpdateDfs()) + updateFlags |= (DALI_UPDATEF_SUPERFILES | DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM); + if (req.getUpdateCloneFrom()) + updateFlags |= DALI_UPDATEF_CLONE_FROM; + if (req.getUpdateSuperFiles()) + updateFlags |= DALI_UPDATEF_SUPERFILES; + if (req.getAppendCluster()) + updateFlags |= DALI_UPDATEF_APPEND_CLUSTER; + + QueryFileCopier cpr(target); + cpr.init(context, req.getAllowForeignFiles()); + cpr.remoteIP.set(daliIP); + cpr.remotePrefix.set(srcPrefix); + cpr.srcCluster.set(srcCluster); + cpr.queryname.set(srcQueryName); + cpr.copy(cw, updateFlags); + + if (req.getIncludeFileErrors()) + cpr.gatherFileErrors(resp.getFileErrors()); + } + + StringBuffer queryId; + WorkunitUpdate wu(&cw->lock()); + WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate(); + addQueryToQuerySet(wu, target, srcQueryName, activate, queryId, context.queryUserId()); + { + Owned queryTree = getQueryById(target, queryId, false); + if (queryTree) + { + queryTree->setProp("@priority", resp.getPriority()); + updateMemoryLimitSetting(queryTree, resp.getMemoryLimit()); + updateQuerySetting(resp.getTimeLimit_isNull(), queryTree, "@timeLimit", resp.getTimeLimit()); + updateQuerySetting(resp.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", resp.getWarnTimeLimit()); + updateQueryPriority(queryTree, resp.getPriority()); + queryTree->setProp("@comment", resp.getComment()); + } + } + + wu->commit(); + wu.clear(); + + PROGLOG("WURecreateQuery published: %s as %s/%s", wuid.str(), target, queryId.str()); + + resp.setQuerySet(target); + resp.setQueryName(srcQueryName); + resp.setQueryId(queryId.str()); + + Owned clusterInfo = getTargetClusterInfo(target); + bool reloadFailed = false; + if (0!=req.getWait() && !req.getNoReload()) + reloadFailed = !reloadCluster(clusterInfo, (unsigned)req.getWait()); + + resp.setReloadFailed(reloadFailed); + + StringBuffer errorMessage; + if (!reloadFailed && !req.getNoReload() && isQuerySuspended(queryId.str(), clusterInfo, (unsigned)req.getWait(), errorMessage)) + { + resp.setSuspended(true); + resp.setErrorMessage(errorMessage); + } + } + } + catch(IException* e) + { + FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); + } + return true; +} + + bool CWsWorkunitsEx::onWUQueryDetails(IEspContext &context, IEspWUQueryDetailsRequest & req, IEspWUQueryDetailsResponse & resp) { const char* querySet = req.getQuerySet(); diff --git a/esp/services/ws_workunits/ws_workunitsService.cpp b/esp/services/ws_workunits/ws_workunitsService.cpp index 8676d97f230..64b9bd5ad3c 100644 --- a/esp/services/ws_workunits/ws_workunitsService.cpp +++ b/esp/services/ws_workunits/ws_workunitsService.cpp @@ -434,7 +434,7 @@ bool CWsWorkunitsEx::onWUCreate(IEspContext &context, IEspWUCreateRequest &req, return true; } -static bool origValueChanged(const char *newValue, const char *origValue, StringBuffer &s, bool nillable=true) +bool origValueChanged(const char *newValue, const char *origValue, StringBuffer &s, bool nillable) { if (!nillable && isEmpty(newValue)) return false; @@ -827,6 +827,7 @@ bool CWsWorkunitsEx::onWUResubmit(IEspContext &context, IEspWUResubmitRequest &r return true; } + bool CWsWorkunitsEx::onWUPushEvent(IEspContext &context, IEspWUPushEventRequest &req, IEspWUPushEventResponse &resp) { try diff --git a/esp/services/ws_workunits/ws_workunitsService.hpp b/esp/services/ws_workunits/ws_workunitsService.hpp index 95b79cd4925..b90936b5ce0 100644 --- a/esp/services/ws_workunits/ws_workunitsService.hpp +++ b/esp/services/ws_workunits/ws_workunitsService.hpp @@ -230,6 +230,7 @@ class CWsWorkunitsEx : public CWsWorkunits bool onWURun(IEspContext &context, IEspWURunRequest &req, IEspWURunResponse &resp); bool onWUCreate(IEspContext &context, IEspWUCreateRequest &req, IEspWUCreateResponse &resp); bool onWUCreateAndUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp); + bool onWURecreateQuery(IEspContext &context, IEspWURecreateQueryRequest &req, IEspWURecreateQueryResponse &resp); bool onWUResubmit(IEspContext &context, IEspWUResubmitRequest &req, IEspWUResubmitResponse &resp); bool onWUPushEvent(IEspContext &context, IEspWUPushEventRequest &req, IEspWUPushEventResponse &resp); @@ -403,4 +404,6 @@ class CClusterQueryStateThreadFactory : public CInterface, public IThreadFactory } }; +bool origValueChanged(const char *newValue, const char *origValue, StringBuffer &s, bool nillable=true); + #endif