Skip to content

Commit

Permalink
put Dan's lmdb change into dcct
Browse files Browse the repository at this point in the history
  • Loading branch information
shadjis committed May 6, 2016
1 parent 7354e67 commit f4eebc2
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 113 deletions.
22 changes: 8 additions & 14 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,12 @@
# Now, use_1_gpu and use_4_gpu will be IGNORED for FC, and applied only to conv. The params
# below take precedence for FC.
num_fcc_per_machine = 1
# If CPU machine only, just set this to 0 and it will be ignored. We could also just use
# the input command line argument (CPU, GPU, 4GPU) to set this, but maybe the FCC machine
# is different from the other machines, etc.
# If CPU machines only, just set this to 0 and it will be ignored. Recall this parameter
# only applies to multiple FCC. It might make sense to just set this to be whatever the
# conv machines will use (e.g. CPU, GPU, 4GPU) because the conv machines will probably
# be idle while the FCC is executing, i.e. we can map the FCC to those machines, unless
# there is pipelining. For now this makes it more generic because e.g. each FCC can use
# 1 GPU so we can map 4 to 1 4GPU machine, etc.
num_gpu_per_fcc_machine = 0


Expand Down Expand Up @@ -225,7 +228,6 @@
bind = "tcp://*:5555";
type = "ConvModelServer";
solver_file = "protos/solver.conv_model_server.prototxt";
data_binary = "protos/dummy.bin"; // Empty file (no file needed, but prevents automatic binary creation)
output_model_file = "conv_model.bin";
conv compute server (running on node019)
Expand All @@ -234,14 +236,12 @@
fc_bind = "tcp://master:5556";
type = "ConvComputeServer";
solver_file = "protos/solver.conv_compute_server.prototxt";
data_binary = "/home/software/dcct/8_train_NEW.bin";
fc server (running on master)
name = "Example FCComputeModelServer on 5556";
bind = "tcp://*:5556";
type = "FCComputeModelServer";
solver_file = "protos/solver.fc_model_server.prototxt";
data_binary = "protos/dummy.bin"; // Empty file (no file needed, but prevents automatic binary creation)
output_model_file = "fc_model.bin";
"""
Expand Down Expand Up @@ -274,7 +274,7 @@
# then it is clear how machines should be assigned. The scheduler will then eventually
# generate this file (e.g. a JSON format which specifies each server, its machine,
# its GPUs, etc.)
print 'Usage: >>> python run.py path/to/solver.prototxt path/to/machine_list.txt machines_per_batch CPU|1GPU|4GPU single_fc|many_fc (map_fcc_to_cc=)1|0'
print 'Usage: >>> python run.py path/to/solver.prototxt path/to/machine_list.txt machines_per_batch CPU|1GPU|4GPU single_fc|many_fc (map_fcc_to_cc=)1|0 base_dir'
sys.exit(0)

# Check that the distributed cct binary exists before running this script
Expand Down Expand Up @@ -914,8 +914,6 @@
# ------------------------------------------------------------------------------
input_file_dir = base_dir + 'server_input_files-' + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") # SHADJIS TODO: base_dir can include this later
os.system('mkdir -p ' + input_file_dir)
dummy_file = input_file_dir + '/dummy.bin'
os.system('touch ' + dummy_file)
print 'Writing input prototxt files to ' + input_file_dir + '/'

# First create the solver files for the model servers
Expand Down Expand Up @@ -1098,7 +1096,7 @@ def open_new_write_lmdb_helper(new_lmdb_name, num_imgs, map_size):

# This requires calling a utility which loads the network and prints the size
# of the output of the final conv layer.
util_output_str = subprocess.check_output(['./tools/size_util/size_util', conv_model_server_solver_file, dummy_file])
util_output_str = subprocess.check_output(['./tools/size_util/size_util', conv_model_server_solver_file])
num_fc_inputs = int(util_output_str.strip().split("\n")[-1].strip())

# Now create a new LMDB with 1 datum that contains the right size
Expand Down Expand Up @@ -1148,7 +1146,6 @@ def open_new_write_lmdb_helper(new_lmdb_name, num_imgs, map_size):
f.write('''name = "FCComputeModelServer on tcp://''' + fc_server_machine + '''";
type = "FCComputeModelServer";
solver = "''' + fc_server_solver_file + '''";
train_bin = "''' + dummy_file + '''";
group_size = ''' + str(machines_per_batch) + ''';
ports = (
Expand All @@ -1174,7 +1171,6 @@ def open_new_write_lmdb_helper(new_lmdb_name, num_imgs, map_size):
f.write('''name = "FCModelServer on tcp://''' + fc_model_server_machine + '''";
type = "FCModelServer";
solver = "''' + fcm_server_solver_file + '''";
train_bin = "''' + dummy_file + '''";
group_size = 1;
ports = (
Expand All @@ -1199,7 +1195,6 @@ def open_new_write_lmdb_helper(new_lmdb_name, num_imgs, map_size):
f.write('''name = "ConvModelServer on tcp://''' + conv_model_server_machine + '''";
type = "ConvModelServer";
solver = "''' + conv_model_server_solver_file + '''";
train_bin = "''' + dummy_file + '''";
group_size = ''' + str(machines_per_batch) + ''';
ports = (
Expand Down Expand Up @@ -1274,7 +1269,6 @@ def open_new_write_lmdb_helper(new_lmdb_name, num_imgs, map_size):
fc_send_bind = "tcp://''' + fc_model_server_machine + ''':''' + str(fcm_ports[2*i ]) + '''";
type = "FCComputeServer";
solver = "''' + fc_compute_server_solver_files[i] + '''";
train_bin = "''' + dummy_file + '''";
group_size = ''' + str(machines_per_batch) + ''';
rank_in_group = 0;
''')
Expand Down
20 changes: 5 additions & 15 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ Server * initConvModelServer(Config & cfg, char * filename){

string NAME = cfg.lookup("name");
string SOLVER = cfg.lookup("solver");
string TRAIN_BIN = cfg.lookup("train_bin");
int GROUPSIZE = cfg.lookup("group_size");

// Now parse each group of ports
Expand All @@ -42,7 +41,6 @@ Server * initConvModelServer(Config & cfg, char * filename){
LOG(INFO) << "NUM GROUPS = " << num_groups << std::endl;
LOG(INFO) << "NAME = " << NAME << std::endl;
LOG(INFO) << "SOLVER = " << SOLVER << std::endl;
LOG(INFO) << "TRAIN_BIN = " << TRAIN_BIN << std::endl;

// Read the ports
std::vector <string> broadcast_ports;
Expand All @@ -59,7 +57,7 @@ Server * initConvModelServer(Config & cfg, char * filename){
listen_ports.push_back(listen);
}

Server * s = new ConvModelServer(NAME, SOLVER, TRAIN_BIN, GROUPSIZE,
Server * s = new ConvModelServer(NAME, SOLVER, GROUPSIZE,
broadcast_ports, listen_ports);
return s;
}
Expand All @@ -71,7 +69,6 @@ Server * initFCComputeModelServer(Config & cfg, char * filename){

string NAME = cfg.lookup("name");
string SOLVER = cfg.lookup("solver");
string TRAIN_BIN = cfg.lookup("train_bin");
int GROUPSIZE = cfg.lookup("group_size");

// Now parse each group of ports
Expand All @@ -84,7 +81,6 @@ Server * initFCComputeModelServer(Config & cfg, char * filename){
LOG(INFO) << "NUM GROUPS = " << num_groups << std::endl;
LOG(INFO) << "NAME = " << NAME << std::endl;
LOG(INFO) << "SOLVER = " << SOLVER << std::endl;
LOG(INFO) << "TRAIN_BIN = " << TRAIN_BIN << std::endl;

// Read the ports
std::vector <string> broadcast_ports;
Expand All @@ -101,7 +97,7 @@ Server * initFCComputeModelServer(Config & cfg, char * filename){
listen_ports.push_back(listen);
}

Server * s = new FCComputeModelServer(NAME, SOLVER, TRAIN_BIN, GROUPSIZE,
Server * s = new FCComputeModelServer(NAME, SOLVER, GROUPSIZE,
broadcast_ports, listen_ports);
return s;
}
Expand All @@ -117,7 +113,6 @@ Server * initFCComputeServer(Config & cfg, char * filename){
string CONV_LISTEN_BIND = cfg.lookup("conv_listen_bind");
string CONV_SEND_BIND = cfg.lookup("conv_send_bind");
string SOLVER = cfg.lookup("solver");
string TRAIN_BIN = cfg.lookup("train_bin");
int GROUPSIZE = cfg.lookup("group_size");
int RANK_IN_GROUP= cfg.lookup("rank_in_group");

Expand All @@ -127,12 +122,11 @@ Server * initFCComputeServer(Config & cfg, char * filename){
LOG(INFO) << "CONV_LISTEN_BIND = " << CONV_LISTEN_BIND << std::endl;
LOG(INFO) << "CONV_SEND_BIND = " << CONV_SEND_BIND << std::endl;
LOG(INFO) << "SOLVER = " << SOLVER << std::endl;
LOG(INFO) << "TRAIN_BIN = " << TRAIN_BIN << std::endl;
LOG(INFO) << "GROUPSIZE = " << GROUPSIZE << std::endl;
LOG(INFO) << "RANK = " << RANK_IN_GROUP << std::endl;

Server * s = new FCComputeServer(NAME, FC_LISTEN_BIND, FC_SEND_BIND,
CONV_LISTEN_BIND, CONV_SEND_BIND, SOLVER, TRAIN_BIN, GROUPSIZE, RANK_IN_GROUP);
CONV_LISTEN_BIND, CONV_SEND_BIND, SOLVER, GROUPSIZE, RANK_IN_GROUP);
return s;
}

Expand All @@ -143,7 +137,6 @@ Server * initFCModelServer(Config & cfg, char * filename){

string NAME = cfg.lookup("name");
string SOLVER = cfg.lookup("solver");
string TRAIN_BIN = cfg.lookup("train_bin");
int GROUPSIZE = cfg.lookup("group_size");

// Now parse each group of ports
Expand All @@ -156,7 +149,6 @@ Server * initFCModelServer(Config & cfg, char * filename){
LOG(INFO) << "NUM GROUPS = " << num_groups << std::endl;
LOG(INFO) << "NAME = " << NAME << std::endl;
LOG(INFO) << "SOLVER = " << SOLVER << std::endl;
LOG(INFO) << "TRAIN_BIN = " << TRAIN_BIN << std::endl;

// Read the ports
std::vector <string> broadcast_ports;
Expand All @@ -173,7 +165,7 @@ Server * initFCModelServer(Config & cfg, char * filename){
listen_ports.push_back(listen);
}

Server * s = new FCModelServer(NAME, SOLVER, TRAIN_BIN, GROUPSIZE,
Server * s = new FCModelServer(NAME, SOLVER, GROUPSIZE,
broadcast_ports, listen_ports);
return s;
}
Expand All @@ -189,7 +181,6 @@ Server * initConvComputeServer(Config & cfg, char * filename){
string FC_LISTEN_BIND = cfg.lookup("fc_listen_bind");
string FC_SEND_BIND = cfg.lookup("fc_send_bind");
string SOLVER = cfg.lookup("solver");
string TRAIN_BIN = cfg.lookup("train_bin");

int GROUPSIZE = cfg.lookup("group_size");
int RANK_IN_GROUP= cfg.lookup("rank_in_group");
Expand All @@ -200,13 +191,12 @@ Server * initConvComputeServer(Config & cfg, char * filename){
LOG(INFO) << "FC_LISTEN_BIND = " << FC_LISTEN_BIND << std::endl;
LOG(INFO) << "FC_SEND_BIND = " << FC_SEND_BIND << std::endl;
LOG(INFO) << "SOLVER = " << SOLVER << std::endl;
LOG(INFO) << "TRAIN_BIN = " << TRAIN_BIN << std::endl;

LOG(INFO) << "GROUPSIZE = " << GROUPSIZE << std::endl;
LOG(INFO) << "RANK = " << RANK_IN_GROUP << std::endl;

Server * s = new ConvComputeServer(NAME, CONV_LISTEN_BIND, CONV_SEND_BIND, FC_LISTEN_BIND,
FC_SEND_BIND, SOLVER, TRAIN_BIN, GROUPSIZE, RANK_IN_GROUP);
FC_SEND_BIND, SOLVER, GROUPSIZE, RANK_IN_GROUP);
return s;
}

Expand Down
92 changes: 27 additions & 65 deletions src/server/ConvComputeServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class ConvComputeServer : public Server{
// SHADJIS TODO: These 3 should be taken from the parent class with using
std::string name;
std::string solver_file;
std::string data_binary;

std::string conv_listen_bind;
std::string conv_send_bind;
Expand All @@ -39,9 +38,9 @@ class ConvComputeServer : public Server{
int nfloats_output_data;

ConvComputeServer(string _name, std::string _conv_listen_bind, std::string _conv_send_bind,
std::string _fc_listen_bind, std::string _fc_send_bind, std::string _solver_file, std::string _data_binary,
std::string _fc_listen_bind, std::string _fc_send_bind, std::string _solver_file,
int _groupsize, int _rank_in_group) :
name(_name), solver_file(_solver_file), data_binary(_data_binary),
name(_name), solver_file(_solver_file),
conv_listen_bind(_conv_listen_bind), conv_send_bind(_conv_send_bind),
fc_listen_bind(_fc_listen_bind), fc_send_bind(_fc_send_bind),
group_size(_groupsize), rank_in_group(_rank_in_group),
Expand Down Expand Up @@ -99,7 +98,7 @@ class ConvComputeServer : public Server{
// Read parameter files and construct network
// -------------------------------------------------------------------------
BridgeVector bridges; cnn::SolverParameter solver_param; cnn::NetParameter net_param;
Corpus * const corpus = DeepNet::load_network(solver_file.c_str(), data_binary.c_str(), solver_param, net_param, bridges, true);
Corpus * const corpus = DeepNet::load_network(solver_file.c_str(), solver_param, net_param, bridges, true);
// Modify all bridges to not update model gradients in backward pass (saves time)
for (auto bridge = bridges.begin(); bridge != bridges.end(); ++bridge) {
(*bridge)->set_update_model_gradients(false);
Expand All @@ -113,12 +112,10 @@ class ConvComputeServer : public Server{
// when a layer should not own its output cubes. E.g. we also would not want to if
// the output of the bridge never needs to get copied back to the host.

// Open the file for the first time during training
FILE * pFile = fopen (corpus->filename.c_str(), "rb");
if (!pFile)
throw std::runtime_error("Error opening the corpus file: " + corpus->filename);
// Keep track of the image number in the dataset we are on
size_t current_image_location_in_dataset = 0;
// It is necessary to open the reader before loading data to initialize
// cursor, transaction and environment data
corpus->OpenLmdbReader();

// size_t current_epoch = 0;

// -------------------------------------------------------------------------
Expand Down Expand Up @@ -220,64 +217,30 @@ class ConvComputeServer : public Server{
// -----------------------------------------------------------------------
VLOG(2) << "~~~~ ENTER STATE Read corpus" << std::endl;

// Read in the next mini-batch from file
size_t rs = fread(corpus->images->get_p_data(), sizeof(DataType_SFFloat), corpus->images->n_elements, pFile);

// If we read less than we expected, read the rest from the beginning
size_t num_floats_left_to_read = corpus->images->n_elements - rs;
if (num_floats_left_to_read > 0) {

// Close the file and re-open it
fclose(pFile);
pFile = fopen (corpus->filename.c_str(), "rb");
if (!pFile)
throw std::runtime_error("Error opening the corpus file: " + corpus->filename);

// Read the remaining data from the file, adjusting the pointer to where we
// read until previously as well as the amount to read
size_t rs2 = fread((float *) (corpus->images->get_p_data()) + rs, sizeof(DataType_SFFloat), num_floats_left_to_read, pFile);
assert(rs2 == num_floats_left_to_read);

// Also, we need to copy over the labels to the outgoing message buffer.
// The labels are all allocated in corpus->labels. Normally we just copy
// from the corpus labels cube data, but since the labels we want are partly
// at the end of that array and partly at the beginning, we have to do 2 copies

// Check if we actually read nothing (i.e. we were right at the end before)
// In this case, we only need one copy
if (rs == 0) {
assert(current_image_location_in_dataset == 0);
memcpy(outgoing_msg_send_data_and_ask_grad->content, corpus->labels->physical_get_RCDslice(0), sizeof(float) * corpus->mini_batch_size);
}
// Otherwise, we have to copy twice
else {
size_t num_images_from_end = corpus->n_images - current_image_location_in_dataset;
assert(num_images_from_end > 0);
assert(num_images_from_end < corpus->mini_batch_size);
size_t num_images_from_beginning = corpus->mini_batch_size - num_images_from_end;
memcpy(outgoing_msg_send_data_and_ask_grad->content,
corpus->labels->physical_get_RCDslice(current_image_location_in_dataset),
sizeof(float) * num_images_from_end);
memcpy(outgoing_msg_send_data_and_ask_grad->content + num_images_from_end,
corpus->labels->physical_get_RCDslice(0),
sizeof(float) * num_images_from_beginning);
}
// Read in the next mini-batch from db
size_t rs = corpus->LoadLmdbData();

// If we read less than we expected, read the rest from the beginning
size_t num_images_left_to_read = corpus->mini_batch_size - rs;
if (num_images_left_to_read > 0) {

// Simply reset the cursor so the next load will start from the start of the lmdb
corpus->ResetCursor();

// ++current_epoch;

// Passing in rs allows us to say that we already filled rs spots in images
// and now we want to start from that position and complete the set up to mini_batch_size
// Eg. Minibatch is 10. We read 2 images and hit the end of the mldb. After reseting the
// cursor above we can just tell the load function to start from index 2 and continue
size_t rs2 = corpus->LoadLmdbData(rs);
assert(rs2 == num_images_left_to_read);
}
// Otherwise we will read all of the labels from the corpus
else {
// Get labels for this mini batch
memcpy(outgoing_msg_send_data_and_ask_grad->content,
corpus->labels->physical_get_RCDslice(current_image_location_in_dataset),
sizeof(float) * corpus->mini_batch_size);
}

// Copy the labels to the outgoing message buffer.
memcpy(outgoing_msg_send_data_and_ask_grad->content,
corpus->labels->get_p_data(), sizeof(float) * corpus->mini_batch_size);

// Move forwards in the dataset
current_image_location_in_dataset += corpus->mini_batch_size;
if (current_image_location_in_dataset >= corpus->n_images) {
current_image_location_in_dataset -= corpus->n_images;
}
// This assertion isn't needed, it just checks my understanding of how we pass data
assert(bridges[0]->p_input_layer->p_data_cube->get_p_data() == corpus->images->physical_get_RCDslice(0));

Expand Down Expand Up @@ -501,7 +464,6 @@ class ConvComputeServer : public Server{
// Destroy network
// -------------------------------------------------------------------------
DeepNet::clean_up(bridges, corpus);
fclose(pFile);

}
};
Expand Down
7 changes: 3 additions & 4 deletions src/server/ConvModelServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class ConvModelServer : public Server{
// SHADJIS TODO: These 3 should be taken from the parent class with using
std::string name;
std::string solver_file;
std::string data_binary;

std::mutex hogwild_lock;
std::vector <std::string> broadcast_ports;
Expand All @@ -33,9 +32,9 @@ class ConvModelServer : public Server{
int nfloats; // For both model and gradient buffers
int group_size;

ConvModelServer(string _name, std::string _solver_file, std::string _data_binary,
ConvModelServer(string _name, std::string _solver_file,
int _groupsize, std::vector <std::string> _broadcast_ports, std::vector <std::string> _listen_ports) :
name(_name), solver_file(_solver_file), data_binary(_data_binary),
name(_name), solver_file(_solver_file),
broadcast_ports(_broadcast_ports), listen_ports(_listen_ports),
nfloats(0), group_size(_groupsize) {}

Expand All @@ -61,7 +60,7 @@ class ConvModelServer : public Server{
// for the size of the input layer which is stored in each datum.
std::string output_model_file = "conv_model.bin";
BridgeVector bridges; cnn::SolverParameter solver_param; cnn::NetParameter net_param;
Corpus * const corpus = DeepNet::load_network(solver_file.c_str(), data_binary.c_str(), solver_param, net_param, bridges, true);
Corpus * const corpus = DeepNet::load_network(solver_file.c_str(), solver_param, net_param, bridges, true);
// SHADJIS TODO: Corpus is unused but the param files are used. We can parse those files without having to read the corpus.

// Get the number of conv compute server groups
Expand Down
Loading

0 comments on commit f4eebc2

Please sign in to comment.