From 2fffb77cacfdb18459930f70ca983e43748e283b Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Tue, 12 Apr 2016 08:56:01 +0200 Subject: [PATCH 01/23] initialization --- Makefile | 6 +++--- ms.c | 2 +- mspar.c | 15 ++++++++------- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index 6fc01dd..4eb307d 100755 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # # -# 'make' make executable file 'mspar' +# 'make' make executable file 'msparsm' # 'make clean' removes all .o and executable files # @@ -33,7 +33,7 @@ RND=rand2.c $(BIN)/%.o: %.c $(DEPS) $(CC) $(CFLAGS) -c -o $@ $< -default: $(BIN)/mspar +default: $(BIN)/msparsm # download: packages # wget http://www.open-mpi.org/software/ompi/v1.8/downloads/openmpi-1.8.2.tar.gz @@ -48,7 +48,7 @@ clean: @echo "*** All resources were cleaned-up ***" @echo "" -$(BIN)/mspar: $(OBJ) +$(BIN)/msparsm: $(OBJ) $(CC) $(CFLAGS) -o $@ $^ $(RND_48) $(LIBS) @echo "" @echo "*** make complete: generated executable 'mspar' ***" diff --git a/ms.c b/ms.c index c235730..e865d34 100644 --- a/ms.c +++ b/ms.c @@ -207,7 +207,7 @@ gensam( char **list, double *pprobss, double *ptmrca, double *pttot, struct para free(seglst[seg].ptree) ; } result.tree = treeOutput; - printf(treeOutput); + printf("%s", treeOutput); } if( pars.mp.timeflag ) { diff --git a/mspar.c b/mspar.c index d949463..3393b14 100644 --- a/mspar.c +++ b/mspar.c @@ -22,23 +22,24 @@ const int GO_TO_WORK_TAG = 400; int masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites) { - // myRank : rank of the current process in the MPI ecosystem. + // global_rank : rank of the current process in the global MPI ecosystem. // poolSize : number of processes in the MPI ecosystem. // goToWork : used by workers to realize if there is more work to do. // seedMatrix : matrix containing the RNG seeds to be distributed to working processes. // localSeedMatrix : matrix used by workers to receive RNG seeds from master. - int myRank; + int global_rank; int poolSize; unsigned short *seedMatrix; unsigned short localSeedMatrix[3]; + MPI_Comm shmcomm; // MPI Initialization MPI_Init(&argc, &argv ); MPI_Comm_size(MPI_COMM_WORLD, &poolSize); - MPI_Comm_rank(MPI_COMM_WORLD, &myRank); + MPI_Comm_rank(MPI_COMM_WORLD, &global_rank); - if(myRank == 0) + if(global_rank == 0) { int i; // Only the master process prints out the application's parameters @@ -65,10 +66,10 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, } // Filter out workers with rank higher than howmany, meaning there are more workers than samples to be generated. - if(myRank <= howmany) + if(global_rank <= howmany) { MPI_Scatter(seedMatrix, 3, MPI_UNSIGNED_SHORT, localSeedMatrix, 3, MPI_UNSIGNED_SHORT, 0, MPI_COMM_WORLD); - if(myRank == 0) + if(global_rank == 0) { // Master Processing masterProcessingLogic(howmany, 0, poolSize, parameters, maxsites); @@ -79,7 +80,7 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, } } - return myRank; + return 0; } void From 858985e7674bb8ab182777023210a1e6ba2e3cc4 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Wed, 13 Apr 2016 08:59:49 +0200 Subject: [PATCH 02/23] routine signature simplification and result accumulation Common data needed by routines is now with global scope. Resulting samples are accumulated first and later on outputted. This is a precondition step we need before introducing super master topology. --- ms.c | 1 - mspar.c | 124 +++++++++++++++++++++++++++++++++++++------------------- mspar.h | 6 +-- 3 files changed, 86 insertions(+), 45 deletions(-) diff --git a/ms.c b/ms.c index 5592bec..cccd048 100644 --- a/ms.c +++ b/ms.c @@ -122,7 +122,6 @@ int main(int argc, char *argv[]){ char **tbsparamstrs ; double probss, tmrca, ttot ; struct params pars ; - void seedit( const char * ) ; struct params getpars( int argc, char *argv[], int *howmany, int ntbs, int count ) ; int samples; diff --git a/mspar.c b/mspar.c index 4a59422..339f3f3 100644 --- a/mspar.c +++ b/mspar.c @@ -1,9 +1,3 @@ -const int SEEDS_COUNT = 3; -const int SEED_TAG = 100; -const int SAMPLES_NUMBER_TAG = 200; -const int RESULTS_TAG = 300; -const int GO_TO_WORK_TAG = 400; - #include #include #include @@ -11,6 +5,17 @@ const int GO_TO_WORK_TAG = 400; #include "mspar.h" #include /* OpenMPI library */ +const int SAMPLES_NUMBER_TAG = 200; +const int RESULTS_TAG = 300; +const int GO_TO_WORK_TAG = 400; + +// Following variables are with global scope in order to facilitate its sharing among routines. +// They are going to be updated in the masterWorkerSetup routine only, which is called only one, therefore there is no +// risk of race conditions or whatever other concurrency related problem. +int world_rank, shm_rank; +int world_size, shm_size; +int shm_mode = 0; // indicates whether MPI-3 SHM is going to be applied + // ************************************** // // MASTER // ************************************** // @@ -18,23 +23,30 @@ const int GO_TO_WORK_TAG = 400; int masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites) { - // myRank : rank of the current process in the MPI ecosystem. - // poolSize : number of processes in the MPI ecosystem. // goToWork : used by workers to realize if there is more work to do. // seedMatrix : matrix containing the RNG seeds to be distributed to working processes. // localSeedMatrix : matrix used by workers to receive RNG seeds from master. - int myRank; - int poolSize; unsigned short *seedMatrix; unsigned short localSeedMatrix[3]; + MPI_Comm shmcomm; // MPI Initialization MPI_Init(&argc, &argv ); - MPI_Comm_size(MPI_COMM_WORLD, &poolSize); - MPI_Comm_rank(MPI_COMM_WORLD, &myRank); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); + + // MPI_COMM_TYPE_SHARED: This type splits the communicator into subcommunicators, each of which can create a shared memory region. + MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); + MPI_Comm_size( shmcomm, &shm_size ); + MPI_Comm_rank( shmcomm, &shm_rank ); - if(myRank == 0) + if (shm_size != world_rank) // there are MPI process in ore than 1 computing node + { + shm_mode = 1; + } + + if(world_rank == 0) { int i; // Only the master process prints out the application's parameters @@ -45,14 +57,14 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, // If there are (not likely) more processes than samples, then the process pull // is cut up to the number of samples. */ - if(poolSize > howmany) + if(world_size > howmany) { - poolSize = howmany + 1; // the extra 1 is due to the master + world_size = howmany + 1; // the extra 1 is due to the master } int nseeds; doInitializeRng(argc, argv, &nseeds, parameters); - int dimension = nseeds * poolSize; + int dimension = nseeds * world_size; seedMatrix = (unsigned short *) malloc(sizeof(unsigned short) * dimension); for(i=0; i 0) { - int idleProcess = findIdleProcess(processActivity, poolSize, lastAssignedProcess); + int idleProcess = findIdleProcess(processActivity, lastAssignedProcess); if(idleProcess >= 0) { - assignWork(processActivity, idleProcess, 1); + assignWork(processActivity, idleProcess, 1); // we should pass the communicator lastAssignedProcess = idleProcess; howmany--; } else { - readResultsFromWorkers(1, processActivity); + sample = readResultsFromWorkers(1, processActivity); // we should pass the communicator + offset = strlen(results); + length = strlen(sample); + results = realloc(results, offset + length + 1); + memcpy(results+offset, sample, length); + free(sample); + pendingJobs--; } } while(pendingJobs > 0) { - readResultsFromWorkers(0, processActivity); + char *sample = readResultsFromWorkers(0, processActivity); // we should pass the communicator + offset = strlen(results); + length = strlen(sample); + results = realloc(results, offset + length + 1); + memcpy(results+offset, sample, length); + free(sample); + pendingJobs--; } + + fprintf(stdout, "%s", results); + free(results); // be good citizen + + // 1. at this point all of the work was done + // 2. if there are more nodes (shm_mode?) + // 3. if I'm not the super master, then send results to WORLD rank 0 + // 4. otherwise print the accumulated results? (or not accumulate them if not needed?) + // don't forget to 'free' the results } /* @@ -133,10 +179,10 @@ masterProcessingLogic(int howmany, int lastAssignedProcess, int poolSize, struct * * @param goToWork indica si el worker queda en espera de más trabajo (1) o si ya puede finalizar su ejecución (0) * @param workersActivity el vector con el estado de actividad de los workers - * @return * + * @return the generated sample */ -void readResultsFromWorkers(int goToWork, int* workersActivity) +char* readResultsFromWorkers(int goToWork, int* workersActivity) { MPI_Status status; int size; @@ -152,31 +198,29 @@ void readResultsFromWorkers(int goToWork, int* workersActivity) MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, MPI_COMM_WORLD); workersActivity[source]=0; - fprintf(stdout, "%s", results); - free(results); // be good citizen + return results; } /* * Finds an idle process from a list of potential worker processes. * * @param workersActivity status of all processes that can generate some work (0=idle; 1=busy) - * @param poolSize number of worker processes * @lastAssignedProcess last process assigned with some work * * @return idle process index or -1 if all processes are busy. */ -int findIdleProcess(int *processActivity, int poolSize, int lastAssignedProcess) { +int findIdleProcess(int *processActivity, int lastAssignedProcess) { /* * Implementation note: lastAssignedProcess is used to implement a fairness policy in which every available process * can be assigned with some work. */ int result = -1; int i= lastAssignedProcess+1; // master process does not generate replicas - while(i < poolSize && processActivity[i] == 1){ + while(i < world_size && processActivity[i] == 1){ i++; }; - if(i >= poolSize){ + if(i >= world_size){ i=1; // master process does not generate replicas while(i < lastAssignedProcess && processActivity[i] == 1){ i++; @@ -212,8 +256,6 @@ void assignWork(int* workersActivity, int worker, int samples) { int workerProcess(struct params parameters, unsigned maxsites) { - char *generateSamples(int, struct params, unsigned); - int samples; char *results; diff --git a/mspar.h b/mspar.h index f2f70bb..0e0313e 100644 --- a/mspar.h +++ b/mspar.h @@ -1,13 +1,13 @@ int masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites); void masterWorkerTeardown(); -void masterProcessingLogic(int howmany, int lastIdleWorker, int poolSize, struct params parameters, unsigned maxsites); +void masterProcessingLogic(int howmany, int lastIdleWorker); int workerProcess(struct params parameters, unsigned maxsites); void doInitializeRng(int argc, char *argv[], int *seeds, struct params parameters); void sendResultsToMasterProcess(char* results); int receiveWorkRequest(); void assignWork(int* workersActivity, int assignee, int samples); -void readResultsFromWorkers(int goToWork, int* workersActivity); -int findIdleProcess(int *processActivity, int poolSize, int lastAssignedProcess); +char* readResultsFromWorkers(int goToWork, int* workersActivity); +int findIdleProcess(int *processActivity, int lastAssignedProcess); char* generateSample(struct params parameters, unsigned maxsites); char *generateSamples(int, struct params, unsigned); struct gensam_result gensam(char **gametes, double *probss, double *ptmrca, double *pttot, struct params pars, int* segsites); From 2b1b9b492ec6974cefe7fbbddb26664c673259ea Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Fri, 15 Apr 2016 09:24:33 +0200 Subject: [PATCH 03/23] shm preparation and some pseudo-code --- mspar.c | 83 ++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 70 insertions(+), 13 deletions(-) diff --git a/mspar.c b/mspar.c index 339f3f3..d5d5ed0 100644 --- a/mspar.c +++ b/mspar.c @@ -1,9 +1,9 @@ #include #include #include +#include #include "ms.h" #include "mspar.h" -#include /* OpenMPI library */ const int SAMPLES_NUMBER_TAG = 200; const int RESULTS_TAG = 300; @@ -12,6 +12,7 @@ const int GO_TO_WORK_TAG = 400; // Following variables are with global scope in order to facilitate its sharing among routines. // They are going to be updated in the masterWorkerSetup routine only, which is called only one, therefore there is no // risk of race conditions or whatever other concurrency related problem. +MPI_Comm shmcomm; // shm intra-communicator int world_rank, shm_rank; int world_size, shm_size; int shm_mode = 0; // indicates whether MPI-3 SHM is going to be applied @@ -29,7 +30,10 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, unsigned short *seedMatrix; unsigned short localSeedMatrix[3]; - MPI_Comm shmcomm; + // MPI-3 SHM related + MPI_Win win; // shm window object + char *shm; // the shared memory + char *shm_results; // memory place where all MPI process from one node will going to share // MPI Initialization MPI_Init(&argc, &argv ); @@ -41,11 +45,12 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, MPI_Comm_size( shmcomm, &shm_size ); MPI_Comm_rank( shmcomm, &shm_rank ); - if (shm_size != world_rank) // there are MPI process in ore than 1 computing node + if (shm_size != world_size) // there are MPI process in ore than 1 computing node { shm_mode = 1; } + if(world_rank == 0) { int i; @@ -76,19 +81,65 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, if(world_rank < howmany) { MPI_Scatter(seedMatrix, 3, MPI_UNSIGNED_SHORT, localSeedMatrix, 3, MPI_UNSIGNED_SHORT, 0, MPI_COMM_WORLD); - if(world_rank == 0) // or shm_rank = 0 + /* + MPI_Barrier(MPI_COMM_WORLD); + + if ( shm_mode == 0) { - // 1. if there are more nodes - // 2. then we should split the work among nodes - // 2.1 by doing world_size/shm_size we know how many masters are there around - // 2.2 each shm_rank = 0 does the "masterProcessingLogic", but with reduced howmany and world_size + parallelSeed(localSeedMatrix); + MPI_Aint sz; + int dispunit = 1; + + if (shm_rank != 0) + { + + char *sample = generateSample(parameters, maxsites); + int length = strlen(sample); + + MPI_Send(&length, 1, MPI_INT, 0, 10001, shmcomm); + + MPI_Win_allocate_shared(length, 1, MPI_INFO_NULL, shmcomm, &shm, &win); + MPI_Win_shared_query(win, MPI_PROC_NULL, &sz, &dispunit, &shm_results); + + memcpy(shm_results, sample, length); + free(sample); + MPI_Win_fence(0, win); + //MPI_Barrier(shmcomm); + + MPI_Win_free(&win); + + } + else + { + int length; + + MPI_Status status; + MPI_Recv(&length, 1, MPI_INT, 1, 10001, shmcomm, &status); + //printf("[%d] - shm from [%d] ready for reading\n", shm_rank, status.MPI_SOURCE); + + MPI_Win_allocate_shared(0, 1, MPI_INFO_NULL, shmcomm, &shm, &win); + MPI_Win_shared_query(win, MPI_PROC_NULL, &sz, &dispunit, &shm_results); + + MPI_Win_fence(0, win); + //MPI_Barrier(shmcomm); + printf("%s\n", shm_rank, shm_results); + + MPI_Win_free(&win); + + } + } + */ + + if(world_rank == 0) + { // Master Processing masterProcessingLogic(howmany, 0); // if we're doing shm // then listen for results from other nodes - } else + } + else { // Worker Processing parallelSeed(localSeedMatrix); @@ -96,6 +147,7 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, } return world_rank; +// return 0; } void @@ -110,12 +162,17 @@ masterWorkerTeardown() { * @param lastAssignedProcess last processes that has been assigned som work */ void -masterProcessingLogic(int howmany, int lastAssignedProcess) // we should get the communicator here +masterProcessingLogic(int howmany, int lastAssignedProcess) { int *processActivity = (int*) malloc(world_size * sizeof(int)); processActivity[0] = 1; // Master does not generate replicas int i; + // 1. if there are more nodes + // 2. then we should split the work among nodes + // 2.1 by doing world_size/shm_size we know how many masters are there around + // 2.2 each shm_rank = 0 does the "masterProcessingLogic", but with reduced howmany and world_size + for(i=1; i= 0) { - assignWork(processActivity, idleProcess, 1); // we should pass the communicator + assignWork(processActivity, idleProcess, 1); lastAssignedProcess = idleProcess; howmany--; } else { - sample = readResultsFromWorkers(1, processActivity); // we should pass the communicator + sample = readResultsFromWorkers(1, processActivity); offset = strlen(results); length = strlen(sample); results = realloc(results, offset + length + 1); @@ -151,7 +208,7 @@ masterProcessingLogic(int howmany, int lastAssignedProcess) // we should get the } while(pendingJobs > 0) { - char *sample = readResultsFromWorkers(0, processActivity); // we should pass the communicator + char *sample = readResultsFromWorkers(0, processActivity); offset = strlen(results); length = strlen(sample); results = realloc(results, offset + length + 1); From cf3550cbef3bea5a0c64207742694446e9d831d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Montemui=C3=B1o?= Date: Fri, 15 Apr 2016 17:03:13 +0200 Subject: [PATCH 04/23] right cmake conf for ubuntu --- CMakeLists.txt | 3 +-- mspar.c | 7 +++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 39c8fdb..bc4ba64 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,7 +7,6 @@ find_package(MPI REQUIRED) include_directories(${MPI_INCLUDE_PATH}) set(MPI_COMPILE_FLAGS "-O3 -std=gnu99 -I.") -set(MPI_LINK_FLAGS "-lm") set(SOURCE_FILES ms.c ms.h @@ -17,7 +16,7 @@ set(SOURCE_FILES streec.c) add_executable(msparsm ${SOURCE_FILES}) -target_link_libraries(msparsm ${MPI_LIBRARIES}) +target_link_libraries(msparsm ${MPI_LIBRARIES} -lm) if(MPI_COMPILE_FLAGS) set_target_properties(msparsm PROPERTIES COMPILE_FLAGS "${MPI_COMPILE_FLAGS}") diff --git a/mspar.c b/mspar.c index d5d5ed0..bb362f3 100644 --- a/mspar.c +++ b/mspar.c @@ -1,9 +1,9 @@ #include #include #include -#include #include "ms.h" #include "mspar.h" +#include /* OpenMPI library */ const int SAMPLES_NUMBER_TAG = 200; const int RESULTS_TAG = 300; @@ -50,7 +50,6 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, shm_mode = 1; } - if(world_rank == 0) { int i; @@ -81,6 +80,7 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, if(world_rank < howmany) { MPI_Scatter(seedMatrix, 3, MPI_UNSIGNED_SHORT, localSeedMatrix, 3, MPI_UNSIGNED_SHORT, 0, MPI_COMM_WORLD); + /* MPI_Barrier(MPI_COMM_WORLD); @@ -145,9 +145,8 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, parallelSeed(localSeedMatrix); } } - - return world_rank; // return 0; + return world_rank; } void From 5aa2764fe6dcf6e360bb902571f0cf973b9e4d06 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Tue, 19 Apr 2016 01:23:23 +0200 Subject: [PATCH 05/23] global master / node master processing --- mspar.c | 236 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 168 insertions(+), 68 deletions(-) diff --git a/mspar.c b/mspar.c index bb362f3..4ace009 100644 --- a/mspar.c +++ b/mspar.c @@ -8,6 +8,12 @@ const int SAMPLES_NUMBER_TAG = 200; const int RESULTS_TAG = 300; const int GO_TO_WORK_TAG = 400; +const int NODE_MASTER_ASSIGNMENT = 500; // Used to assign a number of samples to a node master + +typedef struct { + int shm_rank; + int world_rank; +} Rank_Struct; // Following variables are with global scope in order to facilitate its sharing among routines. // They are going to be updated in the masterWorkerSetup routine only, which is called only one, therefore there is no @@ -24,6 +30,10 @@ int shm_mode = 0; // indicates whether MPI-3 SHM is going to be applied int masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites) { + int numberOfNodes(void*, MPI_Aint); + void buildRankDataType(MPI_Datatype*); + char *readResults(MPI_Comm, int*); + // goToWork : used by workers to realize if there is more work to do. // seedMatrix : matrix containing the RNG seeds to be distributed to working processes. // localSeedMatrix : matrix used by workers to receive RNG seeds from master. @@ -34,6 +44,7 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, MPI_Win win; // shm window object char *shm; // the shared memory char *shm_results; // memory place where all MPI process from one node will going to share + int nodeHowmany; // MPI Initialization MPI_Init(&argc, &argv ); @@ -45,7 +56,7 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, MPI_Comm_size( shmcomm, &shm_size ); MPI_Comm_rank( shmcomm, &shm_rank ); - if (shm_size != world_size) // there are MPI process in ore than 1 computing node + if (shm_size != world_size) // there are MPI process in more than 1 computing node { shm_mode = 1; } @@ -131,24 +142,110 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, } */ - if(world_rank == 0) + Rank_Struct rank; + MPI_Aint struct_rank_size, struct_rank_size_lb; + MPI_Datatype struct_rank_type; + + buildRankDataType(&struct_rank_type); + + MPI_Type_get_extent(struct_rank_type, &struct_rank_size_lb, &struct_rank_size); + + rank.shm_rank = shm_rank; + rank.world_rank = world_rank; + + void *ranks = malloc(struct_rank_size * (world_size)); + + MPI_Gather (&rank, 1, struct_rank_type, ranks, 1, struct_rank_type, 0, MPI_COMM_WORLD); + + if ( world_rank == 0 ) { - // Master Processing - masterProcessingLogic(howmany, 0); + int node_count = numberOfNodes(ranks, struct_rank_size); + + // calculate how many samples are going to be distributed among all nodes + nodeHowmany = howmany / node_count; + int remainder = howmany % node_count; + + int i, pendingNodeMasters = 0; + if ( node_count > 1 ) { + for (i = 1; i < world_size; ++i) // don't include global master now + { + Rank_Struct *r; + r = ranks + struct_rank_size * i; + if (r->shm_rank == 0) { + MPI_Send(&nodeHowmany, 1, MPI_INT, r->world_rank, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD); + pendingNodeMasters += 1; + } + } + } - // if we're doing shm - // then listen for results from other nodes + // process my own samples + masterProcessingLogic(nodeHowmany + remainder, 0); + + if ( shm_mode ) + { + int source; + while ( pendingNodeMasters ) { + shm_results = readResults(MPI_COMM_WORLD, &source); + printf("%s", shm_results); + free(shm_results); + pendingNodeMasters -= 1; + } + } } else { - // Worker Processing - parallelSeed(localSeedMatrix); + if ( shm_rank == 0 ) + { + MPI_Recv(&nodeHowmany, 1, MPI_INT, 0, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + masterProcessingLogic(howmany, 0); + } + else + { + // Worker Processing + parallelSeed(localSeedMatrix); + } } + + MPI_Type_free(&struct_rank_type); } -// return 0; + return world_rank; } +/** + * Calculates how many nodes are there taking the SHM ranks from all MPI processes. + * This function is useful to determine the number of node masters. + */ +int +numberOfNodes(void *ranks, MPI_Aint rank_size) +{ + int i, result = 0; + Rank_Struct *rank; + for ( i = 0; i < world_size; i ++) { + rank = ranks + rank_size * i; + if (rank->shm_rank == 0) + result += 1; + } + + return result; +} + +void +buildRankDataType(MPI_Datatype* result) { + Rank_Struct rank; + { + int blocklen[] = {1, 1}; + MPI_Aint addr[3]; + MPI_Get_address(&rank, &addr[0]); + MPI_Get_address(&rank.shm_rank, &addr[1]); + MPI_Get_address(&rank.world_rank, &addr[2]); + MPI_Aint displacements[] = {addr[1] - addr[0], addr[2] - addr[0]}; + MPI_Datatype types[2] = {MPI_INT, MPI_INT}; + MPI_Type_create_struct(2, blocklen, displacements, types, result); + MPI_Type_commit(result); + } +} + void masterWorkerTeardown() { MPI_Finalize(); @@ -163,68 +260,61 @@ masterWorkerTeardown() { void masterProcessingLogic(int howmany, int lastAssignedProcess) { - int *processActivity = (int*) malloc(world_size * sizeof(int)); - processActivity[0] = 1; // Master does not generate replicas - int i; + int *processActivity = (int*) malloc(shm_size * sizeof(int)); + if ( howmany > 0 ) { + processActivity[0] = 1; // Master does not generate replicas - // 1. if there are more nodes - // 2. then we should split the work among nodes - // 2.1 by doing world_size/shm_size we know how many masters are there around - // 2.2 each shm_rank = 0 does the "masterProcessingLogic", but with reduced howmany and world_size + int i; + for (i = 1; i < shm_size; i++) { + processActivity[i] = 0; + } - for(i=1; i 0) { + int idleProcess = findIdleProcess(processActivity, lastAssignedProcess); + if (idleProcess >= 0) { + assignWork(processActivity, idleProcess, 1); + lastAssignedProcess = idleProcess; + howmany--; + } + else { + sample = readResultsFromWorkers(1, processActivity); + offset = strlen(results); + length = strlen(sample); + results = realloc(results, offset + length + 1); + memcpy(results + offset, sample, length); + free(sample); - while(howmany > 0) - { - int idleProcess = findIdleProcess(processActivity, lastAssignedProcess); - if(idleProcess >= 0) - { - assignWork(processActivity, idleProcess, 1); - lastAssignedProcess = idleProcess; - howmany--; + pendingJobs--; + } } - else - { - sample = readResultsFromWorkers(1, processActivity); + + while (pendingJobs > 0) { + sample = readResultsFromWorkers(0, processActivity); offset = strlen(results); length = strlen(sample); results = realloc(results, offset + length + 1); - memcpy(results+offset, sample, length); + memcpy(results + offset, sample, length); free(sample); pendingJobs--; } - } - while(pendingJobs > 0) - { - char *sample = readResultsFromWorkers(0, processActivity); - offset = strlen(results); - length = strlen(sample); - results = realloc(results, offset + length + 1); - memcpy(results+offset, sample, length); - free(sample); - - pendingJobs--; - } - fprintf(stdout, "%s", results); - free(results); // be good citizen + if (world_rank == 0) { + fprintf(stdout, "%s", results); + } + else { + MPI_Send(results, strlen(results) + 1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); + } - // 1. at this point all of the work was done - // 2. if there are more nodes (shm_mode?) - // 3. if I'm not the super master, then send results to WORLD rank 0 - // 4. otherwise print the accumulated results? (or not accumulate them if not needed?) - // don't forget to 'free' the results + free(results); // be good citizen + } } /* @@ -239,21 +329,31 @@ masterProcessingLogic(int howmany, int lastAssignedProcess) * @return the generated sample */ char* readResultsFromWorkers(int goToWork, int* workersActivity) +{ + char *readResults(MPI_Comm, int*); + + int source; + char *results = readResults(shmcomm, &source); + + MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); + + workersActivity[source]=0; + return results; +} + +char +*readResults(MPI_Comm comm, int *source) { MPI_Status status; int size; - int source; - MPI_Probe(MPI_ANY_SOURCE, RESULTS_TAG, MPI_COMM_WORLD, &status); + MPI_Probe(MPI_ANY_SOURCE, RESULTS_TAG, comm, &status); MPI_Get_count(&status, MPI_CHAR, &size); - source = status.MPI_SOURCE; - char * results = (char *) malloc(size*sizeof(char)); + *source = status.MPI_SOURCE; + char *results = (char *) malloc(size*sizeof(char)); - MPI_Recv(results, size, MPI_CHAR, source, RESULTS_TAG, MPI_COMM_WORLD, &status); - source = status.MPI_SOURCE; - MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, MPI_COMM_WORLD); + MPI_Recv(results, size, MPI_CHAR, *source, RESULTS_TAG, comm, MPI_STATUS_IGNORE); - workersActivity[source]=0; return results; } @@ -300,7 +400,7 @@ int findIdleProcess(int *processActivity, int lastAssignedProcess) { * @param samples samples the worker is going to generate */ void assignWork(int* workersActivity, int worker, int samples) { - MPI_Send(&samples, 1, MPI_INT, worker, SAMPLES_NUMBER_TAG, MPI_COMM_WORLD); + MPI_Send(&samples, 1, MPI_INT, worker, SAMPLES_NUMBER_TAG, shmcomm); //TODO check usage of MPI_Sendv?? workersActivity[worker]=1; } @@ -357,7 +457,7 @@ int receiveWorkRequest(){ int samples; MPI_Status status; - MPI_Recv(&samples, 1, MPI_INT, 0, SAMPLES_NUMBER_TAG, MPI_COMM_WORLD, &status); + MPI_Recv(&samples, 1, MPI_INT, 0, SAMPLES_NUMBER_TAG, shmcomm, &status); return samples; } @@ -365,7 +465,7 @@ int isThereMoreWork() { int goToWork; MPI_Status status; - MPI_Recv(&goToWork, 1, MPI_INT, 0, GO_TO_WORK_TAG, MPI_COMM_WORLD, &status); + MPI_Recv(&goToWork, 1, MPI_INT, 0, GO_TO_WORK_TAG, shmcomm, &status); return goToWork; } @@ -527,7 +627,7 @@ char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes){ */ void sendResultsToMasterProcess(char* results) { - MPI_Send(results, strlen(results)+1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); + MPI_Send(results, strlen(results)+1, MPI_CHAR, 0, RESULTS_TAG, shmcomm); } // ************************************** // From cb00a48e5338ac8b8fa9e470baf9a993b787d834 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Tue, 19 Apr 2016 23:36:39 +0200 Subject: [PATCH 06/23] computation / communication interleaving improvement When there is only one node, then each worker directly outputs the results (vs. using a point-to-point communication for sending the results to the master process). When there are more than one node, then a second master is created in the same node where the global master resides, and results are outputted the same exact way as before (in that node only). This way the global master is released from the coordination of local sample generation, hence it can coordinate the generation among other nodes. Now the global master should not be the bottleneck. --- ms.c | 6 +- mspar.c | 166 +++++++++++++++++++++++++++++++++++++++++--------------- mspar.h | 19 +++++-- 3 files changed, 141 insertions(+), 50 deletions(-) diff --git a/ms.c b/ms.c index cccd048..0085544 100644 --- a/ms.c +++ b/ms.c @@ -138,9 +138,11 @@ int main(int argc, char *argv[]){ pars = getpars(argc, argv, &howmany, ntbs, count); // Master-Worker - int myRank = masterWorkerSetup(argc, argv, howmany, pars, SITESINC); + int excludeFrom; + int myRank = masterWorkerSetup(argc, argv, howmany, pars, SITESINC, &excludeFrom); - if(myRank <= howmany && myRank > 0) + + if(myRank <= howmany && myRank > excludeFrom) { while(workerProcess(pars, SITESINC)); } diff --git a/mspar.c b/mspar.c index 4ace009..231b0a5 100644 --- a/mspar.c +++ b/mspar.c @@ -3,12 +3,13 @@ #include #include "ms.h" #include "mspar.h" -#include /* OpenMPI library */ +//#include /* OpenMPI library */ const int SAMPLES_NUMBER_TAG = 200; const int RESULTS_TAG = 300; const int GO_TO_WORK_TAG = 400; const int NODE_MASTER_ASSIGNMENT = 500; // Used to assign a number of samples to a node master +const int ACK_TAG = 600; // Used by workers in the master node typedef struct { int shm_rank; @@ -28,12 +29,8 @@ int shm_mode = 0; // indicates whether MPI-3 SHM is going to be applied // ************************************** // int -masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites) +masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites, int *excludeFrom) { - int numberOfNodes(void*, MPI_Aint); - void buildRankDataType(MPI_Datatype*); - char *readResults(MPI_Comm, int*); - // goToWork : used by workers to realize if there is more work to do. // seedMatrix : matrix containing the RNG seeds to be distributed to working processes. // localSeedMatrix : matrix used by workers to receive RNG seeds from master. @@ -61,6 +58,8 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, shm_mode = 1; } + *excludeFrom = shm_mode; + if(world_rank == 0) { int i; @@ -68,8 +67,8 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, for(i=0; i howmany) @@ -165,9 +164,14 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, nodeHowmany = howmany / node_count; int remainder = howmany % node_count; + + int samples = nodeHowmany + remainder; + MPI_Send(&samples, 1, MPI_INT, 1, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD); // process 1 will output the results int i, pendingNodeMasters = 0; + if ( node_count > 1 ) { - for (i = 1; i < world_size; ++i) // don't include global master now + // Distribute samples among nodes + for (i = 1; i < world_size; ++i) // don't include node with global master now { Rank_Struct *r; r = ranks + struct_rank_size * i; @@ -179,14 +183,15 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, } // process my own samples - masterProcessingLogic(nodeHowmany + remainder, 0); + masterProcessingLogic(nodeHowmany + remainder, shm_mode); if ( shm_mode ) { int source; while ( pendingNodeMasters ) { shm_results = readResults(MPI_COMM_WORLD, &source); - printf("%s", shm_results); + fprintf(stdout, "%s", shm_results); + fflush(stdout); free(shm_results); pendingNodeMasters -= 1; } @@ -194,14 +199,15 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, } else { - if ( shm_rank == 0 ) + if ( shm_mode && ( shm_rank == 0 || world_rank == 1 ) ) { MPI_Recv(&nodeHowmany, 1, MPI_INT, 0, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD, MPI_STATUS_IGNORE); masterProcessingLogic(howmany, 0); + } else { - // Worker Processing + // Workers initialize the RGN parallelSeed(localSeedMatrix); } } @@ -261,11 +267,18 @@ void masterProcessingLogic(int howmany, int lastAssignedProcess) { int *processActivity = (int*) malloc(shm_size * sizeof(int)); + int node_offset = 0; + if ( world_rank == 1 ) + { + node_offset = 1; + processActivity[1] = 1; + } + if ( howmany > 0 ) { processActivity[0] = 1; // Master does not generate replicas int i; - for (i = 1; i < shm_size; i++) { + for (i = node_offset+1; i < shm_size; i++) { processActivity[i] = 0; } @@ -277,46 +290,95 @@ masterProcessingLogic(int howmany, int lastAssignedProcess) results = malloc(sizeof(char) * 1000); while (howmany > 0) { - int idleProcess = findIdleProcess(processActivity, lastAssignedProcess); + int idleProcess = findIdleProcess(processActivity, lastAssignedProcess, node_offset); if (idleProcess >= 0) { assignWork(processActivity, idleProcess, 1); lastAssignedProcess = idleProcess; howmany--; } - else { - sample = readResultsFromWorkers(1, processActivity); - offset = strlen(results); - length = strlen(sample); - results = realloc(results, offset + length + 1); - memcpy(results + offset, sample, length); - free(sample); + else + { + // we don't really get results from workers when they're located at same node as the global master, but such workers + // rather directly output the results. This scenario may only happen when either current rank is 0 (1 node) or 1 (more than 1 node) + if (world_rank != 0 && shm_rank == 0) + { + sample = readResultsFromWorkers(1, processActivity); + offset = strlen(results); + length = strlen(sample); + results = realloc(results, offset + length + 1); + memcpy(results + offset, sample, length); + free(sample); + } + else + { + // we need to receive some ACK to verify a sample was outputted by a local worker process + readAckFromLocalWorker(1, processActivity); + } pendingJobs--; } } while (pendingJobs > 0) { - sample = readResultsFromWorkers(0, processActivity); - offset = strlen(results); - length = strlen(sample); - results = realloc(results, offset + length + 1); - memcpy(results + offset, sample, length); - free(sample); + if (world_rank != 0 && shm_rank == 0) + { + sample = readResultsFromWorkers(0, processActivity); + offset = strlen(results); + length = strlen(sample); + results = realloc(results, offset + length + 1); + memcpy(results + offset, sample, length); + free(sample); + } + else + { + // we need to receive some ACK to verify a sample was outputted by a local worker process + readAckFromLocalWorker(0, processActivity); + } pendingJobs--; } - if (world_rank == 0) { + if ( world_rank == 0 ) { + // directly output the results fprintf(stdout, "%s", results); + fflush(stdout); } - else { - MPI_Send(results, strlen(results) + 1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); + else + { + if ( shm_rank == 0 ) // exclude the case where there are more than 1 node, and local master is world_rank=1 + { + MPI_Send(results, strlen(results) + 1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); + } } free(results); // be good citizen } } +void +readAckFromLocalWorker(int goToWork, int *workersActivity) +{ + int source; + + readAck(shmcomm, &source); + + MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); + + workersActivity[source]=0; +} + +void +readAck(MPI_Comm comm, int* source) +{ + MPI_Status status; + + MPI_Probe(MPI_ANY_SOURCE, ACK_TAG, comm, &status); + *source = status.MPI_SOURCE; + + int ack; + MPI_Recv(&ack, 1, MPI_INT, *source, ACK_TAG, comm, MPI_STATUS_IGNORE); +} + /* * * Esta función realiza dos tareas: por un lado hace que el Master escuche los resultados enviados por los workers y por @@ -365,7 +427,7 @@ char * * @return idle process index or -1 if all processes are busy. */ -int findIdleProcess(int *processActivity, int lastAssignedProcess) { +int findIdleProcess(int *processActivity, int lastAssignedProcess, int node_offset) { /* * Implementation note: lastAssignedProcess is used to implement a fairness policy in which every available process * can be assigned with some work. @@ -377,7 +439,7 @@ int findIdleProcess(int *processActivity, int lastAssignedProcess) { }; if(i >= world_size){ - i=1; // master process does not generate replicas + i = node_offset + 1; // master process does not generate replicas while(i < lastAssignedProcess && processActivity[i] == 1){ i++; } @@ -414,14 +476,15 @@ workerProcess(struct params parameters, unsigned maxsites) { int samples; char *results; + int master; - samples = receiveWorkRequest(); + samples = receiveWorkRequest(&master); results = generateSamples(samples, parameters, maxsites); - sendResultsToMasterProcess(results); + sendResultsToMasterProcess(results, master); free(results); // be good citizen - return isThereMoreWork(); + return isThereMoreWork(master); } char *generateSamples(int samples, struct params parameters, unsigned maxsites) @@ -453,19 +516,23 @@ char *generateSamples(int samples, struct params parameters, unsigned maxsites) * * @return samples to be generated */ -int receiveWorkRequest(){ +int +receiveWorkRequest(int *master){ int samples; - MPI_Status status; - MPI_Recv(&samples, 1, MPI_INT, 0, SAMPLES_NUMBER_TAG, shmcomm, &status); + *master = 0; + if ( shm_mode && world_rank == shm_rank ) { + *master = 1; + } + + MPI_Recv(&samples, 1, MPI_INT, *master, SAMPLES_NUMBER_TAG, shmcomm, MPI_STATUS_IGNORE); return samples; } -int isThereMoreWork() { +int isThereMoreWork(int master) { int goToWork; - MPI_Status status; - MPI_Recv(&goToWork, 1, MPI_INT, 0, GO_TO_WORK_TAG, shmcomm, &status); + MPI_Recv(&goToWork, 1, MPI_INT, master, GO_TO_WORK_TAG, shmcomm, MPI_STATUS_IGNORE); return goToWork; } @@ -625,9 +692,22 @@ char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes){ * @param results results to be sent * */ -void sendResultsToMasterProcess(char* results) +void sendResultsToMasterProcess(char* results, int master) { - MPI_Send(results, strlen(results)+1, MPI_CHAR, 0, RESULTS_TAG, shmcomm); + if ( shm_mode ) + { + MPI_Send(results, strlen(results)+1, MPI_CHAR, master, RESULTS_TAG, shmcomm); + } + else + { + + int ack = 1; + // MPI_Request req; + // MPI_Isend(&ack, 1, MPI_INT, master, ACK_TAG, MPI_COMM_WORLD, &req); + MPI_Send(&ack, 1, MPI_INT, master, ACK_TAG, shmcomm); + printf("%s", results); + // MPI_Wait(req, MPI_STATUS_IGNORE); + } } // ************************************** // diff --git a/mspar.h b/mspar.h index 0e0313e..eb703d2 100644 --- a/mspar.h +++ b/mspar.h @@ -1,22 +1,31 @@ -int masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites); +#include + +int masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites, int *excludeFrom); void masterWorkerTeardown(); void masterProcessingLogic(int howmany, int lastIdleWorker); int workerProcess(struct params parameters, unsigned maxsites); void doInitializeRng(int argc, char *argv[], int *seeds, struct params parameters); -void sendResultsToMasterProcess(char* results); -int receiveWorkRequest(); +void sendResultsToMasterProcess(char* results, int master); +int receiveWorkRequest(int *master); void assignWork(int* workersActivity, int assignee, int samples); char* readResultsFromWorkers(int goToWork, int* workersActivity); -int findIdleProcess(int *processActivity, int lastAssignedProcess); +int findIdleProcess(int *processActivity, int lastAssignedProcess, int node_offset); char* generateSample(struct params parameters, unsigned maxsites); char *generateSamples(int, struct params, unsigned); struct gensam_result gensam(char **gametes, double *probss, double *ptmrca, double *pttot, struct params pars, int* segsites); -int isThereMoreWork(); +int isThereMoreWork(int master); unsigned short* parallelSeed(unsigned short *seedv); char *append(char *lhs, const char *rhs); char *doPrintWorkerResultHeader(int segsites, double probss, struct params paramters, char *treeOutput); char *doPrintWorkerResultPositions(int segsites, int output_precision, double *posit); char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes); + +void readAckFromLocalWorker(int goToWork, int *workersActivity); +int numberOfNodes(void*, MPI_Aint); +void buildRankDataType(MPI_Datatype*); +char *readResults(MPI_Comm, int*); +void readAck(MPI_Comm, int*); + /* From ms.c*/ char ** cmatrix(int nsam, int len); double ran1(); From 03f5f3319ee99825b02d5b74a01aefa19d596723 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Wed, 20 Apr 2016 02:13:35 +0200 Subject: [PATCH 07/23] fix processing with higher master-worker topology --- ms.c | 5 +++-- mspar.c | 33 +++++++++++++++------------------ mspar.h | 4 ++-- 3 files changed, 20 insertions(+), 22 deletions(-) diff --git a/ms.c b/ms.c index 0085544..33c9ae1 100644 --- a/ms.c +++ b/ms.c @@ -139,10 +139,11 @@ int main(int argc, char *argv[]){ // Master-Worker int excludeFrom; - int myRank = masterWorkerSetup(argc, argv, howmany, pars, SITESINC, &excludeFrom); + int shm_rank; + int world_rank = masterWorkerSetup(argc, argv, howmany, pars, SITESINC, &excludeFrom, &shm_rank); - if(myRank <= howmany && myRank > excludeFrom) + if(world_rank <= howmany && world_rank > excludeFrom && shm_rank != 0 ) { while(workerProcess(pars, SITESINC)); } diff --git a/mspar.c b/mspar.c index 231b0a5..d994832 100644 --- a/mspar.c +++ b/mspar.c @@ -29,7 +29,7 @@ int shm_mode = 0; // indicates whether MPI-3 SHM is going to be applied // ************************************** // int -masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites, int *excludeFrom) +masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites, int *excludeFrom, int *node_rank) { // goToWork : used by workers to realize if there is more work to do. // seedMatrix : matrix containing the RNG seeds to be distributed to working processes. @@ -49,7 +49,9 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); // MPI_COMM_TYPE_SHARED: This type splits the communicator into subcommunicators, each of which can create a shared memory region. - MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); + //MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); + int color = world_rank < 3; + MPI_Comm_split(MPI_COMM_WORLD, color, world_rank, &shmcomm); MPI_Comm_size( shmcomm, &shm_size ); MPI_Comm_rank( shmcomm, &shm_rank ); @@ -59,6 +61,7 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, } *excludeFrom = shm_mode; + *node_rank = shm_rank; if(world_rank == 0) { @@ -182,9 +185,6 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, } } - // process my own samples - masterProcessingLogic(nodeHowmany + remainder, shm_mode); - if ( shm_mode ) { int source; @@ -202,7 +202,7 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, if ( shm_mode && ( shm_rank == 0 || world_rank == 1 ) ) { MPI_Recv(&nodeHowmany, 1, MPI_INT, 0, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - masterProcessingLogic(howmany, 0); + masterProcessingLogic(nodeHowmany, 0); } else @@ -300,7 +300,7 @@ masterProcessingLogic(int howmany, int lastAssignedProcess) { // we don't really get results from workers when they're located at same node as the global master, but such workers // rather directly output the results. This scenario may only happen when either current rank is 0 (1 node) or 1 (more than 1 node) - if (world_rank != 0 && shm_rank == 0) + if (world_rank > 1) { sample = readResultsFromWorkers(1, processActivity); offset = strlen(results); @@ -338,6 +338,7 @@ masterProcessingLogic(int howmany, int lastAssignedProcess) pendingJobs--; } + if ( world_rank == 0 ) { // directly output the results fprintf(stdout, "%s", results); @@ -350,7 +351,6 @@ masterProcessingLogic(int howmany, int lastAssignedProcess) MPI_Send(results, strlen(results) + 1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); } } - free(results); // be good citizen } } @@ -360,7 +360,7 @@ readAckFromLocalWorker(int goToWork, int *workersActivity) { int source; - readAck(shmcomm, &source); + readAck(&source); MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); @@ -368,15 +368,15 @@ readAckFromLocalWorker(int goToWork, int *workersActivity) } void -readAck(MPI_Comm comm, int* source) +readAck(int* source) { MPI_Status status; - MPI_Probe(MPI_ANY_SOURCE, ACK_TAG, comm, &status); + MPI_Probe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &status); *source = status.MPI_SOURCE; int ack; - MPI_Recv(&ack, 1, MPI_INT, *source, ACK_TAG, comm, MPI_STATUS_IGNORE); + MPI_Recv(&ack, 1, MPI_INT, *source, ACK_TAG, shmcomm, MPI_STATUS_IGNORE); } /* @@ -434,11 +434,11 @@ int findIdleProcess(int *processActivity, int lastAssignedProcess, int node_offs */ int result = -1; int i= lastAssignedProcess+1; // master process does not generate replicas - while(i < world_size && processActivity[i] == 1){ + while(i < shm_size && processActivity[i] == 1){ i++; }; - if(i >= world_size){ + if(i >= shm_size){ i = node_offset + 1; // master process does not generate replicas while(i < lastAssignedProcess && processActivity[i] == 1){ i++; @@ -694,7 +694,7 @@ char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes){ */ void sendResultsToMasterProcess(char* results, int master) { - if ( shm_mode ) + if ( shm_mode && world_rank != shm_rank ) { MPI_Send(results, strlen(results)+1, MPI_CHAR, master, RESULTS_TAG, shmcomm); } @@ -702,11 +702,8 @@ void sendResultsToMasterProcess(char* results, int master) { int ack = 1; - // MPI_Request req; - // MPI_Isend(&ack, 1, MPI_INT, master, ACK_TAG, MPI_COMM_WORLD, &req); MPI_Send(&ack, 1, MPI_INT, master, ACK_TAG, shmcomm); printf("%s", results); - // MPI_Wait(req, MPI_STATUS_IGNORE); } } diff --git a/mspar.h b/mspar.h index eb703d2..615f54f 100644 --- a/mspar.h +++ b/mspar.h @@ -1,6 +1,6 @@ #include -int masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites, int *excludeFrom); +int masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites, int *excludeFrom, int *node_rank); void masterWorkerTeardown(); void masterProcessingLogic(int howmany, int lastIdleWorker); int workerProcess(struct params parameters, unsigned maxsites); @@ -24,7 +24,7 @@ void readAckFromLocalWorker(int goToWork, int *workersActivity); int numberOfNodes(void*, MPI_Aint); void buildRankDataType(MPI_Datatype*); char *readResults(MPI_Comm, int*); -void readAck(MPI_Comm, int*); +void readAck(int*); /* From ms.c*/ char ** cmatrix(int nsam, int len); From defd7b684f6bba5a9ab95df64a7793b60aace5c2 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Wed, 20 Apr 2016 02:22:53 +0200 Subject: [PATCH 08/23] fix processing with higher master-worker topology --- mspar.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/mspar.c b/mspar.c index d994832..485dd2f 100644 --- a/mspar.c +++ b/mspar.c @@ -49,9 +49,7 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); // MPI_COMM_TYPE_SHARED: This type splits the communicator into subcommunicators, each of which can create a shared memory region. - //MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); - int color = world_rank < 3; - MPI_Comm_split(MPI_COMM_WORLD, color, world_rank, &shmcomm); + MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); MPI_Comm_size( shmcomm, &shm_size ); MPI_Comm_rank( shmcomm, &shm_rank ); From b88bc40060cb17cb5ac27bce1c2e3aaf876f59df Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Wed, 20 Apr 2016 09:04:49 +0200 Subject: [PATCH 09/23] improvement for the scenario with 1 node only When there is only one node, then a secondary master is not needed, meaning an extra MPI process is available for generating samples. --- mspar.c | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/mspar.c b/mspar.c index 485dd2f..a1093a1 100644 --- a/mspar.c +++ b/mspar.c @@ -157,21 +157,24 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, MPI_Gather (&rank, 1, struct_rank_type, ranks, 1, struct_rank_type, 0, MPI_COMM_WORLD); - if ( world_rank == 0 ) + if ( world_rank == 0 ) // Global master { - int node_count = numberOfNodes(ranks, struct_rank_size); + int i, pendingNodeMasters = 0; - // calculate how many samples are going to be distributed among all nodes - nodeHowmany = howmany / node_count; - int remainder = howmany % node_count; + // Distribute remaining samples. + if ( shm_mode ) + { + int node_count = numberOfNodes(ranks, struct_rank_size); + // calculate how many samples are going to be distributed among all nodes + nodeHowmany = howmany / node_count; + int remainder = howmany % node_count; - int samples = nodeHowmany + remainder; - MPI_Send(&samples, 1, MPI_INT, 1, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD); // process 1 will output the results - int i, pendingNodeMasters = 0; + // Delegate samples on node where the global master resides to a secondary master node (workd_rank = 1). + int samples = nodeHowmany + remainder; + MPI_Send(&samples, 1, MPI_INT, 1, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD); // process 1 will output the results - if ( node_count > 1 ) { - // Distribute samples among nodes + // Distribute samples among remaining nodes for (i = 1; i < world_size; ++i) // don't include node with global master now { Rank_Struct *r; @@ -181,10 +184,8 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, pendingNodeMasters += 1; } } - } - if ( shm_mode ) - { + // Receive results from master nodes int source; while ( pendingNodeMasters ) { shm_results = readResults(MPI_COMM_WORLD, &source); @@ -194,10 +195,14 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, pendingNodeMasters -= 1; } } + else // There is only one node, hence a secondary master is not needed + { + masterProcessingLogic(howmany, 0); + } } else { - if ( shm_mode && ( shm_rank == 0 || world_rank == 1 ) ) + if ( shm_rank == 0 || ( shm_mode && world_rank == 1 ) ) { MPI_Recv(&nodeHowmany, 1, MPI_INT, 0, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD, MPI_STATUS_IGNORE); masterProcessingLogic(nodeHowmany, 0); From 4b8e969b39420b798c60bf59b80abfee90f8a079 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Mon, 25 Apr 2016 07:31:18 +0200 Subject: [PATCH 10/23] Direct output and all-work mode in in master Workers located in master node now directly send the generated samples to the standard output. Master also generates samples when there is no other work to do. This happens in the master node only. --- mspar.c | 146 ++++++++++++++++++++++++++------------------------------ mspar.h | 11 ++--- 2 files changed, 74 insertions(+), 83 deletions(-) diff --git a/mspar.c b/mspar.c index a1093a1..ca264c4 100644 --- a/mspar.c +++ b/mspar.c @@ -29,7 +29,7 @@ int shm_mode = 0; // indicates whether MPI-3 SHM is going to be applied // ************************************** // int -masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites, int *excludeFrom, int *node_rank) +masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, unsigned int maxsites, int *excludeFrom, int *node_rank) { // goToWork : used by workers to realize if there is more work to do. // seedMatrix : matrix containing the RNG seeds to be distributed to working processes. @@ -65,31 +65,25 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, { int i; // Only the master process prints out the application's parameters - for(i=0; i howmany) - { world_size = howmany + 1; // the extra 1 is due to the master - } int nseeds; doInitializeRng(argc, argv, &nseeds, parameters); int dimension = nseeds * world_size; seedMatrix = (unsigned short *) malloc(sizeof(unsigned short) * dimension); for(i=0; ishm_rank == 0) { @@ -187,25 +178,20 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, // Receive results from master nodes int source; - while ( pendingNodeMasters ) { + while (pendingNodeMasters) { shm_results = readResults(MPI_COMM_WORLD, &source); fprintf(stdout, "%s", shm_results); fflush(stdout); free(shm_results); pendingNodeMasters -= 1; } + } else { // There is only one node, hence a secondary master is not needed + masterProcessingLogic(howmany, 0, parameters, maxsites); } - else // There is only one node, hence a secondary master is not needed - { - masterProcessingLogic(howmany, 0); - } - } - else - { - if ( shm_rank == 0 || ( shm_mode && world_rank == 1 ) ) - { + } else { + if (shm_rank == 0 || (shm_mode && world_rank == 1)) { MPI_Recv(&nodeHowmany, 1, MPI_INT, 0, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - masterProcessingLogic(nodeHowmany, 0); + masterProcessingLogic(nodeHowmany, 0, parameters, maxsites); } else @@ -267,23 +253,21 @@ masterWorkerTeardown() { * @param lastAssignedProcess last processes that has been assigned som work */ void -masterProcessingLogic(int howmany, int lastAssignedProcess) +masterProcessingLogic(int howmany, int lastAssignedProcess, struct params parameters, unsigned int maxsites) { int *processActivity = (int*) malloc(shm_size * sizeof(int)); int node_offset = 0; - if ( world_rank == 1 ) - { + if (world_rank == 1) { // it is the secondary master located at the main node (where the global master resides) node_offset = 1; processActivity[1] = 1; } - if ( howmany > 0 ) { - processActivity[0] = 1; // Master does not generate replicas + if (howmany > 0) { + processActivity[0] = 1; // Master initially does not generate replicas int i; - for (i = node_offset+1; i < shm_size; i++) { + for (i = node_offset+1; i < shm_size; i++) // secondary master initially does not generate replicas processActivity[i] = 0; - } int pendingJobs = howmany; // number of jobs already assigned but pending to be finalized @@ -292,50 +276,45 @@ masterProcessingLogic(int howmany, int lastAssignedProcess) size_t offset, length; results = malloc(sizeof(char) * 1000); - while (howmany > 0) { + while (howmany > 0) { // Assign sample generation jobs to all available workers int idleProcess = findIdleProcess(processActivity, lastAssignedProcess, node_offset); if (idleProcess >= 0) { assignWork(processActivity, idleProcess, 1); lastAssignedProcess = idleProcess; howmany--; - } - else - { - // we don't really get results from workers when they're located at same node as the global master, but such workers - // rather directly output the results. This scenario may only happen when either current rank is 0 (1 node) or 1 (more than 1 node) - if (world_rank > 1) - { + } else { + // Collect previously assigned sample generation jobs + if (world_rank > 1) { // it is not the main node, hence samples are received using MPI point-to-point sample = readResultsFromWorkers(1, processActivity); offset = strlen(results); length = strlen(sample); results = realloc(results, offset + length + 1); memcpy(results + offset, sample, length); free(sample); - } - else - { - // we need to receive some ACK to verify a sample was outputted by a local worker process - readAckFromLocalWorker(1, processActivity); - } + } else { // In main node all workers directly outputs the samples, but... + // ...we need to receive some ACK to verify a sample was indeed outputted + int additional_samples = readAckFromLocalWorker(howmany, processActivity, parameters, maxsites); + // need to update counters if applicable + pendingJobs -= additional_samples; + howmany -= additional_samples; + } pendingJobs--; } } while (pendingJobs > 0) { - if (world_rank != 0 && shm_rank == 0) - { + if (world_rank != 0 && shm_rank == 0) { sample = readResultsFromWorkers(0, processActivity); offset = strlen(results); length = strlen(sample); results = realloc(results, offset + length + 1); memcpy(results + offset, sample, length); free(sample); - } - else - { + } else { + fflush(stdout); // we need to receive some ACK to verify a sample was outputted by a local worker process - readAckFromLocalWorker(0, processActivity); + readAckFromLocalWorker(0, processActivity, parameters, maxsites); } pendingJobs--; @@ -350,36 +329,50 @@ masterProcessingLogic(int howmany, int lastAssignedProcess) else { if ( shm_rank == 0 ) // exclude the case where there are more than 1 node, and local master is world_rank=1 - { MPI_Send(results, strlen(results) + 1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); - } } free(results); // be good citizen } } -void -readAckFromLocalWorker(int goToWork, int *workersActivity) + +int readAckFromLocalWorker(int remaining, int *workersActivity, struct params parameters, unsigned int maxsites) { int source; + int goToWork = 0; + int additional_samples = 0; + MPI_Status status; + int msg_avail = 0; + + ///MPI_Probe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &status); + while (!msg_avail) { + // This function is called by secondary master in main node only, hence it is safe to generate an extra sample + // and send it to the standard output. + MPI_Iprobe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &msg_avail, &status); + if (remaining) { // then generate sample while others reply + char *results = generateSamples(1, parameters, maxsites); + fprintf(stdout, "%s", results); + fflush(stdout); + free(results); + remaining--; + additional_samples++; + } + } - readAck(&source); + source = status.MPI_SOURCE; - MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); + int ack; + MPI_Recv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, MPI_STATUS_IGNORE); - workersActivity[source]=0; -} + // Now let the worker know whether there are (still) any pending samples + if (remaining > 0) + goToWork = 1; -void -readAck(int* source) -{ - MPI_Status status; + MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); - MPI_Probe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &status); - *source = status.MPI_SOURCE; + workersActivity[source]=0; - int ack; - MPI_Recv(&ack, 1, MPI_INT, *source, ACK_TAG, shmcomm, MPI_STATUS_IGNORE); + return additional_samples; } /* @@ -555,7 +548,6 @@ generateSample(struct params parameters, unsigned maxsites) double probss, tmrca, ttot; char *results; char **gametes; - double *positions; struct gensam_result gensamResults; if( parameters.mp.segsitesin == 0 ) @@ -564,29 +556,26 @@ generateSample(struct params parameters, unsigned maxsites) gametes = cmatrix(parameters.cp.nsam, parameters.mp.segsitesin+1 ); gensamResults = gensam(gametes, &probss, &tmrca, &ttot, parameters, &segsites); - positions = gensamResults.positions; results = doPrintWorkerResultHeader(segsites, probss, parameters, gensamResults.tree); offset = strlen(results); if(segsites > 0) { - char *positionsStr = doPrintWorkerResultPositions(segsites, parameters.output_precision, positions); + char *positionsStr = doPrintWorkerResultPositions(segsites, parameters.output_precision, gensamResults.positions); positionStrLength = strlen(positionsStr); + char *gametesStr = doPrintWorkerResultGametes(segsites, parameters.cp.nsam, gametes); gametesStrLenght = strlen(gametesStr); results = realloc(results, offset + positionStrLength + gametesStrLenght + 1); - //sprintf(results, "%s%s", results, positionsStr); memcpy(results+offset, positionsStr, positionStrLength+1); offset += positionStrLength; memcpy(results+offset, gametesStr, gametesStrLenght+1); - free(positionsStr); free(gametesStr); - free(gensamResults.positions); if( parameters.mp.timeflag ) { free(gensamResults.tree); } @@ -661,6 +650,8 @@ char *doPrintWorkerResultPositions(int segsites, int output_precision, double *p offset += positionStrLength; } + free(positionStr); + return results; } @@ -706,7 +697,8 @@ void sendResultsToMasterProcess(char* results, int master) int ack = 1; MPI_Send(&ack, 1, MPI_INT, master, ACK_TAG, shmcomm); - printf("%s", results); + fprintf(stdout, "%s", results); + fflush(stdout); } } diff --git a/mspar.h b/mspar.h index 615f54f..6379f64 100644 --- a/mspar.h +++ b/mspar.h @@ -1,16 +1,16 @@ #include -int masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites, int *excludeFrom, int *node_rank); +int masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int unsigned maxsites, int *excludeFrom, int *node_rank); void masterWorkerTeardown(); -void masterProcessingLogic(int howmany, int lastIdleWorker); -int workerProcess(struct params parameters, unsigned maxsites); +void masterProcessingLogic(int howmany, int lastIdleWorker, struct params parameters, unsigned int maxsites); +int workerProcess(struct params parameters, unsigned int maxsites); void doInitializeRng(int argc, char *argv[], int *seeds, struct params parameters); void sendResultsToMasterProcess(char* results, int master); int receiveWorkRequest(int *master); void assignWork(int* workersActivity, int assignee, int samples); char* readResultsFromWorkers(int goToWork, int* workersActivity); int findIdleProcess(int *processActivity, int lastAssignedProcess, int node_offset); -char* generateSample(struct params parameters, unsigned maxsites); +char* generateSample(struct params parameters, unsigned int maxsites); char *generateSamples(int, struct params, unsigned); struct gensam_result gensam(char **gametes, double *probss, double *ptmrca, double *pttot, struct params pars, int* segsites); int isThereMoreWork(int master); @@ -20,11 +20,10 @@ char *doPrintWorkerResultHeader(int segsites, double probss, struct params param char *doPrintWorkerResultPositions(int segsites, int output_precision, double *posit); char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes); -void readAckFromLocalWorker(int goToWork, int *workersActivity); +int readAckFromLocalWorker(int remaining, int *workersActivity, struct params parameters, unsigned int maxsites); int numberOfNodes(void*, MPI_Aint); void buildRankDataType(MPI_Datatype*); char *readResults(MPI_Comm, int*); -void readAck(int*); /* From ms.c*/ char ** cmatrix(int nsam, int len); From f67e74fbfaff5037b1d32933721597e85829de05 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Fri, 29 Apr 2016 18:09:16 +0200 Subject: [PATCH 11/23] reduce number of messages used for coordination --- ms.c | 2 +- mspar.c | 161 ++++++++++++++++++++++++++++++++------------------------ mspar.h | 2 +- 3 files changed, 95 insertions(+), 70 deletions(-) diff --git a/ms.c b/ms.c index 33c9ae1..6233283 100644 --- a/ms.c +++ b/ms.c @@ -145,7 +145,7 @@ int main(int argc, char *argv[]){ if(world_rank <= howmany && world_rank > excludeFrom && shm_rank != 0 ) { - while(workerProcess(pars, SITESINC)); + workerProcess(pars, SITESINC); } masterWorkerTeardown(); diff --git a/mspar.c b/mspar.c index ca264c4..a2a2f5a 100644 --- a/mspar.c +++ b/mspar.c @@ -155,7 +155,7 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int i, pendingNodeMasters = 0; // Distribute remaining samples. - if (shm_mode) { + if (shm_mode) { // there is more than one node int node_count = numberOfNodes(ranks, struct_rank_size); // calculate how many samples are going to be distributed among all nodes @@ -186,10 +186,12 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, pendingNodeMasters -= 1; } } else { // There is only one node, hence a secondary master is not needed + // Note: global master visits this branch if and only if there is one single node masterProcessingLogic(howmany, 0, parameters, maxsites); } } else { if (shm_rank == 0 || (shm_mode && world_rank == 1)) { + // Note: global master never visits this branch MPI_Recv(&nodeHowmany, 1, MPI_INT, 0, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD, MPI_STATUS_IGNORE); masterProcessingLogic(nodeHowmany, 0, parameters, maxsites); @@ -276,35 +278,39 @@ masterProcessingLogic(int howmany, int lastAssignedProcess, struct params parame size_t offset, length; results = malloc(sizeof(char) * 1000); - while (howmany > 0) { // Assign sample generation jobs to all available workers - int idleProcess = findIdleProcess(processActivity, lastAssignedProcess, node_offset); - if (idleProcess >= 0) { - assignWork(processActivity, idleProcess, 1); + int idleProcess = 1; + int samples = 1; // number of samples to be generated by workers + while (howmany > 0 && idleProcess > 0) { // Send sample generation requests to all available workers + idleProcess = findIdleProcess(processActivity, lastAssignedProcess, node_offset); + if (idleProcess > 0) { + assignWork(processActivity, idleProcess, samples); lastAssignedProcess = idleProcess; howmany--; - } else { - // Collect previously assigned sample generation jobs - if (world_rank > 1) { // it is not the main node, hence samples are received using MPI point-to-point - sample = readResultsFromWorkers(1, processActivity); - offset = strlen(results); - length = strlen(sample); - results = realloc(results, offset + length + 1); - memcpy(results + offset, sample, length); - free(sample); - } else { // In main node all workers directly outputs the samples, but... - // ...we need to receive some ACK to verify a sample was indeed outputted - int additional_samples = readAckFromLocalWorker(howmany, processActivity, parameters, maxsites); - - // need to update counters if applicable - pendingJobs -= additional_samples; - howmany -= additional_samples; - } - pendingJobs--; } } + while (howmany > 0) { // Collect previously assigned sample generation jobs + if (world_rank > 1) { // it is not the main node, hence samples are received using MPI point-to-point + sample = readResultsFromWorkers(1, processActivity); + offset = strlen(results); + length = strlen(sample); + results = realloc(results, offset + length + 1); + memcpy(results + offset, sample, length); + free(sample); + howmany-= samples; // in the readResultsFromWorkers we sent the goToWork, which means another sample is going to be generated + } else { // In main node all workers directly outputs the samples, but... + // ...we need to receive some ACK to verify a sample was indeed outputted + int goToWork = 0; // when 1, then it does mean another sample was requested to a worker + int additional_samples = readAckFromLocalWorker(howmany, processActivity, parameters, maxsites, &goToWork); + + // need to update counters if applicable + pendingJobs -= additional_samples; + howmany -= additional_samples + (goToWork * samples); + } + pendingJobs-= samples; // in either branch (at least) one sample was generated + } - while (pendingJobs > 0) { - if (world_rank != 0 && shm_rank == 0) { + while (pendingJobs > 0) { // collect any non yet received sample + if (world_rank != 0 && shm_rank == 0) { // whatever node master, but the secondary master located at main node (i.e.: world_rank= 1) sample = readResultsFromWorkers(0, processActivity); offset = strlen(results); length = strlen(sample); @@ -312,23 +318,21 @@ masterProcessingLogic(int howmany, int lastAssignedProcess, struct params parame memcpy(results + offset, sample, length); free(sample); } else { - fflush(stdout); // we need to receive some ACK to verify a sample was outputted by a local worker process - readAckFromLocalWorker(0, processActivity, parameters, maxsites); + int goToWork = 0; + readAckFromLocalWorker(0, processActivity, parameters, maxsites, &goToWork); } pendingJobs--; } - if ( world_rank == 0 ) { - // directly output the results + if (world_rank == 0) { // global master, but there is only one single node, otherwise global master does not visit this function at all fprintf(stdout, "%s", results); fflush(stdout); - } - else - { - if ( shm_rank == 0 ) // exclude the case where there are more than 1 node, and local master is world_rank=1 + } else { // node master in a multi-node scenario + if (shm_rank == 0) // exclude the secondary master located at the main node (i.e. world_rank = 1) + // Note: we do not want to inlude the secondary master here, as it is already sending the samples to the standard output (see readAckFromLocalWorker function) MPI_Send(results, strlen(results) + 1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); } free(results); // be good citizen @@ -336,42 +340,56 @@ masterProcessingLogic(int howmany, int lastAssignedProcess, struct params parame } -int readAckFromLocalWorker(int remaining, int *workersActivity, struct params parameters, unsigned int maxsites) +int readAckFromLocalWorker(int remaining, int *workersActivity, struct params parameters, unsigned int maxsites, int *goToWork) { int source; - int goToWork = 0; int additional_samples = 0; MPI_Status status; - int msg_avail = 0; - - ///MPI_Probe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &status); - while (!msg_avail) { - // This function is called by secondary master in main node only, hence it is safe to generate an extra sample - // and send it to the standard output. - MPI_Iprobe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &msg_avail, &status); - if (remaining) { // then generate sample while others reply - char *results = generateSamples(1, parameters, maxsites); - fprintf(stdout, "%s", results); - fflush(stdout); - free(results); - remaining--; - additional_samples++; - } - } + + *goToWork = 0; + + MPI_Probe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &status); +// while (!msg_avail) { +// // This function is called by secondary master in main node only, hence it is safe to generate an extra sample +// // and send it to the standard output. +// MPI_Iprobe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &msg_avail, &status); +// if (remaining) { // then generate sample while others reply +// char *results = generateSamples(1, parameters, maxsites); +// fprintf(stdout, "################%s", results); +// fflush(stdout); +// free(results); +// remaining--; +// additional_samples++; +// } +// } source = status.MPI_SOURCE; int ack; - MPI_Recv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, MPI_STATUS_IGNORE); + //MPI_Recv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, MPI_STATUS_IGNORE); + MPI_Request requests[2]; + MPI_Irecv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, &requests[0]); + + if (remaining) { // then generate 1 sample while others reply + char *results = generateSamples(1, parameters, maxsites); + fprintf(stdout, "%s", results); + fflush(stdout); + free(results); + remaining--; + additional_samples++; + } // Now let the worker know whether there are (still) any pending samples if (remaining > 0) - goToWork = 1; + *goToWork = 1; - MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); + //MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); + MPI_Isend(goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm, &requests[1]); workersActivity[source]=0; + MPI_Waitall(2, requests, MPI_STATUS_IGNORE); + return additional_samples; } @@ -459,7 +477,6 @@ int findIdleProcess(int *processActivity, int lastAssignedProcess, int node_offs */ void assignWork(int* workersActivity, int worker, int samples) { MPI_Send(&samples, 1, MPI_INT, worker, SAMPLES_NUMBER_TAG, shmcomm); - //TODO check usage of MPI_Sendv?? workersActivity[worker]=1; } @@ -473,14 +490,27 @@ workerProcess(struct params parameters, unsigned maxsites) int samples; char *results; int master; + int flag = 1; + + samples = receiveWorkRequest(&master); // TODO: could just skip this and start generating samples? (or it could be used to set the "samples" number once +// results = generateSamples(samples, parameters, maxsites); +// +// sendResultsToMasterProcess(results, master); +// +// free(results); // be good citizen + + while (flag) { + results = generateSamples(samples, parameters, maxsites); - samples = receiveWorkRequest(&master); - results = generateSamples(samples, parameters, maxsites); + sendResultsToMasterProcess(results, master); - sendResultsToMasterProcess(results, master); + free(results); // be good citizen + + flag = isThereMoreWork(master); + } - free(results); // be good citizen - return isThereMoreWork(master); + //return isThereMoreWork(master); + return flag; } char *generateSamples(int samples, struct params parameters, unsigned maxsites) @@ -688,13 +718,9 @@ char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes){ */ void sendResultsToMasterProcess(char* results, int master) { - if ( shm_mode && world_rank != shm_rank ) - { + if (shm_mode && world_rank != shm_rank) { MPI_Send(results, strlen(results)+1, MPI_CHAR, master, RESULTS_TAG, shmcomm); - } - else - { - + } else { // there is either one single node, or the worker is located at the master node int ack = 1; MPI_Send(&ack, 1, MPI_INT, master, ACK_TAG, shmcomm); fprintf(stdout, "%s", results); @@ -723,8 +749,7 @@ void sendResultsToMasterProcess(char* results, int master) * A pointer to the new string (rhs appended to lhs) * *------------------------------------------------------------*/ -char * -append(char *lhs, const char *rhs) +char *append(char *lhs, const char *rhs) { const size_t len1 = strlen(lhs); const size_t len2 = strlen(rhs); diff --git a/mspar.h b/mspar.h index 6379f64..ae94d9c 100644 --- a/mspar.h +++ b/mspar.h @@ -20,7 +20,7 @@ char *doPrintWorkerResultHeader(int segsites, double probss, struct params param char *doPrintWorkerResultPositions(int segsites, int output_precision, double *posit); char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes); -int readAckFromLocalWorker(int remaining, int *workersActivity, struct params parameters, unsigned int maxsites); +int readAckFromLocalWorker(int remaining, int *workersActivity, struct params parameters, unsigned int maxsites, int *goToWork); int numberOfNodes(void*, MPI_Aint); void buildRankDataType(MPI_Datatype*); char *readResults(MPI_Comm, int*); From 5e7b8804edb2ec01b879f66ba8f0d900ee9573a0 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Mon, 2 May 2016 08:00:44 +0200 Subject: [PATCH 12/23] deactivate sample generation at master level --- mspar.c | 44 ++++++++++++++++---------------------------- 1 file changed, 16 insertions(+), 28 deletions(-) diff --git a/mspar.c b/mspar.c index a2a2f5a..0973dd5 100644 --- a/mspar.c +++ b/mspar.c @@ -335,6 +335,7 @@ masterProcessingLogic(int howmany, int lastAssignedProcess, struct params parame // Note: we do not want to inlude the secondary master here, as it is already sending the samples to the standard output (see readAckFromLocalWorker function) MPI_Send(results, strlen(results) + 1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); } + free(results); // be good citizen } } @@ -349,46 +350,33 @@ int readAckFromLocalWorker(int remaining, int *workersActivity, struct params pa *goToWork = 0; MPI_Probe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &status); -// while (!msg_avail) { -// // This function is called by secondary master in main node only, hence it is safe to generate an extra sample -// // and send it to the standard output. -// MPI_Iprobe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &msg_avail, &status); -// if (remaining) { // then generate sample while others reply -// char *results = generateSamples(1, parameters, maxsites); -// fprintf(stdout, "################%s", results); -// fflush(stdout); -// free(results); -// remaining--; -// additional_samples++; -// } -// } source = status.MPI_SOURCE; int ack; - //MPI_Recv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, MPI_STATUS_IGNORE); - MPI_Request requests[2]; - MPI_Irecv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, &requests[0]); - - if (remaining) { // then generate 1 sample while others reply - char *results = generateSamples(1, parameters, maxsites); - fprintf(stdout, "%s", results); - fflush(stdout); - free(results); - remaining--; - additional_samples++; - } + MPI_Recv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, MPI_STATUS_IGNORE); +// MPI_Request requests[2]; +// MPI_Irecv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, &requests[0]); +// +// if (remaining) { // then generate 1 sample while others reply +// char *results = generateSamples(1, parameters, maxsites); +// fprintf(stdout, "%s", results); +// fflush(stdout); +// free(results); +// remaining--; +// additional_samples++; +// } // Now let the worker know whether there are (still) any pending samples if (remaining > 0) *goToWork = 1; - //MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); - MPI_Isend(goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm, &requests[1]); + MPI_Send(goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); +// MPI_Isend(goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm, &requests[1]); workersActivity[source]=0; - MPI_Waitall(2, requests, MPI_STATUS_IGNORE); +// MPI_Waitall(2, requests, MPI_STATUS_IGNORE); return additional_samples; } From b8f415b90b1a649b14a2f286391c8eb067989b77 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Sun, 29 May 2016 22:35:21 +0200 Subject: [PATCH 13/23] revert back mspar code and bring makefile Makefile is preferred when installed CMake is lower than 3.5.1 --- CMakeLists.txt | 2 +- Makefile | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++ mspar.c | 44 +++++++++++++++++++++++++--------------- 3 files changed, 83 insertions(+), 17 deletions(-) create mode 100644 Makefile diff --git a/CMakeLists.txt b/CMakeLists.txt index bc4ba64..893756e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 2.6) +cmake_minimum_required(VERSION 3.5.1) project(msparsm) # Require MPI for this project: diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b7e20a3 --- /dev/null +++ b/Makefile @@ -0,0 +1,54 @@ +# +# +# 'make' make executable file 'mspar' +# 'make clean' removes all .o and executable files +# + +# Compiler +CC=mpicc + +# Compilation flags +CFLAGS=-O3 -std=gnu99 -I. + +# define any libraries to link into executable: +LIBS=-lm + +# Dependencies +DEPS=ms.h mspar.h + +# Folder to put the generated binaries +BIN=./bin + +# Object files +OBJ=$(BIN)/mspar.o $(BIN)/ms.o $(BIN)/streec.o + +# Random functions using drand48() +RND_48=rand1.c + +# Random functions using rand() +RND=rand2.c + +.PHONY: clean + +$(BIN)/%.o: %.c $(DEPS) + $(CC) $(CFLAGS) -c -o $@ $< + +default: $(BIN)/mspar + +# download: packages +# wget http://www.open-mpi.org/software/ompi/v1.8/downloads/openmpi-1.8.2.tar.gz +# tar -xf openmpi-1.8.2.tar.gz -C $(CURDIR)/packages + +#packages: +# mkdir packages + +clean: + rm -f $(BIN)/* + @echo "" + @echo "*** All resources were cleaned-up ***" + @echo "" + +$(BIN)/mspar: $(OBJ) + $(CC) $(CFLAGS) -o $@ $^ $(RND_48) $(LIBS) + @echo "" + @echo "*** make complete: generated executable 'mspar' ***" diff --git a/mspar.c b/mspar.c index 0973dd5..a2a2f5a 100644 --- a/mspar.c +++ b/mspar.c @@ -335,7 +335,6 @@ masterProcessingLogic(int howmany, int lastAssignedProcess, struct params parame // Note: we do not want to inlude the secondary master here, as it is already sending the samples to the standard output (see readAckFromLocalWorker function) MPI_Send(results, strlen(results) + 1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); } - free(results); // be good citizen } } @@ -350,33 +349,46 @@ int readAckFromLocalWorker(int remaining, int *workersActivity, struct params pa *goToWork = 0; MPI_Probe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &status); +// while (!msg_avail) { +// // This function is called by secondary master in main node only, hence it is safe to generate an extra sample +// // and send it to the standard output. +// MPI_Iprobe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &msg_avail, &status); +// if (remaining) { // then generate sample while others reply +// char *results = generateSamples(1, parameters, maxsites); +// fprintf(stdout, "################%s", results); +// fflush(stdout); +// free(results); +// remaining--; +// additional_samples++; +// } +// } source = status.MPI_SOURCE; int ack; - MPI_Recv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, MPI_STATUS_IGNORE); -// MPI_Request requests[2]; -// MPI_Irecv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, &requests[0]); -// -// if (remaining) { // then generate 1 sample while others reply -// char *results = generateSamples(1, parameters, maxsites); -// fprintf(stdout, "%s", results); -// fflush(stdout); -// free(results); -// remaining--; -// additional_samples++; -// } + //MPI_Recv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, MPI_STATUS_IGNORE); + MPI_Request requests[2]; + MPI_Irecv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, &requests[0]); + + if (remaining) { // then generate 1 sample while others reply + char *results = generateSamples(1, parameters, maxsites); + fprintf(stdout, "%s", results); + fflush(stdout); + free(results); + remaining--; + additional_samples++; + } // Now let the worker know whether there are (still) any pending samples if (remaining > 0) *goToWork = 1; - MPI_Send(goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); -// MPI_Isend(goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm, &requests[1]); + //MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); + MPI_Isend(goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm, &requests[1]); workersActivity[source]=0; -// MPI_Waitall(2, requests, MPI_STATUS_IGNORE); + MPI_Waitall(2, requests, MPI_STATUS_IGNORE); return additional_samples; } From 7fc4ea2d2801774e7adc8774f35fa33890c547c2 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Mon, 6 Jun 2016 18:50:06 +0200 Subject: [PATCH 14/23] semi-autonomous scheduling --- Makefile | 6 +-- ms.c | 4 +- mspar.c | 131 +++++++++++++++++++++++++++++++++++++++++-------------- mspar.h | 3 +- 4 files changed, 104 insertions(+), 40 deletions(-) diff --git a/Makefile b/Makefile index b7e20a3..3f5cae5 100644 --- a/Makefile +++ b/Makefile @@ -8,16 +8,16 @@ CC=mpicc # Compilation flags -CFLAGS=-O3 -std=gnu99 -I. +CFLAGS?=-O3 -std=gnu99 -I. # define any libraries to link into executable: -LIBS=-lm +LIBS?=-lm # Dependencies DEPS=ms.h mspar.h # Folder to put the generated binaries -BIN=./bin +BIN?=./bin # Object files OBJ=$(BIN)/mspar.o $(BIN)/ms.o $(BIN)/streec.o diff --git a/ms.c b/ms.c index 6233283..65b96a1 100644 --- a/ms.c +++ b/ms.c @@ -142,12 +142,12 @@ int main(int argc, char *argv[]){ int shm_rank; int world_rank = masterWorkerSetup(argc, argv, howmany, pars, SITESINC, &excludeFrom, &shm_rank); - +/* if(world_rank <= howmany && world_rank > excludeFrom && shm_rank != 0 ) { workerProcess(pars, SITESINC); } - +*/ masterWorkerTeardown(); } diff --git a/mspar.c b/mspar.c index a2a2f5a..a7ac6d6 100644 --- a/mspar.c +++ b/mspar.c @@ -136,27 +136,110 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, } */ - Rank_Struct rank; - MPI_Aint struct_rank_size, struct_rank_size_lb; - MPI_Datatype struct_rank_type; + if (world_size == shm_size) { // There is only one node + // No master process is needed. Every MPI process can just output the generated samples + int samples = howmany / world_size; + int remainder = howmany % world_size; + + if (world_rank == 0) // let the "global master" to generate the remainder samples as well + samples += remainder; + + char *results = generateSamples(samples, parameters, maxsites); + + fprintf(stdout, "%s", results); + fflush(stdout); + + free(results); // be good citizen + + return world_rank; + } else { + // Gather all SHM rank (local rank) from all the processes + int *shm_ranks = (int *)malloc(sizeof(int) * world_size); + + // Note: elements are ordered by process rank in MPI_COMM_WORLD communicator + MPI_Gather (&world_rank, 1, MPI_INT, shm_ranks, 1, MPI_INT, 0, MPI_COMM_WORLD); + + // Calculate number of nodes + int i = 0; + int nodes = 0; + for (i = 0; i < world_size; i++) + if (shm_ranks[i] == 0) nodes++; + + int nodeSamples = howmany / nodes; + int remainingNodeSamples = howmany % nodes; + int workerSamples = nodeSamples / (shm_size - 1); + int remainingWorkerSamples = nodeSamples % (shm_size - 1); + + if (world_rank != 0 && shm_rank != 0) { + char *results = generateSamples(workerSamples, parameters, maxsites); + if (world_rank == shm_rank) { + fprintf(stdout, "%s", results); + fflush(stdout); + + free(results); // be good citizen + } else { + // Send results to shm_rank = 0 + MPI_Send(results, strlen(results)+1, MPI_CHAR, 0, RESULTS_TAG, shmcomm); + } + } else { + if (world_rank != 0 && shm_rank == 0) { + char *results = malloc(sizeof(char) * 1000); + if (remainingWorkerSamples > 0) + results = generateSamples(remainingWorkerSamples, parameters, maxsites); + + // Receive samples from workers in same node. + int source; + size_t offset; + size_t length; + for (i = 1; i < nodes; i++){ + shm_results = readResults(shmcomm, &source); + offset = strlen(results); + length = strlen(shm_results); + results = realloc(results, offset + length + 1); + memcpy(results + offset, shm_results, length); + free(shm_results); + } + + // Send gathered results to master in master-node + MPI_Send(results, strlen(shm_results)+1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); + } else { + if (remainingNodeSamples + remainingWorkerSamples > 0) { + char *results; + results = generateSamples(remainingNodeSamples + remainingWorkerSamples, parameters, maxsites); - buildRankDataType(&struct_rank_type); + fprintf(stdout, "%s", results); + fflush(stdout); + free(results); + } + + // Receive samples from other node masters, each one sending a consolidated message + int source; + for (i = 1; i < nodes; i++){ + shm_results = readResults(MPI_COMM_WORLD, &source); + fprintf(stdout, "%s", shm_results); + fflush(stdout); + free(shm_results); + } + } + } - MPI_Type_get_extent(struct_rank_type, &struct_rank_size_lb, &struct_rank_size); + return world_rank; + } - rank.shm_rank = shm_rank; - rank.world_rank = world_rank; + MPI_Aint struct_rank_size; + MPI_Datatype struct_rank_type; - void *ranks = malloc(struct_rank_size * (world_size)); + void *ranks = malloc(sizeof(int) * world_size); - MPI_Gather (&rank, 1, struct_rank_type, ranks, 1, struct_rank_type, 0, MPI_COMM_WORLD); + int node_count = numberOfNodes(ranks); + + MPI_Barrier(MPI_COMM_WORLD); if (world_rank == 0) { // Global master int i, pendingNodeMasters = 0; // Distribute remaining samples. if (shm_mode) { // there is more than one node - int node_count = numberOfNodes(ranks, struct_rank_size); // calculate how many samples are going to be distributed among all nodes nodeHowmany = howmany / node_count; @@ -214,35 +297,17 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, * This function is useful to determine the number of node masters. */ int -numberOfNodes(void *ranks, MPI_Aint rank_size) +numberOfNodes(void *ranks) { - int i, result = 0; - Rank_Struct *rank; - for ( i = 0; i < world_size; i ++) { - rank = ranks + rank_size * i; - if (rank->shm_rank == 0) - result += 1; + int result = 1; + + if (shm_rank == 0) { + MPI_Gather (&world_rank, 1, MPI_INT, ranks, 1, MPI_INT, 0, MPI_COMM_WORLD); } return result; } -void -buildRankDataType(MPI_Datatype* result) { - Rank_Struct rank; - { - int blocklen[] = {1, 1}; - MPI_Aint addr[3]; - MPI_Get_address(&rank, &addr[0]); - MPI_Get_address(&rank.shm_rank, &addr[1]); - MPI_Get_address(&rank.world_rank, &addr[2]); - MPI_Aint displacements[] = {addr[1] - addr[0], addr[2] - addr[0]}; - MPI_Datatype types[2] = {MPI_INT, MPI_INT}; - MPI_Type_create_struct(2, blocklen, displacements, types, result); - MPI_Type_commit(result); - } -} - void masterWorkerTeardown() { MPI_Finalize(); diff --git a/mspar.h b/mspar.h index ae94d9c..410d1be 100644 --- a/mspar.h +++ b/mspar.h @@ -21,8 +21,7 @@ char *doPrintWorkerResultPositions(int segsites, int output_precision, double *p char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes); int readAckFromLocalWorker(int remaining, int *workersActivity, struct params parameters, unsigned int maxsites, int *goToWork); -int numberOfNodes(void*, MPI_Aint); -void buildRankDataType(MPI_Datatype*); +int numberOfNodes(void*); char *readResults(MPI_Comm, int*); /* From ms.c*/ From 3004ef384d9240a391905d5d4f7a9ac1d59ddf5c Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Mon, 6 Jun 2016 22:17:47 +0200 Subject: [PATCH 15/23] fix shm rank gathering process --- mspar.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mspar.c b/mspar.c index a7ac6d6..d99ed9a 100644 --- a/mspar.c +++ b/mspar.c @@ -157,7 +157,7 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int *shm_ranks = (int *)malloc(sizeof(int) * world_size); // Note: elements are ordered by process rank in MPI_COMM_WORLD communicator - MPI_Gather (&world_rank, 1, MPI_INT, shm_ranks, 1, MPI_INT, 0, MPI_COMM_WORLD); + MPI_Gather (&shm_rank, 1, MPI_INT, shm_ranks, 1, MPI_INT, 0, MPI_COMM_WORLD); // Calculate number of nodes int i = 0; From 4b6c0aef445404ff08d470001c848d5552a91361 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Mon, 6 Jun 2016 22:52:38 +0200 Subject: [PATCH 16/23] missing bcast for number of nodes to all processes --- mspar.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/mspar.c b/mspar.c index d99ed9a..47055bd 100644 --- a/mspar.c +++ b/mspar.c @@ -158,12 +158,15 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, // Note: elements are ordered by process rank in MPI_COMM_WORLD communicator MPI_Gather (&shm_rank, 1, MPI_INT, shm_ranks, 1, MPI_INT, 0, MPI_COMM_WORLD); - // Calculate number of nodes int i = 0; int nodes = 0; - for (i = 0; i < world_size; i++) - if (shm_ranks[i] == 0) nodes++; + if (world_rank == 0) { + for (i = 0; i < world_size; i++) + if (shm_ranks[i] == 0) nodes++; + } + + MPI_Bcast(&nodes, 1, MPI_INT, 0, MPI_COMM_WORLD); int nodeSamples = howmany / nodes; int remainingNodeSamples = howmany % nodes; From d3dc3e1f1cd7424edfb2049b00b48a06abbd641e Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Mon, 6 Jun 2016 23:18:36 +0200 Subject: [PATCH 17/23] fix master-worker collection --- mspar.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mspar.c b/mspar.c index 47055bd..f74d4b0 100644 --- a/mspar.c +++ b/mspar.c @@ -194,7 +194,7 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int source; size_t offset; size_t length; - for (i = 1; i < nodes; i++){ + for (i = 1; i < shm_size; i++){ shm_results = readResults(shmcomm, &source); offset = strlen(results); length = strlen(shm_results); From 3fe009e85bd30a4387dc10a0e4a9416b4b200698 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Mon, 6 Jun 2016 23:25:36 +0200 Subject: [PATCH 18/23] fix sending results to node master --- mspar.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mspar.c b/mspar.c index f74d4b0..8e89e5d 100644 --- a/mspar.c +++ b/mspar.c @@ -204,7 +204,7 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, } // Send gathered results to master in master-node - MPI_Send(results, strlen(shm_results)+1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); + MPI_Send(results, strlen(results)+1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); } else { if (remainingNodeSamples + remainingWorkerSamples > 0) { char *results; From c48f5e984e42afbd25636669e1e39d723ad78860 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Tue, 7 Jun 2016 09:16:27 +0200 Subject: [PATCH 19/23] add diagnose option --- mspar.c | 45 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/mspar.c b/mspar.c index 8e89e5d..1fc2a60 100644 --- a/mspar.c +++ b/mspar.c @@ -16,6 +16,8 @@ typedef struct { int world_rank; } Rank_Struct; +int diagnose = 0; // Used for diagnosing the application. + // Following variables are with global scope in order to facilitate its sharing among routines. // They are going to be updated in the masterWorkerSetup routine only, which is called only one, therefore there is no // risk of race conditions or whatever other concurrency related problem. @@ -31,6 +33,7 @@ int shm_mode = 0; // indicates whether MPI-3 SHM is going to be applied int masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, unsigned int maxsites, int *excludeFrom, int *node_rank) { + if (getenv("MSPARSM_DIAGNOSE")) diagnose = 1; // goToWork : used by workers to realize if there is more work to do. // seedMatrix : matrix containing the RNG seeds to be distributed to working processes. // localSeedMatrix : matrix used by workers to receive RNG seeds from master. @@ -144,12 +147,15 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, if (world_rank == 0) // let the "global master" to generate the remainder samples as well samples += remainder; - char *results = generateSamples(samples, parameters, maxsites); - - fprintf(stdout, "%s", results); - fflush(stdout); + if (diagnose) { + fprintf(stderr, "[%d] -> Generated [%d] samples.\n", world_rank, samples); + } else { + char *results = generateSamples(samples, parameters, maxsites); - free(results); // be good citizen + fprintf(stdout, "%s", results); + fflush(stdout); + free(results); // be good citizen + } return world_rank; } else { @@ -175,20 +181,31 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, if (world_rank != 0 && shm_rank != 0) { char *results = generateSamples(workerSamples, parameters, maxsites); + if (diagnose) + fprintf(stderr, "[%d] -> Generated [%d] worker samples.\n", world_rank, workerSamples); + if (world_rank == shm_rank) { fprintf(stdout, "%s", results); fflush(stdout); - free(results); // be good citizen + + if (diagnose) + fprintf(stderr, "[%d] -> Printed [%d] worker samples.\n", world_rank, workerSamples); } else { // Send results to shm_rank = 0 - MPI_Send(results, strlen(results)+1, MPI_CHAR, 0, RESULTS_TAG, shmcomm); + MPI_Send(results, strlen(results) + 1, MPI_CHAR, 0, RESULTS_TAG, shmcomm); + + if (diagnose) + fprintf(stderr, "[%d] -> Sent [%d] worker samples.\n", world_rank, workerSamples); } } else { if (world_rank != 0 && shm_rank == 0) { char *results = malloc(sizeof(char) * 1000); - if (remainingWorkerSamples > 0) + if (remainingWorkerSamples > 0) { results = generateSamples(remainingWorkerSamples, parameters, maxsites); + if (diagnose) + fprintf(stderr, "[%d] -> Generated [%d] remaining worker samples.\n", world_rank, remainingWorkerSamples); + } // Receive samples from workers in same node. int source; @@ -205,14 +222,20 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, // Send gathered results to master in master-node MPI_Send(results, strlen(results)+1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); + + if (diagnose) + fprintf(stderr, "[%d] -> Read from [%d] workers.\n", world_rank, i-1); } else { - if (remainingNodeSamples + remainingWorkerSamples > 0) { + int remaining = remainingNodeSamples + remainingWorkerSamples; + if (remaining > 0) { char *results; - results = generateSamples(remainingNodeSamples + remainingWorkerSamples, parameters, maxsites); + results = generateSamples(remaining, parameters, maxsites); fprintf(stdout, "%s", results); fflush(stdout); free(results); + if (diagnose) + fprintf(stderr, "[%d] -> Generated [%d] remaining samples.\n", world_rank, remaining); } // Receive samples from other node masters, each one sending a consolidated message @@ -223,6 +246,8 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, fflush(stdout); free(shm_results); } + if (diagnose) + fprintf(stderr, "[%d] -> Read from [%d] node masters.\n", world_rank, i-1); } } From 9c63b9e2ca74d1d07ffebc1f93656150f7377bf7 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Wed, 8 Jun 2016 00:13:11 +0200 Subject: [PATCH 20/23] simplify code and remove dead code --- ms.c | 11 +- mspar.c | 713 +++++++++++--------------------------------------------- mspar.h | 27 +-- 3 files changed, 148 insertions(+), 603 deletions(-) diff --git a/ms.c b/ms.c index 65b96a1..dd7eee5 100644 --- a/ms.c +++ b/ms.c @@ -138,16 +138,7 @@ int main(int argc, char *argv[]){ pars = getpars(argc, argv, &howmany, ntbs, count); // Master-Worker - int excludeFrom; - int shm_rank; - int world_rank = masterWorkerSetup(argc, argv, howmany, pars, SITESINC, &excludeFrom, &shm_rank); - -/* - if(world_rank <= howmany && world_rank > excludeFrom && shm_rank != 0 ) - { - workerProcess(pars, SITESINC); - } -*/ + masterWorkerSetup(argc, argv, howmany, pars, SITESINC); masterWorkerTeardown(); } diff --git a/mspar.c b/mspar.c index 1fc2a60..1b2bfb3 100644 --- a/mspar.c +++ b/mspar.c @@ -1,20 +1,10 @@ -#include #include +#include #include #include "ms.h" #include "mspar.h" -//#include /* OpenMPI library */ -const int SAMPLES_NUMBER_TAG = 200; const int RESULTS_TAG = 300; -const int GO_TO_WORK_TAG = 400; -const int NODE_MASTER_ASSIGNMENT = 500; // Used to assign a number of samples to a node master -const int ACK_TAG = 600; // Used by workers in the master node - -typedef struct { - int shm_rank; - int world_rank; -} Rank_Struct; int diagnose = 0; // Used for diagnosing the application. @@ -29,22 +19,78 @@ int shm_mode = 0; // indicates whether MPI-3 SHM is going to be applied // ************************************** // // MASTER // ************************************** // +void singleNodeProcessing(int howmany, struct params parameters, unsigned int maxsites, int *bytes) +{ + // No master process is needed. Every MPI process can just output the generated samples + int samples = howmany / world_size; + int remainder = howmany % world_size; + + if (world_rank == 0) // let the "global master" to generate the remainder samples as well + samples += remainder; + + char *results = generateSamples(samples, parameters, maxsites, bytes); + printSamples(results, *bytes); +} + +void printSamples(char *results, int bytes) +{ + fprintf(stdout, "%s", results); + fflush(stdout); + + if (diagnose) + fprintf(stderr, "[%d] -> Printed [%d] bytes.\n", world_rank, bytes); + + free(results); // be good citizen +} + +void secondaryNodeProcessing(int remaining, struct params parameters, unsigned int maxsites) +{ + int bytes = 0; + char *results = malloc(sizeof(char) * 1000); + if (remaining > 0) + results = generateSamples(remaining, parameters, maxsites, &bytes); + + // Receive samples from workers in same node. + int i; + char *shm_results; + for (i = 1; i < shm_size; i++){ + int source, length; + + shm_results = readResults(shmcomm, &source, &length); + results = realloc(results, bytes + length + 1); + memcpy(results + bytes, shm_results, length); + bytes += length + 1; + free(shm_results); + } + + // Send gathered results to master in master-node + sendResultsToMaster(results, bytes, MPI_COMM_WORLD); +} + +void sendResultsToMaster(char *results, int bytes, MPI_Comm comm) +{ + MPI_Send(results, bytes + 1, MPI_CHAR, 0, RESULTS_TAG, comm); + + if (diagnose) { + char *communicator = "MPI_COMM_WORLD"; + if (comm != MPI_COMM_WORLD) + communicator = "SHM_COMM"; + + fprintf(stderr, "[%d] -> Sent [%d] bytes to master in %s.\n", world_rank, bytes + 1, communicator); + } -int -masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, unsigned int maxsites, int *excludeFrom, int *node_rank) + free(results); +} + +void masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, unsigned int maxsites) { if (getenv("MSPARSM_DIAGNOSE")) diagnose = 1; - // goToWork : used by workers to realize if there is more work to do. // seedMatrix : matrix containing the RNG seeds to be distributed to working processes. // localSeedMatrix : matrix used by workers to receive RNG seeds from master. unsigned short *seedMatrix; unsigned short localSeedMatrix[3]; - // MPI-3 SHM related - MPI_Win win; // shm window object - char *shm; // the shared memory char *shm_results; // memory place where all MPI process from one node will going to share - int nodeHowmany; // MPI Initialization MPI_Init(&argc, &argv ); @@ -53,111 +99,22 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, // MPI_COMM_TYPE_SHARED: This type splits the communicator into subcommunicators, each of which can create a shared memory region. MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); - MPI_Comm_size( shmcomm, &shm_size ); - MPI_Comm_rank( shmcomm, &shm_rank ); + MPI_Comm_size(shmcomm, &shm_size); + MPI_Comm_rank(shmcomm, &shm_rank); if (shm_size != world_size) // there are MPI process in more than 1 computing node - { shm_mode = 1; - } - - *excludeFrom = shm_mode; - *node_rank = shm_rank; if(world_rank == 0) - { - int i; - // Only the master process prints out the application's parameters - for(i=0; i howmany) - world_size = howmany + 1; // the extra 1 is due to the master - - int nseeds; - doInitializeRng(argc, argv, &nseeds, parameters); - int dimension = nseeds * world_size; - seedMatrix = (unsigned short *) malloc(sizeof(unsigned short) * dimension); - for(i=0; i Generated [%d] samples.\n", world_rank, samples); - } else { - char *results = generateSamples(samples, parameters, maxsites); - - fprintf(stdout, "%s", results); - fflush(stdout); - free(results); // be good citizen - } - - return world_rank; + int bytes; + singleNodeProcessing(howmany, parameters, maxsites, &bytes); } else { // Gather all SHM rank (local rank) from all the processes int *shm_ranks = (int *)malloc(sizeof(int) * world_size); @@ -179,481 +136,85 @@ masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int workerSamples = nodeSamples / (shm_size - 1); int remainingWorkerSamples = nodeSamples % (shm_size - 1); + int bytes = 0; if (world_rank != 0 && shm_rank != 0) { - char *results = generateSamples(workerSamples, parameters, maxsites); - if (diagnose) - fprintf(stderr, "[%d] -> Generated [%d] worker samples.\n", world_rank, workerSamples); + char *results = generateSamples(workerSamples, parameters, maxsites, &bytes); - if (world_rank == shm_rank) { - fprintf(stdout, "%s", results); - fflush(stdout); - free(results); // be good citizen - - if (diagnose) - fprintf(stderr, "[%d] -> Printed [%d] worker samples.\n", world_rank, workerSamples); - } else { + if (world_rank == shm_rank) + printSamples(results, bytes); + else // Send results to shm_rank = 0 - MPI_Send(results, strlen(results) + 1, MPI_CHAR, 0, RESULTS_TAG, shmcomm); - - if (diagnose) - fprintf(stderr, "[%d] -> Sent [%d] worker samples.\n", world_rank, workerSamples); - } + sendResultsToMaster(results, bytes, shmcomm); } else { if (world_rank != 0 && shm_rank == 0) { - char *results = malloc(sizeof(char) * 1000); - if (remainingWorkerSamples > 0) { - results = generateSamples(remainingWorkerSamples, parameters, maxsites); - if (diagnose) - fprintf(stderr, "[%d] -> Generated [%d] remaining worker samples.\n", world_rank, remainingWorkerSamples); - } - - // Receive samples from workers in same node. - int source; - size_t offset; - size_t length; - for (i = 1; i < shm_size; i++){ - shm_results = readResults(shmcomm, &source); - offset = strlen(results); - length = strlen(shm_results); - results = realloc(results, offset + length + 1); - memcpy(results + offset, shm_results, length); - free(shm_results); - } - - // Send gathered results to master in master-node - MPI_Send(results, strlen(results)+1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); - - if (diagnose) - fprintf(stderr, "[%d] -> Read from [%d] workers.\n", world_rank, i-1); + secondaryNodeProcessing(remainingWorkerSamples, parameters, maxsites);; } else { + bytes = 0; int remaining = remainingNodeSamples + remainingWorkerSamples; if (remaining > 0) { - char *results; - results = generateSamples(remaining, parameters, maxsites); - - fprintf(stdout, "%s", results); - fflush(stdout); - free(results); - if (diagnose) - fprintf(stderr, "[%d] -> Generated [%d] remaining samples.\n", world_rank, remaining); + char *results = generateSamples(remaining, parameters, maxsites, &bytes); + printSamples(results, bytes); } // Receive samples from other node masters, each one sending a consolidated message int source; for (i = 1; i < nodes; i++){ - shm_results = readResults(MPI_COMM_WORLD, &source); - fprintf(stdout, "%s", shm_results); - fflush(stdout); - free(shm_results); + shm_results = readResults(MPI_COMM_WORLD, &source, &bytes); + printSamples(shm_results, bytes); } - if (diagnose) - fprintf(stderr, "[%d] -> Read from [%d] node masters.\n", world_rank, i-1); } } - - return world_rank; } - - MPI_Aint struct_rank_size; - MPI_Datatype struct_rank_type; - - void *ranks = malloc(sizeof(int) * world_size); - - int node_count = numberOfNodes(ranks); - - MPI_Barrier(MPI_COMM_WORLD); - - if (world_rank == 0) { // Global master - int i, pendingNodeMasters = 0; - - // Distribute remaining samples. - if (shm_mode) { // there is more than one node - - // calculate how many samples are going to be distributed among all nodes - nodeHowmany = howmany / node_count; - int remainder = howmany % node_count; - - // Delegate samples on node where the global master resides to a secondary master node (world_rank = 1). - int samples = nodeHowmany + remainder; - MPI_Send(&samples, 1, MPI_INT, 1, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD); // process 1 will output the results - - // Distribute samples among remaining nodes - for (i = 1; i < world_size; ++i) {// don't include node with global master now - Rank_Struct *r; - r = ranks + struct_rank_size * i; - if (r->shm_rank == 0) { - MPI_Send(&nodeHowmany, 1, MPI_INT, r->world_rank, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD); - pendingNodeMasters += 1; - } - } - - // Receive results from master nodes - int source; - while (pendingNodeMasters) { - shm_results = readResults(MPI_COMM_WORLD, &source); - fprintf(stdout, "%s", shm_results); - fflush(stdout); - free(shm_results); - pendingNodeMasters -= 1; - } - } else { // There is only one node, hence a secondary master is not needed - // Note: global master visits this branch if and only if there is one single node - masterProcessingLogic(howmany, 0, parameters, maxsites); - } - } else { - if (shm_rank == 0 || (shm_mode && world_rank == 1)) { - // Note: global master never visits this branch - MPI_Recv(&nodeHowmany, 1, MPI_INT, 0, NODE_MASTER_ASSIGNMENT, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - masterProcessingLogic(nodeHowmany, 0, parameters, maxsites); - - } - else - { - // Workers initialize the RGN - parallelSeed(localSeedMatrix); - } - } - - MPI_Type_free(&struct_rank_type); - } - - return world_rank; -} - -/** - * Calculates how many nodes are there taking the SHM ranks from all MPI processes. - * This function is useful to determine the number of node masters. - */ -int -numberOfNodes(void *ranks) -{ - int result = 1; - - if (shm_rank == 0) { - MPI_Gather (&world_rank, 1, MPI_INT, ranks, 1, MPI_INT, 0, MPI_COMM_WORLD); } - - return result; } -void -masterWorkerTeardown() { +void masterWorkerTeardown() { MPI_Finalize(); } -/* - * Logic implemented by the master process. - * - * @param howmany total number of replicas to be generated - * @param lastAssignedProcess last processes that has been assigned som work - */ -void -masterProcessingLogic(int howmany, int lastAssignedProcess, struct params parameters, unsigned int maxsites) +char *readResults(MPI_Comm comm, int *source, int *bytes) { - int *processActivity = (int*) malloc(shm_size * sizeof(int)); - int node_offset = 0; - if (world_rank == 1) { // it is the secondary master located at the main node (where the global master resides) - node_offset = 1; - processActivity[1] = 1; - } - - if (howmany > 0) { - processActivity[0] = 1; // Master initially does not generate replicas - - int i; - for (i = node_offset+1; i < shm_size; i++) // secondary master initially does not generate replicas - processActivity[i] = 0; - - int pendingJobs = howmany; // number of jobs already assigned but pending to be finalized - - char *results; - char *sample; - size_t offset, length; - results = malloc(sizeof(char) * 1000); - - int idleProcess = 1; - int samples = 1; // number of samples to be generated by workers - while (howmany > 0 && idleProcess > 0) { // Send sample generation requests to all available workers - idleProcess = findIdleProcess(processActivity, lastAssignedProcess, node_offset); - if (idleProcess > 0) { - assignWork(processActivity, idleProcess, samples); - lastAssignedProcess = idleProcess; - howmany--; - } - } - while (howmany > 0) { // Collect previously assigned sample generation jobs - if (world_rank > 1) { // it is not the main node, hence samples are received using MPI point-to-point - sample = readResultsFromWorkers(1, processActivity); - offset = strlen(results); - length = strlen(sample); - results = realloc(results, offset + length + 1); - memcpy(results + offset, sample, length); - free(sample); - howmany-= samples; // in the readResultsFromWorkers we sent the goToWork, which means another sample is going to be generated - } else { // In main node all workers directly outputs the samples, but... - // ...we need to receive some ACK to verify a sample was indeed outputted - int goToWork = 0; // when 1, then it does mean another sample was requested to a worker - int additional_samples = readAckFromLocalWorker(howmany, processActivity, parameters, maxsites, &goToWork); - - // need to update counters if applicable - pendingJobs -= additional_samples; - howmany -= additional_samples + (goToWork * samples); - } - pendingJobs-= samples; // in either branch (at least) one sample was generated - } - - while (pendingJobs > 0) { // collect any non yet received sample - if (world_rank != 0 && shm_rank == 0) { // whatever node master, but the secondary master located at main node (i.e.: world_rank= 1) - sample = readResultsFromWorkers(0, processActivity); - offset = strlen(results); - length = strlen(sample); - results = realloc(results, offset + length + 1); - memcpy(results + offset, sample, length); - free(sample); - } else { - // we need to receive some ACK to verify a sample was outputted by a local worker process - int goToWork = 0; - readAckFromLocalWorker(0, processActivity, parameters, maxsites, &goToWork); - } - - pendingJobs--; - } - - - if (world_rank == 0) { // global master, but there is only one single node, otherwise global master does not visit this function at all - fprintf(stdout, "%s", results); - fflush(stdout); - } else { // node master in a multi-node scenario - if (shm_rank == 0) // exclude the secondary master located at the main node (i.e. world_rank = 1) - // Note: we do not want to inlude the secondary master here, as it is already sending the samples to the standard output (see readAckFromLocalWorker function) - MPI_Send(results, strlen(results) + 1, MPI_CHAR, 0, RESULTS_TAG, MPI_COMM_WORLD); - } - free(results); // be good citizen - } -} - - -int readAckFromLocalWorker(int remaining, int *workersActivity, struct params parameters, unsigned int maxsites, int *goToWork) -{ - int source; - int additional_samples = 0; MPI_Status status; - *goToWork = 0; - - MPI_Probe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &status); -// while (!msg_avail) { -// // This function is called by secondary master in main node only, hence it is safe to generate an extra sample -// // and send it to the standard output. -// MPI_Iprobe(MPI_ANY_SOURCE, ACK_TAG, shmcomm, &msg_avail, &status); -// if (remaining) { // then generate sample while others reply -// char *results = generateSamples(1, parameters, maxsites); -// fprintf(stdout, "################%s", results); -// fflush(stdout); -// free(results); -// remaining--; -// additional_samples++; -// } -// } - - source = status.MPI_SOURCE; - - int ack; - //MPI_Recv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, MPI_STATUS_IGNORE); - MPI_Request requests[2]; - MPI_Irecv(&ack, 1, MPI_INT, source, ACK_TAG, shmcomm, &requests[0]); - - if (remaining) { // then generate 1 sample while others reply - char *results = generateSamples(1, parameters, maxsites); - fprintf(stdout, "%s", results); - fflush(stdout); - free(results); - remaining--; - additional_samples++; - } - - // Now let the worker know whether there are (still) any pending samples - if (remaining > 0) - *goToWork = 1; - - //MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); - MPI_Isend(goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm, &requests[1]); - - workersActivity[source]=0; - - MPI_Waitall(2, requests, MPI_STATUS_IGNORE); - - return additional_samples; -} - -/* - * - * Esta función realiza dos tareas: por un lado hace que el Master escuche los resultados enviados por los workers y por - * otro lado, se envía al worker que se ha recibido la información un mensaje sobre si debe seguir esperando por - * trabajos o si ha de finalizar su contribución al sistema. - * - * @param goToWork indica si el worker queda en espera de más trabajo (1) o si ya puede finalizar su ejecución (0) - * @param workersActivity el vector con el estado de actividad de los workers - * - * @return the generated sample - */ -char* readResultsFromWorkers(int goToWork, int* workersActivity) -{ - char *readResults(MPI_Comm, int*); - - int source; - char *results = readResults(shmcomm, &source); - - MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, shmcomm); - - workersActivity[source]=0; - return results; -} - -char -*readResults(MPI_Comm comm, int *source) -{ - MPI_Status status; - int size; - MPI_Probe(MPI_ANY_SOURCE, RESULTS_TAG, comm, &status); - MPI_Get_count(&status, MPI_CHAR, &size); + MPI_Get_count(&status, MPI_CHAR, bytes); *source = status.MPI_SOURCE; - char *results = (char *) malloc(size*sizeof(char)); - - MPI_Recv(results, size, MPI_CHAR, *source, RESULTS_TAG, comm, MPI_STATUS_IGNORE); - - return results; -} - -/* - * Finds an idle process from a list of potential worker processes. - * - * @param workersActivity status of all processes that can generate some work (0=idle; 1=busy) - * @lastAssignedProcess last process assigned with some work - * - * @return idle process index or -1 if all processes are busy. - */ -int findIdleProcess(int *processActivity, int lastAssignedProcess, int node_offset) { - /* - * Implementation note: lastAssignedProcess is used to implement a fairness policy in which every available process - * can be assigned with some work. - */ - int result = -1; - int i= lastAssignedProcess+1; // master process does not generate replicas - while(i < shm_size && processActivity[i] == 1){ - i++; - }; - - if(i >= shm_size){ - i = node_offset + 1; // master process does not generate replicas - while(i < lastAssignedProcess && processActivity[i] == 1){ - i++; - } - - if(i <= lastAssignedProcess && processActivity[i] == 0){ - result = i; - } - } else { - result = i; - } - - return result; -} - -/* - * Assigns samples to the workers. This implies to send the number of samples to be generated.. - * - * @param workersActivity worker's state (0=idle; 1=busy) - * @param worker worker's index to whom a sample is going to be assigned - * @param samples samples the worker is going to generate - */ -void assignWork(int* workersActivity, int worker, int samples) { - MPI_Send(&samples, 1, MPI_INT, worker, SAMPLES_NUMBER_TAG, shmcomm); - workersActivity[worker]=1; -} - -// ************************************** // -// WORKERS -// ************************************** // -int -workerProcess(struct params parameters, unsigned maxsites) -{ - int samples; - char *results; - int master; - int flag = 1; - - samples = receiveWorkRequest(&master); // TODO: could just skip this and start generating samples? (or it could be used to set the "samples" number once -// results = generateSamples(samples, parameters, maxsites); -// -// sendResultsToMasterProcess(results, master); -// -// free(results); // be good citizen - - while (flag) { - results = generateSamples(samples, parameters, maxsites); + char *results = (char *) malloc(*bytes * sizeof(char)); - sendResultsToMasterProcess(results, master); + MPI_Recv(results, *bytes, MPI_CHAR, *source, RESULTS_TAG, comm, MPI_STATUS_IGNORE); - free(results); // be good citizen + if (diagnose) + fprintf(stderr, "[%d] -> Read [%d] bytes from worker %d.\n", world_rank, *bytes, *source); - flag = isThereMoreWork(master); - } - - //return isThereMoreWork(master); - return flag; + return results; } -char *generateSamples(int samples, struct params parameters, unsigned maxsites) +char *generateSamples(int samples, struct params parameters, unsigned maxsites, int *bytes) { char *results; char *sample; - size_t offset, length; + int length; - results = generateSample(parameters, maxsites); + results = generateSample(parameters, maxsites, &length); + *bytes = length; int i; for (i = 1; i < samples; ++i) { - sample = generateSample(parameters, maxsites); + sample = generateSample(parameters, maxsites, &length); - offset = strlen(results); - length = strlen(sample); + results = realloc(results, *bytes + length + 1); - results = realloc(results, offset + length + 1); + memcpy(results + *bytes, sample, length); - memcpy(results+offset, sample, length); + *bytes += length + 1; free(sample); } - return results; -} - -/* - * Receives the sample's quantity the Master process asked to be generated. - * - * @return samples to be generated - */ -int -receiveWorkRequest(int *master){ - int samples; - - *master = 0; - if ( shm_mode && world_rank == shm_rank ) { - *master = 1; - } - - MPI_Recv(&samples, 1, MPI_INT, *master, SAMPLES_NUMBER_TAG, shmcomm, MPI_STATUS_IGNORE); - return samples; -} - -int isThereMoreWork(int master) { - int goToWork; + if (diagnose) + fprintf(stderr, "[%d] -> Generated [%d] samples.\n", world_rank, samples); - MPI_Recv(&goToWork, 1, MPI_INT, master, GO_TO_WORK_TAG, shmcomm, MPI_STATUS_IGNORE); - - return goToWork; + return results; } /* @@ -663,11 +224,10 @@ int isThereMoreWork(int master) { * * @return the sample generated by the worker */ -char* -generateSample(struct params parameters, unsigned maxsites) +char* generateSample(struct params parameters, unsigned maxsites, int *bytes) { - int segsites; - size_t positionStrLength, gametesStrLenght, offset; + int segsites, offset; + size_t positionStrLength, gametesStrLenght; double probss, tmrca, ttot; char *results; char **gametes; @@ -679,8 +239,7 @@ generateSample(struct params parameters, unsigned maxsites) gametes = cmatrix(parameters.cp.nsam, parameters.mp.segsitesin+1 ); gensamResults = gensam(gametes, &probss, &tmrca, &ttot, parameters, &segsites); - results = doPrintWorkerResultHeader(segsites, probss, parameters, gensamResults.tree); - offset = strlen(results); + results = doPrintWorkerResultHeader(segsites, probss, parameters, gensamResults.tree, &offset); if(segsites > 0) { @@ -704,6 +263,8 @@ generateSample(struct params parameters, unsigned maxsites) } } + *bytes = offset; + return results; } @@ -714,7 +275,7 @@ generateSample(struct params parameters, unsigned maxsites) * // xxx.x xx.xx x.xxxx x.xxxx * segsites: xxx */ -char *doPrintWorkerResultHeader(int segsites, double probss, struct params pars, char *treeOutput){ +char *doPrintWorkerResultHeader(int segsites, double probss, struct params pars, char *treeOutput, int *bytes){ char *results; int length = 3 + 1; // initially "\n//" and optionally a "\n" when there is no treeOutput; @@ -736,17 +297,19 @@ char *doPrintWorkerResultHeader(int segsites, double probss, struct params pars, sprintf(results, "\n//"); if( (segsites > 0 ) || ( pars.mp.theta > 0.0 ) ) { - if( pars.mp.treeflag ) { + if( pars.mp.treeflag ) sprintf(results, "%s%s", results, treeOutput); - } else { + else sprintf(results, "%s%s", results, "\n"); - } - if( (pars.mp.segsitesin > 0 ) && ( pars.mp.theta > 0.0 )) { + + if( (pars.mp.segsitesin > 0 ) && ( pars.mp.theta > 0.0 )) sprintf(results, "%sprob: %g\n", results, probss); - } + sprintf(results, "%ssegsites: %d\n", results, segsites); } + *bytes = length; + return results; } @@ -803,24 +366,6 @@ char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes){ return results; } -/* - * Sent Worker's results to the Master process. - * - * @param results results to be sent - * - */ -void sendResultsToMasterProcess(char* results, int master) -{ - if (shm_mode && world_rank != shm_rank) { - MPI_Send(results, strlen(results)+1, MPI_CHAR, master, RESULTS_TAG, shmcomm); - } else { // there is either one single node, or the worker is located at the master node - int ack = 1; - MPI_Send(&ack, 1, MPI_INT, master, ACK_TAG, shmcomm); - fprintf(stdout, "%s", results); - fflush(stdout); - } -} - // ************************************** // // UTILS // ************************************** // @@ -856,13 +401,6 @@ char *append(char *lhs, const char *rhs) return buffer; } /* append */ -/* Initialization of the random generator. */ -unsigned short * parallelSeed(unsigned short *seedv){ - unsigned short *seed48(); - - return seed48(seedv); -} - /* * name: doInitializeRng * description: En caso de especificarse las semillas para inicializar el RGN, @@ -889,4 +427,27 @@ doInitializeRng(int argc, char *argv[], int *seeds, struct params parameters) break; } } +} + +// this function should be call by the master in the master node only +unsigned short *initializeSeedMatrix(int argc, char *argv[], int howmany, struct params parameters) { + int i; + for(i=0; i howmany) + world_size = howmany + 1; // the extra 1 is due to the master + + int nseeds; + doInitializeRng(argc, argv, &nseeds, parameters); + + int dimension = nseeds * world_size; + unsigned short *seedMatrix = (unsigned short *) malloc(sizeof(unsigned short) * dimension); + for(i=0; i -int masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int unsigned maxsites, int *excludeFrom, int *node_rank); +void masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int unsigned maxsites); void masterWorkerTeardown(); -void masterProcessingLogic(int howmany, int lastIdleWorker, struct params parameters, unsigned int maxsites); -int workerProcess(struct params parameters, unsigned int maxsites); void doInitializeRng(int argc, char *argv[], int *seeds, struct params parameters); -void sendResultsToMasterProcess(char* results, int master); -int receiveWorkRequest(int *master); -void assignWork(int* workersActivity, int assignee, int samples); -char* readResultsFromWorkers(int goToWork, int* workersActivity); -int findIdleProcess(int *processActivity, int lastAssignedProcess, int node_offset); -char* generateSample(struct params parameters, unsigned int maxsites); -char *generateSamples(int, struct params, unsigned); +char* generateSample(struct params parameters, unsigned int maxsites, int *bytes); +char *generateSamples(int, struct params, unsigned, int *bytes); struct gensam_result gensam(char **gametes, double *probss, double *ptmrca, double *pttot, struct params pars, int* segsites); -int isThereMoreWork(int master); -unsigned short* parallelSeed(unsigned short *seedv); char *append(char *lhs, const char *rhs); -char *doPrintWorkerResultHeader(int segsites, double probss, struct params paramters, char *treeOutput); +char *doPrintWorkerResultHeader(int segsites, double probss, struct params paramters, char *treeOutput, int *bytes); char *doPrintWorkerResultPositions(int segsites, int output_precision, double *posit); char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes); - -int readAckFromLocalWorker(int remaining, int *workersActivity, struct params parameters, unsigned int maxsites, int *goToWork); -int numberOfNodes(void*); -char *readResults(MPI_Comm, int*); +char *readResults(MPI_Comm comm, int* source, int *bytes); +unsigned short *initializeSeedMatrix(int argc, char *argv[], int howmany, struct params parameters); +void singleNodeProcessing(int howmany, struct params parameters, unsigned int maxsites, int *bytes); +void printSamples(char *results, int bytes); +void secondaryNodeProcessing(int remaining, struct params parameters, unsigned int maxsites); +void sendResultsToMaster(char *results, int bytes, MPI_Comm comm); /* From ms.c*/ char ** cmatrix(int nsam, int len); From aebebe2ea9433c1a5720c583e83b00a9e716ec01 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Wed, 8 Jun 2016 09:08:05 +0200 Subject: [PATCH 21/23] code simplification --- ms.c | 3 +- mspar.c | 113 +++++++++++++++++++++++++++++++++----------------------- mspar.h | 7 +++- 3 files changed, 73 insertions(+), 50 deletions(-) diff --git a/ms.c b/ms.c index dd7eee5..5625cf0 100644 --- a/ms.c +++ b/ms.c @@ -138,8 +138,7 @@ int main(int argc, char *argv[]){ pars = getpars(argc, argv, &howmany, ntbs, count); // Master-Worker - masterWorkerSetup(argc, argv, howmany, pars, SITESINC); - masterWorkerTeardown(); + masterWorker(argc, argv, howmany, pars, SITESINC); } struct gensam_result diff --git a/mspar.c b/mspar.c index 1b2bfb3..c6a6d1a 100644 --- a/mspar.c +++ b/mspar.c @@ -14,7 +14,6 @@ int diagnose = 0; // Used for diagnosing the application. MPI_Comm shmcomm; // shm intra-communicator int world_rank, shm_rank; int world_size, shm_size; -int shm_mode = 0; // indicates whether MPI-3 SHM is going to be applied // ************************************** // // MASTER @@ -82,15 +81,49 @@ void sendResultsToMaster(char *results, int bytes, MPI_Comm comm) free(results); } -void masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, unsigned int maxsites) +void principalMasterProcessing(int remaining, int nodes, struct params parameters, unsigned int maxsites) +{ + int bytes = 0; + if (remaining > 0) { + char *results = generateSamples(remaining, parameters, maxsites, &bytes); + printSamples(results, bytes); + } + + // Receive samples from other node masters, each one sending a consolidated message + int source, i; + char *shm_results; + for (i = 1; i < nodes; i++){ + shm_results = readResults(MPI_COMM_WORLD, &source, &bytes); + printSamples(shm_results, bytes); + } +} + +int calculateNumberOfNodes() +{ + // Gather all SHM rank (local rank) from all the processes + int *shm_ranks = (int *)malloc(sizeof(int) * world_size); + + // Note: elements are ordered by process rank in MPI_COMM_WORLD communicator + MPI_Gather (&shm_rank, 1, MPI_INT, shm_ranks, 1, MPI_INT, 0, MPI_COMM_WORLD); + + int i = 0; + int nodes = 0; + if (world_rank == 0) { + for (i = 0; i < world_size; i++) + if (shm_ranks[i] == 0) nodes++; + } + + return nodes; +} + +int setup(int argc, char *argv[], int howmany, struct params parameters) { - if (getenv("MSPARSM_DIAGNOSE")) diagnose = 1; // seedMatrix : matrix containing the RNG seeds to be distributed to working processes. // localSeedMatrix : matrix used by workers to receive RNG seeds from master. unsigned short *seedMatrix; unsigned short localSeedMatrix[3]; - char *shm_results; // memory place where all MPI process from one node will going to share + if (getenv("MSPARSM_DIAGNOSE")) diagnose = 1; // MPI Initialization MPI_Init(&argc, &argv ); @@ -102,74 +135,61 @@ void masterWorkerSetup(int argc, char *argv[], int howmany, struct params parame MPI_Comm_size(shmcomm, &shm_size); MPI_Comm_rank(shmcomm, &shm_rank); - if (shm_size != world_size) // there are MPI process in more than 1 computing node - shm_mode = 1; - if(world_rank == 0) seedMatrix = initializeSeedMatrix(argc, argv, howmany, parameters); + MPI_Scatter(seedMatrix, 3, MPI_UNSIGNED_SHORT, localSeedMatrix, 3, MPI_UNSIGNED_SHORT, 0, MPI_COMM_WORLD); + + int nodes = calculateNumberOfNodes(); + + if (diagnose) + fprintf(stderr, "[%d] -> SHM Rank=%d, SHM Size=%d, WORLD Size=%d\n", world_rank, shm_rank, shm_size, world_size); + + if(diagnose && world_rank == 0) + fprintf(stderr, "[%d] -> # of nodes=%d\n", world_rank, nodes); + + return nodes; +} + +void teardown() { + MPI_Finalize(); +} + +void masterWorker(int argc, char *argv[], int howmany, struct params parameters, unsigned int maxsites) +{ + int nodes = setup(argc, argv, howmany, parameters); + // Filter out workers with rank higher than howmany, meaning there are more workers than samples to be generated. if(world_rank < howmany) { - MPI_Scatter(seedMatrix, 3, MPI_UNSIGNED_SHORT, localSeedMatrix, 3, MPI_UNSIGNED_SHORT, 0, MPI_COMM_WORLD); - if (world_size == shm_size) { // There is only one node int bytes; singleNodeProcessing(howmany, parameters, maxsites, &bytes); } else { - // Gather all SHM rank (local rank) from all the processes - int *shm_ranks = (int *)malloc(sizeof(int) * world_size); - - // Note: elements are ordered by process rank in MPI_COMM_WORLD communicator - MPI_Gather (&shm_rank, 1, MPI_INT, shm_ranks, 1, MPI_INT, 0, MPI_COMM_WORLD); - // Calculate number of nodes - int i = 0; - int nodes = 0; - if (world_rank == 0) { - for (i = 0; i < world_size; i++) - if (shm_ranks[i] == 0) nodes++; - } - MPI_Bcast(&nodes, 1, MPI_INT, 0, MPI_COMM_WORLD); int nodeSamples = howmany / nodes; - int remainingNodeSamples = howmany % nodes; + int remainingGlobal = howmany % nodes; int workerSamples = nodeSamples / (shm_size - 1); - int remainingWorkerSamples = nodeSamples % (shm_size - 1); + int remainingLocal = nodeSamples % (shm_size - 1); - int bytes = 0; if (world_rank != 0 && shm_rank != 0) { + int bytes = 0; char *results = generateSamples(workerSamples, parameters, maxsites, &bytes); if (world_rank == shm_rank) printSamples(results, bytes); - else - // Send results to shm_rank = 0 + else // Send results to shm_rank = 0 sendResultsToMaster(results, bytes, shmcomm); } else { if (world_rank != 0 && shm_rank == 0) { - secondaryNodeProcessing(remainingWorkerSamples, parameters, maxsites);; - } else { - bytes = 0; - int remaining = remainingNodeSamples + remainingWorkerSamples; - if (remaining > 0) { - char *results = generateSamples(remaining, parameters, maxsites, &bytes); - printSamples(results, bytes); - } - - // Receive samples from other node masters, each one sending a consolidated message - int source; - for (i = 1; i < nodes; i++){ - shm_results = readResults(MPI_COMM_WORLD, &source, &bytes); - printSamples(shm_results, bytes); - } - } + secondaryNodeProcessing(remainingLocal, parameters, maxsites); + } else + principalMasterProcessing(remainingGlobal + remainingLocal, nodes, parameters, maxsites); } } } -} -void masterWorkerTeardown() { - MPI_Finalize(); + teardown(); } char *readResults(MPI_Comm comm, int *source, int *bytes) @@ -446,6 +466,7 @@ unsigned short *initializeSeedMatrix(int argc, char *argv[], int howmany, struct int dimension = nseeds * world_size; unsigned short *seedMatrix = (unsigned short *) malloc(sizeof(unsigned short) * dimension); + for(i=0; i -void masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int unsigned maxsites); -void masterWorkerTeardown(); +void masterWorker(int argc, char *argv[], int howmany, struct params parameters, int unsigned maxsites); +void teardown(); +int setup(int argc, char *argv[], int howmany, struct params parameters); void doInitializeRng(int argc, char *argv[], int *seeds, struct params parameters); char* generateSample(struct params parameters, unsigned int maxsites, int *bytes); char *generateSamples(int, struct params, unsigned, int *bytes); @@ -16,6 +17,8 @@ void singleNodeProcessing(int howmany, struct params parameters, unsigned int ma void printSamples(char *results, int bytes); void secondaryNodeProcessing(int remaining, struct params parameters, unsigned int maxsites); void sendResultsToMaster(char *results, int bytes, MPI_Comm comm); +void principalMasterProcessing(int remaining, int nodes, struct params parameters, unsigned int maxsites); +int calculateNumberOfNodes(); /* From ms.c*/ char ** cmatrix(int nsam, int len); From 3cfc76babb926184305302e0789a49060860e0f5 Mon Sep 17 00:00:00 2001 From: Carlos Montemuino Date: Wed, 8 Jun 2016 22:20:22 +0200 Subject: [PATCH 22/23] code documentation --- mspar.c | 81 ++++++++++++++++++++++++++++++++------------------------- mspar.h | 21 +++------------ 2 files changed, 48 insertions(+), 54 deletions(-) diff --git a/mspar.c b/mspar.c index c6a6d1a..c49121f 100644 --- a/mspar.c +++ b/mspar.c @@ -135,10 +135,14 @@ int setup(int argc, char *argv[], int howmany, struct params parameters) MPI_Comm_size(shmcomm, &shm_size); MPI_Comm_rank(shmcomm, &shm_rank); - if(world_rank == 0) - seedMatrix = initializeSeedMatrix(argc, argv, howmany, parameters); + if (world_rank == 0) { // print out program parameters + int i; + for(i=0; i howmany) - world_size = howmany + 1; // the extra 1 is due to the master + size = howmany + 1; // the extra 1 is due to the master - int nseeds; - doInitializeRng(argc, argv, &nseeds, parameters); - - int dimension = nseeds * world_size; + int dimension = 3 * size; unsigned short *seedMatrix = (unsigned short *) malloc(sizeof(unsigned short) * dimension); - for(i=0; i Date: Mon, 13 Jun 2016 07:44:45 +0200 Subject: [PATCH 23/23] documentation and fix output --- Makefile | 16 ++++------------ README.md | 32 ++++++++++++++++++++++++++++---- mspar.c | 26 +++++++++++++++----------- mspar.h | 2 +- 4 files changed, 48 insertions(+), 28 deletions(-) diff --git a/Makefile b/Makefile index 3f5cae5..cb839c2 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,5 @@ # -# -# 'make' make executable file 'mspar' +# 'make' make executable file 'msparsm' # 'make clean' removes all .o and executable files # @@ -33,14 +32,7 @@ RND=rand2.c $(BIN)/%.o: %.c $(DEPS) $(CC) $(CFLAGS) -c -o $@ $< -default: $(BIN)/mspar - -# download: packages -# wget http://www.open-mpi.org/software/ompi/v1.8/downloads/openmpi-1.8.2.tar.gz -# tar -xf openmpi-1.8.2.tar.gz -C $(CURDIR)/packages - -#packages: -# mkdir packages +default: $(BIN)/msparsm clean: rm -f $(BIN)/* @@ -48,7 +40,7 @@ clean: @echo "*** All resources were cleaned-up ***" @echo "" -$(BIN)/mspar: $(OBJ) +$(BIN)/msparsm: $(OBJ) $(CC) $(CFLAGS) -o $@ $^ $(RND_48) $(LIBS) @echo "" - @echo "*** make complete: generated executable 'mspar' ***" + @echo "*** make complete: generated executable 'bin/msparsm' ***" diff --git a/README.md b/README.md index 3b743db..4929e53 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,39 @@ -msparsm +msParSm ======= -Parallel version of "ms" coalescent simulator using a master worker approach and a MPI implementation with on-demand scheduling. +The _msParSm_ application is an evolution of _msPar_ [(1)][1], the parallel version of the coalescent simulation program +_ms_ [(2)][2], +which removes the limitation for simulating long stretches of DNA sequences with large recombination rates, +without compromising the accuracy of the standard coalescence. + +## Pre-requisites +- Linux GNU Compiler 4.9.1 (or greater) +- OpenMPI 1.10.1 (other releases in the branch 1.10 should be fine) + - Version 1.8.x could potentially be fine, but please notice that _msParSm_ was not fully tested with such version. +- CMake 3.5.1 (or greather) **OR** GNU Make 3.81 (or greater) ## How to Build -Create the makefile using CMake: +There are two ways for building _msParSm_: CMake and Make. If you have installed CMAKE with version greater than 3.5.0, +then go with CMake, otherwise you should use Make. + +### CMake ```bash cmake -DCMAKE_INSTALL_PREFIX= ``` -Install using make: +### Make ```bash make install ``` +Binary files will be put into the `bin` folder (which is already _git ignored_). + +## How to Use +Usage is the mostly the same as with traditional _ms_, but you need to run it through _OpenMPI_. Next example +will run the application using 4 threads: + +```bash +mpirun -n 4 bin/msparsm 10 20 -seeds 40328 19150 54118 -t 100 -r 100 100000 -I 2 2 8 -eN 0.4 10.01 -eN 1 0.01 -en 0.25 2 0.2 -ej 3 2 1 -T > results.out +``` + +[1]: http://link.springer.com/chapter/10.1007/978-3-642-54420-0_32 +[2]: http://home.uchicago.edu/~rhudson1/popgen356/OxfordSurveysEvolBiol7_1-44.pdf \ No newline at end of file diff --git a/mspar.c b/mspar.c index c49121f..4ee77bc 100644 --- a/mspar.c +++ b/mspar.c @@ -250,8 +250,8 @@ char *generateSamples(int samples, struct params parameters, unsigned maxsites, */ char* generateSample(struct params parameters, unsigned maxsites, int *bytes) { - int segsites, offset; - size_t positionStrLength, gametesStrLenght; + int segsites; + size_t offset, positionStrLength, gametesStrLenght; double probss, tmrca, ttot; char *results; char **gametes; @@ -263,23 +263,31 @@ char* generateSample(struct params parameters, unsigned maxsites, int *bytes) gametes = cmatrix(parameters.cp.nsam, parameters.mp.segsitesin+1 ); gensamResults = gensam(gametes, &probss, &tmrca, &ttot, parameters, &segsites); - results = doPrintWorkerResultHeader(segsites, probss, parameters, gensamResults.tree, &offset); + + results = doPrintWorkerResultHeader(segsites, probss, parameters, gensamResults.tree); + + offset = strlen(results); + *bytes = offset; if(segsites > 0) { char *positionsStr = doPrintWorkerResultPositions(segsites, parameters.output_precision, gensamResults.positions); positionStrLength = strlen(positionsStr); - char *gametesStr = doPrintWorkerResultGametes(segsites, parameters.cp.nsam, gametes); gametesStrLenght = strlen(gametesStr); - results = realloc(results, offset + positionStrLength + gametesStrLenght + 1); - //sprintf(results, "%s%s", results, positionsStr); + results = realloc(results, offset + positionStrLength + gametesStrLenght + 2); + memcpy(results+offset, positionsStr, positionStrLength+1); offset += positionStrLength; + *bytes += positionStrLength; + memcpy(results+offset, gametesStr, gametesStrLenght+1); + + *bytes += gametesStrLenght; + free(positionsStr); free(gametesStr); if( parameters.mp.timeflag ) { @@ -287,8 +295,6 @@ char* generateSample(struct params parameters, unsigned maxsites, int *bytes) } } - *bytes = offset; - return results; } @@ -299,7 +305,7 @@ char* generateSample(struct params parameters, unsigned maxsites, int *bytes) * // xxx.x xx.xx x.xxxx x.xxxx * segsites: xxx */ -char *doPrintWorkerResultHeader(int segsites, double probss, struct params pars, char *treeOutput, int *bytes){ +char *doPrintWorkerResultHeader(int segsites, double probss, struct params pars, char *treeOutput){ char *results; int length = 3 + 1; // initially "\n//" and optionally a "\n" when there is no treeOutput; @@ -332,8 +338,6 @@ char *doPrintWorkerResultHeader(int segsites, double probss, struct params pars, sprintf(results, "%ssegsites: %d\n", results, segsites); } - *bytes = length; - return results; } diff --git a/mspar.h b/mspar.h index 2acecac..46973da 100644 --- a/mspar.h +++ b/mspar.h @@ -8,7 +8,7 @@ char* generateSample(struct params parameters, unsigned int maxsites, int *bytes char *generateSamples(int, struct params, unsigned, int *bytes); struct gensam_result gensam(char **gametes, double *probss, double *ptmrca, double *pttot, struct params pars, int* segsites); char *append(char *lhs, const char *rhs); -char *doPrintWorkerResultHeader(int segsites, double probss, struct params paramters, char *treeOutput, int *bytes); +char *doPrintWorkerResultHeader(int segsites, double probss, struct params paramters, char *treeOutput); char *doPrintWorkerResultPositions(int segsites, int output_precision, double *posit); char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes); char *readResults(MPI_Comm comm, int* source, int *bytes);