-
Notifications
You must be signed in to change notification settings - Fork 55
/
pagerank.cpp
100 lines (88 loc) · 3.24 KB
/
pagerank.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// Copyright 2016 Husky Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <vector>
#include "boost/tokenizer.hpp"
#include "core/engine.hpp"
#include "io/input/inputformat_store.hpp"
class Vertex {
public:
using KeyT = int;
Vertex() : pr(0.15) {}
explicit Vertex(const KeyT& id) : vertexId(id), pr(0.15) {}
const KeyT& id() const { return vertexId; }
// Serialization and deserialization
friend husky::BinStream& operator<<(husky::BinStream& stream, const Vertex& u) {
stream << u.vertexId << u.adj << u.pr;
return stream;
}
friend husky::BinStream& operator>>(husky::BinStream& stream, Vertex& u) {
stream >> u.vertexId >> u.adj >> u.pr;
return stream;
}
int vertexId;
std::vector<int> adj;
float pr;
};
void pagerank() {
auto& infmt = husky::io::InputFormatStore::create_line_inputformat();
infmt.set_input(husky::Context::get_param("input"));
// Create and globalize vertex objects
auto& vertex_list = husky::ObjListStore::create_objlist<Vertex>();
auto parse_wc = [&vertex_list](boost::string_ref& chunk) {
if (chunk.size() == 0)
return;
// Possible graph file formats like: source : num_neighbors neighbor_1 neighbor_2 ... neighbor_n
boost::char_separator<char> sep(" \t,:;-");
boost::tokenizer<boost::char_separator<char>> tok(chunk, sep);
boost::tokenizer<boost::char_separator<char>>::iterator it = tok.begin();
int id = stoi(*it++);
it++; // Skip num_neighbors, otherwise comment out
Vertex v(id);
while (it != tok.end()) {
v.adj.push_back(stoi(*it++));
}
vertex_list.add_object(std::move(v));
};
husky::load(infmt, parse_wc);
husky::globalize(vertex_list);
// Iterative PageRank computation
auto& prch =
husky::ChannelStore::create_push_combined_channel<float, husky::SumCombiner<float>>(vertex_list, vertex_list);
int numIters = stoi(husky::Context::get_param("iters"));
for (int iter = 0; iter < numIters; ++iter) {
husky::list_execute(vertex_list, [&prch, iter](Vertex& u) {
if (iter > 0)
u.pr = 0.85 * prch.get(u) + 0.15;
if (u.adj.size() == 0)
return;
float sendPR = u.pr / u.adj.size();
for (auto& nb : u.adj) {
prch.push(sendPR, nb);
}
});
}
}
int main(int argc, char** argv) {
std::vector<std::string> args;
args.push_back("hdfs_namenode");
args.push_back("hdfs_namenode_port");
args.push_back("input");
args.push_back("iters");
if (husky::init_with_args(argc, argv, args)) {
husky::run_job(pagerank);
return 0;
}
return 1;
}