Skip to content

Commit

Permalink
Merge 7108009 into 0f2f2ee
Browse files Browse the repository at this point in the history
  • Loading branch information
jya-kmu committed Feb 19, 2021
2 parents 0f2f2ee + 7108009 commit 01534c9
Show file tree
Hide file tree
Showing 8 changed files with 402 additions and 84 deletions.
9 changes: 9 additions & 0 deletions benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
include_directories(
${PROJECT_SOURCE_DIR}/src/api
)

add_executable(mdm_bench mdm_bench.cc)
target_link_libraries(mdm_bench hermes MPI::MPI_CXX
$<$<BOOL:${HERMES_RPC_THALLIUM}>:thallium>)
target_compile_definitions(mdm_bench
PRIVATE $<$<BOOL:${HERMES_RPC_THALLIUM}>:HERMES_RPC_THALLIUM>)

add_executable(dpe_bench dpe_bench.cc)
target_link_libraries(dpe_bench hermes MPI::MPI_CXX
$<$<BOOL:${HERMES_RPC_THALLIUM}>:thallium>)
target_compile_definitions(dpe_bench
PRIVATE $<$<BOOL:${HERMES_RPC_THALLIUM}>:HERMES_RPC_THALLIUM>)
179 changes: 179 additions & 0 deletions benchmarks/dpe_bench.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#include <iostream>
#include <random>
#include <map>
#include <chrono>

#include "hermes.h"
#include "utils.h"
#include "data_placement_engine.h"

/* example usage: ./bin/dpe_bench -m -s 4096 */

using namespace hermes; // NOLINT(*)
using std::chrono::time_point;
const auto now = std::chrono::high_resolution_clock::now;

const u64 dpe_total_targets = 10;
const size_t dpe_total_num_blobs = 10;
const size_t dpe_total_blob_size = GIGABYTES(10);

void PrintUsage(char *program) {
fprintf(stderr, "Usage %s [-r]\n", program);
fprintf(stderr, " -m\n");
fprintf(stderr, " Use Random policy (default).\n");
fprintf(stderr, " -n\n");
fprintf(stderr, " Use RoundRobin policy.\n");
fprintf(stderr, " -o\n");
fprintf(stderr, " Use MinimizeIoTime policy.\n");
fprintf(stderr, " -r\n");
fprintf(stderr, " Chooese blob size range.\n");
fprintf(stderr, " s/S: Small blob range (0, 64KB].\n");
fprintf(stderr, " m/M: Medium blob range (64KB, 1MB].\n");
fprintf(stderr, " l/L: Large blob range (1MB, 4MB].\n");
fprintf(stderr, " x/X: Xlarge blob range (4MB, 64MB].\n");
fprintf(stderr, " h/H: Huge blob size is 1GB.\n");
fprintf(stderr, " -s\n");
fprintf(stderr, " Specify exact blob size.\n");
fprintf(stderr, " Blob size option: 4KB, 64KB, 1MB, 4MB, 64MB.\n");
}

