Skip to content

Commit

Permalink
HPCC-24970 Sasha K8s services.
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
  • Loading branch information
jakesmith committed Feb 2, 2021
1 parent 283b756 commit 6fe1503
Show file tree
Hide file tree
Showing 24 changed files with 1,057 additions and 351 deletions.
214 changes: 168 additions & 46 deletions dali/sasha/saarch.cpp
Expand Up @@ -640,10 +640,16 @@ class CBranchArchiver
#ifdef _DEBUG
firsttime = true;
#endif

#ifdef _CONTAINERIZED
props.set(archprops);
branchxpath.set("*"); // in container mode, archivers are separated and each connect to their own root
#else
if (archprops->hasProp(branchname))
props.setown(archprops->getPropTree(branchname));
else
props.setown(createPTree(branchname));
#endif
schedule.init(props,definterval,definterval/4);
limit = props->getPropInt("@limit",deflimit);
cutoffdays = props->getPropInt("@cutoff",defcutoff);
Expand Down Expand Up @@ -1078,7 +1084,7 @@ class CWorkUnitArchiver: public CBranchArchiver
{
return new cWUBranchItem(this,e,cutoff,backupcutoff,props->getPropInt("@retryinterval",7));
}

CWorkUnitArchiver(IPropertyTree *archprops,unsigned definterval,bool &_stopped)
: CBranchArchiver(archprops,"WorkUnits","WorkUnits/*",DEFAULT_WORKUNIT_LIMIT,definterval,DEFAULT_WORKUNIT_CUTOFF,_stopped)
{
Expand Down Expand Up @@ -1328,13 +1334,10 @@ class CDFUrecoveryArchiver: public CBranchArchiver
}
return false;
}


};

class CCachedWorkUnitRemover: public CBranchArchiver
{

public:

IBranchItem *createBranchItem(IPropertyTree &e)
Expand All @@ -1343,7 +1346,7 @@ class CCachedWorkUnitRemover: public CBranchArchiver
}

CCachedWorkUnitRemover(IPropertyTree *archprops,unsigned definterval,bool &_stopped)
: CBranchArchiver(archprops,"CachedWorkUnits","",DEFAULT_CACHEDWORKUNIT_LIMIT,definterval,0,_stopped)
: CBranchArchiver(archprops,"CachedWorkUnits","CachedWorkUnits/*",DEFAULT_CACHEDWORKUNIT_LIMIT,definterval,0,_stopped)
{
PROGLOG("CLEANUP: Cached WorkUnits: limit=%d",limit);
}
Expand All @@ -1353,7 +1356,7 @@ class CCachedWorkUnitRemover: public CBranchArchiver
// check limit exceeded
PROGLOG("CLEANUP: Scanning CachedWorkUnits");
IPropertyTree *root = conn->queryRoot();
Owned<IPropertyTreeIterator> iter1 = root->getElements("CachedWorkUnits/*");
Owned<IPropertyTreeIterator> iter1 = root->getElements(branchxpath);
StringAttrArray branchlist;
StringBuffer path;
ForEach(*iter1) {
Expand Down Expand Up @@ -1389,83 +1392,82 @@ class CCachedWorkUnitRemover: public CBranchArchiver
}
PROGLOG("CLEANUP: CachedWorkUnits complete (%d removed of %d)",i,n);
}


};




