diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d7eb40183b..698c7d82a9 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -51,7 +51,7 @@ jobs: ./build charm4py netlrts-linux-x86_64 tcp -g -j4 --with-production - name: build-charm4py run: | - pip3 install setuptools cython cffi greenlet numpy torch torchvision + pip3 install setuptools cython cffi greenlet numpy torch torchvision matplotlib git clone https://github.com/UIUC-PPL/charm4py export PYTHONPATH="$PWD/charm4py" diff --git a/CMakeLists.txt b/CMakeLists.txt index 6eab72be46..2e625c11ff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -917,6 +917,11 @@ foreach(file ${ckloop-h-files}) configure_file(${file} ${CMAKE_BINARY_DIR}/include COPYONLY) endforeach(file) +# allGather +configure_file(src/libs/ck-libs/allGather/allGather.h ${CMAKE_BINARY_DIR}/include COPYONLY) +add_library(moduleallGather src/libs/ck-libs/allGather/allGather.C src/libs/ck-libs/allGather/allGather.h) +add_dependencies(moduleallGather ck) + # NDMeshStreamer file(GLOB moduleNDMeshStreamer-h-sources src/libs/ck-libs/NDMeshStreamer/*.h) set(moduleNDMeshStreamer-cxx-sources src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.C) diff --git a/examples/charm++/Makefile b/examples/charm++/Makefile index dce54fa17a..b4f6c8ee4d 100644 --- a/examples/charm++/Makefile +++ b/examples/charm++/Makefile @@ -1,6 +1,7 @@ -include ../../include/conv-mach-opt.mak DIRS = \ + allGather \ allToAll \ array_map \ ckcallback \ diff --git a/examples/charm++/allGather/Makefile b/examples/charm++/allGather/Makefile new file mode 100644 index 0000000000..79d62e57e1 --- /dev/null +++ b/examples/charm++/allGather/Makefile @@ -0,0 +1,41 @@ +CHARMC = ../../../bin/charmc $(OPTS) +TYPE ?= RING + +all: build + +build: + $(CHARMC) ./user.ci + $(CHARMC) -o allGather -D$(TYPE) ./user.C -lmoduleallGather + +test: + @$(CHARMC) ./user.ci + @echo "\033[1;34mRunning allGather with RING algorithm\033[0m" + @$(CHARMC) -o allGather -DRING ./user.C -lmoduleallGather + $(call run, +p4 ./allGather 4 29 7 ) + @echo "\033[1;34mRunning allGather with HYPERCUBE algorithm\033[0m" + @$(CHARMC) -o allGather -DHYPERCUBE ./user.C -lmoduleallGather + $(call run, +p4 ./allGather 4 29 7 ) + @echo "\033[1;34mRunning allGather with RECURSIVE-DOUBLING algorithm\033[0m" + $(call run, +p5 ./allGather 5 29 7 ) + @echo "\033[1;34mRunning allGather with FLOODING algorithm\033[0m" + @$(CHARMC) -o allGather -DFLOODING ./user.C -lmoduleallGather + $(call run, +p4 ./allGather 4 29 7 ) + @echo "\033[1;32mAll Tests completed!\033[0m" + +testp: + @$(CHARMC) ./user.ci + @echo "\033[1;34mRunning allGather with RING algorithm\033[0m" + @$(CHARMC) -o allGather -DRING ./user.C -lmoduleallGather + $(call run, +p$(P) ./allGather $(P) 29 7 ) + @echo "\033[1;34mRunning allGather with HYPERCUBE algorithm\033[0m" + @$(CHARMC) -o allGather -DHYPERCUBE ./user.C -lmoduleallGather + $(call run, +p$(P) ./allGather $(P) 29 7 ) + @echo "\033[1;34mRunning allGather with FLOODING algorithm\033[0m" + @$(CHARMC) -o allGather -DFLOODING ./user.C -lmoduleallGather + $(call run, +p$(P) ./allGather $(P) 29 7 ) + @echo "\033[1;32mAll Tests completed!\033[0m" + +clean: + rm -f *.decl.h *.def.h *.o charmrun allGather + +.phony : all build test clean diff --git a/examples/charm++/allGather/user.C b/examples/charm++/allGather/user.C new file mode 100644 index 0000000000..3d6a021772 --- /dev/null +++ b/examples/charm++/allGather/user.C @@ -0,0 +1,120 @@ +#include "user.h" + +start::start(CkArgMsg* msg) +{ + if (msg->argc != 4) + { + ckout << "Usage: " << msg->argv[0] + << " " + "" + << endl; + CkExit(); + } + + int n = atoi(msg->argv[1]); + int k = atoi(msg->argv[2]); + int d = atoi(msg->argv[3]); + delete msg; + + sim = CProxy_simBox::ckNew(thisProxy, k, n, d, n); + +#ifdef FLOODING + AllGather = CProxy_AllGather::ckNew(k * sizeof(long int), + (int)allGatherType::ALL_GATHER_FLOODING, 0); +#endif + +#ifdef HYPERCUBE + AllGather = CProxy_AllGather::ckNew(k * sizeof(long int), + (int)allGatherType::ALL_GATHER_HYPERCUBE, 0); +#endif + +#ifdef RING + AllGather = CProxy_AllGather::ckNew(k * sizeof(long int), + (int)allGatherType::ALL_GATHER_RING, 0); +#endif + + sim.begin(AllGather); +} + +void start::fini() +{ + if (numIter == 1) + { + ckout << "[STATUS] Completed the AllGather Simulation" << endl; + CkExit(); + } + else + { + numIter++; + ckout << "[STATUS] Completed the AllGather Simulation, starting again" << endl; + sim.begin(AllGather); + } +} + +simBox::simBox(CProxy_start startProxy, int k, int n, int d) + : startProxy(startProxy), k(k), n(n), d(d) +{ +} + +void simBox::begin(CProxy_AllGather AllGatherGroup) +{ + result = (long int*)CkRdmaAlloc(k * n * sizeof(long int)); + data = (long int*)CkRdmaAlloc(k * sizeof(long int)); + long int max_serial = (1 << d) - 1; + long int base = thisIndex; + while (max_serial > 0) + { + base = base * 10; + max_serial = max_serial / 10; + } + for (int i = 0; i < k; i++) + { + data[i] = base + i; + } + CkCallback cb(CkIndex_simBox::done(NULL), CkArrayIndex1D(thisIndex), thisProxy); + AllGather* libptr = AllGatherGroup.ckLocalBranch(); + libptr->init((void*)result, (void*)data, thisIndex, cb); +} + +void simBox::done(allGatherMsg* msg) +{ + bool success = true; + for (int i = 0; i < n; i++) + { + long int max_serial = (1 << d) - 1; + long int base = i; + while (max_serial > 0) + { + base = base * 10; + max_serial = max_serial / 10; + } + for (int j = 0; j < k; j++) + { + if (result[i * k + j] != base + j) + { + success = false; + break; + } + } + if (!success) + break; + } + + if (success) + ckout << "[STATUS] Correct result for Chare " << thisIndex << endl; + else + { + ckout << "[STATUS] Incorrect result for Chare " << thisIndex << endl; + for (int i = 0; i < n * k; i++) + { + ckout << result[i] << " "; + } + ckout << endl; + } + CkRdmaFree(result); + CkRdmaFree(data); + CkCallback cbfini(CkReductionTarget(start, fini), startProxy); + contribute(cbfini); +} + +#include "user.def.h" diff --git a/examples/charm++/allGather/user.ci b/examples/charm++/allGather/user.ci new file mode 100644 index 0000000000..07f59a5d43 --- /dev/null +++ b/examples/charm++/allGather/user.ci @@ -0,0 +1,17 @@ +mainmodule user +{ + extern module allGather; + + mainchare start + { + entry start(CkArgMsg * m); + entry[reductiontarget] void fini(); + }; + + array[1D] simBox + { + entry simBox(CProxy_start startProxy, int k, int n, int d); + entry void begin(CProxy_AllGather allGatherProxy); + entry[nokeep] void done(allGatherMsg * m); + } +}; diff --git a/examples/charm++/allGather/user.h b/examples/charm++/allGather/user.h new file mode 100644 index 0000000000..003647c452 --- /dev/null +++ b/examples/charm++/allGather/user.h @@ -0,0 +1,33 @@ +#include "allGather.h" + +#include "user.decl.h" +class start : public CBase_start +{ +private: + CProxy_simBox sim; + CProxy_AllGather AllGather; + int numIter = 0; + +public: + start(CkArgMsg* msg); + + void fini(); +}; + +class simBox : public CBase_simBox +{ +private: + CProxy_start startProxy; + int k; + int n; + int d; + long int* data; + long int* result; + +public: + simBox(CProxy_start startProxy, int k, int n, int d); + + void begin(CProxy_AllGather AllGather); + + void done(allGatherMsg* msg); +}; diff --git a/src/libs/ck-libs/Makefile b/src/libs/ck-libs/Makefile index 73c725b248..9ba4b0d9ac 100644 --- a/src/libs/ck-libs/Makefile +++ b/src/libs/ck-libs/Makefile @@ -5,7 +5,7 @@ CHARMINC=. SIMPLE_DIRS = completion cache sparseContiguousReducer tcharm ampi idxl \ multiphaseSharedArrays io \ - collide mblock barrier irecv liveViz \ + collide mblock allGather barrier irecv liveViz \ taskGraph search MeshStreamer NDMeshStreamer pose \ state_space_searchengine DIRS = $(SIMPLE_DIRS) pythonCCS @@ -24,6 +24,7 @@ ParFUM: idxl ampi collide multiphaseSharedArrays ParFUM-Tops: ParFUM collide: tcharm mblock: tcharm +allGather: barrier: irecv: liveViz: diff --git a/src/libs/ck-libs/allGather/Makefile b/src/libs/ck-libs/allGather/Makefile new file mode 100644 index 0000000000..5bace2965a --- /dev/null +++ b/src/libs/ck-libs/allGather/Makefile @@ -0,0 +1,26 @@ +CDIR=../../../.. +CHARMC=$(CDIR)/bin/charmc $(OPTS) +DEST=$(CDIR)/lib/libmoduleallGather.a + +all: $(DEST) $(CDIR)/include/allGather.h $(CDIR)/include/allGather.decl.h $(CDIR)/include/allGather.def.h + +$(DEST): allGather.o + $(CHARMC) -o $(DEST) allGather.o + +allGather.o: allGather.C allGather.h allGather.decl.h allGather.def.h + $(CHARMC) -c allGather.C + +allGather.decl.h allGather.def.h: allGather.ci + $(CHARMC) allGather.ci + +$(CDIR)/include/allGather.decl.h: allGather.decl.h + /bin/cp allGather.decl.h $(CDIR)/include + +$(CDIR)/include/allGather.h: allGather.h + /bin/cp allGather.h $(CDIR)/include + +$(CDIR)/include/allGather.def.h: allGather.def.h + /bin/cp allGather.def.h $(CDIR)/include + +clean: + rm -f *.decl.h *.def.h *.o $(DEST) diff --git a/src/libs/ck-libs/allGather/README.md b/src/libs/ck-libs/allGather/README.md new file mode 100644 index 0000000000..eb55c72367 --- /dev/null +++ b/src/libs/ck-libs/allGather/README.md @@ -0,0 +1,62 @@ +# collectiveSim + +A library for implementing collectives commonly used in machine learning tasks including allGather and allReduce. + +## allGather + +allGather lets you gather data distributed accross different chare array/group elements. The library provides 3 algorithms for doing the allGather operations, namely ring, hypercube and flooding. + +### How to use + +The library is available in a default charm++ build and to use allGather, you simply have to include the header `allGather.h` and link against the library using the flag `-lmoduleallGather`. + +After that you need to declare allGather as an extern module in your `.ci` file and create an AllGather group object. + +```C++ +AllGather = CProxy_AllGather::ckNew(k, (int)allGatherType::ALL_GATHER_RING, seed); +``` + +Here, k refers to the number of data elements present in each chare array element(assuming the gather is happening among chare array elements, it can also be done among group elements) and the second parameter lets you choose the algorithm you want to run. The algorithms are: + +```C++ +enum allGatherType { + ALL_GATHER_RING, + ALL_GATHER_HYPERCUBE, + ALL_GATHER_FLOODING +}; +``` +The third argument is a random seed which is set to pid if zero is passed otherwise the passed value is used. Note that this is used only when `flooding` algorithm is run. + +You must also declare a callback to a function which the library can return to after its done and it must only take a pointer to `allGatherMsg` as its argument. To start operations, each chare array element must make a local pointer to the library chare group element on the same PE as it. + +```C++ +AllGather *libptr = AllGatherGroup.ckLocalBranch(); +libptr->init(result, data, thisIndex, cb); +``` +The parameters for the `init` function are: + +- `result`: A pointer (void *) to where the allGather operation results will be stored. This must be allocated with enough space to hold n * k elements, where n is the (number of participants in allGather). +- `data`: The per-chare array element data pointer (void *) that will be contributed to the allGather operation. +- `thisIndex`: The index value used to determine the order of the gathered data. +- `cb`: The callback that will be invoked when the allGather operation completes. + +You can customize the gathering order by modifying the `thisIndex` parameter. For example: +```C++ +libptr->init(result, data, n - thisIndex, cb); +``` +This would gather the data in the reverse order of array indices. + +Once the library is done, it will send an empty message (a kick if you will) telling the user that the result is now available in the destination that the user specified earlier. + +#### Notes on Implementation +Each group element is a representative of one of the participants of the allGather. We use zero copy api so we do not tranfer the data as it is. We first send zero copy buffers and the one's getting them can decide whether to get some data or not. This is signifincant in the `ALL_GATHER_FLOODING` algorithm. + +In `ALL_GATHER_RING` the data for all the groups, starting from the originating group elements, gets forwarded in a ring(each element getting the data from lower `PE` group element and passing it to higher `PE` group element). + +In `ALL_GATHER_HYPERCUBE`, when `n` is a power of 2, the group elements assume a hypercube connectivity and use the standard [hypercube communication pattern](https://en.wikipedia.org/wiki/Hypercube_(communication_pattern)) .`ALL_GATHER_HYPERCUBE` switches to recursive doubling when n is not a power of 2 as described in [this paper](https://ieeexplore.ieee.org/abstract/document/342126?casa_token=vuF8Rhhm2f4AAAAA:TBigoTv8ge_lz8Bqt7wF0jWnyVrEXfPBL7cQGsWgnsXVZqEx3pFgtputZ8lvNma9pHjKAnR_pck5). + +In `ALL_GATHER_FLOODING`, we make a sparse graph over the group elements to specify the connectivity. The communication is done by each group element getting data from it's neighbours, keeping it and forwarding/flooding it to all it's neighbours () in case it has not already seen it and discard it otherwise. + +### Notes +- Currently only gathering equal sized data is supported. +- The number of PEs needs to be the same as `n`(the participants in all gather). diff --git a/src/libs/ck-libs/allGather/allGather.C b/src/libs/ck-libs/allGather/allGather.C new file mode 100644 index 0000000000..02b3fdd8bb --- /dev/null +++ b/src/libs/ck-libs/allGather/allGather.C @@ -0,0 +1,136 @@ +#include "allGather.h" + +int AllGather::gen_rand() +{ + std::mt19937_64 gen(randCounter++); + std::uniform_int_distribution dis(0, n - 1); + return dis(gen); +} + +AllGather::AllGather(int k, int type, int seed) : k(k) +{ + n = CkNumPes(); + this->type = (allGatherType)type; + switch (type) + { + case allGatherType::ALL_GATHER_HYPERCUBE: + if ((n & (n - 1)) != 0) + HypercubeRecursiveDoubling = true; + numHypercubeIter = std::ceil(std::log2(n)); + break; + case allGatherType::ALL_GATHER_FLOODING: + randCounter = seed ? seed : getpid(); + graph.resize(n); + for (int i = 0; i < n; i++) graph[i].resize(n); + /** + * TODO: Experiment with different graph structures + */ + + // Ring with constant number of random connections + for (int i = 0; i < n; i++) + { + graph[i][(n + i + 1) % n] = 1; + graph[i][(n + i - 1) % n] = 1; + } + for (int i = 0; i < 6; i++) + { + int x = gen_rand(); + int y = gen_rand(); + if (x != y) + { + graph[x][y] = 1; + graph[y][x] = 1; + } + } + break; + case allGatherType::ALL_GATHER_RING: + break; + } +} + +void AllGather::init(void* result, void* data, int idx, CkCallback cb) +{ + this->msg = new allGatherMsg; + this->lib_done_callback = cb; + this->idx = idx; + zero_copy_callback = + CkCallback(CkIndex_AllGather::local_buff_done(NULL), thisProxy[CkMyPe()]); + this->store = (char*)result; + this->data = (char*)data; + this->numRecvMsg = 0; + CkCallback cbInitDone(CkReductionTarget(AllGather, startGather), thisProxy); + contribute(cbInitDone); +} + +void AllGather::local_buff_done(CkDataMsg* m) +{ + numRecvMsg++; + if (numRecvMsg == 2 * (n - 1)) + { + switch (type) + { + case allGatherType::ALL_GATHER_HYPERCUBE: + hyperCubeIndx.clear(); + hyperCubeStore.clear(); + break; + case allGatherType::ALL_GATHER_FLOODING: + recvFloodMsg.clear(); + break; + case allGatherType::ALL_GATHER_RING: + break; + } + lib_done_callback.send(msg); + } +} + +void AllGather::startGather() +{ + for (int i = 0; i < k; i++) store[k * idx + i] = data[i]; + CkNcpyBuffer src(data, k * sizeof(char), zero_copy_callback, CK_BUFFER_REG); + + switch (type) + { + case allGatherType::ALL_GATHER_RING: + thisProxy[(idx + 1) % n].recvRing(idx, src); + break; + case allGatherType::ALL_GATHER_HYPERCUBE: + hyperCubeIndx.push_back(idx); + hyperCubeStore.push_back(src); + thisProxy[idx].Hypercube(); + break; + case allGatherType::ALL_GATHER_FLOODING: + recvFloodMsg[idx] = true; + for (int i = 0; i < n; i++) + if (graph[idx][i] == 1) + thisProxy[i].Flood(idx, src); + break; + } +} + +void AllGather::recvRing(int sender, CkNcpyBuffer src) +{ + if (numRecvMsg == 2 * (n - 1)) + return; + CkNcpyBuffer dst(store + sender * k, k * sizeof(char), zero_copy_callback, + CK_BUFFER_REG); + dst.get(src); + if (((CkMyPe() + 1) % n) != sender) + thisProxy[(CkMyPe() + 1) % n].recvRing(sender, src); +} + +void AllGather::Flood(int sender, CkNcpyBuffer src) +{ + if (numRecvMsg == 2 * (n - 1)) + return; + if (recvFloodMsg[sender]) + return; + recvFloodMsg[sender] = true; + CkNcpyBuffer dst(store + sender * k, k * sizeof(char), zero_copy_callback, + CK_BUFFER_REG); + dst.get(src); + for (int i = 0; i < n; i++) + if (graph[CkMyPe()][i] == 1 and i != sender) + thisProxy[i].Flood(sender, src); +} + +#include "allGather.def.h" diff --git a/src/libs/ck-libs/allGather/allGather.ci b/src/libs/ck-libs/allGather/allGather.ci new file mode 100644 index 0000000000..6cc3499ac8 --- /dev/null +++ b/src/libs/ck-libs/allGather/allGather.ci @@ -0,0 +1,57 @@ +module allGather +{ + include "vector"; + + message allGatherMsg; + + group AllGather + { + entry AllGather(int size, int type, int seed); + entry[reductiontarget] void startGather(); + entry void recvRing(int sender, CkNcpyBuffer data); + entry[nokeep] void local_buff_done(CkDataMsg * m); + entry void Hypercube() + { + for (iter = 0; iter < numHypercubeIter; iter++) + { + serial + { + if (HypercubeRecursiveDoubling) + HypercubeToSend = (n + CkMyPe() - (int)pow(2, iter)) % n; + else + HypercubeToSend = CkMyPe() ^ ((int)pow(2, iter)); + } + serial + { + if (HypercubeRecursiveDoubling && iter == numHypercubeIter - 1) + { + int size = n - (int)pow(2, iter); + thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStore.data(), + hyperCubeIndx.data(), size); + } + else + thisProxy[HypercubeToSend].recvHypercube( + iter, hyperCubeStore.data(), hyperCubeIndx.data(), hyperCubeStore.size()); + } + when recvHypercube[iter](int ref, CkNcpyBuffer data[size], int dataIndx[size], + int size) + { + serial + { + for (int m = 0; m < size; m++) + { + hyperCubeStore.emplace_back(data[m]); + hyperCubeIndx.emplace_back(dataIndx[m]); + CkNcpyBuffer dst(store + dataIndx[m] * k, k * sizeof(char), + zero_copy_callback, CK_BUFFER_REG); + dst.get(data[m]); + } + } + } + } + }; + entry void recvHypercube(int ref, CkNcpyBuffer data[size], int dataIndx[size], + int size); + entry void Flood(int sender, CkNcpyBuffer data); + }; +}; diff --git a/src/libs/ck-libs/allGather/allGather.h b/src/libs/ck-libs/allGather/allGather.h new file mode 100644 index 0000000000..b722465970 --- /dev/null +++ b/src/libs/ck-libs/allGather/allGather.h @@ -0,0 +1,62 @@ +#pragma once + +#include "allGather.decl.h" +#include +#include +#include +#include +#include +#include +#include + +class allGatherMsg : public CMessage_allGatherMsg +{ +}; + +enum allGatherType +{ + ALL_GATHER_RING, + ALL_GATHER_HYPERCUBE, + ALL_GATHER_FLOODING +}; + +class AllGather : public CBase_AllGather +{ +private: + int k; + int n; + int idx; + char* store; + int numRecvMsg; + CkCallback lib_done_callback; + allGatherType type; + int numHypercubeIter; + bool HypercubeRecursiveDoubling{}; + int iter; + int HypercubeToSend; + std::vector> graph{}; + std::map recvFloodMsg{}; + int randCounter; + std::vector hyperCubeIndx{}; + std::vector hyperCubeStore{}; + allGatherMsg* msg; + char* data; + CkCallback zero_copy_callback; + +public: + AllGather_SDAG_CODE + + AllGather(int k, int type, int seed); + + void startGather(); + + void recvRing(int sender, CkNcpyBuffer data); + + void local_buff_done(CkDataMsg* m); + + int gen_rand(); + + void Flood(int sender, CkNcpyBuffer data); + + void init(void* result, void* data, int idx, CkCallback cb); +};