-
Notifications
You must be signed in to change notification settings - Fork 146
/
pagerank_optimized.rg
289 lines (256 loc) · 9 KB
/
pagerank_optimized.rg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
-- Copyright 2024 Stanford University
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-- runs-with:
-- []
import "regent"
import "bishop"
mapper
$GPUs = processors[isa=cuda]
$CPUs = processors[isa=x86]
$HAS_ZCMEM = memories[kind=zcmem].size > 0
$HAS_GPUS = $GPUs.size > 0
task#init_pr_score[index=$i],
task#pagerank[index=$i] {
target : $GPUs[$i % $GPUs.size];
}
task#init_graph[index=$i],
task#init_partition[index=$i] {
target : $CPUs[$i % $CPUs.size];
}
task#pagerank[target=$proc] region#nodes,
task#pagerank[target=$proc] region#edges,
task#pagerank[target=$proc] region#pr_ws{
target : $proc.memories[kind=fbmem];
}
task#init_pr_score[target=$proc] region#pr,
task#pagerank[target=$proc] region#pr_old,
task#pagerank[target=$proc] region#pr_new {
target : $proc.memories[kind=zcmem];
}
task#init_graph[target=$proc] region#nodes,
task#init_graph[target=$proc] region#edges,
task#init_partition[target=$proc] region#node_range,
task#init_partition[target=$proc] region#edge_range,
task#init_partition[target=$proc] region#nodes {
target : $proc.memories[kind=zcmem];
}
end
local c = regentlib.c
local std = terralib.includec("stdlib.h")
local cstring = terralib.includec("string.h")
local V_ID = int32
local E_ID = int64
rawset(_G, "rand", std.rand)
struct Config {
num_nodes : V_ID,
num_edges : E_ID,
num_iterations : int64,
num_workers : int64,
graph : int8[256],
}
struct NodeStruct {
index : E_ID,
degree : V_ID,
}
struct EdgeStruct {
src : V_ID,
dst : V_ID,
}
local clegion_interop
do
local root_dir = arg[0]:match(".*/") or "./"
local legion_interop_cu = root_dir .. "legion_interop.cu"
local nvcc = os.getenv('NVCC') or 'nvcc'
local cubpath_dir = os.getenv("CUB_INCLUDE_DIR")
assert(cubpath_dir ~= nil, "Please set CUB_INCLUDE_DIR to the path of CUB headers")
local include_flags = {"-I" .. cubpath_dir}
local nvcc_flags = terralib.newlist({"-Xcompiler", "-fPIC"})
nvcc_flags:insertall(include_flags)
local launcher = require("std/launcher")
clegion_interop = launcher.build_library("legion_interop", {legion_interop_cu}, nvcc, nvcc_flags, include_flags)
end
terra parse_input_args(conf : Config)
var args = c.legion_runtime_get_input_args()
var input_file : rawstring = nil
for i = 0, args.argc do
if cstring.strcmp(args.argv[i], "-ni") == 0 then
i = i + 1
conf.num_iterations = std.atoll(args.argv[i])
elseif cstring.strcmp(args.argv[i], "-nw") == 0 then
i = i + 1
conf.num_workers = std.atoll(args.argv[i])
elseif cstring.strcmp(args.argv[i], "-graph") == 0 then
i = i + 1
input_file = rawstring(args.argv[i])
end
end
regentlib.assert_error(input_file ~= nil, "error: the flag '-graph' is required but was not provided")
var file = c.fopen(input_file, "r")
regentlib.assert(file ~= nil, "failed to open input file")
c.fscanf(file, "%i", &conf.num_nodes)
c.fscanf(file, "%i", &conf.num_edges)
c.fscanf(file, "%255s", conf.graph)
c.fclose(file)
return conf
end
task init_graph(nodes : region(ispace(int1d), NodeStruct),
edges : region(ispace(int1d), EdgeStruct),
num_nodes : V_ID,
num_edges : E_ID,
graph : regentlib.string)
where
reads(nodes, edges), writes(nodes, edges)
do
var indices : &E_ID = [&E_ID](c.malloc(num_nodes * 8))
var degrees : &V_ID = [&V_ID](c.malloc(num_nodes * 4))
var srcs : &V_ID = [&V_ID](c.malloc(num_edges * 4))
var file = c.fopen([rawstring](graph), "rb")
regentlib.assert(not isnull(file), "failed to open graph file")
c.printf("graph = %s\n", [rawstring](graph))
c.fread(indices, 8, num_nodes, file)
for n = 0, num_nodes do
nodes[n].index = indices[n]
end
c.fread(degrees, 4, num_nodes, file)
for n = 0, num_nodes do
nodes[n].degree = degrees[n]
end
c.fread(srcs, 4, num_edges, file)
var dst : V_ID = 0
for e = 0, num_edges do
while nodes[dst].index <= e do
dst = dst + 1;
end
edges[e].src = srcs[e]
edges[e].dst = dst
end
c.fclose(file)
c.free(indices)
c.free(degrees)
c.free(srcs)
return 1
end
task init_partition(node_range : region(ispace(int1d), regentlib.rect1d),
edge_range : region(ispace(int1d), regentlib.rect1d),
nodes : region(ispace(int1d), NodeStruct),
avg_num_edges : E_ID,
num_parts : int)
where
writes(node_range, edge_range), reads(nodes)
do
var range_is = node_range.ispace
var node_is = nodes.ispace
var total_num_edges : E_ID = 0
var start_idx : E_ID = 0
var start_node : V_ID = 0
var p : int = 0
for n in node_is do
if ((nodes[n].index - start_idx > avg_num_edges) or (n == node_is.bounds.hi)) then
node_range[p] = {start_node, n}
edge_range[p] = {start_idx, nodes[n].index - 1}
start_idx = nodes[n].index
start_node = n + 1
p = p + 1
end
end
regentlib.assert(p == range_is.volume, "Number of partitions don't match")
return nodes[node_is.bounds.hi].index
end
extern task pagerank(nodes : region(ispace(int1d), NodeStruct),
edges : region(ispace(int1d), EdgeStruct),
pr_old : region(ispace(int1d), float),
pr_new : region(ispace(int1d), float),
pr_ws : region(ispace(int1d), float))
where
reads(nodes, edges, pr_old), writes(pr_new, pr_ws)
end
pagerank:set_task_id(clegion_interop.TID_F)
pagerank:set_calling_convention(regentlib.convention.manual())
extern task init_pr_score(pr : region(ispace(int1d), float))
where
writes(pr)
end
init_pr_score:set_task_id(clegion_interop.TID_F2)
init_pr_score:set_calling_convention(regentlib.convention.manual())
terra check(x : int)
end
task main()
var conf : Config
conf.num_nodes = 10000
conf.num_edges = 1000000
conf.num_iterations = 10
conf.num_workers = 1
conf = parse_input_args(conf)
c.printf("pagerank settings: num_nodes=%d num_edges=%lld iterations=%d workers=%d\n",
conf.num_nodes, conf.num_edges, conf.num_iterations, conf.num_workers)
var is_nodes = ispace(int1d, conf.num_nodes)
var is_edges = ispace(int1d, conf.num_edges)
var is_workspace = ispace(int1d, conf.num_nodes * conf.num_workers)
var all_nodes = region(is_nodes, NodeStruct)
var all_edges = region(is_edges, EdgeStruct)
var pr_score0 = region(is_nodes, float)
var pr_score1 = region(is_nodes, float)
var pr_workspace = region(is_workspace, float)
c.printf("Load input graph...\n")
init_graph(all_nodes, all_edges, conf.num_nodes, conf.num_edges, [rawstring](conf.graph))
init_pr_score(pr_score0)
var pieces = ispace(int1d, conf.num_workers)
var part_workspace = partition(equal, pr_workspace, pieces)
-- compute node and edge partition
var node_range = region(pieces, regentlib.rect1d)
var edge_range = region(pieces, regentlib.rect1d)
var part_node_range = partition(equal, node_range, pieces)
var part_edge_range = partition(equal, edge_range, pieces)
var total_num_edges = init_partition(node_range, edge_range, all_nodes,
conf.num_edges/ conf.num_workers+1,
conf.num_workers)
regentlib.assert(total_num_edges == conf.num_edges, "Edge number not match")
__fence(__execution, __block)
var part_nodes = image(all_nodes, part_node_range, node_range)
var part_score0 = image(disjoint, pr_score0, part_node_range, node_range)
var part_score1 = image(disjoint, pr_score1, part_node_range, node_range)
var part_edges = image(all_edges, part_edge_range, edge_range)
__fence(__execution, __block)
c.printf("Start PageRank computation...\n")
var ts_start : int64
for iter = 0, conf.num_iterations+2 do
-- use the first two iterations to warm up the execution
if iter == 2 then
__fence(__execution, __block)
ts_start = c.legion_get_current_time_in_micros()
end
if iter % 2 == 0 then
__demand(__index_launch)
for p in pieces do
pagerank(part_nodes[p], part_edges[p],
pr_score0, part_score1[p], part_workspace[p])
end
else
__demand(__index_launch)
for p in pieces do
pagerank(part_nodes[p], part_edges[p],
pr_score1, part_score0[p], part_workspace[p])
end
end
end
-- Force all previous tasks to complete before stop the timer
__fence(__execution, __block)
var ts_end = c.legion_get_current_time_in_micros()
c.printf("Iterations = %d, elapsed time = %lldus\n", conf.num_iterations, ts_end - ts_start)
end
terra callback()
clegion_interop.register_tasks()
[bishoplib.make_entry()]()
end
regentlib.start(main, callback)