Skip to content

Commit

Permalink
Merge pull request #2103 from richardkchapman/roxie-filecache-release
Browse files Browse the repository at this point in the history
Roxie filecache release

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
  • Loading branch information
ghalliday committed Apr 16, 2012
2 parents 010f7dd + 63be6a8 commit fe52b04
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 5 deletions.
52 changes: 47 additions & 5 deletions roxie/ccd/ccdfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class CLazyFileIO : public CInterface, implements ILazyFileIO, implements IDelay
unsigned lastAccess;
bool copying;
bool isCompressed;
const IRoxieFileCache *cached;

#ifdef FAIL_20_READ
unsigned readCount;
Expand All @@ -121,12 +122,31 @@ class CLazyFileIO : public CInterface, implements ILazyFileIO, implements IDelay
copyInForeground = false;
lastAccess = msTick();
copying = false;
cached = NULL;
}

~CLazyFileIO()
{
}

virtual void beforeDispose()
{
if (cached)
cached->removeCache(this);
}

void setCache(const IRoxieFileCache *cache)
{
assertex(!cached);
cached = cache;
}

void removeCache(const IRoxieFileCache *cache)
{
assertex(cached==cache);
cached = NULL;
}

inline void setRemote(bool _remote) { remote = _remote; }

virtual void setCopying(bool _copying)
Expand Down Expand Up @@ -510,8 +530,8 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
ICopyArrayOf<ILazyFileIO> todo; // Might prefer a queue but probably doesn't really matter.
InterruptableSemaphore toCopy;
InterruptableSemaphore toClose;
MapStringToMyClass<ILazyFileIO> files;
CriticalSection crit;
mutable CopyMapStringToMyClass<ILazyFileIO> files;
mutable CriticalSection crit;
CriticalSection cpcrit;
bool started;
bool aborting;
Expand Down Expand Up @@ -642,7 +662,8 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
}
else
throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Could not open file %s", localLocation);

ret->setCache(this);
files.setValue(localLocation, (ILazyFileIO *)ret);
return ret.getClear();
}

Expand Down Expand Up @@ -925,6 +946,18 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
started = false;
}

~CRoxieFileCache()
{
// NOTE - I assume that by the time I am being destroyed, system is single threaded.
// Removing any possible race between destroying of the cache and destroying of the files in it would be complex otherwise
HashIterator h(files);
ForEach(h)
{
ILazyFileIO *f = files.mapToValue(&h.query());
f->removeCache(this);
}
}

virtual void start()
{
if (!started)
Expand Down Expand Up @@ -1097,6 +1130,17 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
return aborting ? CFPcancel : CFPcontinue;
}

virtual void removeCache(ILazyFileIO *file) const
{
CriticalBlock b(crit);
// NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
// So only remove from hash table if what we find there matches the item that is being deleted.
const char *filename = file->queryFilename();
ILazyFileIO *goer = files.getValue(filename);
if (goer == file)
files.remove(filename);
}

virtual ILazyFileIO *lookupFile(const char *id, unsigned partNo, RoxieFileType fileType, const char *localLocation, const char *baseIndexFileName, ILazyFileIO *patchFile, const StringArray &peerRoxieCopiedLocationInfo, const StringArray &deployedLocationInfo, offset_t size, const CDateTime &modified, bool memFile, bool isRemote, bool startFileCopy, bool doForegroundCopy, unsigned crc, bool isCompressed, const char *lookupDali)
{
Owned<ILazyFileIO> ret;
Expand Down Expand Up @@ -1135,7 +1179,6 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
}

ret.setown(openFile(id, partNo, fileType, localLocation, peerRoxieCopiedLocationInfo, deployedLocationInfo, size, modified, memFile, crc, isCompressed)); // for now don't check crcs
files.setValue(localLocation, ret);

if (baseIndexFileName)
ret->setBaseIndexFileName(baseIndexFileName);
Expand Down Expand Up @@ -1226,7 +1269,6 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
}
CDateTime nullFiledate; // null date is fine here
ret = openFile(dllname, 1, ROXIE_WU_DLL, localLocation, remoteNames, remoteNames, -1, nullFiledate, false, crc, false); // make partno = 1 (second param)
files.setValue(localLocation, ret);
if (ret->isRemote())
{
try
Expand Down
6 changes: 6 additions & 0 deletions roxie/ccd/ccdfile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
enum RoxieFileStatus { FileSizeMismatch, FileDateMismatch, FileCRCMismatch, FileIsValid, FileNotFound };
enum RoxieFileType { ROXIE_WU_DLL, ROXIE_PLUGIN_DLL, ROXIE_KEY, ROXIE_FILE, ROXIE_PATCH, ROXIE_BASEINDEX };
interface IFileIOArray;
interface IRoxieFileCache;

interface ILazyFileIO : extends IFileIO
{
virtual const char *queryFilename() = 0;
Expand Down Expand Up @@ -58,6 +60,9 @@ interface ILazyFileIO : extends IFileIO
virtual void setCopying(bool copying) = 0;
virtual bool isCopying() const = 0;
virtual IMemoryMappedFile *queryMappedFile() = 0;

virtual void setCache(const IRoxieFileCache *) = 0;
virtual void removeCache(const IRoxieFileCache *) = 0;
};

extern ILazyFileIO *createDynamicFile(const char *id, IPartDescriptor *pdesc, RoxieFileType fileType, int numParts);
Expand All @@ -74,6 +79,7 @@ interface IRoxieFileCache : extends IInterface
virtual StringAttrMapping *queryFileErrorList() = 0; // returns list of files that could not be open
virtual void flushUnusedDirectories(const char *origBaseDir, const char *directory, StringBuffer &info) = 0;
virtual void start() = 0;
virtual void removeCache(ILazyFileIO *file) const = 0;
};

interface IDiffFileInfoCache : extends IInterface
Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccdstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,7 @@ class CRoxieDaliQueryPackageManager : public CRoxieQueryPackageManager, implemen
newServerManager->load(newQuerySet, *packages);
newSlaveManagers->load(newQuerySet, *packages);
reloadQueryManagers(newSlaveManagers.getClear(), newServerManager.getClear());
clearKeyStoreCache(false); // Allows us to fully release files we no longer need because of unloaded queries
}

};
Expand Down

0 comments on commit fe52b04

Please sign in to comment.