Skip to content

Commit

Permalink
HPCC-24970 Sasha K8s services.
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
  • Loading branch information
jakesmith committed Feb 10, 2021
1 parent 795cd20 commit c0faec3
Show file tree
Hide file tree
Showing 27 changed files with 1,132 additions and 381 deletions.
226 changes: 174 additions & 52 deletions dali/sasha/saarch.cpp
@@ -1,3 +1,5 @@
#include <atomic>

#include "platform.h"
#include "jlib.hpp"
#include "jlog.ipp"
Expand Down Expand Up @@ -620,7 +622,7 @@ class CBranchArchiver
StringAttr branchxpath;
CSashaSchedule schedule;
unsigned limit;
bool &stopped;
std::atomic<bool> &stopped;
IRemoteConnection* conn;
unsigned cutoffdays;
unsigned backupdays; // if 0 not used
Expand All @@ -634,16 +636,22 @@ class CBranchArchiver
unsigned numprotected;
unsigned numnulltimes;

CBranchArchiver(IPropertyTree *archprops,const char *_branchname,const char *_branchxpath,unsigned deflimit,unsigned definterval,unsigned defcutoff, bool &_stopped)
CBranchArchiver(IPropertyTree *archprops,const char *_branchname,const char *_branchxpath,unsigned deflimit,unsigned definterval,unsigned defcutoff, std::atomic<bool> &_stopped)
: branchname(_branchname),branchxpath(_branchxpath), stopped(_stopped)
{
#ifdef _DEBUG
firsttime = true;
#endif

#ifdef _CONTAINERIZED
props.set(archprops);
branchxpath.set("*"); // in container mode, archivers are separated and each connect to their own root
#else
if (archprops->hasProp(branchname))
props.setown(archprops->getPropTree(branchname));
else
props.setown(createPTree(branchname));
#endif
schedule.init(props,definterval,definterval/4);
limit = props->getPropInt("@limit",deflimit);
cutoffdays = props->getPropInt("@cutoff",defcutoff);
Expand Down Expand Up @@ -1078,8 +1086,8 @@ class CWorkUnitArchiver: public CBranchArchiver
{
return new cWUBranchItem(this,e,cutoff,backupcutoff,props->getPropInt("@retryinterval",7));
}
CWorkUnitArchiver(IPropertyTree *archprops,unsigned definterval,bool &_stopped)

CWorkUnitArchiver(IPropertyTree *archprops,unsigned definterval, std::atomic<bool> &_stopped)
: CBranchArchiver(archprops,"WorkUnits","WorkUnits/*",DEFAULT_WORKUNIT_LIMIT,definterval,DEFAULT_WORKUNIT_CUTOFF,_stopped)
{
wufactory.setown(getWorkUnitFactory());
Expand Down Expand Up @@ -1199,7 +1207,7 @@ class CDFUWorkUnitArchiver: public CBranchArchiver
return new cDFUWUBranchItem(this,e,cutoff);
}

CDFUWorkUnitArchiver(IPropertyTree *archprops,unsigned definterval,bool &_stopped)
CDFUWorkUnitArchiver(IPropertyTree *archprops,unsigned definterval, std::atomic<bool> &_stopped)
: CBranchArchiver(archprops,"DFUworkunits","DFU/WorkUnits/*",DEFAULT_DFUWORKUNIT_LIMIT,DEFAULT_DFUWORKUNIT_CUTOFF,definterval,_stopped)
{
PROGLOG("ARCHIVE DFU Workunits: limit=%d, cutoff=%d days",limit,cutoffdays);
Expand Down Expand Up @@ -1289,7 +1297,7 @@ class CDFUrecoveryArchiver: public CBranchArchiver
return new cDRBranchItem(this,e,cutoff);
}

CDFUrecoveryArchiver(IPropertyTree *archprops,unsigned definterval,bool &_stopped)
CDFUrecoveryArchiver(IPropertyTree *archprops,unsigned definterval, std::atomic<bool> &_stopped)
: CBranchArchiver(archprops,"DFUrecovery","DFU/RECOVERY/job",DEFAULT_DFURECOVERY_LIMIT,definterval,DEFAULT_DFURECOVERY_CUTOFF,_stopped)
{
PROGLOG("ARCHIVE DFU Recovery: limit=%d, cutoff=%d days",limit,cutoffdays);
Expand Down Expand Up @@ -1328,22 +1336,19 @@ class CDFUrecoveryArchiver: public CBranchArchiver
}
return false;
}


};

