Skip to content

HdfsManager

Yuzhen Huang edited this page Nov 9, 2017 · 5 revisions

HDFSManager

HdfsManager is a separate module for data loading in FlexPS. To use the HDFSManager, users need to provide the nodes information, some related configuration information and a lambda function to parse each line in the HDFS files.

API

1. Include

Include file io/hdfs_manager.h.

2. Create a configuration file

The HDFSManager::Config definition.

struct Config {
  std::string master_host;
  int master_port;
  std::string worker_host;
  int worker_port;
  std::string hdfs_namenode;
  int hdfs_namenode_port;
  std::string input;
  int num_local_load_thread;
}; 

Note that all these variables in Config needs to be the same in different process, including num_local_load_thread. This means that currently it does not support to use different number of threads in different workers to do the data loading.

3. Construct an HDFSManager object

The constructor is as follow:

HDFSManager(Node node, const std::vector<Node>& nodes, const Config& config, zmq::context_t* zmq_context)
  • node: Info for the current node.
  • nodes: Info for all nodes in this distributed program.
  • config: The config info.
  • zmq_context: zmq_context shared in the program.

4. Pass in the lambda

The flow is very simple: firstly you need to start the hdfs_manager and then you need to pass a lambda function to run, the format of lambda function is like

[](HDFSManager::InputFormat* input_format, int local_tid) { ... }

the input_format is a data structure to load data with HasRecord() and GetNextRecord() functions. A normal usage is:

while (input_format->HasRecord()) {
  auto record = input_format->GetNextRecord();
}

The return type of GetNextRecord() is a boost::string_ref.

4. Stop

Finally, you need to stop the hdfs_manager by calling the Stop function.

Example

Here is a complete example.

  HDFSManager::Config config;
  config.worker_host = proj10;
  config.worker_port = 45123;
  config.master_host = proj10;
  config.master_port = 19717;
  config.hdfs_namenode = proj10;
  config.hdfs_namenode_port = 9000;
  config.input = "hdfs://classfication/a9";
  config.num_local_load_thread = 2;

  zmq::context_t* zmq_context = new zmq::context_t(1);
  HDFSManager hdfs_manager(my_node, nodes, config, zmq_context);
  hdfs_manager.Start();
  hdfs_manager.Run([my_node](HDFSManager::InputFormat* input_format, int local_tid) {
    int count = 0;
    while (input_format->HasRecord()) {
      auto record = input_format->GetNextRecord();
      count++;
    }
    LOG(INFO) << count << " lines in (node, thread):(" 
      << my_node.id << "," << local_tid << ")";
  });
  hdfs_manager.Stop();
Clone this wiki locally