In [None]:
from cling import cling, bash

**Listing 15.1**

Caption: Definition of the plain action to compute the piece-wise evaluation of the Taylor series and compute the partial sum.

In [None]:
%%writefile taylor_action.hpp
#ifndef TAYLOR_ACTION_HPP

#define TAYLOR_ACTION_HPP

static double compute(data_client client, double x) { 
  data d = client.get_data().get();
  size_t size = d.size();
  double sum = 0;

  std::cout << d[size - 1] << std::endl;

  for (size_t i = 0; i < size; i++) {
    double e = d[i];
    sum += std::pow(-1.0, e + 1) * std::pow(x, e) / (e);
  }
  return sum;
}

#endif


**Listing 15.2**

Caption: Distributed implementation of the Taylor series for the natural logarithm using HPX.

In [None]:
%%writefile Listing_15_2.cpp
#include <cstdlib>
#include <hpx/hpx.hpp>
#include <hpx/hpx_main.hpp>
#include <iostream>

#include "taylor_data.hpp"
#include "taylor_component.hpp"
#include "taylor_action.hpp"
//-----------------------------------------------------------------

HPX_PLAIN_ACTION(compute, compute_action) 

int main(int args, char** argv) {
  int n = 100;
  double x = 0.25;

  std::vector<hpx::id_type> localities = hpx::find_all_localities(); 
  std::size_t num_partitions = localities.size();  
  size_t partition_size = std::round(n / num_partitions);  

  std::vector<data_client> parts(num_partitions);

  for (size_t i = 0; i < num_partitions; i++) {  
    size_t begin = 1;

    if (i > 0) begin = i * partition_size;

    if (i == num_partitions - 1)
      parts[i] = data_client(localities[i], n - (partition_size * i), begin);  
    else
      parts[i] = data_client(localities[i], partition_size, begin);
  }

  std::vector<hpx::future<double>> futures;  

  for (size_t i = 0; i < num_partitions; i++) { 
    futures.push_back(hpx::async<compute_action>(localities[i], parts[i], x));
  }

  double result = 0;   

  hpx::when_all(futures)
    .then([&](auto&& f) {
      auto futures = f.get();
      for (size_t i = 0; i < futures.size(); i++)
        result += futures[i].get();
    })
    .get();

  std::cout << "Difference of Taylor and C++ result "
            << result - std::log1p(x) << " after " << n << " iterations."
            << std::endl;  

  return EXIT_SUCCESS;
}


In [None]:
%%bash
hpxcxx -I . --exe=Listing_15_2.exe Listing_15_2.cpp
hpxrun.py -l 3 -t 1 ./Listing_15_2.exe

**Listing 15.3**

Caption: Definition of the data object and serialization.

In [None]:
%%writefile taylor_data.hpp
#ifndef TAYLOR_DATA_HPP

#define TAYLOR_DATA_HPP

typedef hpx::serialization::serialize_buffer<double> buffer;
struct data : public buffer {
  data(size_t size=0, double value=0.0) :
    buffer(std::allocator<double>().allocate(size), size, buffer::take)
  {
    for(size_t i=0;i < size; i++) (*this)[i] = value+i;
  }
};

#endif


**Listing 15.4**

Caption: Definition of the component client and component server.

In [None]:
%%writefile taylor_component.hpp
#ifndef TAYLOR_COMPONENT_HPP

#define TAYLOR_COMPONENT_HPP

struct data_server
    : hpx::components::component_base<data_server> {
  // Construct a new instance
  data_server() {}

  data_server(size_t size, double const value) : data_(size, value) {}

  // Access data
  data get_data() const { return data_; }

  HPX_DEFINE_COMPONENT_DIRECT_ACTION(data_server, get_data, get_data_action)

 private:
  data data_;
};

typedef hpx::components::component<data_server> data_server_type;
HPX_REGISTER_COMPONENT(data_server_type, data_server)