class CCachedWorkUnitRemover: public CBranchArchiver
{

public:

IBranchItem *createBranchItem(IPropertyTree &e)
{
return NULL; // not used
}

CCachedWorkUnitRemover(IPropertyTree *archprops,unsigned definterval,bool &_stopped)
: CBranchArchiver(archprops,"CachedWorkUnits","",DEFAULT_CACHEDWORKUNIT_LIMIT,definterval,0,_stopped)
CCachedWorkUnitRemover(IPropertyTree *archprops,unsigned definterval, std::atomic<bool> &_stopped)
: CBranchArchiver(archprops,"CachedWorkUnits","CachedWorkUnits/*",DEFAULT_CACHEDWORKUNIT_LIMIT,definterval,0,_stopped)
{
PROGLOG("CLEANUP: Cached WorkUnits: limit=%d",limit);
}
Expand All @@ -1353,7 +1358,7 @@ class CCachedWorkUnitRemover: public CBranchArchiver
// check limit exceeded
PROGLOG("CLEANUP: Scanning CachedWorkUnits");
IPropertyTree *root = conn->queryRoot();
Owned<IPropertyTreeIterator> iter1 = root->getElements("CachedWorkUnits/*");
Owned<IPropertyTreeIterator> iter1 = root->getElements(branchxpath);
StringAttrArray branchlist;
StringBuffer path;
ForEach(*iter1) {
Expand Down Expand Up @@ -1389,83 +1394,81 @@ class CCachedWorkUnitRemover: public CBranchArchiver
}
PROGLOG("CLEANUP: CachedWorkUnits complete (%d removed of %d)",i,n);
}


};




