Skip to content

Commit

Permalink
parallel executor: WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
helinwang committed Feb 27, 2018
1 parent 0dbaad5 commit 0770935
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 0 deletions.
68 changes: 68 additions & 0 deletions python/paddle/fluid/parallel_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import executor
from . import core
from threading import Thread
from Queue import Queue


def run_exe(q, idx, exe, program, feed, fetch_list, feed_var_name,
fetch_var_name, cur_scope, return_numpy):
q.put((idx, exe.run(program, feed, fetch_list, feed_var_name,
fetch_var_name, cur_scope, return_numpy)))


class ParallelExecutor(object):
def __init__(self, places):
self.scopes = {}
self.executors = []
for place in places:
self.executors.append(executor.Executor(place))

def run(self,
program=None,
feed=None,
fetch_list=None,
feed_var_name='feed',
fetch_var_name='fetch',
scope=None,
return_numpy=True):
# TODO(helin): split input
q = Queue(maxsize=len(self.executors))
for idx, exe in enumerate(self.executors):
if scope is None:
if idx == 0:
cur_scope = executor.global_scope()
else:
if idx in self.scopes:
cur_scope = self.scopes[idx]
else:
cur_scope = core.Scope()
self.scopes[idx] = cur_scope

t = Thread(
target=run_exe,
args=(q, idx, exe, program, feed, fetch_list, feed_var_name,
fetch_var_name, cur_scope, return_numpy))
t.daemon = True
t.start()

results = []
for _ in self.executors:
results.append(q.get())

results.sort(key=lambda x: x[0])
# TODO(helin): concat output
return results[0][1]
180 changes: 180 additions & 0 deletions python/paddle/fluid/tests/book/parallel_executor_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import paddle.v2 as paddle
import paddle.v2.fluid as fluid
import math
import sys
import numpy


def resnet_cifar10(input, depth=32):
def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'):
tmp = fluid.layers.conv2d(
input=input,
filter_size=filter_size,
num_filters=ch_out,
stride=stride,
padding=padding,
act=None,
bias_attr=False)
return fluid.layers.batch_norm(input=tmp, act=act)

def shortcut(input, ch_in, ch_out, stride):
if ch_in != ch_out:
return conv_bn_layer(input, ch_out, 1, stride, 0, None)
else:
return input

def basicblock(input, ch_in, ch_out, stride):
tmp = conv_bn_layer(input, ch_out, 3, stride, 1)
tmp = conv_bn_layer(tmp, ch_out, 3, 1, 1, act=None)
short = shortcut(input, ch_in, ch_out, stride)
return fluid.layers.elementwise_add(x=tmp, y=short, act='relu')

def layer_warp(block_func, input, ch_in, ch_out, count, stride):
tmp = block_func(input, ch_in, ch_out, stride)
for i in range(1, count):
tmp = block_func(tmp, ch_out, ch_out, 1)
return tmp

assert (depth - 2) % 6 == 0
n = (depth - 2) / 6
conv1 = conv_bn_layer(
input=input, ch_out=16, filter_size=3, stride=1, padding=1)
res1 = layer_warp(basicblock, conv1, 16, 16, n, 1)
res2 = layer_warp(basicblock, res1, 16, 32, n, 2)
res3 = layer_warp(basicblock, res2, 32, 64, n, 2)
pool = fluid.layers.pool2d(
input=res3, pool_size=8, pool_type='avg', pool_stride=1)
return pool


def vgg16_bn_drop(input):
def conv_block(input, num_filter, groups, dropouts):
return fluid.nets.img_conv_group(
input=input,
pool_size=2,
pool_stride=2,
conv_num_filter=[num_filter] * groups,
conv_filter_size=3,
conv_act='relu',
conv_with_batchnorm=True,
conv_batchnorm_drop_rate=dropouts,
pool_type='max')

conv1 = conv_block(input, 64, 2, [0.3, 0])
conv2 = conv_block(conv1, 128, 2, [0.4, 0])
conv3 = conv_block(conv2, 256, 3, [0.4, 0.4, 0])
conv4 = conv_block(conv3, 512, 3, [0.4, 0.4, 0])
conv5 = conv_block(conv4, 512, 3, [0.4, 0.4, 0])

drop = fluid.layers.dropout(x=conv5, dropout_prob=0.5)
fc1 = fluid.layers.fc(input=drop, size=512, act=None)
bn = fluid.layers.batch_norm(input=fc1, act='relu')
drop2 = fluid.layers.dropout(x=bn, dropout_prob=0.5)
fc2 = fluid.layers.fc(input=drop2, size=512, act=None)
return fc2


def train(net_type, use_cuda, save_dirname):
classdim = 10
data_shape = [3, 32, 32]

r = paddle.dataset.cifar.train10()
for data in r():
img, l = data[0], data[1]
break

#images = fluid.layers.fill_constant(shape=data_shape, dtype='float32', value=3)
images = fluid.layers.data(name='pixel', shape=data_shape, dtype='float32')
#label = fluid.layers.fill_constant(shape=[1], dtype='int64', value=2)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')

if net_type == "vgg":
print("train vgg net")
net = vgg16_bn_drop(images)
elif net_type == "resnet":
print("train resnet")
net = resnet_cifar10(images, 32)
else:
raise ValueError("%s network is not supported" % net_type)

predict = fluid.layers.fc(input=net, size=classdim, act='softmax')
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
acc = fluid.layers.accuracy(input=predict, label=label)

# Test program
test_program = fluid.default_main_program().clone()

optimizer = fluid.optimizer.Adam(learning_rate=0.001)
optimizer.minimize(avg_cost)

BATCH_SIZE = 128
PASS_NUM = 1

train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.cifar.train10(), buf_size=128 * 10),
batch_size=BATCH_SIZE)

test_reader = paddle.batch(
paddle.dataset.cifar.test10(), batch_size=BATCH_SIZE)

#place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.ParallelExecutor([fluid.CUDAPlace(0), fluid.CUDAPlace(1)])
exe.run(fluid.default_startup_program())
feeder = fluid.DataFeeder(place=fluid.CPUPlace(), feed_list=[images, label])

loss = 0.0
for pass_id in range(PASS_NUM):
for batch_id, data in enumerate(train_reader()):
exe.run(feed=feeder.feed(data))

if (batch_id % 10) == 0:
acc_list = []
avg_loss_list = []
for tid, test_data in enumerate(test_reader()):
loss_t, acc_t = exe.run(program=test_program,
feed=feeder.feed(test_data),
fetch_list=[avg_cost, acc])
if math.isnan(float(loss_t)):
sys.exit("got NaN loss, training failed.")
acc_list.append(float(acc_t))
avg_loss_list.append(float(loss_t))

acc_value = numpy.array(acc_list).mean()
avg_loss_value = numpy.array(avg_loss_list).mean()

print(
'PassID {0:1}, BatchID {1:04}, Test Loss {2:2.2}, Acc {3:2.2}'.
format(pass_id, batch_id + 1,
float(avg_loss_value), float(acc_value)))


def main(net_type, use_cuda):
if use_cuda and not fluid.core.is_compiled_with_cuda():
return

# Directory for saving the trained model
save_dirname = "image_classification_" + net_type + ".inference.model"

train(net_type, use_cuda, save_dirname)


if __name__ == '__main__':
main('vgg', use_cuda=True)

0 comments on commit 0770935

Please sign in to comment.