Skip to content

Commit

Permalink
Merge pull request #5061 from jakesmith/hpcc-10288b
Browse files Browse the repository at this point in the history
HPCC-10288 - Add RemoveOwnedSubFiles

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
  • Loading branch information
richardkchapman committed Oct 24, 2013
2 parents 92ec84a + d9ab2de commit 5ed1da5
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 44 deletions.
217 changes: 173 additions & 44 deletions dali/base/dadfs.cpp
Expand Up @@ -2585,6 +2585,24 @@ class CDistributedFileBase : public CInterface, implements INTERFACE
queryLogicalName(), superRepO);
}

virtual void getSuperOwners(StringArray &owners)
{
if (root)
{
StringBuffer owner;
Owned<IPropertyTreeIterator> iter = root->getElements("SuperOwner");
ForEach (*iter)
{
iter->query().getProp("@name", owner.clear());
if (owner.length())
{
if (NotFound == owners.find(owner))
owners.append(owner);
}
}
}
}

void linkSuperOwner(const char *superfile,bool link)
{
if (!superfile||!*superfile)
Expand Down Expand Up @@ -4224,7 +4242,74 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
}
}
// Now we clean the subs
sf->doRemoveSubFile(subfile.get(),transaction);
if (subfile.get())
sf->doRemoveSubFile(subfile.get(), transaction);
else
sf->doRemoveSubFiles(transaction);
}
}
};

/**
* Removes all subfiles exclusively owned by named superfile within a transaction.
*/
class cDeleteOwnedSubFilesAction: public CDFAction
{
StringAttr parentlname;
Owned<IDistributedSuperFile> parent;
public:
cDeleteOwnedSubFilesAction(IDistributedFileTransaction *_transaction, const char *_parentlname)
: CDFAction(_transaction), parentlname(_parentlname)
{
}
bool prepare()
{
parent.setown(transaction->lookupSuperFile(parentlname,true));
if (!parent)
throw MakeStringException(-1,"deleteOwnedSubFiles: SuperFile %s cannot be found", parentlname.get());
// Try to lock all files
addFileLock(parent);
bool dirty=false;
if (lock(&dirty))
{
if (dirty)
{
// in the process of previous attempt to lock for exclusive access, locks were released
// need to reload to ensure position and # of files is correct
CDistributedSuperFile *sf = dynamic_cast<CDistributedSuperFile *>(parent.get());
if (sf)
sf->loadSubFiles(transaction, SDS_TRANSACTION_RETRY);
}
return true;
}
unlock();
return false;
}
void run()
{
CDistributedSuperFile *sf = QUERYINTERFACE(parent.get(),CDistributedSuperFile);
if (sf)
{
// Only delete subfiles which I am sole owner
Owned<IDistributedFileIterator> iter = parent->getSubFileIterator(false);
ForEach (*iter)
{
IDistributedFile *file = &iter->query();
CDistributedFile *_file = QUERYINTERFACE(file, CDistributedFile);
StringArray owners;
_file->getSuperOwners(owners);

if (NotFound == owners.find(parentlname))
ThrowStringException(-1, "deleteOwnedSubFiles: SuperFile %s, subfile %s - subfile not owned by superfile", parentlname.get(), file->queryLogicalName());
if (1 == owners.ordinality()) // just me
{
CDfsLogicalFileName lname;
lname.set(file->queryLogicalName());
// Delay the deletion of the subs until commit
transaction->addDelayedDelete(lname, SDS_SUB_LOCK_TIMEOUT);
}
}
sf->doRemoveSubFiles(transaction); // remove all
}
}
};
Expand Down Expand Up @@ -4388,6 +4473,27 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
throw exceptions.getClear();
}

virtual void getSuperOwners(StringArray &owners)
{
ForEachItemIn(i, subfiles)
{
IDistributedFile *file = &subfiles.item(i);
IDistributedSuperFile *super = file->querySuperFile();
if (super)
{
CDistributedSuperFile *_super = QUERYINTERFACE(super, CDistributedSuperFile);
if (_super)
_super->getSuperOwners(owners);
}
else
{
CDistributedFile *_file = QUERYINTERFACE(file, CDistributedFile);
if (_file)
_file->getSuperOwners(owners);
}
}

}
static StringBuffer &getSubPath(StringBuffer &path,unsigned idx)
{
return path.append("SubFile[@num=\"").append(idx+1).append("\"]");
Expand Down Expand Up @@ -4491,14 +4597,13 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
root->setPropInt("@numsubfiles",subfiles.ordinality());
}

