Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CkIO Example #3649

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions examples/charm++/ckio/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-include ./home/ec2-user/charm-project/charm/examples/common.mk
CHARMC=/home/ec2-user/charm-project/charm/bin/charmc $(OPTS)
Comment on lines +1 to +2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be relative paths (see the Makefiles of other examples).


all: iotest

iotest: iotest.ci iotest.C
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The files and executable should probably be called something other than iotest since that's already the name of the test program in tests/.

$(CHARMC) iotest.ci
$(CHARMC) iotest.C -o $@ -module CkIO

test: iotest
$(call run, ./iotest +p4 4 )

testp: iotest
$(call run, ./iotest +p$(P) $(P) )

smptest: iotest
$(call run, ./iotest 4 +p2 ++ppn 2)
$(call run, ./iotest 4 +p4 ++ppn 2)

clean:
rm -f *.o *.decl.h *.def.h iotest test*

57 changes: 57 additions & 0 deletions examples/charm++/ckio/iotest.C
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#include "iotest.decl.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run this and the .ci file through some kind of formatter (I recommend clang-format, we already have a format file in the root directory of the repo). Also, remove the extra newlines if the formatter doesn't take them out (e.g. line 35 below).

#include <vector>

class Main : public CBase_Main{
Main_SDAG_CODE;
std::vector<Ck::IO::File> _files; // holds all of the CkIO File objects
int _num_writers;
int _num_iterations;
CProxy_Writer writers;
public:
Main(CkArgMsg* msg){
// make sure the example program is being used correctly
if(msg -> argc != 3){
ckout << "Usage: ./<program_name> <number_writers> <number_of_files>" << endl;
CkExit();
}
_num_writers = atoi(msg -> argv[1]); // assign the number of writers
_num_iterations = atoi(msg -> argv[2]); // assign the number of files to write
int num_files = _num_iterations; // save the number of files
_files.resize(_num_iterations);
for(int i = 0; i < num_files; ++i){
Comment on lines +18 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_files and _num_iterations look like they store the same value and are used for the same thing, I'd remove one of them.

thisProxy.startWritingCycle(i); // start writing to the file numbered i
}
delete msg;

}
// standard bookkeeping for how many iterations we need to go through
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// standard bookkeeping for how many iterations we need to go through
// Exit after every file has finished being written

void decrementRemaining(){
_num_iterations--;
if(!_num_iterations){
ckout << "Successfully completed parallel output!" << endl;
CkExit();
}
}

};


class Writer : public CBase_Writer {

public:
/**
* Takes in a Session object to the current writing session. The constructor
* will actually write data to the file in the incoming_session object.
*/
Writer(Ck::IO::Session incoming_session){
char out[11]; // 10 bytes for the message, 1 for the nullbyte
sprintf(out, "Writer[%d]\n", thisIndex);
Ck::IO::write(incoming_session, out, 10, 10*thisIndex); // writing 10 bytes starting at 10*thisIndex from the beginning of the file
}

Writer(CkMigrateMessage* m){

}
};

