-
Notifications
You must be signed in to change notification settings - Fork 39
Add vgg16 aws dist test #27
Changes from 10 commits
b9c56e1
b3f7661
e4a20c5
b510dd0
f5de753
9145e56
b6478aa
64d90a6
1b5a58d
b2c9714
64d0cf1
a478b55
c0b7261
3e10b30
659ecd8
161321d
4ed9b44
1a9ed9e
5c1ff88
b1305af
1bd0d96
23adafe
981f225
b86895e
fe0a80e
f35aefb
f207856
ee4abc2
19d8124
5938e7e
a94a042
6e4072f
c6941e3
fd5ba68
c1dc3c4
24792a1
e72f46c
50f18e0
6c52807
6e8eef4
81253b9
913eb61
4e6525c
6803d39
772013c
657b1f5
35277e1
1646670
78c58a5
f601567
fc313ee
4e700ae
a1acf8a
01507d2
4d0db6c
c31d604
6b8c122
0e2ba06
2d97d55
2f701dc
28563ce
d94b7c1
b2a7afe
ada36a8
b93e4c8
72785e5
34ca85e
6895384
814d93f
db4b971
b792339
6c4fc0a
15be627
7dd4b14
38b066c
a918d78
813409a
0b07930
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
import argparse | ||
import logging | ||
import sys, os | ||
import numpy as np | ||
import threading | ||
import copy | ||
from aws_runner.client.train_command import TrainCommand | ||
|
||
# for ce env ONLY | ||
|
||
sys.path.append(os.environ['ceroot']) | ||
from kpi import LessWorseKpi | ||
|
||
from aws_runner.client.abclient import Abclient | ||
|
||
def str2bool(v): | ||
if v.lower() in ('yes', 'true', 't', 'y', '1'): | ||
return True | ||
elif v.lower() in ('no', 'false', 'f', 'n', '0'): | ||
return False | ||
else: | ||
raise argparse.ArgumentTypeError('Boolean value expected.') | ||
|
||
def print_arguments(): | ||
print('----------- Configuration Arguments -----------') | ||
for arg, value in sorted(vars(args).iteritems()): | ||
print('%s: %s' % (arg, value)) | ||
|
||
parser = argparse.ArgumentParser(description=__doc__) | ||
|
||
parser.add_argument( | ||
'--key_name', type=str, default="", help="required, key pair name") | ||
parser.add_argument( | ||
'--security_group_id', | ||
type=str, | ||
default="", | ||
help="required, the security group id associated with your VPC") | ||
|
||
parser.add_argument( | ||
'--vpc_id', | ||
type=str, | ||
default="", | ||
help="The VPC in which you wish to run test") | ||
parser.add_argument( | ||
'--subnet_id', | ||
type=str, | ||
default="", | ||
help="The Subnet_id in which you wish to run test") | ||
|
||
parser.add_argument( | ||
'--pserver_instance_type', | ||
type=str, | ||
default="c5.2xlarge", | ||
help="your pserver instance type, c5.2xlarge by default") | ||
parser.add_argument( | ||
'--trainer_instance_type', | ||
type=str, | ||
default="p2.8xlarge", | ||
help="your trainer instance type, p2.8xlarge by default") | ||
|
||
parser.add_argument( | ||
'--task_name', | ||
type=str, | ||
default="", | ||
help="the name you want to identify your job") | ||
parser.add_argument( | ||
'--pserver_image_id', | ||
type=str, | ||
default="ami-da2c1cbf", | ||
help="ami id for system image, default one has nvidia-docker ready, \ | ||
use ami-1ae93962 for us-east-2") | ||
|
||
parser.add_argument( | ||
'--pserver_command', | ||
type=str, | ||
default="", | ||
help="pserver start command, format example: python,vgg.py,batch_size:128,is_local:yes" | ||
) | ||
|
||
parser.add_argument( | ||
'--trainer_image_id', | ||
type=str, | ||
default="ami-da2c1cbf", | ||
help="ami id for system image, default one has nvidia-docker ready, \ | ||
use ami-1ae93962 for us-west-2") | ||
|
||
parser.add_argument( | ||
'--trainer_command', | ||
type=str, | ||
default="", | ||
help="trainer start command, format example: python,vgg.py,batch_size:128,is_local:yes" | ||
) | ||
|
||
parser.add_argument( | ||
'--availability_zone', | ||
type=str, | ||
default="us-east-2a", | ||
help="aws zone id to place ec2 instances") | ||
|
||
parser.add_argument( | ||
'--trainer_count', type=int, default=1, help="Trainer count") | ||
|
||
parser.add_argument( | ||
'--pserver_count', type=int, default=1, help="Pserver count") | ||
|
||
parser.add_argument( | ||
'--action', type=str, default="create", help="create|cleanup|status") | ||
|
||
parser.add_argument('--pem_path', type=str, help="private key file") | ||
|
||
parser.add_argument( | ||
'--pserver_port', type=str, default="5436", help="pserver port") | ||
|
||
parser.add_argument( | ||
'--docker_image', type=str, default="busybox", help="training docker image") | ||
|
||
parser.add_argument( | ||
'--master_server_port', type=int, default=5436, help="master server port") | ||
|
||
parser.add_argument( | ||
'--master_server_public_ip', type=str, help="master server public ip") | ||
|
||
parser.add_argument( | ||
'--master_docker_image', | ||
type=str, | ||
default="putcn/paddle_aws_master:latest", | ||
help="master docker image id") | ||
|
||
parser.add_argument( | ||
'--no_clean_up', | ||
type=str2bool, | ||
default=False, | ||
help="whether to clean up after training") | ||
|
||
parser.add_argument( | ||
'--online_mode', | ||
type=str2bool, | ||
default=False, | ||
help="is client activly stays online") | ||
|
||
args = parser.parse_args() | ||
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') | ||
|
||
train_speed_kpi = LessWorseKpi('train_speed', 0.01) | ||
kpis_to_track = {} | ||
|
||
def save_to_kpi(name, val): | ||
val = float(val) | ||
if name in kpis_to_track: | ||
kpi_to_track = kpis_to_track[name] | ||
else: | ||
kpi_to_track = LessWorseKpi(name, 0.01) | ||
kpi_to_track.add_record(np.array(val, dtype='float32')) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 如上,没有persist到文件, 将不会被评价, 参考kpi.py相应类的evaluate函数 |
||
|
||
class DataCollector(object): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please run There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. class DataCollector , train_without_pserver ,train_with_pserver 这些是每个多机训练模型都会用到的么? 可以弄个common lib,貌似也是models的代码重构要做的。。。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 我的理解是需要看具体的需求, train_with/out_pserver是用来测试加速比,但并不是每个测试都需要加速比. 而且这两个函数也相对简单, 没什么需要common化的. class DataCollector可以独立出来, 下周做resnet50来看看有什么需要复用的吧. |
||
def __init__(self): | ||
self.store = [] | ||
self.metric_data_identifier = "**metrics_data: " | ||
def log_processor(self, msg): | ||
if (msg.startswith(self.metric_data_identifier)): | ||
str_msg = msg.replace(self.metric_data_identifier, "") | ||
metrics_raw = str_msg.split(",") | ||
for metric in metrics_raw: | ||
metric_data = metric.split("=") | ||
if metric_data[0].strip() == "train_speed": | ||
self.save(metric_data[1]) | ||
def save(self, val): | ||
self.store.append(float(val)) | ||
def avg(self): | ||
return np.average(self.store) | ||
|
||
solo_data_collector = DataCollector() | ||
def train_without_pserver(args, lock): | ||
def log_handler(source, id): | ||
for line in iter(source.readline, ""): | ||
logging.info("without pserver:") | ||
logging.info(line) | ||
solo_data_collector.log_processor(line) | ||
|
||
args.pserver_count = 0 | ||
args.trainer_count = 1 | ||
trainer_command = TrainCommand(args.trainer_command) | ||
trainer_command.update({"local":"yes"}) | ||
args.trainer_command = trainer_command.unparse() | ||
logging.info(args) | ||
abclient = Abclient(args, log_handler, lock) | ||
abclient.create() | ||
|
||
cluster_data_collector = DataCollector() | ||
def train_with_pserver(args, lock): | ||
def log_handler(source, id): | ||
for line in iter(source.readline, ""): | ||
logging.info("with pserver:") | ||
logging.info(line) | ||
cluster_data_collector.log_processor(line) | ||
|
||
logging.info(args) | ||
abclient = Abclient(args, log_handler, lock) | ||
abclient.create() | ||
|
||
if __name__ == "__main__": | ||
print_arguments() | ||
if args.action == "create": | ||
lock = threading.Lock() | ||
thread_no_pserver = threading.Thread( | ||
target=train_without_pserver, | ||
args=(copy.copy(args), lock,) | ||
) | ||
thread_with_pserver = threading.Thread( | ||
target=train_with_pserver, | ||
args=(copy.copy(args), lock, ) | ||
) | ||
thread_no_pserver.start() | ||
thread_with_pserver.start() | ||
thread_no_pserver.join() | ||
thread_with_pserver.join() | ||
|
||
speedup_rate = cluster_data_collector.avg()/solo_data_collector.avg() | ||
logging.info("speed up rate is "+ str(speedup_rate)) | ||
|
||
save_to_kpi("speedup_rate", speedup_rate.item()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
import os | ||
import sys | ||
sys.path.append(os.environ['ceroot']) | ||
from kpi import LessWorseKpi | ||
|
||
speedup_rate_kpi = LessWorseKpi('speedup_rate', 0.01) | ||
|
||
tracking_kpis = [ | ||
speedup_rate_kpi, | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
[0.5] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
#!/bin/bash | ||
|
||
PADDLE_PATH=../../../ | ||
paddle_build_path=$PADDLE_PATH/build | ||
paddle_docker_hub_tag="paddlepaddlece/paddle:latest" | ||
vgg16_test_dockerhub_tag="paddlepaddlece/vgg16_dist:latest" | ||
training_command="local:no,batch_size:128,num_passes:1" | ||
|
||
# clean up docker | ||
docker system prune -f | ||
|
||
# loginto docker hub | ||
docker login -u $DOCKER_HUB_USERNAME -p $DOCKER_HUB_PASSWORD | ||
|
||
# create paddle docker image | ||
echo "going to build and push paddle production image" | ||
docker build -t $paddle_docker_hub_tag $paddle_build_path | ||
docker push $paddle_docker_hub_tag | ||
|
||
# build test docker image | ||
echo "going to prepare and build vgg16_dist_test" | ||
if [ ! -d vgg16_dist_test ]; then | ||
echo "No vgg16_dist_test repo found, going to clone one" | ||
git clone https://github.com/putcn/vgg16_dist_test.git | ||
fi | ||
cd vgg16_dist_test | ||
if [ -d ~/.cache/paddle/dataset/cifar ]; then | ||
echo "host cifar cache found, copying it to docker root" | ||
mkdir -p .cache/paddle/dataset/ | ||
cp -r -f ~/.cache/paddle/dataset/cifar .cache/paddle/dataset/ | ||
fi | ||
git pull | ||
cd .. | ||
echo "going to build vgg16_dist_test docker image and push it" | ||
docker build -t $vgg16_test_dockerhub_tag ./vgg16_dist_test | ||
docker push $vgg16_test_dockerhub_tag | ||
docker logout | ||
|
||
# fetch runner and install dependencies | ||
echo "going to work with aws_runner" | ||
if [ ! -d aws_runner ]; then | ||
echo "no aws_runner found, cloning one" | ||
git clone https://github.com/putcn/aws_runner.git | ||
fi | ||
cd aws_runner | ||
git pull | ||
cd .. | ||
echo "going to install aws_runner dependencies" | ||
pip install -r aws_runner/client/requirements.txt | ||
|
||
echo "going to start testing" | ||
# start aws testingr | ||
python ce_runner.py \ | ||
--key_name aws_benchmark_us_east \ | ||
--security_group_id sg-95539dff \ | ||
--online_mode yes \ | ||
--trainer_count 2 \ | ||
--pserver_count 2 \ | ||
--pserver_command $training_command \ | ||
--trainer_command $training_command \ | ||
--docker_image $vgg16_test_dockerhub_tag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个dict 预留来做啥的? 我们现在ce 框架是把kpi指标相关的事情放到模型目录下的continuous_evaluation.py , 在模型代码中只有两个修改:add kpi record 和persist到文件,老师可以参考下:https://github.com/PaddlePaddle/paddle-ce-latest-kpis/wiki/CE-%E6%A8%A1%E5%9E%8B
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个dic本来是用来根据测试的输出值动态增加kpi的项目, 不过后来看到追踪项目是需要hard code到continuous_evaluation.py... 我来把这个dict精简掉.