void removeItem(unsigned pos, StringBuffer &subname)
void removeItem(unsigned pos)
{
partscache.kill();
StringBuffer path;
IPropertyTree* sub = root->queryPropTree(getSubPath(path,pos).str());
if (!sub)
throw MakeStringException(-1,"CDistributedSuperFile(3): Corrupt subfile file part %d cannot be found",pos+1);
sub->getProp("@name",subname);
root->removeTree(sub);
// now renumber all above
for (unsigned i=pos+1; i<subfiles.ordinality(); i++) {
Expand Down Expand Up @@ -5305,56 +5410,56 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
linkSubFile(pos, transaction);
}

bool doRemoveSubFiles(IDistributedFileTransaction *transaction)
{
// have to be quite careful here
unsigned pos = subfiles.ordinality();
if (pos)
{
DistributedFilePropertyLock lock(this);
if (lock.needsReload())
loadSubFiles(transaction,1000*60*10);
pos = subfiles.ordinality();
if (pos)
{
do
{
pos--;
unlinkSubFile(pos,transaction);
removeItem(pos);
} while (pos);
setModified();
updateFileAttrs();
lock.unlock();
updateParentFileAttrs(transaction);
}
}
return true;
}

bool doRemoveSubFile(const char *subfile,
IDistributedFileTransaction *transaction)
{
// have to be quite careful here
StringAttrArray subnames;
unsigned pos;
StringBuffer subname;
if (subfile) {
unsigned pos=findSubFileOrd(subfile);
unsigned pos=findSubFileOrd(subfile);
if ((pos==NotFound)||(pos>=subfiles.ordinality()))
pos = findSubFile(subfile);
if (pos==NotFound)
return false;
{
DistributedFilePropertyLock lock(this);
// don't reload subfiles here
pos=findSubFileOrd(subfile);
if ((pos==NotFound)||(pos>=subfiles.ordinality()))
pos = findSubFile(subfile);
if (pos==NotFound)
return false;
{
DistributedFilePropertyLock lock(this);
// don't reload subfiles here
pos=findSubFileOrd(subfile);
if ((pos==NotFound)||(pos>=subfiles.ordinality()))
pos = findSubFile(subfile);
if (pos==NotFound)
return false;
unlinkSubFile(pos,transaction);
removeItem(pos,subname.clear());
subnames.append(* new StringAttrItem(subname.str()));
setModified();
updateFileAttrs();
}
updateParentFileAttrs(transaction);
}
else {
pos = subfiles.ordinality();
if (pos) {
DistributedFilePropertyLock lock(this);
if (lock.needsReload())
loadSubFiles(transaction,1000*60*10);
pos = subfiles.ordinality();
if (pos) {
do {
pos--;
unlinkSubFile(pos,transaction);
removeItem(pos,subname.clear());
subnames.append(* new StringAttrItem(subname.str()));
} while (pos);
setModified();
updateFileAttrs();
lock.unlock();
updateParentFileAttrs(transaction);
}
}
unlinkSubFile(pos,transaction);
removeItem(pos);
setModified();
updateFileAttrs();
}
updateParentFileAttrs(transaction);
return true;
}

Expand Down Expand Up @@ -5494,6 +5599,30 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
return true;
}

virtual bool deleteOwnedSubFiles(IDistributedFileTransaction *transaction)
{
CriticalBlock block (sect);
checkModify("deleteOwnedSubFiles");
partscache.kill();

// Create a local transaction that will be destroyed (MORE: make transaction compulsory)
Linked<IDistributedFileTransaction> localtrans;
if (transaction)
localtrans.set(transaction);
else
localtrans.setown(new CDistributedFileTransaction(udesc));

// Make sure this file is in cache (reuse below)
localtrans->addFile(this);

cDeleteOwnedSubFilesAction *action = new cDeleteOwnedSubFilesAction(localtrans, queryLogicalName());
localtrans->addAction(action); // takes ownership
localtrans->autoCommit();

// MORE - auto-commit will throw an exception, change this to void
return true;
}

virtual bool swapSuperFile( IDistributedSuperFile *_file,
IDistributedFileTransaction *transaction)
{
Expand Down
2 changes: 2 additions & 0 deletions dali/base/dadfs.hpp
Expand Up @@ -321,6 +321,8 @@ interface IDistributedSuperFile: extends IDistributedFile
bool remcontents=false, // if true removes contents of subfile (assuming it is a superfile)
IDistributedFileTransaction *transaction=NULL)=0;
// Note does not delete subfile
virtual bool deleteOwnedSubFiles(IDistributedFileTransaction *transaction=NULL)=0;
// Note does not delete subfile
virtual bool swapSuperFile( IDistributedSuperFile *_file, // swaps sub files
IDistributedFileTransaction *transaction)=0;

Expand Down
3 changes: 3 additions & 0 deletions ecllibrary/std/File.ecl
Expand Up @@ -473,6 +473,9 @@ EXPORT RemoveSuperFile(varstring lsuperfn, varstring lfn, boolean del=false, boo
EXPORT ClearSuperFile(varstring lsuperfn, boolean del=false) :=
lib_fileservices.FileServices.ClearSuperFile(lsuperfn, del);

EXPORT DeleteOwnedSubFiles(varstring lsuperfn) :=
lib_fileservices.FileServices.DeleteOwnedSubFiles(lsuperfn);

EXPORT SwapSuperFile(varstring lsuperfn1, varstring lsuperfn2) :=
lib_fileservices.FileServices.SwapSuperFile(lsuperfn1, lsuperfn2);

Expand Down
24 changes: 24 additions & 0 deletions plugins/fileservices/fileservices.cpp
Expand Up @@ -86,6 +86,7 @@ static const char * EclDefinition =
" AddSuperFile(const varstring lsuperfn,const varstring lfn,unsigned4 atpos=0,boolean addcontents=false, boolean strict=false) : c,action,globalcontext,entrypoint='fsAddSuperFile'; \n"
" RemoveSuperFile(const varstring lsuperfn,const varstring lfn,boolean del=false,boolean remcontents=false) : c,action,globalcontext,entrypoint='fsRemoveSuperFile'; \n"
" ClearSuperFile(const varstring lsuperfn,boolean del=false) : c,action,globalcontext,entrypoint='fsClearSuperFile'; \n"
" DeleteOwnedSubFiles(const varstring lsuperfn) : c,action,globalcontext,entrypoint='fsDeleteOwnedSubFiles'; \n"
" SwapSuperFile(const varstring lsuperfn1,const varstring lsuperfn2) : c,action,globalcontext,entrypoint='fsSwapSuperFile'; \n"
" ReplaceSuperFile(const varstring lsuperfn,const varstring lfn,const varstring bylfn) : c,action,globalcontext,entrypoint='fsReplaceSuperFile'; \n"
" FinishSuperFileTransaction(boolean rollback=false) : c,action,globalcontext,entrypoint='fsFinishSuperFileTransaction'; \n"
Expand Down Expand Up @@ -1012,6 +1013,7 @@ StartSuperFileTransaction();
AddSuperFile(const varstring lsuperfn,const varstring lfn,unsigned4 atpos=0);
RemoveSuperFile(const varstring lsuperfn,const varstring lfn,boolean del=false);
ClearSuperFile(const varstring lsuperfn,boolean del=false);
DeleteOwnedSubFiles(const varstring lsuperfn);
SwapSuperFile(const varstring lsuperfn1,const varstring lsuperfn2);
ReplaceSuperFile(const varstring lsuperfn,const varstring lfn,const varstring bylfn);
FinishSuperFileTransaction(boolean rollback=false);
Expand Down Expand Up @@ -1268,6 +1270,28 @@ FILESERVICES_API void FILESERVICES_CALL fsClearSuperFile(IGlobalCodeContext *gct
fsRemoveSuperFile(gctx,lsuperfn,NULL,del);
}

FILESERVICES_API void FILESERVICES_CALL fsDeleteOwnedSubFiles(IGlobalCodeContext *gctx, const char *lsuperfn)
{
fslDeleteOwnedSubFiles(gctx->queryCodeContext(), lsuperfn);
}

FILESERVICES_API void FILESERVICES_CALL fslDeleteOwnedSubFiles(ICodeContext *ctx, const char *lsuperfn)
{
Owned<IDistributedSuperFile> file;
StringBuffer lsfn;
lookupSuperFile(ctx, lsuperfn, file, true, lsfn, false, true);
IDistributedFileTransaction *transaction = ctx->querySuperFileTransaction();
assertex(transaction);
file->deleteOwnedSubFiles(transaction);
VStringBuffer s("DeleteOwnedSubFiles ('%s') ", lsfn.str());
if (transaction->active())
s.append("trans");
else
s.append("done");
WUmessage(ctx,ExceptionSeverityInformation,NULL,s.str());
AuditMessage(ctx,"DeleteOwnedSubFiles",lsfn.str());
}

FILESERVICES_API void FILESERVICES_CALL fslClearSuperFile(ICodeContext *ctx, const char *lsuperfn,bool del)
{
fslRemoveSuperFile(ctx,lsuperfn,NULL,del);
Expand Down
2 changes: 2 additions & 0 deletions plugins/fileservices/fileservices.hpp
Expand Up @@ -67,6 +67,7 @@ FILESERVICES_API void FILESERVICES_CALL fsStartSuperFileTransaction(IGlobalCodeC
FILESERVICES_API void FILESERVICES_CALL fsAddSuperFile(IGlobalCodeContext *ctx, const char *lsuperfn,const char *_lfn,unsigned atpos=0,bool addcontents=false, bool strict=false);
FILESERVICES_API void FILESERVICES_CALL fsRemoveSuperFile(IGlobalCodeContext *ctx, const char *lsuperfn,const char *_lfn,bool del=false,bool remcontents=false);
FILESERVICES_API void FILESERVICES_CALL fsClearSuperFile(IGlobalCodeContext *ctx, const char *lsuperfn,bool del=false);
FILESERVICES_API void FILESERVICES_CALL fsDeleteOwnedSubFiles(IGlobalCodeContext *ctx, const char *lsuperfn);
FILESERVICES_API void FILESERVICES_CALL fsSwapSuperFile(IGlobalCodeContext *ctx, const char *lsuperfn1,const char *lsuperfn2);
FILESERVICES_API void FILESERVICES_CALL fsReplaceSuperFile(IGlobalCodeContext *ctx, const char *lsuperfn,const char *lfn,const char *bylfn);
FILESERVICES_API void FILESERVICES_CALL fsFinishSuperFileTransaction(IGlobalCodeContext *ctx, bool rollback=false);
Expand Down Expand Up @@ -101,6 +102,7 @@ FILESERVICES_API void FILESERVICES_CALL fslStartSuperFileTransaction(ICodeContex
FILESERVICES_API void FILESERVICES_CALL fslAddSuperFile(ICodeContext *ctx, const char *lsuperfn,const char *_lfn,unsigned atpos=0,bool addcontents=false, bool strict=false);
FILESERVICES_API void FILESERVICES_CALL fslRemoveSuperFile(ICodeContext *ctx, const char *lsuperfn,const char *_lfn,bool del=false,bool remcontents=false);
FILESERVICES_API void FILESERVICES_CALL fslClearSuperFile(ICodeContext *ctx, const char *lsuperfn,bool del=false);
FILESERVICES_API void FILESERVICES_CALL fslDeleteOwnedSubFiles(ICodeContext *ctx, const char *lsuperfn);
FILESERVICES_API void FILESERVICES_CALL fslSwapSuperFile(ICodeContext *ctx, const char *lsuperfn1,const char *lsuperfn2);
FILESERVICES_API void FILESERVICES_CALL fslReplaceSuperFile(ICodeContext *ctx, const char *lsuperfn,const char *lfn,const char *bylfn);
FILESERVICES_API void FILESERVICES_CALL fslFinishSuperFileTransaction(ICodeContext *ctx, bool rollback=false);
Expand Down

0 comments on commit 5ed1da5

Please sign in to comment.