class CSashaArchiverServer: public ISashaServer, public Thread
{

bool stopped;
class CSashaArchiverServerBase : public CSimpleInterfaceOf<ISashaServer>, implements IThreaded
{
CThreaded threaded;
protected:
bool stopped = false;
Semaphore stopsem;
Linked<IPropertyTree> archprops;
public:
IMPLEMENT_IINTERFACE;

CSashaArchiverServer()
: Thread("CSashaArchiverServer")
CSashaArchiverServerBase(IPropertyTree *_config) : archprops(_config), threaded("CSashaArchiverServerBase", this)
{
stopped = false;
}

~CSashaArchiverServer()
virtual void start() override
{
threaded.start();
}

void start()
virtual void ready() override
{
Thread::start();
}

void ready()
virtual void stop() override
{
}

void stop()
{
if (!stopped) {
if (!stopped)
{
stopped = true;
stopsem.signal();
}
join();
threaded.join();
}
};

#ifndef _CONTAINERIZED
class CSashaCombinedArchiverServer : public CSashaArchiverServerBase
{
public:
CSashaCombinedArchiverServer(IPropertyTree *config) : CSashaArchiverServerBase(config)
{
}
void runArchiver(CBranchArchiver &archiver,Owned<IRemoteConnection> &conn)
{
if (!stopped&&archiver.ready()) {
if (!conn.get()) {
if (!stopped&&archiver.ready())
{
if (!conn.get())
conn.setown(querySDS().connect("/", myProcessSession(), 0, 5*60*1000));
}
archiver.run(conn);
}
}


int run()
virtual void threadmain() override
{
Owned<IPropertyTree> archprops = serverConfig->getPropTree("Archiver");
if (!archprops)
archprops.setown(createPTree("Archiver"));
unsigned definterval = archprops->getPropInt("@interval",DEFAULT_INTERVAL); // no longer used
unsigned definterval = archprops->getPropInt("@interval", DEFAULT_INTERVAL); // no longer used
if (definterval==0)
definterval = DEFAULT_INTERVAL;
CWorkUnitArchiver wuarchiver(archprops,definterval,stopped);
CDFUWorkUnitArchiver dfuwuarchiver(archprops,definterval,stopped);
CDFUrecoveryArchiver drarchiver(archprops,definterval,stopped);
CCachedWorkUnitRemover cwuremove(archprops,definterval,stopped);
while (!stopped) {
try {
while (!stopped)
{
try
{
Owned<IRemoteConnection> conn;
runArchiver(wuarchiver,conn);
runArchiver(dfuwuarchiver,conn);
runArchiver(drarchiver,conn);
runArchiver(cwuremove,conn);
}
catch (IException *e) {
catch (IException *e)
{
EXCLOG(e,"SASHA ARCHIVE SERVER");
if (!stopped) {
if (!stopped)
{
requestStop(e);
stopped = true;
}
Expand All @@ -1474,18 +1476,138 @@ class CSashaArchiverServer: public ISashaServer, public Thread
}
stopsem.wait(60*1000); // poll every minute
}
return 0;
}
};


} *sashaArchiverServer = NULL;
CSashaCombinedArchiverServer *sashaArchiverServer = nullptr;

ISashaServer *createSashaArchiverServer()
{
assertex(!sashaArchiverServer); // initialization problem
sashaArchiverServer = new CSashaArchiverServer();
Owned<IPropertyTree> config = serverConfig->getPropTree("Archiver");
if (!config)
config.setown(createPTree("Archiver"));
sashaArchiverServer = new CSashaCombinedArchiverServer(config);
return sashaArchiverServer;
}
#else
class CSashaArchiverServer : public CSashaArchiverServerBase
{
protected:
OwnedPtr<CBranchArchiver> archiver;
StringAttr xpath;
public:
CSashaArchiverServer(IPropertyTree *config) : CSashaArchiverServerBase(config)
{
const char *path = config->queryProp("@storagePath");
if (!isEmptyString(path))
{
config->setPropTree("LDS")->setProp("@rootdir", path);
}
}
virtual void threadmain() override
{
assertex(archiver);
while (!stopped)
{
try
{
if (!stopped&&archiver->ready())
{
Owned<IRemoteConnection> conn = querySDS().connect(xpath, myProcessSession(), 0, 5*60*1000);
archiver->run(conn);
}
}
catch (IException *e)
{
EXCLOG(e,"SASHA ARCHIVE SERVER");
if (!stopped)
{
requestStop(e);
stopped = true;
}
e->Release();
break;
}
stopsem.wait(60*1000); // poll every minute
}
}
};

class CSashaWUArchiverServer : public CSashaArchiverServer
{
public:
CSashaWUArchiverServer(IPropertyTree *config) : CSashaArchiverServer(config)
{
archiver.setown(new CWorkUnitArchiver(config, DEFAULT_INTERVAL, stopped));
xpath.set("WorkUnits/");
}
};

class CSashaDFUWUArchiverServer : public CSashaArchiverServer
{
public:
CSashaDFUWUArchiverServer(IPropertyTree *config) : CSashaArchiverServer(config)
{
archiver.setown(new CDFUWorkUnitArchiver(config, DEFAULT_INTERVAL, stopped));
xpath.set("DFU/WorkUnits/");
}
};

class CSashaDFURecoveryArchiverServer : public CSashaArchiverServer
{
public:
CSashaDFURecoveryArchiverServer(IPropertyTree *config) : CSashaArchiverServer(config)
{
archiver.setown(new CDFUrecoveryArchiver(config, DEFAULT_INTERVAL, stopped));
xpath.set("DFU/RECOVERY/job");
}
};

class CSashaCachedWURemoverServer : public CSashaArchiverServer
{
public:
CSashaCachedWURemoverServer(IPropertyTree *config) : CSashaArchiverServer(config)
{
archiver.setown(new CCachedWorkUnitRemover(config, DEFAULT_INTERVAL, stopped));
xpath.set("CachedWorkUnits/");
}
};


static CSashaWUArchiverServer *sashaWUArchiverServer = nullptr;
ISashaServer *createSashaWUArchiverServer()
{
assertex(!sashaWUArchiverServer); // initialization problem
sashaWUArchiverServer = new CSashaWUArchiverServer(serverConfig);
return sashaWUArchiverServer;
}

static CSashaDFUWUArchiverServer *sashaDFUWUArchiverServer = nullptr;
ISashaServer *createSashaDFUWUArchiverServer()
{
assertex(!sashaDFUWUArchiverServer); // initialization problem
sashaDFUWUArchiverServer = new CSashaDFUWUArchiverServer(serverConfig);
return sashaDFUWUArchiverServer;
}

static CSashaDFURecoveryArchiverServer *sashaDFURecoveryArchiverServer = nullptr;
ISashaServer *createSashaDFURecoveryArchiverServer()
{
assertex(!sashaDFURecoveryArchiverServer); // initialization problem
sashaDFURecoveryArchiverServer = new CSashaDFURecoveryArchiverServer(serverConfig);
return sashaDFURecoveryArchiverServer;
}

static CSashaCachedWURemoverServer *sashaCacheWURemoverServer = nullptr;
ISashaServer *createSashaCachedWURemoverServer()
{
assertex(!sashaCacheWURemoverServer); // initialization problem
sashaCacheWURemoverServer = new CSashaCachedWURemoverServer(serverConfig);
return sashaCacheWURemoverServer;
}
#endif



bool processArchiverCommand(ISashaCommand *cmd)
Expand Down
10 changes: 9 additions & 1 deletion dali/sasha/saarch.hpp
Expand Up @@ -3,7 +3,15 @@

interface ISashaServer;
interface ISashaCommand;
extern ISashaServer *createSashaArchiverServer();

#ifndef _CONTAINERIZED
extern ISashaServer *createSashaArchiverServer();
#else
extern ISashaServer *createSashaWUArchiverServer();
extern ISashaServer *createSashaDFUWUArchiverServer();
extern ISashaServer *createSashaCachedWURemoverServer();
extern ISashaServer *createSashaDFURecoveryArchiverServer();
#endif

bool processArchiverCommand(ISashaCommand *cmd);

Expand Down

0 comments on commit 6fe1503

Please sign in to comment.