/
mapReduce_Like.js
86 lines (80 loc) · 1.75 KB
/
mapReduce_Like.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
'use strict'
let jf = require('json.filed');
let path = require('path');
var count = 0;
let preparer =
jf.filed( './msg01.json' )
.write( { msg: 'Hello World Bye World' } )
.pass(
() => {
jf.filed( './msg02.json' )
.write( { msg: 'Hello json.filed Goodbye json.filed' } )
.pass(
() =>{
sourcer.exec();
}
).exec();
}
).plan();
let sourcer =
jf.filed([ './msg01.json', './msg02.json' ])
.pass( ( msgObj ) => {
mapper.receive( msgObj );
})
.plan();
sourcer.runtime.on('empty',() => { mapper.stop(); });
var c1 = 0;
let mapper =
jf.event(
() => {},
() => {
c1 ++;
return './src' + c1 + '.json'
})
.pass( ( msgObj ) => {
for( let word of msgObj.msg.split(' ')){
shuffler.receive( { key : word, value: 1 });
count ++;
}
})
.plan();
mapper.runtime.on('empty',() => { shuffler.stop(); });
mapper.exec();
var c2 = 0;
let shuffler =
jf.event(
()=>{},
() => {
c2 ++;
return './' + c2 + '.json' } )
.collect( ( kvArray ) => {
var obj = {};
for( let kv of kvArray ){
if( ! obj[ kv.key ] ) obj[ kv.key ] = [];
obj[kv.key].push(kv.value);
}
return obj; },
'./collect.json' )
.pass( (collectedObj, filePath ) => {
for( let word in collectedObj ){
reducer.receive( { key: word, value: collectedObj[word] });
}
reducer.stop();
})
.exec();
let reducer =
jf.event(
() => {},
( obj ) => { return './' + obj.key + '.json'} )
.io( obj => {
//Reduce phase
var sum = 0;
for( let c of obj.value ){
sum = sum + c;
}
return { key: obj.key, value: sum };
}
)
.collect( obj => obj, './answer.json' )
.exec();
preparer.exec();