class CSashaArchiverServer: public ISashaServer, public Thread
{

bool stopped;
class CSashaArchiverServerBase : public CSimpleInterfaceOf<ISashaServer>, implements IThreaded
{
CThreaded threaded;
protected:
std::atomic<bool> stopped{false};
Semaphore stopsem;
Linked<IPropertyTree> archprops;
public:
IMPLEMENT_IINTERFACE;

CSashaArchiverServer()
: Thread("CSashaArchiverServer")
CSashaArchiverServerBase(IPropertyTree *_config) : archprops(_config), threaded("CSashaArchiverServerBase", this)
{
stopped = false;
}

~CSashaArchiverServer()
virtual void start() override
{
threaded.start();
}

void start()
virtual void ready() override
{
Thread::start();
}

void ready()
{
}

void stop()
virtual void stop() override
{
if (!stopped) {
if (!stopped)
{
stopped = true;
stopsem.signal();
}
join();
threaded.join();
}
};

class CSashaCombinedArchiverServer : public CSashaArchiverServerBase
{
public:
CSashaCombinedArchiverServer(IPropertyTree *config) : CSashaArchiverServerBase(config)
{
}
void runArchiver(CBranchArchiver &archiver,Owned<IRemoteConnection> &conn)
{
if (!stopped&&archiver.ready()) {
if (!conn.get()) {
if (!stopped&&archiver.ready())
{
if (!conn.get())
conn.setown(querySDS().connect("/", myProcessSession(), 0, 5*60*1000));
}
archiver.run(conn);
}
}


int run()
virtual void threadmain() override
{
Owned<IPropertyTree> archprops = serverConfig->getPropTree("Archiver");
if (!archprops)
archprops.setown(createPTree("Archiver"));
unsigned definterval = archprops->getPropInt("@interval",DEFAULT_INTERVAL); // no longer used
unsigned definterval = archprops->getPropInt("@interval", DEFAULT_INTERVAL); // no longer used
if (definterval==0)
definterval = DEFAULT_INTERVAL;
CWorkUnitArchiver wuarchiver(archprops,definterval,stopped);
CDFUWorkUnitArchiver dfuwuarchiver(archprops,definterval,stopped);
CDFUrecoveryArchiver drarchiver(archprops,definterval,stopped);
CCachedWorkUnitRemover cwuremove(archprops,definterval,stopped);
while (!stopped) {
try {
while (!stopped)
{
try
{
Owned<IRemoteConnection> conn;
runArchiver(wuarchiver,conn);
runArchiver(dfuwuarchiver,conn);
runArchiver(drarchiver,conn);
runArchiver(cwuremove,conn);
}
catch (IException *e) {
catch (IException *e)
{
EXCLOG(e,"SASHA ARCHIVE SERVER");
if (!stopped) {
if (!stopped)
{
requestStop(e);
stopped = true;
}
Expand All @@ -1474,19 +1477,138 @@ class CSashaArchiverServer: public ISashaServer, public Thread
}
stopsem.wait(60*1000); // poll every minute
}
return 0;
}
};


} *sashaArchiverServer = NULL;
static CSashaCombinedArchiverServer *sashaArchiverServer = nullptr;

ISashaServer *createSashaArchiverServer()
{
assertex(!sashaArchiverServer); // initialization problem
sashaArchiverServer = new CSashaArchiverServer();
Owned<IPropertyTree> config = serverConfig->getPropTree("Archiver");
if (!config)
config.setown(createPTree("Archiver"));
sashaArchiverServer = new CSashaCombinedArchiverServer(config);
return sashaArchiverServer;
}

class CSashaArchiverServer : public CSashaArchiverServerBase
{
protected:
OwnedPtr<CBranchArchiver> archiver;
StringAttr xpath;
public:
CSashaArchiverServer(IPropertyTree *config) : CSashaArchiverServerBase(config)
{
const char *path = config->queryProp("@storagePath");
if (!isEmptyString(path))
{
config->setPropTree("LDS")->setProp("@rootdir", path);
}
}
virtual void threadmain() override
{
assertex(archiver);
while (!stopped)
{
try
{
if (!stopped&&archiver->ready())
{
Owned<IRemoteConnection> conn = querySDS().connect(xpath, myProcessSession(), 0, 5*60*1000);
archiver->run(conn);
}
}
catch (IException *e)
{
EXCLOG(e,"SASHA ARCHIVE SERVER");
if (!stopped)
{
requestStop(e);
stopped = true;
}
e->Release();
break;
}
stopsem.wait(60*1000); // poll every minute
}
}
};

class CSashaWUArchiverServer : public CSashaArchiverServer
{
public:
CSashaWUArchiverServer(IPropertyTree *config) : CSashaArchiverServer(config)
{
archiver.setown(new CWorkUnitArchiver(config, DEFAULT_INTERVAL, stopped));
xpath.set("WorkUnits/");
}
};

class CSashaDFUWUArchiverServer : public CSashaArchiverServer
{
public:
CSashaDFUWUArchiverServer(IPropertyTree *config) : CSashaArchiverServer(config)
{
archiver.setown(new CDFUWorkUnitArchiver(config, DEFAULT_INTERVAL, stopped));
xpath.set("DFU/WorkUnits/");
}
};

class CSashaDFURecoveryArchiverServer : public CSashaArchiverServer
{
public:
CSashaDFURecoveryArchiverServer(IPropertyTree *config) : CSashaArchiverServer(config)
{
archiver.setown(new CDFUrecoveryArchiver(config, DEFAULT_INTERVAL, stopped));
xpath.set("DFU/RECOVERY/job");
}
};

class CSashaCachedWURemoverServer : public CSashaArchiverServer
{
public:
CSashaCachedWURemoverServer(IPropertyTree *config) : CSashaArchiverServer(config)
{
archiver.setown(new CCachedWorkUnitRemover(config, DEFAULT_INTERVAL, stopped));
xpath.set("CachedWorkUnits/");
}
};


static CSashaWUArchiverServer *sashaWUArchiverServer = nullptr;
ISashaServer *createSashaWUArchiverServer()
{
assertex(!sashaWUArchiverServer); // initialization problem
sashaWUArchiverServer = new CSashaWUArchiverServer(serverConfig);
return sashaWUArchiverServer;
}

static CSashaDFUWUArchiverServer *sashaDFUWUArchiverServer = nullptr;
ISashaServer *createSashaDFUWUArchiverServer()
{
assertex(!sashaDFUWUArchiverServer); // initialization problem
sashaDFUWUArchiverServer = new CSashaDFUWUArchiverServer(serverConfig);
return sashaDFUWUArchiverServer;
}

static CSashaDFURecoveryArchiverServer *sashaDFURecoveryArchiverServer = nullptr;
ISashaServer *createSashaDFURecoveryArchiverServer()
{
assertex(!sashaDFURecoveryArchiverServer); // initialization problem
sashaDFURecoveryArchiverServer = new CSashaDFURecoveryArchiverServer(serverConfig);
return sashaDFURecoveryArchiverServer;
}

static CSashaCachedWURemoverServer *sashaCacheWURemoverServer = nullptr;
ISashaServer *createSashaCachedWURemoverServer()
{
assertex(!sashaCacheWURemoverServer); // initialization problem
sashaCacheWURemoverServer = new CSashaCachedWURemoverServer(serverConfig);
return sashaCacheWURemoverServer;
}



bool processArchiverCommand(ISashaCommand *cmd)
{
Expand Down
10 changes: 9 additions & 1 deletion dali/sasha/saarch.hpp
Expand Up @@ -3,7 +3,15 @@

interface ISashaServer;
interface ISashaCommand;
extern ISashaServer *createSashaArchiverServer();

#ifndef _CONTAINERIZED
extern ISashaServer *createSashaArchiverServer();
#else
extern ISashaServer *createSashaWUArchiverServer();
extern ISashaServer *createSashaDFUWUArchiverServer();
extern ISashaServer *createSashaCachedWURemoverServer();
extern ISashaServer *createSashaDFURecoveryArchiverServer();
#endif

bool processArchiverCommand(ISashaCommand *cmd);

Expand Down

0 comments on commit c0faec3

Please sign in to comment.