int main(int argc, char **argv) {
api::PlacementPolicy policy {api::PlacementPolicy::kRandom};
bool fixed_total_num_blobs {true}, fixed_total_blob_size {false};
int option = -1;
char *rvalue = NULL;
size_t each_blob_size;
size_t total_placed_size;
double dpe_seconds;
hermes::api::Status result;

while ((option = getopt(argc, argv, "mnor:s:")) != -1) {
switch (option) {
case 'm':
policy = api::PlacementPolicy::kRandom;
break;
case 'n':
policy = api::PlacementPolicy::kRoundRobin;
break;
case 'o':
policy = api::PlacementPolicy::kMinimizeIoTime;
break;
case 'r':
fixed_total_blob_size = true;
if (fixed_total_num_blobs)
fixed_total_num_blobs = false;
rvalue = optarg;
break;
case 's':
fixed_total_num_blobs = true;
each_blob_size = atoi(optarg);
break;
default:
PrintUsage(argv[0]);
policy = api::PlacementPolicy::kRandom;
fixed_total_blob_size = true;
each_blob_size = 4096;
std::cout << "Using Random policy for data placement engine.\n"
<< "Using fixed number of blobs of size 4KB for test.\n\n";
}
}

if (fixed_total_num_blobs && fixed_total_blob_size) {
std::cout << "DPE benchmark uses fixed total blob size\n"
<< "or fixed total number of blbs.\n"
<< "Use default fixed total number of blbs now\n\n";
}

std::vector<size_t> blob_sizes;
if (fixed_total_blob_size) {
testing::BlobSizeRange blob_range;

if (rvalue[0] == 's' || rvalue[0] == 'S') {
blob_range = testing::BlobSizeRange::kSmall;
} else if (rvalue[0] == 'm' || rvalue[0] == 'M') {
blob_range = testing::BlobSizeRange::kMedium;
} else if (rvalue[0] == 'l' || rvalue[0] == 'L') {
blob_range = testing::BlobSizeRange::kLarge;
} else if (rvalue[0] == 'x' || rvalue[0] == 'X') {
blob_range = testing::BlobSizeRange::kXLarge;
} else if (rvalue[0] == 'h' || rvalue[0] == 'H') {
blob_range = testing::BlobSizeRange::kHuge;
} else {
blob_range = testing::BlobSizeRange::kSmall;
std::cout << "No blob range is setup.\n"
<< "Choose small blob size range (0, 64KB] to test.\n\n";
}

blob_sizes = testing::GenFixedTotalBlobSize(dpe_total_blob_size,
blob_range);
total_placed_size = dpe_total_blob_size;
} else {
blob_sizes.resize(dpe_total_blob_size);
fill(blob_sizes.begin(), blob_sizes.end(), each_blob_size);
total_placed_size = each_blob_size * dpe_total_num_blobs;
}

testing::TargetViewState tgt_state =
testing::InitDeviceState(dpe_total_targets);

assert(tgt_state.num_targets == dpe_total_targets);

std::vector<PlacementSchema> output_tmp, schemas;
std::vector<TargetID> targets =
testing::GetDefaultTargets(tgt_state.num_devices);

switch (policy) {
case api::PlacementPolicy::kRandom: {
std::multimap<u64, TargetID> ordered_cap;
for (auto i = 0; i < tgt_state.num_devices; ++i) {
ordered_cap.insert(std::pair<u64, TargetID>(
tgt_state.bytes_available[i], targets[i]));
}

std::cout << "DPE benchmark uses Random placement.\n\n";
time_point start_tm = now();
result = RandomPlacement(blob_sizes, ordered_cap, output_tmp);
time_point end_tm = now();
dpe_seconds = std::chrono::duration<double>(end_tm - start_tm).count();
break;
}
case api::PlacementPolicy::kRoundRobin: {
time_point start_tm = now();
result = RoundRobinPlacement(blob_sizes, tgt_state.bytes_available,
output_tmp, targets);
std::cout << "DPE benchmark uses RoundRobin placement.\n\n";
time_point end_tm = now();
dpe_seconds = std::chrono::duration<double>(end_tm - start_tm).count();
break;
}
case api::PlacementPolicy::kMinimizeIoTime: {
std::cout << "DPE benchmark uses MinimizeIoTime placement.\n\n";
time_point start_tm = now();
result = MinimizeIoTimePlacement(blob_sizes, tgt_state.bytes_available,
tgt_state.bandwidth, targets,
output_tmp);
time_point end_tm = now();
dpe_seconds = std::chrono::duration<double>(end_tm - start_tm).count();
break;
}
}

u64 placed_size {0};
for (auto schema : output_tmp) {
placed_size += testing::UpdateDeviceState(schema, tgt_state);
}
assert(placed_size == total_placed_size);

// Aggregate placement schemas from the same target
if (!result) {
for (auto it = output_tmp.begin(); it != output_tmp.end(); ++it) {
PlacementSchema schema = AggregateBlobSchema((*it));
assert(schema.size() > 0);
schemas.push_back(schema);
}
}

std::cout << "Total DPE time: " << dpe_seconds << '\n';

return 0;
}
139 changes: 139 additions & 0 deletions src/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@

#include "utils.h"

#include <iostream>
#include <vector>
#include <random>
#include <utility>