typedef data_server::get_data_action get_data_action;
HPX_REGISTER_ACTION(get_data_action)

// Component client

struct data_client
    : hpx::components::client_base<data_client, data_server> {
  typedef hpx::components::client_base<data_client, data_server> base_type;

  data_client() {}

  data_client(hpx::id_type where, size_t size, double init)
      : base_type(hpx::new_<data_server>(where, size, init)) {}

  hpx::future<data> get_data() const {
    data_server::get_data_action act;
    return hpx::async(act, get_id());
  }
};
#endif


**Listing 15.5**

Caption: Main function of distributed fractal set.

In [None]:
%%writefile Listing_15_5.cpp
#include <hpx/hpx.hpp>
#include <hpx/hpx_main.hpp>

#include <distributed_fractal_worker.hpp>
//-----------------------------------------------------------------

int main() {
    auto locs = hpx::find_remote_localities(); 

    // The supervisor
    auto supervisor = hpx::find_here(); 

    // Compute the number of worker localities
    const int num_workers = locs.size(); 

    // Make sure we have some workers
    assert(num_workers > 0);  

    // The logic below assumes more work than workers
    assert(num_workers < size_x && num_workers < size_y); 

    // Allocate the PBM image only for the supervisor
    p = PBM(size_y, size_x);

    start = std::chrono::high_resolution_clock::now(); 

    // Place to hold futures for current worker tasks
    std::vector<hpx::future<void>> data;  

    // Load up workers...
    for(int j=0;j<num_workers;j++) {  
        auto loc = locs.at(j);
        hpx::future<void> f = hpx::async<do_work_action>(loc, supervisor, next_work_item++); 
        data.push_back(std::move(f)); 
    }
    for(int j=0;j<data.size();j++) {
        data.at(j).wait(); 
    }

}


In [None]:
%%bash
hpxcxx -I . --exe=Listing_15_5.exe Listing_15_5.cpp
hpxrun.py -l 3 -t 1 ./Listing_15_5.exe

**Listing 15.6**

Caption: Functions for the supervisor and worker approach.

In [None]:
%%writefile distributed_fractal_worker.hpp
#include <pbm.hpp>
#include <config.hpp>
#include <kernel.hpp>
//-----------------------------------------------------------------
std::chrono::high_resolution_clock::time_point start, finish;
std::atomic<int> next_work_item(0); 
std::atomic<int> completed_work_items(0);
size_t output = get_size_t("OUTPUT", 1);
PBM p;

// Called by workers to get an item of work from the supervisor
int get_work() { return next_work_item++; } 
HPX_PLAIN_ACTION(get_work, get_work_action);

// Called by workers to send a result to the supervisor
void send_result(const std::vector<int>& result, int item_index) { 
  if (item_index < size_x) {
    p.row(item_index) = result;
    int n = ++completed_work_items;  
    // If the work is complete
    // then I can call finish up.
    if (n == size_x) {
      auto finish = std::chrono::high_resolution_clock::now();
      auto duration =
          std::chrono::duration_cast<std::chrono::microseconds>(finish - start);
      std::cout << "duration: " << (duration.count() * 1e-6) << std::endl;
      if ( output == 1)
          p.save("image_distributed.pbm");
    }
  }
}
HPX_PLAIN_ACTION(send_result, send_result_action); 

// The basic unit of work.
void do_work(const hpx::id_type& loc, int item_index) { 
  while (item_index < size_x) {
    // ask for the next work item asynchronously. That way
    // we overlap communication and computation.
    hpx::future<int> fitem = hpx::async<get_work_action>(loc);  
    std::vector<int> result; 

    complex c = complex(0, 4) * complex(item_index, 0) / complex(size_x, 0) -
                complex(0, 2);

    for (int i = 0; i < size_y; i++) {
      // Get the number of iterations
      int value = compute_pixel(c + 4.0 * i / size_y - 2.0);
      // Convert the value to RGB color space
      std::tuple<size_t, size_t, size_t> color = get_rgb(value);
      result.push_back(make_color(std::get<0>(color),
                                  std::get<1>(color),
                                  std::get<2>(color)));
    } 

    // apply() is like async(), except no future comes back.
    hpx::apply<send_result_action>(loc, result, item_index); 
    item_index = fitem.get(); 
  }
}
HPX_PLAIN_ACTION(do_work, do_work_action); 


