-
Notifications
You must be signed in to change notification settings - Fork 1k
XDL Trace
songyue1104 edited this page Aug 12, 2019
·
1 revision
1.PB格式设计
syntax = "proto3";
package xdl.proto;
message Header {
repeated string key = 1; // 每个key对应一个trace的variable
}
enum Type {
kInt8 = 0;
kInt16 = 1;
kInt32 = 2;
kInt64 = 3;
kFloat = 4;
kDouble = 5;
kBool = 6;
kByte = 7;
}
message Column {
Type dtype = 1;
repeated int32 shape = 2;
bytes data = 3;
}
message Record {
uint64 gstep = 1; // global step
uint64 lstep = 2; // local step
repeated Column column = 3; // 与header对应的每个tensor
}
2.文件格式设计
文件类型分为data和meta两种,data存储trace的tensor数据内容,meta存储文件的一些信息。
data数据格式:
size(4字节小端)|header(size长度)|size|record|size|record ...
meta数据格式:
meta(二进制pb)
- 每个worker一个文件,使用rank号标识
- 文件可以设置max_size,超出则分割成多个文件,每个文件都是同样格式
- 测试阶段gstep填当前checkpoint的global step
- xdl.trace_tensor(name, value, scope=None, backend='xdl', summary=None)
trace一个tensor,backend可以使用xdl,tf,mxnet,value分别对应XDL的tensor、tensorflow的Tensor后者Variable、mxnet的Symbol - xdl.trace_tf_tensor(name, value, scope=None, summary=None)
等价于xdl.trace_tensor(name, value, scope, backend='tf') - xdl.trace_mxnet_tensor(name, value, scope=None, summary=None)
等价于xdl.trace_tensor(name, value, scope, backend='mxnet') - xdl.trace_variable(name, var, scope=None, summary=None)
trace一个xdl的variable,等价于trace_tensor(name, var.value, scope, backend='xdl') - xdl.trace_gradient(name, key=None, scope=None, summary=None)
trace一个梯度变量,name是对应的variable,例如"fc1_weight",如果未指定key,则使用gradient/name作为输出的标识符 - xdl.trace_collection(collection, scope=None)
trace一个集合中的所有variable - xdl.trace_callback(name, callback, scope=None)
trace一个自定义函数,该函数必须返回np.array - xdl.trace_once(name, value, scope=None)
只trace一次某个value,只在第一次run时记录结果,后面所有值使用默认的np.array([],dtype=np.float32)
- 用户可以对tensor、variable进行进一步的处理,然后写出到文件中。通过用户自定的处理函数实现灵活的summary方式。summary函数原型如下:
### 参数:
### input_data: numpy.array类型,通过session.run()返回的结果
### return: numpy.array
def my_summary(input_data):
# codes deal with input_data
return output_data
- 用户使用summary的方式:
def avg_data(input_data):
return np.average(input_data, axis=0)
out = tf.matmul(w, x) + b
xdl.trace_tensor(out.name, out, backend='tf', summary=avg_data)
- 考虑到性能、系统架构等问题,目前不对数据进行时间纬度、空间纬度上的聚合。支持数据写出到HDFS、swift等,方便用户利用其它数据处理组件进一步处理。
使用上述接口trace一些结果
@xdl.mxnet_wrapper(is_training=True)
def model(images, labels):
y = fc(images)
label = mx.sym.one_hot(labels, 10)
prop = mx.sym.SoftmaxOutput(data=y, label=label, grad_scale=(1.0/100))
loss = -mx.sym.sum(mx.sym.log(prop) * label) / 100
xdl.trace_mxnet_tensor('prop', prop)
xdl.trace_mxnet_tensor('mx_loss', loss)
return prop, mx.sym.BlockGrad(loss)
在session增加TraceHook
config = {
'output_dir': 'hdfs://hdfs_path',
'max_file_m_size': 300,
'output_file_name': 'trace'
}
sess = xdl.TrainSession([xdl.TraceHook(config)])
sess.run(...)
- output_dir目前支持hdfs、local两种存储方式
- hdfs:
hdfs://
- 本地文件:
file://
或者不加前缀
- hdfs:
import numpy as np
import sys
from xdl.python.proto import trace_pb2
import struct
filepath = 'trace/train.trace.0.1'
def trace2nptype(tp):
if tp == trace_pb2.Int8:
return np.int8
elif tp == trace_pb2.Int16:
return np.int16
elif tp == trace_pb2.Int32:
return np.int32
elif tp == trace_pb2.Int64:
return np.int64
elif tp == trace_pb2.Float:
return np.float32
elif tp == trace_pb2.Double:
return np.float64
elif tp == trace_pb2.Bool:
return np.bool
elif tp == trace_pb2.Byte:
return np.byte
else:
raise RuntimeError('unknown trace data type: {}'.format(tp))
def print_column(key, col):
print key
print np.frombuffer(col.data, dtype=trace2nptype(col.dtype)).reshape(col.shape)
with open(filepath, 'r') as fin:
sz = struct.unpack('<I', fin.read(4))[0]
buf = fin.read(sz)
hdrs = trace_pb2.Header()
hdrs.ParseFromString(buf)
print '|'.join(hdrs.key)
while True:
buf = fin.read(4)
if not buf:
break
sz = struct.unpack('<I', buf)[0]
buf = fin.read(sz)
record = trace_pb2.Record()
record.ParseFromString(buf)
print 'gstep:', record.gstep
print 'lstep:', record.lstep
for key, col in dict(zip(hdrs.key, record.column)).iteritems():
print_column(key, col)
import numpy as np
import sys
from xdl.python.proto import trace_pb2
metapath = 'train.trace.0.5.meta'
with open(metapath, 'r') as fin:
buf = fin.read()
meta = trace_pb2.Meta()
meta.ParseFromString(buf)
print 'lstep_begin:', meta.lstep_begin
print 'lstep_end:', meta.lstep_end
print 'gstep_begin:', meta.gstep_begin
print 'gstep_end:', meta.gstep_end
print 'ts_begin:', meta.timestamp_begin
print 'ts_end:', meta.timestamp_end
trace过程主要包括tensor的序列化和文件IO两个操作,其中IO在单独线程中完成,序列化操作与session.run在同一python线程。基于7层全连接模型测试结果来看,trace对qps的影响较小。
类型 | 数据量 | QPS |
---|---|---|
无trace | 0 | 6.52 batch/s |
有trace | 只trace fc1_weight + fc1_bias | 6.38 batch/s |
有trace | 所有TRAINABLE_VARIABLES | 6.37 batch/s |