-
Notifications
You must be signed in to change notification settings - Fork 0
/
mr_worker.cc
300 lines (258 loc) · 7.91 KB
/
mr_worker.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
#include <iostream>
#include <fstream>
#include <sstream>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <dirent.h>
#include <mutex>
#include <string>
#include <vector>
#include <map>
#include "rpc.h"
#include "mr_protocol.h"
using namespace std;
#define WORKER_LOG(fmt, args...) \
do { \
} while (0);
//#define WORKER_LOG(fmt, args...) \
// do { \
// printf("[MR_WORKER_LOG][%s:%d:%s] " fmt "\n", __FILE__, __LINE__, __FUNCTION__ , ##args); \
// } while (0);
struct KeyVal {
string key;
string val;
};
bool IsLetter(char c) {
return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z');
}
//
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
vector<KeyVal> Map(const string &filename, const string &content) {
// Copy your code from mr_sequential.cc here.
// Hints: split contents into an array of words.
std::vector<char> vec_buf;
vec_buf.resize(content.size());
vec_buf.assign(content.begin(), content.end());
vector<KeyVal> kv_pairs;
std::vector<char> last_word;
for (auto c : vec_buf) {
if (IsLetter(c)) {
last_word.push_back(c);
} else {
if (last_word.empty()) continue;
KeyVal kv;
kv.key.assign(last_word.begin(), last_word.end());
kv.val = "1";
kv_pairs.push_back(kv);
last_word.clear();
}
}
return kv_pairs;
}
//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
string Reduce(const string &key, const vector<string> &values) {
// Copy your code from mr_sequential.cc here.
// Hints: return the number of occurrences of the word.
return to_string(values.size());
}
typedef vector<KeyVal> (*MAPF)(const string &key, const string &value);
typedef string (*REDUCEF)(const string &key, const vector<string> &values);
class Worker {
public:
Worker(const string &dst, const string &dir, MAPF mf, REDUCEF rf);
void doWork();
private:
void doMap(int index, const vector<string> &filenames);
void doReduce(int index);
void doSubmit(mr_tasktype taskType, int index);
void generateIntermediateFile(int map_task_num, int reduce_task_num, const string &buf);
mutex mtx;
int id;
rpcc *cl;
std::string basedir;
std::string resdir;
MAPF mapf;
REDUCEF reducef;
};
Worker::Worker(const string &dst, const string &dir, MAPF mf, REDUCEF rf) {
this->resdir = dir;
this->basedir = "/tmp/" + dir;
this->mapf = mf;
this->reducef = rf;
sockaddr_in dstsock;
make_sockaddr(dst.c_str(), &dstsock);
this->cl = new rpcc(dstsock);
if (this->cl->bind() < 0) {
printf("mr worker: call bind error\n");
}
}
void Worker::generateIntermediateFile(int map_task_num, int reduce_task_num, const string &buf) {
// A reasonable naming convention for intermediate files is mr-X-Y,
// where X is the Map task number, and Y is the reduce task number.
// The worker's map task code will need a way to store intermediate
// key/value pairs in files in a way that can be correctly read
// back during reduce tasks.
WORKER_LOG("generateIntermediateFile mr-%d-%d", map_task_num, reduce_task_num)
ofstream out(basedir + "mr-" + to_string(map_task_num) + "-" + to_string(reduce_task_num));
out << buf;
out.close();
}
void Worker::doMap(int index, const vector<string> &filenames) {
// Lab4: Your code goes here.
// Each mapper only processes one file at one time
string filename = filenames.front();
WORKER_LOG("doMap filename %s", filename.data())
string content;
// read file to content
getline(ifstream(filename), content, '\0');
vector<KeyVal> kv_pairs = mapf(filename, content);
// The Map part of your workers can use a hash function
// to distribute the intermediate key-values to different
// files intended for different Reduce tasks.
std::hash<std::string> doHash;
string buf0, buf1, buf2, buf3;
for (auto kv_pair : kv_pairs) {
// hash by key
string kv_str = kv_pair.key + ' ' + kv_pair.val + '\n';
switch (doHash(kv_pair.key) % REDUCER_COUNT) {
case 0: {
buf0 = buf0 + kv_str;
break;
}
case 1: {
buf1 = buf1 + kv_str;
break;
}
case 2: {
buf2 = buf2 + kv_str;
break;
}
case 3: {
buf3 = buf3 + kv_str;
break;
}
}
}
vector<string> bufs;
bufs.push_back(buf0);
bufs.push_back(buf1);
bufs.push_back(buf2);
bufs.push_back(buf3);
int reduce_task_num = 0;
while (reduce_task_num < REDUCER_COUNT) {
generateIntermediateFile(index, reduce_task_num, bufs[reduce_task_num]);
++reduce_task_num;
}
}
void Worker::doReduce(int index) {
// Lab4: Your code goes here.
map<string, uint64_t> word_num_map;
WORKER_LOG("doReduce index %d", index)
int map_task_num = 0;
string buf;
string key;
uint64_t num;
// check files
while (1) {
buf = "";
ifstream in(basedir + "mr-" + to_string(map_task_num) + "-" + to_string(index));
if (in.is_open()) {
while (getline(in, buf)) {
stringstream ss;
ss << buf;
ss >> key >> num;
if (word_num_map.count(key) == 0) {
word_num_map.emplace(key, num);
} else {
word_num_map[key] += num;
}
// auto res = word_num_map.try_emplace(key, num);
// if(!res.second){
// *(res.first)->second += num;
// }
}
in.close();
++map_task_num;
} else {
// no larger map_task_num file
in.close();
break;
}
}
string res_content = "";
for (auto word_cnt_pair : word_num_map) {
res_content = res_content + word_cnt_pair.first + ' ' + to_string(word_cnt_pair.second) + '\n';
}
ofstream out(resdir + "/mr-out" + to_string(index));
out << res_content;
WORKER_LOG("result file mr-out%d generated", index)
out.close();
}
void Worker::doSubmit(mr_tasktype taskType, int index) {
bool b;
mr_protocol::status ret = this->cl->call(mr_protocol::submittask, taskType, index, b);
if (ret != mr_protocol::OK) {
fprintf(stderr, "submit task failed\n");
exit(-1);
}
}
void Worker::doWork() {
for (;;) {
//
// Lab4: Your code goes here.
// Hints: send asktask RPC call to coordinator
int no_use;
mr_protocol::AskTaskResponse reply;
mr_protocol::status ret = this->cl->call(mr_protocol::asktask, no_use, reply);
WORKER_LOG("asktask RPC sent")
if (ret == mr_protocol::OK) {
switch (reply.tasktype) {
case mr_tasktype::MAP: {
WORKER_LOG("mr_tasktype::MAP")
// if mr_tasktype::MAP, then doMap and doSubmit
doMap(reply.index, {reply.filename});
doSubmit(mr_tasktype::MAP, reply.index);
break;
}
case mr_tasktype::REDUCE: {
WORKER_LOG("mr_tasktype::REDUCE")
// if mr_tasktype::REDUCE, then doReduce and doSubmit
doReduce(reply.index);
doSubmit(mr_tasktype::REDUCE, reply.index);
break;
}
case mr_tasktype::NONE: {
WORKER_LOG("mr_tasktype::NONE")
// if mr_tasktype::NONE, meaning currently no work is needed, then sleep
sleep(1);
break;
}
}
} else {
WORKER_LOG("asktask RPC failed")
}
sleep(1);
}
}
int main(int argc, char **argv) {
if (argc != 3) {
fprintf(stderr, "Usage: %s <coordinator_listen_port> <intermediate_file_dir> \n", argv[0]);
exit(1);
}
MAPF mf = Map;
REDUCEF rf = Reduce;
Worker w(argv[1], argv[2], mf, rf);
w.doWork();
return 0;
}