Skip to content

Commit

Permalink
MPI 4 and a non-deadlocked versie about to be migrated to MPI_Improbe…
Browse files Browse the repository at this point in the history
… + MPI_Mrecv
  • Loading branch information
GastonMazzei committed Nov 18, 2021
1 parent 7815e05 commit 0d2d3ad
Show file tree
Hide file tree
Showing 40 changed files with 614,501 additions and 27,193 deletions.
5 changes: 3 additions & 2 deletions CMakeLists.txt
Expand Up @@ -2,7 +2,8 @@ cmake_minimum_required(VERSION 3.19)
project(cppprojct)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_COMPILER mpic++)
set(CMAKE_CXX_COMPILER mpicxx)
set(CMAKE_C_COMPILER mpicc)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -I/usr/lib/eigen")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS}")
Expand Down Expand Up @@ -39,7 +40,7 @@ find_package(Boost 1.77.0 REQUIRED COMPONENTS mpi graph_parallel system serializ
include_directories(${Boost_INCLUDE_DIRS})


add_definitions(-DVERBOSE)
#add_definitions(-DVERBOSE)
# (max)Nthreads Nprocs
# Run with "mpiexec -x NNODES=15 -x TEST=1 -x SEED=1234 -x OMP_THREAD_LIMIT=5 -x OMP_NESTED=true -n 3 cmake-build-debug/cppprojct"
add_executable(cppprojct main.cpp Utils/timers.cpp Utils/timers.h GraphClasses/ErdosRenyiGraph.cpp GraphClasses/ErdosRenyiGraph.h Utils/error.cpp Utils/error.h GraphClasses/GeneralGraph.cpp GraphClasses/GeneralGraph.h GraphClasses/CliqueGraph.h GraphClasses/CliqueGraph.cpp Simulation/Simulation.cpp Tests/graph-test-init.cpp Tests/graph-test-init.h Utils/reproductibility.cpp Utils/reproductibility.h Utils/adequate_synchronization.h Utils/adequate_synchronization.cpp GraphClasses/RingGraph.h GraphClasses/RingGraph.cpp Tests/graph-test-singlestep-evolution.h Tests/graph-test-singlestep-evolution.cpp Utils/global_standard_messages.h Solvers/GeneralSolver.h Solvers/GeneralSolver.cpp Utils/memory_management.h Utils/memory_management.cpp Tests/solvers-test-init.h Tests/solvers-test-init.cpp DifferentialEquations/GeneralDifferentialEquation.h DifferentialEquations/GeneralDifferentialEquation.cpp Solvers/EulerSolver.h Solvers/EulerSolver.cpp Utils/differential_equations_aux.h Utils/differential_equations_aux.cpp DifferentialEquations/NoiselessKuramoto.h DifferentialEquations/NoiselessKuramoto.cpp Solvers/RungeKuttaSolver.cpp Solvers/RungeKuttaSolver.h Utils/typed_combinations.h Utils/msleep.h Utils/msleep.cpp GraphClasses/GraphFunctions.h GraphClasses/GraphFunctions.cpp Communication/CommunicationFunctions.cpp Communication/CommunicationFunctions.h Utils/HelperClasses.h Utils/HelperClasses.cpp Utils/print_init.h Utils/print_warnings.h Utils/print_init.cpp Utils/print_warnings.cpp Utils/parallel_sanitizer.h Tests/long-singlestep-run.h Tests/long-singlestep-run.cpp macros/macros.h Utils/certify_compliant.h Utils/certify_compliant.cpp)
Expand Down
2 changes: 2 additions & 0 deletions Communication/CommunicationFunctions.cpp
Expand Up @@ -20,6 +20,8 @@ void build_answer(double &answer, ReferenceContainer &REF, double ix, int owner,
exit(1);
}



void respond_value(ReferenceContainer &REF, double ix, int owner, int MyNProc){
auto vs = vertices(*REF.p_g);
for (auto v = vs.first; v != vs.second; ++v){
Expand Down
183 changes: 39 additions & 144 deletions Communication/CommunicationFunctions.h
Expand Up @@ -16,6 +16,7 @@
#include <iterator>
#include <random>


void sendReqForTest(int MYPROC, int i);
int destroyRequestReturnInteger(MPI_Request &R);
void destroyRequest(MPI_Request &R, int &NERR);
Expand Down Expand Up @@ -190,20 +191,24 @@ void answer_messages(ReferenceContainer &REF,int MYTHR) {
int statusFree = 0;
int NTOT = 0;
int NERR = 0;

MPI_Message M;
MPI_Status S;
MPI_Request R;
int TRIES=0;
int t=0;
bool firstlap=true;
while (TRIES < 3){
MPI_Status status;
if ((flag == 1) || firstlap) {
buffer = -9995.0;
R = MPI_Request();
MPI_Irecv(&buffer, 1, MPI_DOUBLE,
MPI_ANY_SOURCE,
VERTEXVAL_REQUEST_FLAG,
MPI_COMM_WORLD,
&R);
// MPI_Improbe(MPI_ANY_SOURCE, VERTEXVAL_REQUEST_FLAG, MPI_COMM_WORLD,
// &flag, &message, &status);
}
if (firstlap) firstlap = false;
MPI_Request_get_status(R, &flag, &status);
Expand All @@ -216,158 +221,47 @@ void answer_messages(ReferenceContainer &REF,int MYTHR) {
t=0;
if (flag == 1){
build_answer(answer, REF, buffer, status.MPI_SOURCE, MYPROC);
printf("About to send one!\n");std::cout<<std::flush;
PRINTF_DBG("About to send one!\n");std::cout<<std::flush;
MPI_Ssend(&answer, 1, MPI_DOUBLE, status.MPI_SOURCE, (int) buffer, MPI_COMM_WORLD);
printf("Correctly sent one!\n");std::cout<<std::flush;
PRINTF_DBG("Correctly sent one!\n");std::cout<<std::flush;
}
}
MPI_Cancel(&R);
MPI_Status status;
MPI_Request_get_status(R, &flag, &status);
MPI_Test_cancelled(&status, &flag);
if (flag!=1) {
//MPI_Wait(&R, MPI_STATUS_IGNORE)
printf("A request failed to be cancelled, we are assuming we recieved it!\n");std::cout<<std::flush;
build_answer(answer, REF, buffer, status.MPI_SOURCE, MYPROC);
MPI_Ssend(&answer, 1, MPI_INT, status.MPI_SOURCE, (int) buffer, MPI_COMM_WORLD);
int flag1=-437, flag2=-437;
MPI_Status status1, status2;
status1.MPI_ERROR = -999;
status2.MPI_ERROR = -999;
MPI_Test(&R, &flag1, &status1);
if (flag1 != 1){
MPI_Cancel(&R);
MPI_Test_cancelled(&status2, &flag2);
}
};
if ((flag1 == 1) || ((flag1!=1) && (flag2!=1))) {

if (flag1 == 1) {
build_answer(answer, REF, buffer, status1.MPI_SOURCE, MYPROC);
printf("A request failed to be cancelled, we are assuming we recieved it! we computed val = %f, recieved buffer = %f ; flags12 = %d %d ; source = %d ; tag = %d; error = %d\n",
answer, buffer, flag1, flag2, status1.MPI_SOURCE, status1.MPI_TAG, status1.MPI_ERROR);
std::cout << std::flush;

MPI_Ssend(&answer, 1, MPI_DOUBLE, status1.MPI_SOURCE, (int) buffer, MPI_COMM_WORLD);

template<int DT, int TIMETOL, int BATCH>
void answer_messages_edges(ReferenceContainer &REF,int MYTHR) {
// (1) recieve two numbers: my and the other's node index :K
// (2) send through channel OFFSET+OWNER_INDEX [vertexval, edgeval]
printf("Completed!\n");
std::cout << std::flush;

int MYPROC = REF.p_ComHelper->WORLD_RANK[MYTHR];
int NPROCS = REF.p_ComHelper->WORLD_SIZE[MYTHR];
std::vector<MPI_Request> R_tot;
std::vector<MPI_Request> R_send;
int NDISPATCHED = 0;

// lay the probes for all the p rocs
int flagprobes[NPROCS];
int probe_status = 1;
int statusreq_status = 1;
for (int i = 0; i < NPROCS; i++) {
flagprobes[i] = 0;
if (i != MYPROC) {
probe_status = MPI_Iprobe(i, EDGEVAL_REQUEST_FLAG, MPI_COMM_WORLD, &flagprobes[i], MPI_STATUS_IGNORE);
while (probe_status!=0){
probe_status = MPI_Iprobe(i, EDGEVAL_REQUEST_FLAG, MPI_COMM_WORLD, &flagprobes[i], MPI_STATUS_IGNORE);
}
} else {
printf("A request failed to be cancelled: will ignore it. Recieved buffer = %f ; flags12 = %d %d ; source = %d ; tag = %d ; status error = %d. Look at status 1: source %d tag %d error %d\n",
buffer, flag1, flag2, status2.MPI_SOURCE, status2.MPI_TAG, status2.MPI_ERROR,status1.MPI_SOURCE, status1.MPI_TAG, status1.MPI_ERROR);
std::cout << std::flush;
}
}
// Strategy 2B: responding asynchronously just in case?
// printf("A request failed to be cancelled, we are assuming we recieved it! (async answer)\n");std::cout<<std::flush;
// build_answer(answer, REF, buffer, status.MPI_SOURCE, MYPROC);
// MPI_Ssend(&answer, 1, MPI_DOUBLE, status.MPI_SOURCE, (int) buffer, MPI_COMM_WORLD);

// initialize auxiliary variables
std::uniform_int_distribution<int> gen(0, NPROCS - 1); // This three lines could be
unsigned int SEED = std::stoi(std::getenv("SEED")); // encapsulated inside REF
std::mt19937 rng(SEED); // :-) and we make it 'more efficient' lol
int ticks = 0;
std::set<int> answered;
answered.insert(MYPROC);
double ix[2];
int i = gen(rng); // initial processor to which check if we can respond to
if (i == MYPROC){
++i;
if (i>=NPROCS){
i=0;
}
}
int statusFree = 0;
int NTOT = 0;
int status = 0;
int status_localreq = 0;
int NERR = 0;

for (int tk = 0; tk < TIMETOL; ++tk) {
for (int j = 0; j < BATCH; ++j) {
if (flagprobes[i] == 1) { // a message appears to be available! recieve it!

// Print the first three elements in the asynchronous probe :-)
PRINTF_DBG("|%d %d %d|\n",flagprobes[0] , flagprobes[1] , flagprobes[2]);std::cout<<std::flush;

// Try to recieve it
R_tot.push_back(MPI_Request());
recv_nonblocking2(i, R_tot[R_tot.size() - 1], ix[0], EDGEVAL_REQUEST_FLAG);

status_localreq = 0;
int localtimecounter = 0;
while ((status_localreq != 1) && (localtimecounter < TIMETOL)){
statusreq_status = MPI_Test(&R_tot[R_tot.size() - 1], &status_localreq, MPI_STATUS_IGNORE);
while (statusreq_status != 0) {
statusreq_status = MPI_Test(&R_tot[R_tot.size() - 1], &status_localreq, MPI_STATUS_IGNORE);
}
localtimecounter++;
//mssleep(DT);
}


if (status_localreq == 1) { // If we were first to capture the message, proceed.
R_send.push_back(MPI_Request());
PRINTF_DBG("We effectively captured a vertex info request :-)\n");
irespond_value_edges(REF, &ix[0], i, R_send[R_send.size() - 1], MYPROC);
++NTOT;
PRINTF_DBG("We effectively answered asynchronously a vertex info request :-)\n");
NDISPATCHED++;
} else {
// MPI_Test didnt destroy the request, so explicitly free it
statusFree = MPI_Request_free(&R_tot[R_tot.size() - 1]);
PRINTF_DBG("We were faced with a probe that indicated an incoming message but we couldnt capture it :o\n");
}

// Reset vars.
status_localreq = 0;
flagprobes[i] = 0;
}

// Re-probe it :-)
probe_status = MPI_Iprobe(i, EDGEVAL_REQUEST_FLAG, MPI_COMM_WORLD, &flagprobes[i], MPI_STATUS_IGNORE);
while (probe_status!=0){
probe_status = MPI_Iprobe(i, EDGEVAL_REQUEST_FLAG, MPI_COMM_WORLD, &flagprobes[i], MPI_STATUS_IGNORE);
}

// For next iteration
++i;
if (i == MYPROC) ++i;
if (i >= NPROCS) {
if (MYPROC!=0) {
i = 0;
} else {
i = 1;
}
}
}

// END OF THE BATCH... now we wait for all the sent requests. (In total they can be up to "Batch")
if (R_send.size() > 0) {
PRINTF_DBG("ENDing answer_messages_edges. Caught (R_tot)=%d requests, Answered (R_send)=%d.\n",
R_tot.size(),R_send.size());

int status_of_getstatus = 1;
for (int i = 0; i < R_send.size(); ++i) {
status_of_getstatus = MPI_Test(&R_send[i], &status_localreq, MPI_STATUS_IGNORE);
while (status_of_getstatus != 0) {
PRINTF_DBG("Failing to test a request, its returning %d\n", status_of_getstatus);
status_of_getstatus = MPI_Test(&R_send[i], &status_localreq, MPI_STATUS_IGNORE);
}
};

if (status_localreq == 0) {
PRINTF_DBG("[AM] We are currently waiting for a send to be completed\n");
MPI_Wait(&R_send[i], MPI_STATUS_IGNORE);
PRINTF_DBG("[AM] Successfully waited the request to be completed\n");
}
}
}

std::vector<MPI_Request> R_tot;
std::vector<MPI_Request> R_send;
//mssleep(DT);
++ticks;
}
//printf("---answer_request heartbeat---\n");std::cout<<std::flush;
};



Expand Down Expand Up @@ -458,23 +352,23 @@ void perform_requests(int NNodes,
results[QAvailable.front()] = std::make_tuple(0, // placeholder until we get the correct val
std::get<0>(*it), // we are inaugurating this indexing model [:<)
owner[QAvailable.front()] * N + their_vix[QAvailable.front()]);
printf("[PR] About to ask for one node!\n");std::cout<<std::flush;
PRINTF_DBG("[PR] About to ask for one node!\n");std::cout<<std::flush;
MPI_Ssend(&their_vix[QAvailable.front()],
1,
MPI_DOUBLE,
owner[QAvailable.front()],
VERTEXVAL_REQUEST_FLAG, MPI_COMM_WORLD);
printf("[PR] Asked!\n");std::cout<<std::flush;
PRINTF_DBG("[PR] Asked!\n");std::cout<<std::flush;

printf("[PR] About to recv!\n");std::cout<<std::flush;
PRINTF_DBG("[PR] About to recv!\n");std::cout<<std::flush;
MPI_Recv(&vval[QAvailable.front()],
1,
MPI_DOUBLE,
owner[QAvailable.front()],
(int) their_vix[QAvailable.front()],
MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
printf("[PR] Correctly recieved!\n");std::cout<<std::flush;
PRINTF_DBG("[PR] Correctly recieved!\n");std::cout<<std::flush;


// Prepare stuff for the next iteration
Expand Down Expand Up @@ -787,6 +681,7 @@ void perform_requests(int NNodes,
}
PRINTF_DBG("TOT=%d, NNodes=%d, ix_update=%d, total_processed=%d, ix=%d\n",
atomic_helper, NNodes, ix_update, total_processed, ix);std::cout<<std::flush;
if (globalstatus && (!ix_update)) mssleep(5);
}
PRINTF_DBG("Final termination of perform_requests :-)\n");std::cout<<std::flush;

Expand Down

0 comments on commit 0d2d3ad

Please sign in to comment.