Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-10288 - Add RemoveOwnedSubFiles #5061

Merged
merged 1 commit into from Oct 24, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
217 changes: 173 additions & 44 deletions dali/base/dadfs.cpp
Expand Up @@ -2585,6 +2585,24 @@ class CDistributedFileBase : public CInterface, implements INTERFACE
queryLogicalName(), superRepO);
}

virtual void getSuperOwners(StringArray &owners)
{
if (root)
{
StringBuffer owner;
Owned<IPropertyTreeIterator> iter = root->getElements("SuperOwner");
ForEach (*iter)
{
iter->query().getProp("@name", owner.clear());
if (owner.length())
{
if (NotFound == owners.find(owner))
owners.append(owner);
}
}
}
}

void linkSuperOwner(const char *superfile,bool link)
{
if (!superfile||!*superfile)
Expand Down Expand Up @@ -4224,7 +4242,74 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
}
}
// Now we clean the subs
sf->doRemoveSubFile(subfile.get(),transaction);
if (subfile.get())
sf->doRemoveSubFile(subfile.get(), transaction);
else
sf->doRemoveSubFiles(transaction);
}
}
};

/**
* Removes all subfiles exclusively owned by named superfile within a transaction.
*/
class cDeleteOwnedSubFilesAction: public CDFAction
{
StringAttr parentlname;
Owned<IDistributedSuperFile> parent;
public:
cDeleteOwnedSubFilesAction(IDistributedFileTransaction *_transaction, const char *_parentlname)
: CDFAction(_transaction), parentlname(_parentlname)
{
}
bool prepare()
{
parent.setown(transaction->lookupSuperFile(parentlname,true));
if (!parent)
throw MakeStringException(-1,"deleteOwnedSubFiles: SuperFile %s cannot be found", parentlname.get());
// Try to lock all files
addFileLock(parent);
bool dirty=false;
if (lock(&dirty))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following block of code looks like it could be commoned up with some of the other clases. Not for this pull request though.

{
if (dirty)
{
// in the process of previous attempt to lock for exclusive access, locks were released
// need to reload to ensure position and # of files is correct
CDistributedSuperFile *sf = dynamic_cast<CDistributedSuperFile *>(parent.get());
if (sf)
sf->loadSubFiles(transaction, SDS_TRANSACTION_RETRY);
}
return true;
}
unlock();
return false;
}
void run()
{
CDistributedSuperFile *sf = QUERYINTERFACE(parent.get(),CDistributedSuperFile);
if (sf)
{
// Only delete subfiles which I am sole owner
Owned<IDistributedFileIterator> iter = parent->getSubFileIterator(false);
ForEach (*iter)
{
IDistributedFile *file = &iter->query();
CDistributedFile *_file = QUERYINTERFACE(file, CDistributedFile);
StringArray owners;
_file->getSuperOwners(owners);

if (NotFound == owners.find(parentlname))
ThrowStringException(-1, "deleteOwnedSubFiles: SuperFile %s, subfile %s - subfile not owned by superfile", parentlname.get(), file->queryLogicalName());
if (1 == owners.ordinality()) // just me
{
CDfsLogicalFileName lname;
lname.set(file->queryLogicalName());
// Delay the deletion of the subs until commit
transaction->addDelayedDelete(lname, SDS_SUB_LOCK_TIMEOUT);
}
}
sf->doRemoveSubFiles(transaction); // remove all
}
}
};
Expand Down Expand Up @@ -4388,6 +4473,27 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
throw exceptions.getClear();
}

virtual void getSuperOwners(StringArray &owners)
{
ForEachItemIn(i, subfiles)
{
IDistributedFile *file = &subfiles.item(i);
IDistributedSuperFile *super = file->querySuperFile();
if (super)
{
CDistributedSuperFile *_super = QUERYINTERFACE(super, CDistributedSuperFile);
if (_super)
_super->getSuperOwners(owners);
}
else
{
CDistributedFile *_file = QUERYINTERFACE(file, CDistributedFile);
if (_file)
_file->getSuperOwners(owners);
}
}

}
static StringBuffer &getSubPath(StringBuffer &path,unsigned idx)
{
return path.append("SubFile[@num=\"").append(idx+1).append("\"]");
Expand Down Expand Up @@ -4491,14 +4597,13 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
root->setPropInt("@numsubfiles",subfiles.ordinality());
}

