-
Notifications
You must be signed in to change notification settings - Fork 7
/
partition.py
361 lines (309 loc) · 15.9 KB
/
partition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
import os
import sys
from functools import reduce
from pathlib import Path
import onnx
import numpy as np
from google.protobuf.json_format import MessageToDict
from fpgaconvnet.hls.generate.partition_template import *
from fpgaconvnet.hls.generate.layers.convolution import gen_convolution_layer
from fpgaconvnet.hls.generate.layers.pooling import gen_pooling_layer
from fpgaconvnet.hls.generate.layers.relu import gen_relu_layer
from fpgaconvnet.hls.generate.layers.inner_product import gen_inner_product_layer
from fpgaconvnet.hls.generate.layers.squeeze import gen_squeeze_layer
from fpgaconvnet.hls.generate.util import *
import fpgaconvnet.hls.tools.onnx_data as onnx_data
from fpgaconvnet.hls.tools.array_init import array_init
import fpgaconvnet.proto.fpgaconvnet_pb2 as fpgaconvnet_pb2
import fpgaconvnet.tools.onnx_helper as onnx_helper
import fpgaconvnet.tools.layer_enum as layer_enum
class GeneratePartition:
def __init__(self, name, partition, model, sess, output_path):
self.name = name
self.partition = partition
self.output_path = output_path
self.model = model
self.sess = sess
# make output path directory
self.mkdir(self.output_path)
self.mkdir(os.path.join(self.output_path, "src"))
self.mkdir(os.path.join(self.output_path, "tb"))
self.mkdir(os.path.join(self.output_path, "include"))
self.mkdir(os.path.join(self.output_path, "data"))
# get fpgaconvnet-hls root directory
self.fpgaconvnet_root = str(Path(__file__).resolve().parent.parent)
# state of generated partition
self.is_generated = {
"layers" : False,
"weights" : False,
"streams" : False,
"source" : False,
"include" : False,
"testbench" : False,
}
self.project_generated = False
def mkdir(self, path):
"""
Helper function to create a directory, which does not throw an error
if the path already exists.
Parameters
----------
path: str
path to directory to create
"""
try:
os.mkdir(path)
except FileExistsError:
print(f"WARNING: path {path} already exists!")
def generate_layers(self):
"""
Generates layer header and source files through the layer generator functions,
and creates a string of layer calls for
"""
self.layers = ""
# generate hardware
for layer in self.partition.layers:
# get parameters of layer
parameters = MessageToDict(layer.parameters, preserving_proto_field_name=True)
# init function arguments for this layer
fn_args = []
# init hardware generation args
args = [
layer.name,
parameters,
os.path.join(self.output_path, "src", f"{layer.name}.cpp"),
os.path.join(self.output_path, "include", f"{layer.name}.hpp")
]
# create hardware for each layer
if layer.type == fpgaconvnet_pb2.layer.layer_type.CONVOLUTION:
fn_args.append(f"{layer.name}_weights")
if layer.parameters.has_bias == 1:
fn_args.append(f"{layer.name}_biases")
gen_convolution_layer(*args)
if layer.type == fpgaconvnet_pb2.layer.layer_type.POOLING:
gen_pooling_layer(*args)
if layer.type == fpgaconvnet_pb2.layer.layer_type.CONCAT:
gen_concat_layer(*args)
if layer.type == fpgaconvnet_pb2.layer.layer_type.RELU:
gen_relu_layer(*args)
if layer.type == fpgaconvnet_pb2.layer.layer_type.SPLIT:
gen_split_layer(*args)
if layer.type == fpgaconvnet_pb2.layer.layer_type.INNER_PRODUCT:
fn_args.append(f"{layer.name}_weights")
if layer.parameters.has_bias == 1:
fn_args.append(f"{layer.name}_biases")
gen_inner_product_layer(*args)
if layer.type == fpgaconvnet_pb2.layer.layer_type.SQUEEZE:
gen_squeeze_layer(*args)
# create layer call
for stream_in in layer.streams_in:
fn_args.append(stream_in.name)
for stream_out in layer.streams_out:
fn_args.append(stream_out.name)
fn_args.append("mode")
fn_args = ", ".join(fn_args)
self.layers += f" {layer.name}({fn_args});\n"
# set generated flag
self.is_generated["layers"] = True
def generate_parameters(self):
weights = []
biases = []
# generate weights
for layer in self.partition.layers:
# get parameters of layer
parameters = MessageToDict(layer.parameters, preserving_proto_field_name=True)
# create hardware for each layer
if layer.type in [fpgaconvnet_pb2.layer.layer_type.CONVOLUTION,
fpgaconvnet_pb2.layer.layer_type.INNER_PRODUCT]:
# add weight generators
weights.append(GenerateWeights(layer.name,
wr=(layer.name == self.partition.weights_reloading_layer)))
# create weights from onnx model
## get weights reloading factor
wr_factor = self.partition.weights_reloading_factor \
if layer.name == self.partition.weights_reloading_layer else 1
## get the raw weights
weights_raw = onnx_helper.get_model_initializer(self.model, layer.weights_path)
## transforms weights
if layer_enum.from_proto_layer_type(layer.type) == layer_enum.LAYER_TYPE.Convolution:
transformed_weights = onnx_data.get_weights_convolution(weights_raw, layer, wr_factor=wr_factor)
elif layer_enum.from_proto_layer_type(layer.type) == layer_enum.LAYER_TYPE.InnerProduct:
transformed_weights = onnx_data.get_weights_inner_product(weights_raw, layer, wr_factor=wr_factor)
## get the output path for the weights
output_path = os.path.join(self.output_path, "data", f"{layer.name}_weights")
## save weights to csv
for weights_reloading_index in range(wr_factor):
with open(f'{output_path}_{weights_reloading_index}.csv', 'w') as f:
f.write(array_init(transformed_weights[weights_reloading_index]))
## flatten weights into a stream
weights_stream = onnx_data._convert_fixed_port_stream(
transformed_weights.reshape(-1),
total_width=layer.parameters.weight_width,
int_width=layer.parameters.weight_width//2)
## save to .dat format
onnx_data._fixed_point_stream_to_dat(weights_stream, output_path=output_path,
streams=1, port_width=64, ports=1)
# add weight generators (if bias present)
if layer.parameters.has_bias == 1:
## add a biases generator
biases.append(GenerateBiases(layer.name))
# create bias parameters from onnx model
## get the raw biases from onnx
biases_raw = onnx_helper.get_model_initializer(self.model, layer.bias_path)
## transform bias parameters
transformed_biases = onnx_data.get_biases(biases_raw, layer, wr_factor=wr_factor)
## get the output path for biases
output_path = os.path.join(self.output_path, "data", f"{layer.name}_biases")
## save biases to csv
with open(f'{output_path}.csv', 'w') as f:
f.write(array_init(transformed_biases[0]))
## flatten biases into a stream
biases_stream = onnx_data._convert_fixed_port_stream(
transformed_biases.reshape(-1),
total_width=layer.parameters.biases_width,
int_width=layer.parameters.biases_width//2)
## save to .dat format
onnx_data._fixed_point_stream_to_dat(biases_stream, output_path=output_path,
streams=1, port_width=64, ports=1)
# get weights definitions and intialisation
self.weights_def = "\n\n".join([w.generate_def() for w in weights])
self.weights_init = "\n\n".join([w.generate_init() for w in weights])
self.biases_def = "\n\n".join([b.generate_def() for b in biases])
self.biases_init = "\n\n".join([b.generate_init() for b in biases])
# set generated flag
self.is_generated["weights"] = True
def generate_streams(self):
# get all streams
streams = {}
for layer in self.partition.layers:
for stream_in in layer.streams_in:
streams[stream_in.name] = GenerateStreams(stream_in.name, f"{layer.name}_input_t",
[f"{layer.name.upper()}_COARSE_IN"])
for stream_out in layer.streams_out:
streams[stream_out.name] = GenerateStreams(stream_out.name, f"{layer.name}_output_t",
[f"{layer.name.upper()}_COARSE_OUT"])
# create stream initialisations
self.streams_init = "\n".join([s.generate_stream() for s in streams.values()])
# set generated flag
self.is_generated["streams"] = True
def generate_include(self):
# include generation
include = ""
for layer in self.partition.layers:
include +=f"#include \"{layer.name}.hpp\"\n"
# HEADER
network_header = network_header_template.format(
name =self.name,
NAME =self.name.upper(),
batch_size =self.partition.batch_size,
rows_in =self.partition.layers[0].parameters.rows_in,
cols_in =self.partition.layers[0].parameters.cols_in,
channels_in =self.partition.layers[0].parameters.channels_in,
rows_out =self.partition.layers[-1].parameters.rows_out,
cols_out =self.partition.layers[-1].parameters.cols_out,
channels_out=self.partition.layers[-1].parameters.channels_out,
ports =self.partition.ports,
streams_in =self.partition.layers[0].parameters.coarse_in,
streams_out =self.partition.layers[-1].parameters.coarse_out,
input_layer =self.partition.layers[0].name,
output_layer=self.partition.layers[-1].name,
wr_layer =self.partition.weights_reloading_layer,
WR_LAYER =self.partition.weights_reloading_layer.upper(),
wr_factor =self.partition.weights_reloading_factor,
wr_flag =int(self.partition.weights_reloading_layer != "None"),
include =include
)
# save to output path
with open(os.path.join(self.output_path, f'include/{self.name}_top.hpp'),'w') as f:
f.write(network_header)
# set generated flag
self.is_generated["include"] = True
def generate_source(self):
# todo: check for layers, weights, streams
assert self.is_generated["layers"], "ERROR: layers not generated!"
assert self.is_generated["weights"], "ERROR: weights not generated!"
assert self.is_generated["streams"], "ERROR: weights not generated!"
# format the source code template
network_src = network_src_template.format(
name =self.name,
NAME =self.name.upper(),
wr_layer =self.partition.weights_reloading_layer,
weights =self.weights_def,
biases =self.biases_def,
weights_init=self.weights_init,
biases_init =self.biases_init,
streams_init=self.streams_init,
layers =self.layers
)
# save to output path
with open(os.path.join(self.output_path, f'src/{self.name}_top.cpp'),'w') as f:
f.write(network_src)
# set generated flag
self.is_generated["source"] = True
def generate_testbench(self):
# format testbench code template
network_tb_src = network_tb_src_template.format(
name = self.name,
NAME = self.name.upper(),
input_data_path = f"{self.partition.layers[0].name}_in_0.dat",
weights_reloading_path = f"{self.partition.weights_reloading_layer}_weights_0.dat",
output_data_path = f"{self.partition.layers[-1].name}_out_0.dat"
)
# save to output path
with open(os.path.join(self.output_path,f'tb/{self.name}_tb.cpp'),'w') as f:
f.write(network_tb_src)
# set generated flag
self.is_generated["testbench"] = True
def create_testbench_data(self, input_data):
# get the input name and shape
input_name = self.sess.get_inputs()[0].name
input_shape = self.sess.get_inputs()[0].shape
# TODO: check data is right shape
# save input layer
# TODO add bitwidth
input_node = self.partition.input_node
input_stream = np.array( self.sess.run([input_node], { input_name : input_data } )[0] )
input_stream = np.moveaxis(input_stream, 1, -1)
input_stream = onnx_data._convert_fixed_port_stream(input_stream.reshape(-1))
onnx_data._fixed_point_stream_to_dat(input_stream,
os.path.join(self.output_path, f"data/{self.partition.layers[0].name}_in"),
streams=int(self.partition.layers[0].parameters.coarse_in))
# save output layer
output_node = self.partition.output_node
output_stream = np.array( self.sess.run([output_node], { input_name : input_data } )[0] )
output_stream = np.moveaxis(output_stream, 1, -1)
output_stream = onnx_data._convert_fixed_port_stream(output_stream.reshape(-1))
onnx_data._fixed_point_stream_to_dat(output_stream,
os.path.join(self.output_path, f"data/{self.partition.layers[-1].name}_out"),
streams=int(self.partition.layers[-1].parameters.coarse_out))
"""
Vivado HLS functions
"""
def create_vivado_hls_project(self, fpga_part="xc7z045ffg900-2", clk=5):
# check everything is generated
assert reduce(lambda a, b: a & b, self.is_generated.values()), "ERROR: not all stages are generated!"
# create hls project
os.system(f"vivado_hls -f {self.fpgaconvnet_root}/scripts/hls/create_partition_project.tcl\
\"_ -prj {self.output_path} -fpga {fpga_part} -clk {clk}\"")
# set project generated flag
self.project_generated = True
def run_csynth(self):
assert self.project_generated, "ERROR: project not yet created!"
os.system(f"vivado_hls -f {self.fpgaconvnet_root}/scripts/hls/run_csynth.tcl\
\"_ -prj {self.output_path}\"")
def run_csim(self):
assert self.project_generated, "ERROR: project not yet created!"
os.system(f"vivado_hls -f {self.fpgaconvnet_root}/scripts/hls/run_csim.tcl\
\"_ -prj {self.output_path}\"")
def run_cosim(self):
assert self.project_generated, "ERROR: project not yet created!"
os.system(f"vivado_hls -f {self.fpgaconvnet_root}/scripts/hls/run_cosim.tcl\
\"_ -prj {self.output_path}\"")
def run_implementation(self):
assert self.project_generated, "ERROR: project not yet created!"
os.system(f"vivado_hls -f {self.fpgaconvnet_root}/scripts/hls/run_implementation.tcl\
\"_ -prj {self.output_path}\"")
def export_design(self):
assert self.project_generated, "ERROR: project not yet created!"
os.system(f"vivado_hls -f {self.fpgaconvnet_root}/scripts/hls/export_design.tcl\
\"_ -prj {self.output_path}\"")