diff --git a/CMakeLists.txt b/CMakeLists.txt index 39c8fdb..893756e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 2.6) +cmake_minimum_required(VERSION 3.5.1) project(msparsm) # Require MPI for this project: @@ -7,7 +7,6 @@ find_package(MPI REQUIRED) include_directories(${MPI_INCLUDE_PATH}) set(MPI_COMPILE_FLAGS "-O3 -std=gnu99 -I.") -set(MPI_LINK_FLAGS "-lm") set(SOURCE_FILES ms.c ms.h @@ -17,7 +16,7 @@ set(SOURCE_FILES streec.c) add_executable(msparsm ${SOURCE_FILES}) -target_link_libraries(msparsm ${MPI_LIBRARIES}) +target_link_libraries(msparsm ${MPI_LIBRARIES} -lm) if(MPI_COMPILE_FLAGS) set_target_properties(msparsm PROPERTIES COMPILE_FLAGS "${MPI_COMPILE_FLAGS}") diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3f5cae5 --- /dev/null +++ b/Makefile @@ -0,0 +1,54 @@ +# +# +# 'make' make executable file 'mspar' +# 'make clean' removes all .o and executable files +# + +# Compiler +CC=mpicc + +# Compilation flags +CFLAGS?=-O3 -std=gnu99 -I. + +# define any libraries to link into executable: +LIBS?=-lm + +# Dependencies +DEPS=ms.h mspar.h + +# Folder to put the generated binaries +BIN?=./bin + +# Object files +OBJ=$(BIN)/mspar.o $(BIN)/ms.o $(BIN)/streec.o + +# Random functions using drand48() +RND_48=rand1.c + +# Random functions using rand() +RND=rand2.c + +.PHONY: clean + +$(BIN)/%.o: %.c $(DEPS) + $(CC) $(CFLAGS) -c -o $@ $< + +default: $(BIN)/mspar + +# download: packages +# wget http://www.open-mpi.org/software/ompi/v1.8/downloads/openmpi-1.8.2.tar.gz +# tar -xf openmpi-1.8.2.tar.gz -C $(CURDIR)/packages + +#packages: +# mkdir packages + +clean: + rm -f $(BIN)/* + @echo "" + @echo "*** All resources were cleaned-up ***" + @echo "" + +$(BIN)/mspar: $(OBJ) + $(CC) $(CFLAGS) -o $@ $^ $(RND_48) $(LIBS) + @echo "" + @echo "*** make complete: generated executable 'mspar' ***" diff --git a/ms.c b/ms.c index e644ec4..5625cf0 100644 --- a/ms.c +++ b/ms.c @@ -122,7 +122,6 @@ int main(int argc, char *argv[]){ char **tbsparamstrs ; double probss, tmrca, ttot ; struct params pars ; - void seedit( const char * ) ; struct params getpars( int argc, char *argv[], int *howmany, int ntbs, int count ) ; int samples; @@ -139,14 +138,7 @@ int main(int argc, char *argv[]){ pars = getpars(argc, argv, &howmany, ntbs, count); // Master-Worker - int myRank = masterWorkerSetup(argc, argv, howmany, pars, SITESINC); - - if(myRank <= howmany && myRank > 0) - { - while(workerProcess(pars, SITESINC)); - } - - masterWorkerTeardown(); + masterWorker(argc, argv, howmany, pars, SITESINC); } struct gensam_result @@ -206,7 +198,7 @@ gensam( char **list, double *pprobss, double *ptmrca, double *pttot, struct para free(seglst[seg].ptree) ; } result.tree = treeOutput; - printf(treeOutput); + printf("%s", treeOutput); } if( pars.mp.timeflag ) { diff --git a/mspar.c b/mspar.c index 4a59422..c49121f 100644 --- a/mspar.c +++ b/mspar.c @@ -1,275 +1,244 @@ -const int SEEDS_COUNT = 3; -const int SEED_TAG = 100; -const int SAMPLES_NUMBER_TAG = 200; -const int RESULTS_TAG = 300; -const int GO_TO_WORK_TAG = 400; - -#include #include +#include #include #include "ms.h" #include "mspar.h" -#include /* OpenMPI library */ + +const int RESULTS_TAG = 300; + +int diagnose = 0; // Used for diagnosing the application. + +// Following variables are with global scope in order to facilitate its sharing among routines. +// They are going to be updated in the masterWorkerSetup routine only, which is called only one, therefore there is no +// risk of race conditions or whatever other concurrency related problem. +MPI_Comm shmcomm; // shm intra-communicator +int world_rank, shm_rank; +int world_size, shm_size; // ************************************** // // MASTER // ************************************** // - -int -masterWorkerSetup(int argc, char *argv[], int howmany, struct params parameters, int maxsites) +void singleNodeProcessing(int howmany, struct params parameters, unsigned int maxsites, int *bytes) { - // myRank : rank of the current process in the MPI ecosystem. - // poolSize : number of processes in the MPI ecosystem. - // goToWork : used by workers to realize if there is more work to do. - // seedMatrix : matrix containing the RNG seeds to be distributed to working processes. - // localSeedMatrix : matrix used by workers to receive RNG seeds from master. - int myRank; - int poolSize; - unsigned short *seedMatrix; - unsigned short localSeedMatrix[3]; + // No master process is needed. Every MPI process can just output the generated samples + int samples = howmany / world_size; + int remainder = howmany % world_size; + if (world_rank == 0) // let the "global master" to generate the remainder samples as well + samples += remainder; - // MPI Initialization - MPI_Init(&argc, &argv ); - MPI_Comm_size(MPI_COMM_WORLD, &poolSize); - MPI_Comm_rank(MPI_COMM_WORLD, &myRank); + char *results = generateSamples(samples, parameters, maxsites, bytes); + printSamples(results, *bytes); +} - if(myRank == 0) - { - int i; - // Only the master process prints out the application's parameters - for(i=0; i howmany) - { - poolSize = howmany + 1; // the extra 1 is due to the master - } + if (diagnose) + fprintf(stderr, "[%d] -> Printed [%d] bytes.\n", world_rank, bytes); - int nseeds; - doInitializeRng(argc, argv, &nseeds, parameters); - int dimension = nseeds * poolSize; - seedMatrix = (unsigned short *) malloc(sizeof(unsigned short) * dimension); - for(i=0; i 0) + results = generateSamples(remaining, parameters, maxsites, &bytes); - return myRank; -} + // Receive samples from workers in same node. + int i; + char *shm_results; + for (i = 1; i < shm_size; i++){ + int source, length; + + shm_results = readResults(shmcomm, &source, &length); + results = realloc(results, bytes + length + 1); + memcpy(results + bytes, shm_results, length); + bytes += length + 1; + free(shm_results); + } -void -masterWorkerTeardown() { - MPI_Finalize(); + // Send gathered results to master in master-node + sendResultsToMaster(results, bytes, MPI_COMM_WORLD); } -/* - * Logic implemented by the master process. - * - * @param howmany total number of replicas to be generated - * @param lastAssignedProcess last processes that has been assigned som work - * @param poolSize number of processes in the MPI ecosystem - * @param parameters parameters used to generate replicas. Used when worker process is the master itself - * @param maxsites maximum number of sites. Used when worker process is the master itself - */ -void -masterProcessingLogic(int howmany, int lastAssignedProcess, int poolSize, struct params parameters, unsigned maxsites) +void sendResultsToMaster(char *results, int bytes, MPI_Comm comm) { - char *results; - int *processActivity = (int*) malloc(poolSize * sizeof(int)); - processActivity[0] = 1; // Master does not generate replicas - int i; - for(i=1; i 0) - { - int idleProcess = findIdleProcess(processActivity, poolSize, lastAssignedProcess); - if(idleProcess >= 0) - { - assignWork(processActivity, idleProcess, 1); - lastAssignedProcess = idleProcess; - howmany--; - } - else - { - readResultsFromWorkers(1, processActivity); - pendingJobs--; - } + fprintf(stderr, "[%d] -> Sent [%d] bytes to master in %s.\n", world_rank, bytes + 1, communicator); } - while(pendingJobs > 0) - { - readResultsFromWorkers(0, processActivity); - pendingJobs--; + + free(results); +} + +void principalMasterProcessing(int remaining, int nodes, struct params parameters, unsigned int maxsites) +{ + int bytes = 0; + if (remaining > 0) { + char *results = generateSamples(remaining, parameters, maxsites, &bytes); + printSamples(results, bytes); + } + + // Receive samples from other node masters, each one sending a consolidated message + int source, i; + char *shm_results; + for (i = 1; i < nodes; i++){ + shm_results = readResults(MPI_COMM_WORLD, &source, &bytes); + printSamples(shm_results, bytes); } } -/* - * - * Esta función realiza dos tareas: por un lado hace que el Master escuche los resultados enviados por los workers y por - * otro lado, se envía al worker que se ha recibido la información un mensaje sobre si debe seguir esperando por - * trabajos o si ha de finalizar su contribución al sistema. - * - * @param goToWork indica si el worker queda en espera de más trabajo (1) o si ya puede finalizar su ejecución (0) - * @param workersActivity el vector con el estado de actividad de los workers - * @return - * - */ -void readResultsFromWorkers(int goToWork, int* workersActivity) +int calculateNumberOfNodes() { - MPI_Status status; - int size; - int source; + // Gather all SHM rank (local rank) from all the processes + int *shm_ranks = (int *)malloc(sizeof(int) * world_size); - MPI_Probe(MPI_ANY_SOURCE, RESULTS_TAG, MPI_COMM_WORLD, &status); - MPI_Get_count(&status, MPI_CHAR, &size); - source = status.MPI_SOURCE; - char * results = (char *) malloc(size*sizeof(char)); + // Note: elements are ordered by process rank in MPI_COMM_WORLD communicator + MPI_Gather (&shm_rank, 1, MPI_INT, shm_ranks, 1, MPI_INT, 0, MPI_COMM_WORLD); - MPI_Recv(results, size, MPI_CHAR, source, RESULTS_TAG, MPI_COMM_WORLD, &status); - source = status.MPI_SOURCE; - MPI_Send(&goToWork, 1, MPI_INT, source, GO_TO_WORK_TAG, MPI_COMM_WORLD); + int i = 0; + int nodes = 0; + if (world_rank == 0) { + for (i = 0; i < world_size; i++) + if (shm_ranks[i] == 0) nodes++; + } - workersActivity[source]=0; - fprintf(stdout, "%s", results); - free(results); // be good citizen + return nodes; } -/* - * Finds an idle process from a list of potential worker processes. - * - * @param workersActivity status of all processes that can generate some work (0=idle; 1=busy) - * @param poolSize number of worker processes - * @lastAssignedProcess last process assigned with some work - * - * @return idle process index or -1 if all processes are busy. - */ -int findIdleProcess(int *processActivity, int poolSize, int lastAssignedProcess) { - /* - * Implementation note: lastAssignedProcess is used to implement a fairness policy in which every available process - * can be assigned with some work. - */ - int result = -1; - int i= lastAssignedProcess+1; // master process does not generate replicas - while(i < poolSize && processActivity[i] == 1){ - i++; - }; - - if(i >= poolSize){ - i=1; // master process does not generate replicas - while(i < lastAssignedProcess && processActivity[i] == 1){ - i++; - } +int setup(int argc, char *argv[], int howmany, struct params parameters) +{ + // seedMatrix : matrix containing the RNG seeds to be distributed to working processes. + // localSeedMatrix : matrix used by workers to receive RNG seeds from master. + unsigned short *seedMatrix; + unsigned short localSeedMatrix[3]; - if(i <= lastAssignedProcess && processActivity[i] == 0){ - result = i; - } - } else { - result = i; + if (getenv("MSPARSM_DIAGNOSE")) diagnose = 1; + + // MPI Initialization + MPI_Init(&argc, &argv ); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); + + // MPI_COMM_TYPE_SHARED: This type splits the communicator into subcommunicators, each of which can create a shared memory region. + MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); + MPI_Comm_size(shmcomm, &shm_size); + MPI_Comm_rank(shmcomm, &shm_rank); + + if (world_rank == 0) { // print out program parameters + int i; + for(i=0; i SHM Rank=%d, SHM Size=%d, WORLD Size=%d\n", world_rank, shm_rank, shm_size, world_size); + + if(diagnose && world_rank == 0) + fprintf(stderr, "[%d] -> # of nodes=%d\n", world_rank, nodes); + + return nodes; } -/* - * Assigns samples to the workers. This implies to send the number of samples to be generated.. - * - * @param workersActivity worker's state (0=idle; 1=busy) - * @param worker worker's index to whom a sample is going to be assigned - * @param samples samples the worker is going to generate - */ -void assignWork(int* workersActivity, int worker, int samples) { - MPI_Send(&samples, 1, MPI_INT, worker, SAMPLES_NUMBER_TAG, MPI_COMM_WORLD); - //TODO check usage of MPI_Sendv?? - workersActivity[worker]=1; +void teardown() { + MPI_Finalize(); } -// ************************************** // -// WORKERS -// ************************************** // +void masterWorker(int argc, char *argv[], int howmany, struct params parameters, unsigned int maxsites) +{ + int nodes = setup(argc, argv, howmany, parameters); + + // Filter out workers with rank higher than howmany, meaning there are more workers than samples to be generated. + if(world_rank < howmany) { + if (world_size == shm_size) { // There is only one node + int bytes; + singleNodeProcessing(howmany, parameters, maxsites, &bytes); + } else { + MPI_Bcast(&nodes, 1, MPI_INT, 0, MPI_COMM_WORLD); + + int nodeSamples = howmany / nodes; + int remainingGlobal = howmany % nodes; + int workerSamples = nodeSamples / (shm_size - 1); + int remainingLocal = nodeSamples % (shm_size - 1); + + if (world_rank != 0 && shm_rank != 0) { + int bytes = 0; + char *results = generateSamples(workerSamples, parameters, maxsites, &bytes); + + if (world_rank == shm_rank) + printSamples(results, bytes); + else // Send results to shm_rank = 0 + sendResultsToMaster(results, bytes, shmcomm); + } else { + if (world_rank != 0 && shm_rank == 0) { + secondaryNodeProcessing(remainingLocal, parameters, maxsites); + } else + principalMasterProcessing(remainingGlobal + remainingLocal, nodes, parameters, maxsites); + } + } + } -int -workerProcess(struct params parameters, unsigned maxsites) + teardown(); +} + +char *readResults(MPI_Comm comm, int *source, int *bytes) { - char *generateSamples(int, struct params, unsigned); + MPI_Status status; - int samples; - char *results; + MPI_Probe(MPI_ANY_SOURCE, RESULTS_TAG, comm, &status); + MPI_Get_count(&status, MPI_CHAR, bytes); + *source = status.MPI_SOURCE; - samples = receiveWorkRequest(); - results = generateSamples(samples, parameters, maxsites); + char *results = (char *) malloc(*bytes * sizeof(char)); - sendResultsToMasterProcess(results); + MPI_Recv(results, *bytes, MPI_CHAR, *source, RESULTS_TAG, comm, MPI_STATUS_IGNORE); - free(results); // be good citizen - return isThereMoreWork(); + if (diagnose) + fprintf(stderr, "[%d] -> Read [%d] bytes from worker %d.\n", world_rank, *bytes, *source); + + return results; } -char *generateSamples(int samples, struct params parameters, unsigned maxsites) +char *generateSamples(int samples, struct params parameters, unsigned maxsites, int *bytes) { char *results; char *sample; - size_t offset, length; + int length; - results = generateSample(parameters, maxsites); + results = generateSample(parameters, maxsites, &length); + *bytes = length; int i; for (i = 1; i < samples; ++i) { - sample = generateSample(parameters, maxsites); + sample = generateSample(parameters, maxsites, &length); - offset = strlen(results); - length = strlen(sample); + results = realloc(results, *bytes + length + 1); - results = realloc(results, offset + length + 1); + memcpy(results + *bytes, sample, length); - memcpy(results+offset, sample, length); + *bytes += length + 1; free(sample); } - return results; -} - -/* - * Receives the sample's quantity the Master process asked to be generated. - * - * @return samples to be generated - */ -int receiveWorkRequest(){ - int samples; - MPI_Status status; - - MPI_Recv(&samples, 1, MPI_INT, 0, SAMPLES_NUMBER_TAG, MPI_COMM_WORLD, &status); - return samples; -} - -int isThereMoreWork() { - int goToWork; - MPI_Status status; + if (diagnose) + fprintf(stderr, "[%d] -> Generated [%d] samples.\n", world_rank, samples); - MPI_Recv(&goToWork, 1, MPI_INT, 0, GO_TO_WORK_TAG, MPI_COMM_WORLD, &status); - - return goToWork; + return results; } /* @@ -279,15 +248,13 @@ int isThereMoreWork() { * * @return the sample generated by the worker */ -char* -generateSample(struct params parameters, unsigned maxsites) +char* generateSample(struct params parameters, unsigned maxsites, int *bytes) { - int segsites; - size_t positionStrLength, gametesStrLenght, offset; + int segsites, offset; + size_t positionStrLength, gametesStrLenght; double probss, tmrca, ttot; char *results; char **gametes; - double *positions; struct gensam_result gensamResults; if( parameters.mp.segsitesin == 0 ) @@ -296,34 +263,32 @@ generateSample(struct params parameters, unsigned maxsites) gametes = cmatrix(parameters.cp.nsam, parameters.mp.segsitesin+1 ); gensamResults = gensam(gametes, &probss, &tmrca, &ttot, parameters, &segsites); - positions = gensamResults.positions; - results = doPrintWorkerResultHeader(segsites, probss, parameters, gensamResults.tree); - offset = strlen(results); + results = doPrintWorkerResultHeader(segsites, probss, parameters, gensamResults.tree, &offset); if(segsites > 0) { - char *positionsStr = doPrintWorkerResultPositions(segsites, parameters.output_precision, positions); + char *positionsStr = doPrintWorkerResultPositions(segsites, parameters.output_precision, gensamResults.positions); positionStrLength = strlen(positionsStr); + char *gametesStr = doPrintWorkerResultGametes(segsites, parameters.cp.nsam, gametes); gametesStrLenght = strlen(gametesStr); results = realloc(results, offset + positionStrLength + gametesStrLenght + 1); - //sprintf(results, "%s%s", results, positionsStr); memcpy(results+offset, positionsStr, positionStrLength+1); offset += positionStrLength; memcpy(results+offset, gametesStr, gametesStrLenght+1); - free(positionsStr); free(gametesStr); - free(gensamResults.positions); if( parameters.mp.timeflag ) { free(gensamResults.tree); } } + *bytes = offset; + return results; } @@ -334,7 +299,7 @@ generateSample(struct params parameters, unsigned maxsites) * // xxx.x xx.xx x.xxxx x.xxxx * segsites: xxx */ -char *doPrintWorkerResultHeader(int segsites, double probss, struct params pars, char *treeOutput){ +char *doPrintWorkerResultHeader(int segsites, double probss, struct params pars, char *treeOutput, int *bytes){ char *results; int length = 3 + 1; // initially "\n//" and optionally a "\n" when there is no treeOutput; @@ -356,17 +321,19 @@ char *doPrintWorkerResultHeader(int segsites, double probss, struct params pars, sprintf(results, "\n//"); if( (segsites > 0 ) || ( pars.mp.theta > 0.0 ) ) { - if( pars.mp.treeflag ) { + if( pars.mp.treeflag ) sprintf(results, "%s%s", results, treeOutput); - } else { + else sprintf(results, "%s%s", results, "\n"); - } - if( (pars.mp.segsitesin > 0 ) && ( pars.mp.theta > 0.0 )) { + + if( (pars.mp.segsitesin > 0 ) && ( pars.mp.theta > 0.0 )) sprintf(results, "%sprob: %g\n", results, probss); - } + sprintf(results, "%ssegsites: %d\n", results, segsites); } + *bytes = length; + return results; } @@ -393,6 +360,8 @@ char *doPrintWorkerResultPositions(int segsites, int output_precision, double *p offset += positionStrLength; } + free(positionStr); + return results; } @@ -412,7 +381,6 @@ char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes){ char *gameteStr = malloc(sizeof(char) * gameteStrLength * nsam); for(i=0;i howmany) + size = howmany + 1; // the extra 1 is due to the master + + int dimension = 3 * size; + unsigned short *seedMatrix = (unsigned short *) malloc(sizeof(unsigned short) * dimension); + + if (world_rank == 0) { + doInitializeRng(argc, argv); + + for(i=0; i + +void masterWorker(int argc, char *argv[], int howmany, struct params parameters, int unsigned maxsites); +void teardown(); +int setup(int argc, char *argv[], int howmany, struct params parameters); +int doInitializeRng(int argc, char *argv[]); +char* generateSample(struct params parameters, unsigned int maxsites, int *bytes); +char *generateSamples(int, struct params, unsigned, int *bytes); struct gensam_result gensam(char **gametes, double *probss, double *ptmrca, double *pttot, struct params pars, int* segsites); -int isThereMoreWork(); -unsigned short* parallelSeed(unsigned short *seedv); char *append(char *lhs, const char *rhs); -char *doPrintWorkerResultHeader(int segsites, double probss, struct params paramters, char *treeOutput); +char *doPrintWorkerResultHeader(int segsites, double probss, struct params paramters, char *treeOutput, int *bytes); char *doPrintWorkerResultPositions(int segsites, int output_precision, double *posit); char *doPrintWorkerResultGametes(int segsites, int nsam, char **gametes); +char *readResults(MPI_Comm comm, int* source, int *bytes); +void initializeSeedMatrix(int argc, char *argv[], int howmany); +void singleNodeProcessing(int howmany, struct params parameters, unsigned int maxsites, int *bytes); +void printSamples(char *results, int bytes); +void secondaryNodeProcessing(int remaining, struct params parameters, unsigned int maxsites); +void sendResultsToMaster(char *results, int bytes, MPI_Comm comm); +void principalMasterProcessing(int remaining, int nodes, struct params parameters, unsigned int maxsites); +int calculateNumberOfNodes(); + /* From ms.c*/ char ** cmatrix(int nsam, int len); double ran1(); - -/* -void ordran(int n, double pbuf[]); -void ranvec(int n, double pbuf[]); -void order(int n, double pbuf[]); - -void biggerlist(int nsam, char **list ); -int poisso(double u); -void locate(int n,double beg, double len,double *ptr); -void mnmial(int n, int nclass, double p[], int rv[]); -void usage(); -int tdesn(struct node *ptree, int tip, int node ); -int pick2(int n, int *i, int *j); -int xover(int nsam,int ic, int is); -int links(int c); -*/ +int commandlineseed(char **);