Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
51caff1
feat: add allgather
anant37289 Apr 20, 2025
e60d9f7
feat: add header for allGather and clean up the cc and ci files
Sh0g0-1758 Apr 20, 2025
fbaf757
docs: update readme
anant37289 Apr 25, 2025
6031827
feat: add example
anant37289 Apr 25, 2025
9dd2338
fix: remove n from all gther interface
anant37289 Apr 25, 2025
5675f1c
chore: clean example Makefile
Sh0g0-1758 Apr 25, 2025
845b2ec
nokeep and new in constructor
Sh0g0-1758 Apr 28, 2025
18b2e2b
clean
Sh0g0-1758 Apr 28, 2025
f1ccbd3
move new to constructor
Sh0g0-1758 Apr 28, 2025
6f4db77
feat: change registration mode to prereg
anant37289 Apr 28, 2025
246532c
update ckreduction with nop
Sh0g0-1758 Apr 28, 2025
c4a512c
add random seed
anant37289 Apr 28, 2025
fb655c2
free user rdma buffers
Sh0g0-1758 Apr 28, 2025
f0ac0ac
fix unreg->prereg in ci
Sh0g0-1758 Apr 29, 2025
dc7f9d4
make allGather to be built by default
Sh0g0-1758 Apr 29, 2025
1f08aa9
nit
Sh0g0-1758 Apr 29, 2025
ef765bf
fix ci for all-test
Sh0g0-1758 Apr 29, 2025
7b3dc49
nit
Sh0g0-1758 Apr 29, 2025
5e2962d
add make rule for testp
Sh0g0-1758 Apr 30, 2025
71c7c85
generalize run
Sh0g0-1758 Apr 30, 2025
89c9f10
fix Makefile to be compatible with buildold
Sh0g0-1758 Apr 30, 2025
66aa427
Merge branch 'main' into allGather
Sh0g0-1758 Apr 30, 2025
730bbf5
fix default initialization
Sh0g0-1758 Apr 30, 2025
8966e3c
set randCounter only when we run flooding
Sh0g0-1758 Apr 30, 2025
0e40968
fix ci for charm4py tests
Sh0g0-1758 Apr 30, 2025
0b8390a
fix_feat: add void * for args and char* for internal arrays
anant37289 May 1, 2025
b433c2f
fix: add nokeep to prevent memory leak
anant37289 May 1, 2025
fa4111b
update Makefile
Sh0g0-1758 May 1, 2025
774a4c1
simplify reduction
Sh0g0-1758 May 1, 2025
79a5f46
simplify reduction - 2
Sh0g0-1758 May 1, 2025
af9e30a
docs: update readme
anant37289 May 1, 2025
b12c215
fix: unreg to reg
anant37289 May 1, 2025
68c5178
test all allGather alogs
Sh0g0-1758 May 1, 2025
d3eac41
docs: remove bug warning from readme
anant37289 May 1, 2025
187d899
fix: remove early free of data
Sh0g0-1758 May 1, 2025
9cbbb28
pass vectors as C-style arrays
Sh0g0-1758 May 1, 2025
74d1ba9
put zero_copy_callback for src
Sh0g0-1758 May 1, 2025
a2c8945
fix: use std::copy
anant37289 May 1, 2025
66a010d
fix: use std::copy
anant37289 May 1, 2025
c9421f0
zero out state variables
Sh0g0-1758 May 1, 2025
ce82cde
rename files to be consistent with other files in charm++
Sh0g0-1758 May 1, 2025
299e849
fix for new file extensions
Sh0g0-1758 May 1, 2025
0dd68e8
fix for user tests
Sh0g0-1758 May 1, 2025
6b8a212
fix for main CMakeLists.txt
Sh0g0-1758 May 1, 2025
637a559
feat: add re-entrance test and remove hypercube store and index copies
anant37289 May 2, 2025
b9dd0fb
fmt
anant37289 May 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
./build charm4py netlrts-linux-x86_64 tcp -g -j4 --with-production
- name: build-charm4py
run: |
pip3 install setuptools cython cffi greenlet numpy torch torchvision
pip3 install setuptools cython cffi greenlet numpy torch torchvision matplotlib
git clone https://github.com/UIUC-PPL/charm4py
export PYTHONPATH="$PWD/charm4py"