void removeItem(unsigned pos, StringBuffer &subname)
void removeItem(unsigned pos)
{
partscache.kill();
StringBuffer path;
IPropertyTree* sub = root->queryPropTree(getSubPath(path,pos).str());
if (!sub)
throw MakeStringException(-1,"CDistributedSuperFile(3): Corrupt subfile file part %d cannot be found",pos+1);
sub->getProp("@name",subname);
root->removeTree(sub);
// now renumber all above
for (unsigned i=pos+1; i<subfiles.ordinality(); i++) {
Expand Down Expand Up @@ -5305,56 +5410,56 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
linkSubFile(pos, transaction);
}

bool doRemoveSubFiles(IDistributedFileTransaction *transaction)
{
// have to be quite careful here
unsigned pos = subfiles.ordinality();
if (pos)
{
DistributedFilePropertyLock lock(this);
if (lock.needsReload())
loadSubFiles(transaction,1000*60*10);
pos = subfiles.ordinality();
if (pos)
{
do
{
pos--;
unlinkSubFile(pos,transaction);
removeItem(pos);
} while (pos);
setModified();
updateFileAttrs();
lock.unlock();
updateParentFileAttrs(transaction);
}
}
return true;
}

bool doRemoveSubFile(const char *subfile,
IDistributedFileTransaction *transaction)
{
// have to be quite careful here
StringAttrArray subnames;
unsigned pos;
StringBuffer subname;
if (subfile) {
unsigned pos=findSubFileOrd(subfile);
unsigned pos=findSubFileOrd(subfile);
if ((pos==NotFound)||(pos>=subfiles.ordinality()))
pos = findSubFile(subfile);
if (pos==NotFound)
return false;
{
DistributedFilePropertyLock lock(this);
// don't reload subfiles here
pos=findSubFileOrd(subfile);
if ((pos==NotFound)||(pos>=subfiles.ordinality()))
pos = findSubFile(subfile);
if (pos==NotFound)
return false;
{
DistributedFilePropertyLock lock(this);
// don't reload subfiles here
pos=findSubFileOrd(subfile);
if ((pos==NotFound)||(pos>=subfiles.ordinality()))
pos = findSubFile(subfile);
if (pos==NotFound)
return false;
unlinkSubFile(pos,transaction);
removeItem(pos,subname.clear());
subnames.append(* new StringAttrItem(subname.str()));
setModified();
updateFileAttrs();
}
updateParentFileAttrs(transaction);
}
else {
pos = subfiles.ordinality();
if (pos) {
DistributedFilePropertyLock lock(this);
if (lock.needsReload())
loadSubFiles(transaction,1000*60*10);
pos = subfiles.ordinality();
if (pos) {
do {
pos--;
unlinkSubFile(pos,transaction);
removeItem(pos,subname.clear());
subnames.append(* new StringAttrItem(subname.str()));
} while (pos);
setModified();
updateFileAttrs();
lock.unlock();
updateParentFileAttrs(transaction);
}
}
unlinkSubFile(pos,transaction);
removeItem(pos);
setModified();
updateFileAttrs();
}
updateParentFileAttrs(transaction);
return true;
}

Expand Down Expand Up @@ -5494,6 +5599,30 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
return true;
}

virtual bool deleteOwnedSubFiles(IDistributedFileTransaction *transaction)
{
CriticalBlock block (sect);
checkModify("deleteOwnedSubFiles");
partscache.kill();

// Create a local transaction that will be destroyed (MORE: make transaction compulsory)
Linked<IDistributedFileTransaction> localtrans;
if (transaction)
localtrans.set(transaction);
else
localtrans.setown(new CDistributedFileTransaction(udesc));

// Make sure this file is in cache (reuse below)
localtrans->addFile(this);

cDeleteOwnedSubFilesAction *action = new cDeleteOwnedSubFilesAction(localtrans, queryLogicalName());
localtrans->addAction(action); // takes ownership
localtrans->autoCommit();

// MORE - auto-commit will throw an exception, change this to void
return true;
}

virtual bool swapSuperFile( IDistributedSuperFile *_file,
IDistributedFileTransaction *transaction)
{
Expand Down
2 changes: 2 additions & 0 deletions dali/base/dadfs.hpp
Expand Up @@ -321,6 +321,8 @@ interface IDistributedSuperFile: extends IDistributedFile
bool remcontents=false, // if true removes contents of subfile (assuming it is a superfile)
IDistributedFileTransaction *transaction=NULL)=0;
// Note does not delete subfile
virtual bool deleteOwnedSubFiles(IDistributedFileTransaction *transaction=NULL)=0;
// Note does not delete subfile
virtual bool swapSuperFile( IDistributedSuperFile *_file, // swaps sub files
IDistributedFileTransaction *transaction)=0;

Expand Down
3 changes: 3 additions & 0 deletions ecllibrary/std/File.ecl
Expand Up @@ -473,6 +473,9 @@ EXPORT RemoveSuperFile(varstring lsuperfn, varstring lfn, boolean del=false, boo
EXPORT ClearSuperFile(varstring lsuperfn, boolean del=false) :=
lib_fileservices.FileServices.ClearSuperFile(lsuperfn, del);

EXPORT DeleteOwnedSubFiles(varstring lsuperfn) :=
lib_fileservices.FileServices.DeleteOwnedSubFiles(lsuperfn);

EXPORT SwapSuperFile(varstring lsuperfn1, varstring lsuperfn2) :=
lib_fileservices.FileServices.SwapSuperFile(lsuperfn1, lsuperfn2);

Expand Down
24 changes: 24 additions & 0 deletions plugins/fileservices/fileservices.cpp
Expand Up @@ -86,6 +86,7 @@ static const char * EclDefinition =
" AddSuperFile(const varstring lsuperfn,const varstring lfn,unsigned4 atpos=0,boolean addcontents=false, boolean strict=false) : c,action,globalcontext,entrypoint='fsAddSuperFile'; \n"
" RemoveSuperFile(const varstring lsuperfn,const varstring lfn,boolean del=false,boolean remcontents=false) : c,action,globalcontext,entrypoint='fsRemoveSuperFile'; \n"
" ClearSuperFile(const varstring lsuperfn,boolean del=false) : c,action,globalcontext,entrypoint='fsClearSuperFile'; \n"
" DeleteOwnedSubFiles(const varstring lsuperfn) : c,action,globalcontext,entrypoint='fsDeleteOwnedSubFiles'; \n"
" SwapSuperFile(const varstring lsuperfn1,const varstring lsuperfn2) : c,action,globalcontext,entrypoint='fsSwapSuperFile'; \n"
" ReplaceSuperFile(const varstring lsuperfn,const varstring lfn,const varstring bylfn) : c,action,globalcontext,entrypoint='fsReplaceSuperFile'; \n"
" FinishSuperFileTransaction(boolean rollback=false) : c,action,globalcontext,entrypoint='fsFinishSuperFileTransaction'; \n"
Expand Down Expand Up @@ -1012,6 +1013,7 @@ StartSuperFileTransaction();
AddSuperFile(const varstring lsuperfn,const varstring lfn,unsigned4 atpos=0);
RemoveSuperFile(const varstring lsuperfn,const varstring lfn,boolean del=false);
ClearSuperFile(const varstring lsuperfn,boolean del=false);
DeleteOwnedSubFiles(const varstring lsuperfn);
SwapSuperFile(const varstring lsuperfn1,const varstring lsuperfn2);
ReplaceSuperFile(const varstring lsuperfn,const varstring lfn,const varstring bylfn);
FinishSuperFileTransaction(boolean rollback=false);
Expand Down Expand Up @@ -1268,6 +1270,28 @@ FILESERVICES_API void FILESERVICES_CALL fsClearSuperFile(IGlobalCodeContext *gct
fsRemoveSuperFile(gctx,lsuperfn,NULL,del);
}

FILESERVICES_API void FILESERVICES_CALL fsDeleteOwnedSubFiles(IGlobalCodeContext *gctx, const char *lsuperfn)
{
fslDeleteOwnedSubFiles(gctx->queryCodeContext(), lsuperfn);
}

FILESERVICES_API void FILESERVICES_CALL fslDeleteOwnedSubFiles(ICodeContext *ctx, const char *lsuperfn)
{
Owned<IDistributedSuperFile> file;
StringBuffer lsfn;
lookupSuperFile(ctx, lsuperfn, file, true, lsfn, false, true);
IDistributedFileTransaction *transaction = ctx->querySuperFileTransaction();
assertex(transaction);
file->deleteOwnedSubFiles(transaction);
VStringBuffer s("DeleteOwnedSubFiles ('%s') ", lsfn.str());
if (transaction->active())
s.append("trans");
else
s.append("done");
WUmessage(ctx,ExceptionSeverityInformation,NULL,s.str());
AuditMessage(ctx,"DeleteOwnedSubFiles",lsfn.str());
}

FILESERVICES_API void FILESERVICES_CALL fslClearSuperFile(ICodeContext *ctx, const char *lsuperfn,bool del)
{
fslRemoveSuperFile(ctx,lsuperfn,NULL,del);
Expand Down
2 changes: 2 additions & 0 deletions plugins/fileservices/fileservices.hpp
Expand Up @@ -67,6 +67,7 @@ FILESERVICES_API void FILESERVICES_CALL fsStartSuperFileTransaction(IGlobalCodeC
FILESERVICES_API void FILESERVICES_CALL fsAddSuperFile(IGlobalCodeContext *ctx, const char *lsuperfn,const char *_lfn,unsigned atpos=0,bool addcontents=false, bool strict=false);
FILESERVICES_API void FILESERVICES_CALL fsRemoveSuperFile(IGlobalCodeContext *ctx, const char *lsuperfn,const char *_lfn,bool del=false,bool remcontents=false);
FILESERVICES_API void FILESERVICES_CALL fsClearSuperFile(IGlobalCodeContext *ctx, const char *lsuperfn,bool del=false);
FILESERVICES_API void FILESERVICES_CALL fsDeleteOwnedSubFiles(IGlobalCodeContext *ctx, const char *lsuperfn);
FILESERVICES_API void FILESERVICES_CALL fsSwapSuperFile(IGlobalCodeContext *ctx, const char *lsuperfn1,const char *lsuperfn2);
FILESERVICES_API void FILESERVICES_CALL fsReplaceSuperFile(IGlobalCodeContext *ctx, const char *lsuperfn,const char *lfn,const char *bylfn);
FILESERVICES_API void FILESERVICES_CALL fsFinishSuperFileTransaction(IGlobalCodeContext *ctx, bool rollback=false);
Expand Down Expand Up @@ -101,6 +102,7 @@ FILESERVICES_API void FILESERVICES_CALL fslStartSuperFileTransaction(ICodeContex
FILESERVICES_API void FILESERVICES_CALL fslAddSuperFile(ICodeContext *ctx, const char *lsuperfn,const char *_lfn,unsigned atpos=0,bool addcontents=false, bool strict=false);
FILESERVICES_API void FILESERVICES_CALL fslRemoveSuperFile(ICodeContext *ctx, const char *lsuperfn,const char *_lfn,bool del=false,bool remcontents=false);
FILESERVICES_API void FILESERVICES_CALL fslClearSuperFile(ICodeContext *ctx, const char *lsuperfn,bool del=false);
FILESERVICES_API void FILESERVICES_CALL fslDeleteOwnedSubFiles(ICodeContext *ctx, const char *lsuperfn);
FILESERVICES_API void FILESERVICES_CALL fslSwapSuperFile(ICodeContext *ctx, const char *lsuperfn1,const char *lsuperfn2);
FILESERVICES_API void FILESERVICES_CALL fslReplaceSuperFile(ICodeContext *ctx, const char *lsuperfn,const char *lfn,const char *bylfn);
FILESERVICES_API void FILESERVICES_CALL fslFinishSuperFileTransaction(ICodeContext *ctx, bool rollback=false);
Expand Down