Skip to content

Commit

Permalink
Remove a number of potential crashers, some macro/token logic errors
Browse files Browse the repository at this point in the history
and fix the issue where the main thread would try to exit before all
workers had finished. Also, force the actual compressed file rewrite
to be exclusive, so that a crash in a parallel non-crucial operation
is less likely to lead to data loss.
  • Loading branch information
RJVB committed Sep 28, 2016
1 parent 5ca24b2 commit caee050
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 25 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ find_package(ZLIB 1.2.8 REQUIRED)
pkg_check_modules(SPARSEHASH libsparsehash)
include_directories(${ZLIB_INCLUDE_DIRS} ${SPARSEHASH_INCLUDEDIR})
link_directories(${SPARSEHASH_LIBRARY_DIRS})
add_definitions(-DSUPPORT_PARALLEL)

add_executable(afsctool
afsctool.c
Expand All @@ -19,7 +20,7 @@ add_executable(afsctool
CritSectEx/timing.c
)
target_include_directories(afsctool PRIVATE ${CMAKE_SOURCE_DIR})
set_target_properties(afsctool PROPERTIES COMPILE_FLAGS -DSUPPORT_PARALLEL)
# set_target_properties(afsctool PROPERTIES COMPILE_FLAGS -DSUPPORT_PARALLEL)
target_link_libraries(afsctool ${ZLIB_LIBRARIES} ${SPARSEHASH_LIBRARIES} "-framework CoreServices")

install(TARGETS afsctool DESTINATION ${CMAKE_INSTALL_PREFIX}/bin)
Expand Down
61 changes: 43 additions & 18 deletions ParallelProcess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ ParallelFileProcessor::ParallelFileProcessor(const int n, const int verbose)
{
threadPool.clear();
nJobs = n;
nProcessing = 0;
nProcessed = 0;
allDoneEvent = NULL;
ioLock = new CRITSECTLOCK(4000);
Expand Down Expand Up @@ -157,7 +158,11 @@ int ParallelFileProcessor::run()
}
if( allDoneEvent ){
DWORD waitResult = ~WAIT_OBJECT_0;
while( nJobs >= 1 && !quitRequested() && size() > 0 && waitResult != WAIT_OBJECT_0 ){
// contrary to what one might expect, we should NOT use size()==0 as a stopping criterium.
// The queue size is decreased when a worker picks a new file to process, not when it's
// finished. Using size()==0 as a stopping criterium caused the processing interrupts
// that were observed with large files.
while( nJobs >= 1 && !quitRequested() && waitResult != WAIT_OBJECT_0 ){
waitResult = WaitForSingleObject( allDoneEvent, 2000 );
if( nJobs ){
double perc = 100.0 * nProcessed / N;
Expand All @@ -176,10 +181,15 @@ int ParallelFileProcessor::run()
prevPerc = perc;
}
}
if( quitRequested() && !threadPool.empty() ){
// the WaitForSingleObject() call above was interrupted by the signal that
// led to quitRequested() being set and as a result the workers haven't yet
// had the chance to exit cleanly. Give them that chance now.
}
if( (quitRequested() && !threadPool.empty()) || nProcessing > 0 ){
// the WaitForSingleObject() call above was interrupted by the signal that
// led to quitRequested() being set and as a result the workers haven't yet
// had the chance to exit cleanly. Give them that chance now.
fprintf( stderr, " quitting [%ld]...", nProcessing ); fflush(stderr);
waitResult = WaitForSingleObject( allDoneEvent, 2000 );
for( i = 0 ; i < 4 && waitResult == WAIT_TIMEOUT ; ++i ){
fprintf( stderr, " [%ld]...", nProcessing) ; fflush(stderr);
waitResult = WaitForSingleObject( allDoneEvent, 2000 );
}
}
Expand All @@ -191,7 +201,11 @@ int ParallelFileProcessor::run()
while( !threadPool.empty() ){
FileProcessor *thread = threadPool.front();
if( thread->GetExitCode() == (THREAD_RETURN)STILL_ACTIVE ){
fprintf( stderr, "Stopping worker thread #%d that is still active!\n", i );
fprintf( stderr, "Stopping worker thread #%d that is still %s!\n", i, (thread->scope)? "processing" : "active" );
std::string currentFileName = thread->currentFileName();
if( currentFileName.c_str()[0] ){
fprintf( stderr, "\tcurrent file: %s\n", currentFileName.c_str() );
}
thread->Stop(true);
}
if( thread->nProcessed ){
Expand Down Expand Up @@ -229,10 +243,10 @@ int ParallelFileProcessor::run()

int ParallelFileProcessor::workerDone(FileProcessor *worker)
{ CRITSECTLOCK::Scope scope(threadLock);
char name[17];
// char name[17];
// pthread_getname_np( (pthread_t) GetThreadId(worker->GetThread()), name, sizeof(name) );
// fprintf( stderr, "workerDone(): worker \"%s\" is done; %ld workers left\n", name, nJobs - 1 );
nJobs -= 1;
pthread_getname_np( (pthread_t) GetThreadId(worker->GetThread()), name, sizeof(name) );
// fprintf( stderr, "workerDone(): worker \"%s\" is done\n", name );
if( nJobs <= 0 ){
if( allDoneEvent ){
SetEvent(allDoneEvent);
Expand All @@ -252,8 +266,15 @@ DWORD FileProcessor::Run(LPVOID arg)
// create a scoped lock without closing it immediately
CRITSECTLOCK::Scope scp(PP->ioLock, 0);
scope = &scp;
currentEntry = &entry;
_InterlockedIncrement(&PP->nProcessing);
entry.compress( this, PP );
_InterlockedDecrement(&PP->nProcessing);
_InterlockedIncrement(&PP->nProcessed);
currentEntry = NULL;
nProcessed += 1;
scope = NULL;

runningTotalRaw += entry.fileInfo.st_size;
runningTotalCompressed += (entry.compressedSize > 0)? entry.compressedSize : entry.fileInfo.st_size;
if( PP->verbose() > 1 ){
Expand All @@ -264,8 +285,6 @@ DWORD FileProcessor::Run(LPVOID arg)
cpuUsage += info.cpu_usage/10.0;
}
}
nProcessed += 1;
scope = NULL;
}
}
return DWORD(nProcessed);
Expand All @@ -281,19 +300,25 @@ void FileProcessor::InitThread()

inline bool FileProcessor::lockScope()
{
if( scope ){
PP->ioLockedFlag = scope->Lock();
if( PP ){
if( scope ){
PP->ioLockedFlag = scope->Lock();
}
return PP->ioLockedFlag;
}
return PP->ioLockedFlag;
return false;
}

inline bool FileProcessor::unLockScope()
{
if( scope ){
scope->Unlock();
PP->ioLockedFlag = *scope;
if( PP ){
if( scope ){
scope->Unlock();
PP->ioLockedFlag = *scope;
}
return PP->ioLockedFlag;
}
return PP->ioLockedFlag;
return false;
}

// ================================= C interface functions =================================
Expand Down
18 changes: 17 additions & 1 deletion ParallelProcess_p.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// kate: auto-insert-doxygen true; backspace-indents true; indent-width 4; keep-extra-spaces true; replace-tabs true; tab-indents true; tab-width 4;
// kate: auto-insert-doxygen true; backspace-indents true; indent-width 4; keep-extra-spaces true; replace-tabs false; tab-indents true; tab-width 4;

/*
* @file ParallelProcess_p.h
Expand Down Expand Up @@ -157,6 +157,8 @@ class ParallelFileProcessor : public ParallelProcessor<FileEntry>
int workerDone(FileProcessor *worker);
// the number of configured or active worker threads
volatile long nJobs;
// the number of processing threads
volatile long nProcessing;
// the number of processed items
volatile long nProcessed;
// a pool containing pointers to the worker threads
Expand All @@ -183,7 +185,15 @@ class FileProcessor : public Thread
, runningTotalCompressed(0)
, runningTotalRaw(0)
, cpuUsage(0.0)
, currentEntry(NULL)
{}
~FileProcessor()
{
// better be safe than sorry
PP = NULL;
scope = NULL;
currentEntry = NULL;
}
bool lockScope();
bool unLockScope();

Expand All @@ -192,6 +202,10 @@ class FileProcessor : public Thread
return procID;
}

inline std::string currentFileName() const
{
return (currentEntry)? currentEntry->fileName : "";
}
protected:
DWORD Run(LPVOID arg);
void InitThread();
Expand All @@ -210,6 +224,8 @@ class FileProcessor : public Thread
const int procID;
CRITSECTLOCK::Scope *scope;
friend class ParallelFileProcessor;
private:
FileEntry *currentEntry;
};


Expand Down
9 changes: 5 additions & 4 deletions afsctool.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ static const char *lbasename(const char *url)
return c;
}

#if SUPPORT_PARALLEL
#ifdef SUPPORT_PARALLEL
void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_info *folderinfo, void *worker )
#else
void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_info *folderinfo)
Expand Down Expand Up @@ -404,7 +404,8 @@ void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_inf
}

#ifdef SUPPORT_PARALLEL
if( exclusive_io && worker ){
// 20160928: the actual rewrite of the file is never done in parallel
if( worker ){
locked = lockParallelProcessorIO(worker);
}
#else
Expand Down Expand Up @@ -512,7 +513,7 @@ void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_inf
chmod(inFile, orig_mode);
}
#ifdef SUPPORT_PARALLEL
if( exclusive_io && worker ){
if( worker ){
locked = unLockParallelProcessorIO(worker);
}
#endif
Expand Down Expand Up @@ -1669,7 +1670,7 @@ void printUsage()
"-b make a backup of files before compressing them\n"
#ifdef SUPPORT_PARALLEL
"-jN compress (only compressable) files using <N> threads (compression is concurrent, disk IO is exclusive)\n"
"-JN read, compress and write files (only compressable ones) using <N> threads (everything including disk IO is concurrent)\n"
"-JN read, compress and write files (only compressable ones) using <N> threads (everything is concurrent except writing the compressed file)\n"
#endif
"-<level> Compression level to use when compressing (ranging from 1 to 9, with 1 being the fastest and 9 being the best - default is 5)\n");
}
Expand Down
2 changes: 1 addition & 1 deletion afsctool.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ struct filetype_info
long long int num_hard_link_files;
};

#if SUPPORT_PARALLEL
#ifdef SUPPORT_PARALLEL
# ifdef __cplusplus
extern void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_info *folderinfo, void *worker=NULL);
# else
Expand Down

0 comments on commit caee050

Please sign in to comment.