Expand Down
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,11 @@ foreach(file ${ckloop-h-files})
configure_file(${file} ${CMAKE_BINARY_DIR}/include COPYONLY)
endforeach(file)

# allGather
configure_file(src/libs/ck-libs/allGather/allGather.h ${CMAKE_BINARY_DIR}/include COPYONLY)
add_library(moduleallGather src/libs/ck-libs/allGather/allGather.C src/libs/ck-libs/allGather/allGather.h)
add_dependencies(moduleallGather ck)

# NDMeshStreamer
file(GLOB moduleNDMeshStreamer-h-sources src/libs/ck-libs/NDMeshStreamer/*.h)
set(moduleNDMeshStreamer-cxx-sources src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.C)
Expand Down
1 change: 1 addition & 0 deletions examples/charm++/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-include ../../include/conv-mach-opt.mak

DIRS = \
allGather \
allToAll \
array_map \
ckcallback \
Expand Down
41 changes: 41 additions & 0 deletions examples/charm++/allGather/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
CHARMC = ../../../bin/charmc $(OPTS)
TYPE ?= RING

all: build

build:
$(CHARMC) ./user.ci
$(CHARMC) -o allGather -D$(TYPE) ./user.C -lmoduleallGather

test:
@$(CHARMC) ./user.ci
@echo "\033[1;34mRunning allGather with RING algorithm\033[0m"
@$(CHARMC) -o allGather -DRING ./user.C -lmoduleallGather
$(call run, +p4 ./allGather 4 29 7 )
@echo "\033[1;34mRunning allGather with HYPERCUBE algorithm\033[0m"
@$(CHARMC) -o allGather -DHYPERCUBE ./user.C -lmoduleallGather
$(call run, +p4 ./allGather 4 29 7 )
@echo "\033[1;34mRunning allGather with RECURSIVE-DOUBLING algorithm\033[0m"
$(call run, +p5 ./allGather 5 29 7 )
@echo "\033[1;34mRunning allGather with FLOODING algorithm\033[0m"
@$(CHARMC) -o allGather -DFLOODING ./user.C -lmoduleallGather
$(call run, +p4 ./allGather 4 29 7 )
@echo "\033[1;32mAll Tests completed!\033[0m"

testp:
@$(CHARMC) ./user.ci
@echo "\033[1;34mRunning allGather with RING algorithm\033[0m"
@$(CHARMC) -o allGather -DRING ./user.C -lmoduleallGather
$(call run, +p$(P) ./allGather $(P) 29 7 )
@echo "\033[1;34mRunning allGather with HYPERCUBE algorithm\033[0m"
@$(CHARMC) -o allGather -DHYPERCUBE ./user.C -lmoduleallGather
$(call run, +p$(P) ./allGather $(P) 29 7 )
@echo "\033[1;34mRunning allGather with FLOODING algorithm\033[0m"
@$(CHARMC) -o allGather -DFLOODING ./user.C -lmoduleallGather
$(call run, +p$(P) ./allGather $(P) 29 7 )
@echo "\033[1;32mAll Tests completed!\033[0m"

clean:
rm -f *.decl.h *.def.h *.o charmrun allGather

.phony : all build test clean
120 changes: 120 additions & 0 deletions examples/charm++/allGather/user.C
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include "user.h"

start::start(CkArgMsg* msg)
{
if (msg->argc != 4)
{
ckout << "Usage: " << msg->argv[0]
<< " <chare_array_size> <num_data_points_per_chare_array_element> "
"<num_bits_for_data_points>"
<< endl;
CkExit();
}

int n = atoi(msg->argv[1]);
int k = atoi(msg->argv[2]);
int d = atoi(msg->argv[3]);
delete msg;

sim = CProxy_simBox::ckNew(thisProxy, k, n, d, n);

#ifdef FLOODING
AllGather = CProxy_AllGather::ckNew(k * sizeof(long int),
(int)allGatherType::ALL_GATHER_FLOODING, 0);
#endif

#ifdef HYPERCUBE
AllGather = CProxy_AllGather::ckNew(k * sizeof(long int),
(int)allGatherType::ALL_GATHER_HYPERCUBE, 0);
#endif

#ifdef RING
AllGather = CProxy_AllGather::ckNew(k * sizeof(long int),
(int)allGatherType::ALL_GATHER_RING, 0);
#endif

sim.begin(AllGather);
}

void start::fini()
{
if (numIter == 1)
{
ckout << "[STATUS] Completed the AllGather Simulation" << endl;
CkExit();
}
else
{
numIter++;
ckout << "[STATUS] Completed the AllGather Simulation, starting again" << endl;
sim.begin(AllGather);
}
}

simBox::simBox(CProxy_start startProxy, int k, int n, int d)
: startProxy(startProxy), k(k), n(n), d(d)
{
}

void simBox::begin(CProxy_AllGather AllGatherGroup)
{
result = (long int*)CkRdmaAlloc(k * n * sizeof(long int));
data = (long int*)CkRdmaAlloc(k * sizeof(long int));
long int max_serial = (1 << d) - 1;
long int base = thisIndex;
while (max_serial > 0)
{
base = base * 10;
max_serial = max_serial / 10;
}
for (int i = 0; i < k; i++)
{
data[i] = base + i;
}
CkCallback cb(CkIndex_simBox::done(NULL), CkArrayIndex1D(thisIndex), thisProxy);
AllGather* libptr = AllGatherGroup.ckLocalBranch();
libptr->init((void*)result, (void*)data, thisIndex, cb);
}

void simBox::done(allGatherMsg* msg)
{
bool success = true;
for (int i = 0; i < n; i++)
{
long int max_serial = (1 << d) - 1;
long int base = i;
while (max_serial > 0)
{
base = base * 10;
max_serial = max_serial / 10;
}
for (int j = 0; j < k; j++)
{
if (result[i * k + j] != base + j)
{
success = false;
break;
}
}
if (!success)
break;
}

if (success)
ckout << "[STATUS] Correct result for Chare " << thisIndex << endl;
else
{
ckout << "[STATUS] Incorrect result for Chare " << thisIndex << endl;
for (int i = 0; i < n * k; i++)
{
ckout << result[i] << " ";
}
ckout << endl;
}
CkRdmaFree(result);
CkRdmaFree(data);
CkCallback cbfini(CkReductionTarget(start, fini), startProxy);
contribute(cbfini);
}

#include "user.def.h"
17 changes: 17 additions & 0 deletions examples/charm++/allGather/user.ci
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
mainmodule user
{
extern module allGather;

mainchare start
{
entry start(CkArgMsg * m);
entry[reductiontarget] void fini();
};

array[1D] simBox
{
entry simBox(CProxy_start startProxy, int k, int n, int d);
entry void begin(CProxy_AllGather allGatherProxy);
entry[nokeep] void done(allGatherMsg * m);
}
};
33 changes: 33 additions & 0 deletions examples/charm++/allGather/user.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include "allGather.h"

#include "user.decl.h"
class start : public CBase_start
{
private:
CProxy_simBox sim;
CProxy_AllGather AllGather;
int numIter = 0;

public:
start(CkArgMsg* msg);

void fini();
};

class simBox : public CBase_simBox
{
private:
CProxy_start startProxy;
int k;
int n;
int d;
long int* data;
long int* result;

public:
simBox(CProxy_start startProxy, int k, int n, int d);

void begin(CProxy_AllGather AllGather);

void done(allGatherMsg* msg);
};
3 changes: 2 additions & 1 deletion src/libs/ck-libs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ CHARMINC=.

SIMPLE_DIRS = completion cache sparseContiguousReducer tcharm ampi idxl \
multiphaseSharedArrays io \
collide mblock barrier irecv liveViz \
collide mblock allGather barrier irecv liveViz \
taskGraph search MeshStreamer NDMeshStreamer pose \
state_space_searchengine
DIRS = $(SIMPLE_DIRS) pythonCCS
Expand All @@ -24,6 +24,7 @@ ParFUM: idxl ampi collide multiphaseSharedArrays
ParFUM-Tops: ParFUM
collide: tcharm
mblock: tcharm
allGather:
barrier:
irecv:
liveViz:
Expand Down
26 changes: 26 additions & 0 deletions src/libs/ck-libs/allGather/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
CDIR=../../../..
CHARMC=$(CDIR)/bin/charmc $(OPTS)
DEST=$(CDIR)/lib/libmoduleallGather.a

all: $(DEST) $(CDIR)/include/allGather.h $(CDIR)/include/allGather.decl.h $(CDIR)/include/allGather.def.h

$(DEST): allGather.o
$(CHARMC) -o $(DEST) allGather.o

allGather.o: allGather.C allGather.h allGather.decl.h allGather.def.h
$(CHARMC) -c allGather.C

allGather.decl.h allGather.def.h: allGather.ci
$(CHARMC) allGather.ci

$(CDIR)/include/allGather.decl.h: allGather.decl.h
/bin/cp allGather.decl.h $(CDIR)/include

$(CDIR)/include/allGather.h: allGather.h
/bin/cp allGather.h $(CDIR)/include

$(CDIR)/include/allGather.def.h: allGather.def.h
/bin/cp allGather.def.h $(CDIR)/include

clean:
rm -f *.decl.h *.def.h *.o $(DEST)
62 changes: 62 additions & 0 deletions src/libs/ck-libs/allGather/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# collectiveSim

A library for implementing collectives commonly used in machine learning tasks including allGather and allReduce.

## allGather

allGather lets you gather data distributed accross different chare array/group elements. The library provides 3 algorithms for doing the allGather operations, namely ring, hypercube and flooding.

### How to use

The library is available in a default charm++ build and to use allGather, you simply have to include the header `allGather.h` and link against the library using the flag `-lmoduleallGather`.

After that you need to declare allGather as an extern module in your `.ci` file and create an AllGather group object.

```C++
AllGather = CProxy_AllGather::ckNew(k, (int)allGatherType::ALL_GATHER_RING, seed);
```

Here, k refers to the number of data elements present in each chare array element(assuming the gather is happening among chare array elements, it can also be done among group elements) and the second parameter lets you choose the algorithm you want to run. The algorithms are:

```C++
enum allGatherType {
ALL_GATHER_RING,
ALL_GATHER_HYPERCUBE,
ALL_GATHER_FLOODING
};
```
The third argument is a random seed which is set to pid if zero is passed otherwise the passed value is used. Note that this is used only when `flooding` algorithm is run.

You must also declare a callback to a function which the library can return to after its done and it must only take a pointer to `allGatherMsg` as its argument. To start operations, each chare array element must make a local pointer to the library chare group element on the same PE as it.

```C++
AllGather *libptr = AllGatherGroup.ckLocalBranch();
libptr->init(result, data, thisIndex, cb);
```
The parameters for the `init` function are:

- `result`: A pointer (void *) to where the allGather operation results will be stored. This must be allocated with enough space to hold n * k elements, where n is the (number of participants in allGather).
- `data`: The per-chare array element data pointer (void *) that will be contributed to the allGather operation.
- `thisIndex`: The index value used to determine the order of the gathered data.
- `cb`: The callback that will be invoked when the allGather operation completes.

You can customize the gathering order by modifying the `thisIndex` parameter. For example:
```C++
libptr->init(result, data, n - thisIndex, cb);
```
This would gather the data in the reverse order of array indices.

Once the library is done, it will send an empty message (a kick if you will) telling the user that the result is now available in the destination that the user specified earlier.

#### Notes on Implementation
Each group element is a representative of one of the participants of the allGather. We use zero copy api so we do not tranfer the data as it is. We first send zero copy buffers and the one's getting them can decide whether to get some data or not. This is signifincant in the `ALL_GATHER_FLOODING` algorithm.

In `ALL_GATHER_RING` the data for all the groups, starting from the originating group elements, gets forwarded in a ring(each element getting the data from lower `PE` group element and passing it to higher `PE` group element).

In `ALL_GATHER_HYPERCUBE`, when `n` is a power of 2, the group elements assume a hypercube connectivity and use the standard [hypercube communication pattern](https://en.wikipedia.org/wiki/Hypercube_(communication_pattern)) .`ALL_GATHER_HYPERCUBE` switches to recursive doubling when n is not a power of 2 as described in [this paper](https://ieeexplore.ieee.org/abstract/document/342126?casa_token=vuF8Rhhm2f4AAAAA:TBigoTv8ge_lz8Bqt7wF0jWnyVrEXfPBL7cQGsWgnsXVZqEx3pFgtputZ8lvNma9pHjKAnR_pck5).

In `ALL_GATHER_FLOODING`, we make a sparse graph over the group elements to specify the connectivity. The communication is done by each group element getting data from it's neighbours, keeping it and forwarding/flooding it to all it's neighbours () in case it has not already seen it and discard it otherwise.

### Notes
- Currently only gathering equal sized data is supported.
- The number of PEs needs to be the same as `n`(the participants in all gather).
Loading