namespace hermes {

size_t RoundUpToMultiple(size_t val, size_t multiple) {
Expand Down Expand Up @@ -106,4 +111,138 @@ void InitDefaultConfig(Config *config) {
}
config->buffer_pool_shmem_name[shmem_name_size] = '\0';
}

namespace testing {

TargetViewState InitDeviceState(u64 total_target, bool homo_dist) {
TargetViewState result = {};
result.num_devices = total_target;
std::vector<double> tgt_fraction;
std::vector<double> tgt_homo_dist {0.25, 0.25, 0.25, 0.25};
std::vector<double> tgt_heto_dist {0.1, 0.2, 0.3, 0.4};
/** Use Megabytes */
std::vector<i64> device_size {128, 1024, 4096, 16384};
/** Use Megabytes/Sec */
std::vector<double> device_bandwidth {8192, 3072, 550, 120};

if (homo_dist)
tgt_fraction = tgt_homo_dist;
else
tgt_fraction = tgt_heto_dist;

std::vector<u64> tgt_num_per_type;
u64 used_tgt {0};
for (size_t i {0}; i < tgt_fraction.size()-1; ++i) {
u64 used_tgt_tmp {static_cast<u64>(tgt_fraction[i] * total_target)};
tgt_num_per_type.push_back(used_tgt_tmp);
used_tgt += used_tgt_tmp;
}
tgt_num_per_type.push_back(total_target - used_tgt);

using hermes::TargetID;
std::vector<TargetID> targets = GetDefaultTargets(total_target);

u64 target_position {0};
for (size_t i {0}; i < tgt_num_per_type.size(); ++i) {
for (size_t j {0}; j < tgt_num_per_type[i]; ++j) {
result.bandwidth.push_back(device_bandwidth[i]);

result.bytes_available.push_back(MEGABYTES(device_size[i]));
result.bytes_capacity.push_back(MEGABYTES(device_size[i]));
result.ordered_cap.insert(std::pair<hermes::u64, TargetID>(
MEGABYTES(device_size[i]),
targets[target_position]));
}
}

return result;
}

u64 UpdateDeviceState(PlacementSchema &schema,
TargetViewState &node_state) {
u64 result {0};
node_state.ordered_cap.clear();

for (auto [size, target] : schema) {
result += size;
node_state.bytes_available[target.bits.device_id] -= size;
node_state.ordered_cap.insert(
std::pair<u64, TargetID>(
node_state.bytes_available[target.bits.device_id], target));
}

return result;
}

void PrintNodeState(TargetViewState &node_state) {
for (int i {0}; i < node_state.num_devices; ++i) {
std::cout << " capacity of device[" << i << "]: "
<< node_state.bytes_available[i]
<< '\n' << std::flush;
std::cout << " available ratio of device["<< i << "]: "
<< static_cast<double>(node_state.bytes_available[i])/
node_state.bytes_capacity[i]
<< "\n\n" << std::flush;
}
}

std::vector<TargetID> GetDefaultTargets(size_t n) {
std::vector<TargetID> result(n);
for (size_t i = 0; i < n; ++i) {
TargetID id = {};
id.bits.node_id = 1;
id.bits.device_id = (DeviceID)i;
id.bits.index = i;
result[i] = id;
}

return result;
}

std::pair<size_t, size_t> GetBlobBound(BlobSizeRange blob_size_range) {
if (blob_size_range == BlobSizeRange::kSmall)
return {0, KILOBYTES(64)};
else if (blob_size_range == BlobSizeRange::kMedium)
return {KILOBYTES(64), MEGABYTES(1)};
else if (blob_size_range == BlobSizeRange::kLarge)
return {MEGABYTES(1), MEGABYTES(4)};
else if (blob_size_range == BlobSizeRange::kXLarge)
return {MEGABYTES(4), MEGABYTES(64)};
else if (blob_size_range == BlobSizeRange::kHuge)
return {GIGABYTES(1), GIGABYTES(1)};

std::cout << "No specified blob range is found.\n"
<< "Use small blob range (0, 64KB].";
return {0, KILOBYTES(64)};
}

std::vector<size_t> GenFixedTotalBlobSize(size_t total_size,
BlobSizeRange range) {
std::vector<size_t> result;
size_t used_size {};
size_t size {};
std::random_device dev;
std::mt19937 rng(dev());

std::pair bound = GetBlobBound(range);
size_t lo_bound = bound.first;
size_t hi_bound = bound.second;

while (used_size < total_size) {
std::vector<hermes::api::Blob> input_blobs;
if (total_size - used_size > hi_bound) {
std::uniform_int_distribution<std::mt19937::result_type>
distribution(lo_bound, hi_bound);
size = distribution(rng);
used_size += size;
} else {
size = total_size - used_size;
used_size = total_size;
}
result.push_back(size);
}
return result;
}

} // namespace testing
} // namespace hermes
Loading

0 comments on commit 01534c9

Please sign in to comment.