**Listing 15.7**

Caption: Main function of distributed fractal set with non-collective I/O.

In [None]:
%%writefile Listing_15_7.cpp
#include <hpx/hpx.hpp>
#include <hpx/hpx_main.hpp>
#include "distributed_fractal_worker_improved.hpp"
#include <fstream>
#include <chrono>

using std::chrono::high_resolution_clock;
//-----------------------------------------------------------------
int main() {
    auto locs = hpx::find_remote_localities();

    // The supervisor
    auto supervisor = hpx::find_here();

    // Compute the number of worker localities
    const int num_workers = locs.size();

    // Make sure we have some workers
    assert(num_workers > 0);

    // The logic below assumes more work than workers
    assert(num_workers < size_x && num_workers < size_y);

    // Measure the time only on the supervisor
    high_resolution_clock::time_point start_time = high_resolution_clock::now();

    std::vector<hpx::future<void>> data;
    // Load up workers...
    for(int j=0;j<num_workers;j++) {
        auto loc = locs.at(j);
        hpx::future<void> f = hpx::async<do_work_action>(loc, supervisor, next_work_item++, j); 
        data.push_back(std::move(f));
    }

    for(int j=0;j<data.size();j++)
        data.at(j).wait();

    std::vector<hpx::future<void>> futures;

    hpx::when_all(futures).get();

    auto stop_time = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>( stop_time - start_time );
    std::cout << "Time: " << duration.count() * 1e-9 << std::endl;
}


In [None]:
%%bash
hpxcxx -I . --exe=Listing_15_7.exe Listing_15_7.cpp
hpxrun.py -l 3 -t 1 ./Listing_15_7.exe

**Listing 15.8**

Caption: Actions for the fractal set with non-collective I/O.

In [None]:
%%writefile distributed_fractal_worker_improved.hpp
#include <config.hpp>
#include <kernel.hpp>
std::chrono::high_resolution_clock::time_point start, finish;
std::atomic<int> next_work_item(0);
std::atomic<int> completed_work_items(0);
size_t output = get_size_t("OUTPUT", 1);
//-----------------------------------------------------------------
// Called by workers to get an item of work from the supervisor
int get_work() { return next_work_item++; }
HPX_PLAIN_ACTION(get_work, get_work_action);

// The basic unit of work.
void do_work(const hpx::id_type& loc, int item_index, int id) { 
  std::ofstream outfile("data_"+std::to_string(id)+".part"); 
  while (item_index < size_x) {
    // ask for the next work item asynchronously. That way
    // we overlap communication and computation.
    hpx::future<int> fitem = hpx::async<get_work_action>(loc);
    std::vector<int> result;

    complex c = complex(0, 4) * complex(item_index, 0) /
                complex(size_x, 0) - complex(0, 2);

    for (int i = 0; i < size_y; i++) {
      // Get the number of iterations
      int value = compute_pixel(c + 4.0 * i / size_y - 2.0);
      // Convert the smoothened value to RGB color space
      std::tuple<size_t, size_t, size_t> color = get_rgb(value);
      result.push_back(make_color(std::get<0>(color),
        std::get<1>(color),std::get<2>(color)));
    }

    outfile << item_index << " "; 
    for(auto d : result)
      outfile << d << " ";
    outfile << "\n";
    item_index = fitem.get();
  }
  outfile.close(); 
}
HPX_PLAIN_ACTION(do_work, do_work_action);
