/
Strata.cpp
142 lines (113 loc) · 5.03 KB
/
Strata.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#include "Strata.h"
#include <cassert>
#include <mutex>
using std::lock_guard;
DiskBuffer* get_disk_buffer(
const std::string& filename,
int feature_size,
int num_examples,
int num_examples_per_block) {
int num_disk_block = (num_examples + num_examples_per_block - 1) / num_examples_per_block;
return new DiskBuffer(filename, feature_size, num_examples_per_block, num_disk_block);
}
void stratum_block_read_thread(
Receiver<ExampleWithScore>& in_queue_r_,
Sender<ExampleWithScore>& out_queue_s_,
Receiver<int>& slot_r,
std::unique_ptr<DiskBuffer>& disk_buffer) {
std::vector<ExampleInSampleSet> out_block;
int index = 0;
while (ThreadManager::continue_run) {
if (index >= out_block.size()) {
std::pair<bool, int> block_index_try = slot_r.try_recv();
if (block_index_try.first) {
int& block_index = block_index_try.second;
out_block = disk_buffer->read_block(block_index);
// deserialize block_data
// write deserialized block_data to out_block
index = 0;
// if is some
out_queue_s_.send(out_block[index++]);
} else {
// if the number of examples is less than what requires to form a block,
// they would stay in `in_queue` forever and never write to disk.
// We read from `in_queue` directly in this case.
ExampleWithScore example = in_queue_r_.recv();
// if is some
out_queue_s_.send(example);
}
} else {
// send if some
out_queue_s_.send(out_block[index++]);
}
}
}
void stratum_block_write_thread(int num_examples_per_block,
Receiver<ExampleWithScore>& in_queue_r_,
Sender<int>& slot_s,
std::unique_ptr<DiskBuffer>& disk_buffer) {
while (ThreadManager::continue_run) {
if (in_queue_r_.len() >= num_examples_per_block) {
std::vector<ExampleWithScore> in_block;
for (int i = 0; i < num_examples_per_block; ++i) {
in_block.push_back(in_queue_r_.recv());
}
int slot_index = disk_buffer->write_block(in_block);
slot_s.send(slot_index);
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}
Stratum::Stratum(
int index,
int num_examples_per_block,
std::unique_ptr<DiskBuffer>& disk_buffer) :
in_channel(bounded_channel<ExampleWithScore>(num_examples_per_block * 2, "stratum-i" + std::to_string(index))),
slot_channel(unbounded_channel<int>("stratum-slot" + std::to_string(index))),
out_channel(bounded_channel<ExampleWithScore>(num_examples_per_block * 2, "stratum-o" + std::to_string(index))) {
Sender<ExampleWithScore>& in_queue_s = in_channel.first;
Receiver<ExampleWithScore>& in_queue_r = in_channel.second;
Sender<int>& slot_s = slot_channel.first;
Receiver<int>& slot_r = slot_channel.second;
Sender<ExampleWithScore>& out_queue_s = out_channel.first;
Receiver<ExampleWithScore>& out_queue_r = out_channel.second;
std::thread thbw(stratum_block_write_thread, std::ref(num_examples_per_block),
std::ref(in_queue_r), std::ref(slot_s), std::ref(disk_buffer));
thbw.detach();
std::thread thbr(stratum_block_read_thread, std::ref(in_queue_r),
std::ref(out_queue_s), std::ref(slot_r), std::ref(disk_buffer));
thbr.detach();
}
void Strata::send(int index, const Example& example, double score, int version) {
assert(index >= -128 && index <= 127);
lock_guard<std::mutex> lock(this->mutex);
auto& sender = in_queues[index + 128];
if (sender.get() == nullptr) {
this->create(index);
}
sender->send({ example,{ score, version } });
}
std::unique_ptr<InQueueSender>& Strata::get_in_queue(int index) {
return in_queues[index + 128];
}
std::unique_ptr<OutQueueReceiver>& Strata::get_out_queue(int index) {
return out_queues[index + 128];
}
std::pair<InQueueSender, OutQueueReceiver> Strata::create(int index) {
if (in_queues[index + 128]) {
// Other process have created the stratum before this process secures the writing lock
return std::make_pair(*in_queues[index + 128], *out_queues[index + 128]);
} else {
// Each stratum will create two threads for writing in and reading out examples
// TODO: create a systematic approach to manage stratum threads
std::shared_ptr<Stratum> stratum(new Stratum(index, num_examples_per_block, disk_buffer));
Sender<ExampleWithScore>& in_queue = stratum->in_channel.first;
Receiver<ExampleWithScore>& out_queue = stratum->out_channel.second;
//let(in_queue, out_queue) = (stratum.in_queue_s.clone(), stratum.out_queue_r.clone());
in_queues[index + 128] = std::make_unique<InQueueSender>(in_queue);
out_queues[index + 128] = std::make_unique<OutQueueReceiver>(out_queue);
stratas.push_back(stratum);
return std::make_pair(in_queue, out_queue);
}
}