#include "iotest.def.h"
89 changes: 89 additions & 0 deletions examples/charm++/ckio/iotest.ci
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
mainmodule iotest {
include "ckio.h"; // includes the necessary functions for CkIO
mainchare Main {
entry Main(CkArgMsg* m);
entry void ready(Ck::IO::FileReadyMsg* msg);
entry void startWrite(Ck::IO::SessionReadyMsg* msg);
entry void postWrite(CkReductionMsg* msg);
entry void close(CkReductionMsg* msg);
entry void decrementRemaining();

entry void startWritingCycle(int file_number){
serial {
/**
Set the Options struct so that CkIO opens the file and sets the correct configuration for writing.
The writeStripe parameter determines the amount of contiguous bytes each writer chare will write to file.
The peStripe parameters determines how much actual data each writer chare will aggregate at a given time. This is used so that
a bunch of tiny data gets distributed across many writing chares when it would be better to all go to a single chare.
It is required that peStripe >= writeStripe.
*/
Comment on lines +13 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to be a bit more concise in these descriptions of what the code is doing so the key points don't get lost too much in text.

Suggested change
/**
Set the Options struct so that CkIO opens the file and sets the correct configuration for writing.
The writeStripe parameter determines the amount of contiguous bytes each writer chare will write to file.
The peStripe parameters determines how much actual data each writer chare will aggregate at a given time. This is used so that
a bunch of tiny data gets distributed across many writing chares when it would be better to all go to a single chare.
It is required that peStripe >= writeStripe.
*/
/**
Begin the write procedure by opening the file via providing CkIO the filename and I/O options.
*/

ckout << "starting the writing cycle for " << file_number << endl;
Ck::IO::Options opts; // struct containing the options for the writer
opts.writeStripe = 1024; // collect up to 1kB of data before writing; use the specific number you'd like or it defaults to 4MB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this PR is old and probably just collecting dust on a shelf, but in terms of exemplifying CkIO usage, the values should probably be in the range of megabytes, to be comparable in scale to typical Lustre stripe sizes. The original design was that they should be equal to (a multiple of) the FS stripe size, to limit the contention on storage servers and the individual stripes.

opts.peStripe = 4 * opts.writeStripe; // the amount of data that is aggregated by each "write chare"
Comment on lines +21 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ck::IO::Options opts; // struct containing the options for the writer
opts.writeStripe = 1024; // collect up to 1kB of data before writing; use the specific number you'd like or it defaults to 4MB
opts.peStripe = 4 * opts.writeStripe; // the amount of data that is aggregated by each "write chare"
Ck::IO::Options opts;
// Write to disk in contiguous chunks of 1024 bytes
opts.writeStripe = 1024;
// Each intermediate CkIO chare aggregates for contiguous 4096 bytes chunks of the file
opts.peStripe = 4 * opts.writeStripe;

CkCallback open_cb(CkIndex_Main::ready(NULL), thisProxy); // index the function for the callback to use
char name[20]; // buffer for the file name
sprintf(name ,"file_%d", file_number);
open_cb.setRefNum(file_number); // set the reference number of callback and function to the file_number
ckout << "about to enter the open function of CkIO in " << file_number << endl;

// open the file file_<file_number>, pass a FileReadyMsg* to the open_cb calback function ready, and also pass the options struct for the IO
Ck::IO::open(name, open_cb, opts);
}
/**
ready is the function that is called after opening the file by the open_cb callback in line 24.
Uses the FileReadyMsg* msg passed to it by Ck::IO::open in order to get the file represented by (msg -> file).
ready will create a callback start_session that will be invoked by the startSession function in the beginning.
After the start_session callback is invoked and the write is done, the end_session callback will be invoked.
Note that the commit_message variable will be written only after the start_session callback is completed. Having a
commit message is optional.
*/
when ready[file_number](Ck::IO::FileReadyMsg* msg) serial{
ckout << "ready function for file[" << file_number << "]." << endl;
_files[file_number] = msg -> file; // set the file opened by the Ck::IO to the index of file_number
CkCallback start_session(CkIndex_Main::startWrite(0), thisProxy); // create the callback to be used when you start session
start_session.setRefNum(file_number);

CkCallback end_session(CkIndex_Main::postWrite(0), thisProxy); // callback to be used when you close the session
end_session.setRefNum(file_number); // invoked at the end of the session, or on completion of the data being written

std::string commit_message = "Commit message\n"; // the message that gets committed at the end of the batched write; commits are an optional argument
Ck::IO::startSession(_files[file_number], 10*_num_writers, 0, start_session, commit_message.c_str(), commit_message.size(), 10 * _num_writers, end_session); // start the writing session
}
/**
This function actually does the writing to the files. It creates a chare array of Writer chares, whose constructor
will do the writing, as defined in iotest.C.
*/
when startWrite[file_number](Ck::IO::SessionReadyMsg* msg) serial{
writers = CProxy_Writer::ckNew(msg -> session, _num_writers); // create n writers, and pass all of them the the session
ckout << "Finished writing\n";
delete msg; // it's the user's responsibility to free the SessionMsg*
}

// this function is called after the session has written the amount of bytes specified
// by the end_session callback. Will also create a close callback, which will be invoked
// after the call to Ck::IO::close and CkIO closes the specified file
when postWrite[file_number](CkReductionMsg* msg) serial{
ckout << "This session has written the amount of bytes\n";
delete msg;
// Time to close the file
CkCallback close_cb(CkIndex_Main::close(0), thisProxy); // create the callback after the file closed
close_cb.setRefNum(file_number); // tag the callback
Ck::IO::close(_files[file_number], close_cb); // close the file
}
// executed after CkIO closes the file via the close_cb on line 70
when close[file_number](CkReductionMsg* msg) serial{ // only called after the file has been closed
ckout << "File " << file_number << " has succesfully been closed!" << endl;
delete msg;
thisProxy.decrementRemaining(); // called to tell the Mainchare another file has been opened and closed successfully
}
}

}

array [1D] Writer {
entry Writer(Ck::IO::Session incoming_session); // constructor for the writer; stores the Ck::IO::Session token


}
}