diff --git a/vgg16_aws_dist/__init__.py b/vgg16_aws_dist/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vgg16_aws_dist/ce_runner.py b/vgg16_aws_dist/ce_runner.py new file mode 100644 index 00000000..cecedcc2 --- /dev/null +++ b/vgg16_aws_dist/ce_runner.py @@ -0,0 +1,414 @@ +import argparse +import logging +import sys, os +import numpy as np +import threading +import copy +import csv +from aws_runner.client.train_command import TrainCommand + +# for ce env ONLY + +sys.path.append(os.environ['ceroot']) +from continuous_evaluation import cluster_specs, kpis_map, generate_kpi_id, generate_cluster_id + +from aws_runner.client.abclient import Abclient + + +def str2bool(v): + if v.lower() in ('yes', 'true', 't', 'y', '1'): + return True + elif v.lower() in ('no', 'false', 'f', 'n', '0'): + return False + else: + raise argparse.ArgumentTypeError('Boolean value expected.') + + +def print_arguments(): + print('----------- Configuration Arguments -----------') + for arg, value in sorted(vars(args).iteritems()): + print('%s: %s' % (arg, value)) + + +parser = argparse.ArgumentParser(description=__doc__) + +parser.add_argument( + '--key_name', type=str, default="", help="required, key pair name") +parser.add_argument( + '--security_group_id', + type=str, + default="", + help="required, the security group id associated with your VPC") + +parser.add_argument( + '--vpc_id', + type=str, + default="", + help="The VPC in which you wish to run test") +parser.add_argument( + '--subnet_id', + type=str, + default="", + help="The Subnet_id in which you wish to run test") + +parser.add_argument( + '--pserver_instance_type', + type=str, + default="c5.2xlarge", + help="your pserver instance type, c5.2xlarge by default") +parser.add_argument( + '--trainer_instance_type', + type=str, + default="p2.8xlarge", + help="your trainer instance type, p2.8xlarge by default") + +parser.add_argument( + '--task_name', + type=str, + default="", + help="the name you want to identify your job") + +parser.add_argument( + '--pserver_image_id', + type=str, + default="ami-da2c1cbf", + help="ami id for system image, default one has nvidia-docker ready, \ + use ami-1ae93962 for us-east-2") + +parser.add_argument( + '--pserver_command', + type=str, + default="", + help="pserver start command, format example: python,vgg.py,batch_size:128,is_local:yes" +) + +parser.add_argument( + '--trainer_image_id', + type=str, + default="ami-da2c1cbf", + help="ami id for system image, default one has nvidia-docker ready, \ + use ami-1ae93962 for us-west-2") + +parser.add_argument( + '--trainer_command', + type=str, + default="", + help="trainer start command, format example: python,vgg.py,batch_size:128,is_local:yes" +) + +parser.add_argument( + '--availability_zone', + type=str, + default="us-east-2a", + help="aws zone id to place ec2 instances") + +parser.add_argument( + '--action', type=str, default="create", help="create|cleanup|status") + +parser.add_argument('--pem_path', type=str, help="private key file") + +parser.add_argument( + '--pserver_port', type=str, default="5436", help="pserver port") + +parser.add_argument( + '--docker_image', + type=str, + default="busybox", + help="training docker image") + +parser.add_argument( + '--master_server_port', type=int, default=5436, help="master server port") + +parser.add_argument( + '--master_server_public_ip', type=str, help="master server public ip") + +parser.add_argument( + '--master_docker_image', + type=str, + default="putcn/paddle_aws_master:latest", + help="master docker image id") + +parser.add_argument( + '--no_clean_up', + type=str2bool, + default=False, + help="whether to clean up after training") + +parser.add_argument( + '--online_mode', + type=str2bool, + default=False, + help="is client activly stays online") + +args = parser.parse_args() +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') + + +class DataCollector(object): + ''' collect metrics data from training output + + This class is used to collect training metrics data and add/persist records + to KPI instances. + + ''' + _instance_store = {} + + @classmethod + def get_instance_by_spec(cls, cluster_spec): + ''' singleton method to get/create instnace + Args: + cluster_spec: a array based cluster spec. see contibuous_evaluation + for more detail + ''' + cluster_id = generate_cluster_id(cluster_spec) + if cluster_id not in cls._instance_store: + cls._instance_store[cluster_id] = cls(cluster_spec) + return cls._instance_store[cluster_id] + + @classmethod + def persist_all(cls): + ''' to persist all data to KPI ''' + for _, collector in cls._instance_store.iteritems(): + collector.persist() + + @classmethod + def generate_csv(cls): + ''' to generate csv from data managed by this class ''' + with open("report.csv", "w") as csvfile: + fieldnames = [] + rows = [] + for cluster_id, collector in cls._instance_store.iteritems(): + row = {"cluster_spec": cluster_id} + for metric_name, _ in collector.store.iteritems(): + if metric_name not in fieldnames: + fieldnames.append(metric_name) + row[metric_name] = collector.avg(metric_name) + rows.append(row) + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + for row in rows: + writer.writerow(row) + + def __init__(self, cluster_spec): + self.store = {} + self.metric_data_identifier = "**metrics_data: " + self.cluster_spec = cluster_spec + self.cluster_id = generate_cluster_id(cluster_spec) + + def log_processor(self, source, log_type): + ''' to catch metric data from training output + + Args: + source: is a python file like log out put. + log_type: indicats it's a stdout or stderr + ''' + for msg in iter(source.readline, ""): + logging.info(self.cluster_id) + logging.info(msg) + if (msg.startswith(self.metric_data_identifier)): + logging.info("metric data found, parse and save it") + str_msg = msg.replace(self.metric_data_identifier, "") + metrics_raw = str_msg.split(",") + for metric in metrics_raw: + metric_data = metric.split("=") + self.save(metric_data[0], metric_data[1]) + + def save(self, key, val): + ''' save metric data to its data store''' + key = key.strip() + if isinstance(val, str): + val = val.strip() + if (key not in self.store): + self.store[key] = [] + logging.info("going to save " + key + "=" + str(val) + "from " + + self.cluster_id) + self.store[key].append(float(val)) + + def get(self, key): + ''' return store by key if there is such store ''' + if (key in self.store): + return self.store[key] + return None + + def avg(self, key): + vals = self.get(key) + if vals is None: + return None + return sum(vals) / float(len(vals)) + + def persist(self): + ''' find kpi instance by id, add record to it, then persist the data ''' + for metric_name, _ in self.store.iteritems(): + kpi_id = generate_kpi_id(metric_name, self.cluster_spec) + logging.info("going to persist kpi " + kpi_id) + if kpi_id in kpis_map: + kpi_instance = kpis_map[kpi_id] + kpi_instance.add_record( + np.array( + self.avg(metric_name), dtype='float32')) + kpi_instance.persist() + logging.info("done persisting kpi " + kpi_id) + else: + logging.info("no such kpi id found in map!!!") + logging.info(kpi_id) + + +def train_with_spec(spec, args, lock): + ''' a thread targe to run a thread of test + + Args: + args: test config + lock: thread lock used when creating subnets, since subnets creating may + conflict with each other. + + ''' + logging.info("updating cluster config and starting client") + test_name = spec[0] + batch_size = spec[1] + args.trainer_count = spec[2] + gpus_per_trainer_count = spec[3] + args.pserver_count = spec[4] + trainer_command = TrainCommand(args.trainer_command) + + command_to_update = { + "model": test_name, + "batch_size": str(batch_size), + "gpus": str(gpus_per_trainer_count), + } + + if args.pserver_count == 0 and args.trainer_count == 1: + command_to_update["update_method"] = "local" + ''' not yet supported because aws runner can't provide PADDLE_TRAINER_IPS + if args.pserver_count == 0 and args.trainer_count > 1: + command_to_update["update_method"] = "nccl2" + ''' + + trainer_command.update(command_to_update) + args.trainer_command = trainer_command.unparse() + args.pserver_command = args.trainer_command + + data_collector = DataCollector.get_instance_by_spec(spec) + + logging.info(args) + abclient = Abclient(args, data_collector.log_processor, lock) + abclient.create() + + +class ClusterIterator: + ''' + ClusterIterator relies on spec structure as follows + batch_size, trainer_count, gpus_per_trainer_count, pserver_count + cluster_specs = [ + [64, 1, 1, 0], + [64, 8, 1, 8], + [64, 16, 1, 8], + [64, 33, 1, 8], + ] + it will sequentially distribute specs into chunks and make sure each chunk + does not exceeds trainer and pserver count limit + above specs will be distributed into 2 chunks + [[64, 1, 1, 0], [64, 8, 1, 8]] + and + [[64, 16, 1, 8]] + + [64, 33, 1, 8] itself does not fit in a single chunk, thus gets discard + + ''' + + def __init__(self, + specs, + trainer_count_threshold=32, + pserver_count_threshold=10): + self.specs = specs + self.trainer_count_threshold = trainer_count_threshold + self.pserver_count_threshold = pserver_count_threshold + self.bad_specs = [] + + def __iter__(self): + return self + + def spec_can_not_fit(self, trainer_count, pserver_count): + return (trainer_count > self.trainer_count_threshold or + pserver_count > self.pserver_count_threshold) + + def next(self): + specs_to_ret = [] + trainer_count = 0 + pserver_count = 0 + if len(self.specs) == 0: + raise StopIteration() + else: + while len(self.specs) != 0: + next_spec = self.specs[0] + # when single spec can't even fit, move it to bad spec list + if self.spec_can_not_fit(next_spec[2], next_spec[4]): + self.bad_specs.append(self.specs.pop(0)) + continue + trainer_count += next_spec[2] + pserver_count += next_spec[4] + if self.spec_can_not_fit(trainer_count, pserver_count): + break + specs_to_ret.append(self.specs.pop(0)) + if len(specs_to_ret) == 0: + if len(self.bad_specs) != 0: + logging.info("%d specs not be able to fit in any test chunk" % + len(self.bad_specs)) + raise StopIteration() + return specs_to_ret + + +if __name__ == "__main__": + print_arguments() + if args.action == "create": + lock = threading.Lock() + cluster_specs_origin = copy.copy(cluster_specs) + ''' start testing in chunks ''' + for specs in ClusterIterator(cluster_specs): + logging.info("starting a new chunk of test") + testing_threads = [] + for cluster_spec in specs: + logging.info("creating cluster thread with spec") + logging.info(cluster_spec) + thread = threading.Thread( + target=train_with_spec, + args=( + cluster_spec, + copy.copy(args), + lock, )) + testing_threads.append(thread) + + for testing_thread in testing_threads: + testing_thread.start() + + for testing_thread in testing_threads: + testing_thread.join() + logging.info("testing chunk ended") + + logging.info("all testing ended") + ''' all testing done, start to generate speedup rate ''' + + # spec[0] is the baseline + def get_speed_and_collector_by_spec(spec): + data_collector = DataCollector.get_instance_by_spec(spec) + return data_collector.avg("train_speed"), data_collector + + logging.info("generating speedup") + + # base_speed supposed to be one trainer, one gpu, local mode + base_speed, _ = get_speed_and_collector_by_spec(cluster_specs_origin[ + 0]) + if base_speed is not None: + logging.info("base speed is %f" % base_speed) + if base_speed is not None: + for cluster_spec in cluster_specs_origin: + speed, data_collector = get_speed_and_collector_by_spec( + cluster_spec) + if speed is not None: + # speed * trainer_count / base_speed + data_collector.save("speedup", speed * + cluster_spec[2] / base_speed) + else: + logging.info("base speed is not available") + + DataCollector.persist_all() + # DataCollector.generate_csv() diff --git a/vgg16_aws_dist/continuous_evaluation.py b/vgg16_aws_dist/continuous_evaluation.py new file mode 100644 index 00000000..bfec1f35 --- /dev/null +++ b/vgg16_aws_dist/continuous_evaluation.py @@ -0,0 +1,41 @@ +import os +import sys +sys.path.append(os.environ['ceroot']) +from kpi import LessWorseKpi, GreaterWorseKpi + +kpis_specs = { + "speedup": [LessWorseKpi, 0.01], + "train_speed": [LessWorseKpi, 0.01], + # "converge_speed":[GreaterWorseKpi, 0.01], + # "gpu_memory":[GreaterWorseKpi, 0.01], + # "acc_4passes":[GreaterWorseKpi, 0.01], +} + +# each row represets a cluster setting with the following columns +# test_name, batch_size, trainer_count, gpus_per_trainer_count, pserver_count + +cluster_specs = [ + ["vgg", 16, 1, 1, 0], + ["vgg", 16, 4, 4, 4], + ["vgg", 16, 7, 8, 7], +] + +kpis_map = {} + +tracking_kpis = [] + + +def generate_cluster_id(cluster_spec): + return "_".join(map(str, cluster_spec)) + + +def generate_kpi_id(kpi_name, cluster_spec): + return kpi_name + "_" + generate_cluster_id(cluster_spec) + + +for kpi_type_name, (Kpi_class, diff_thre) in kpis_specs.items(): + for cluster_spec in cluster_specs: + kpi_id = generate_kpi_id(kpi_type_name, cluster_spec) + the_kpi = Kpi_class(kpi_id, diff_thre) + tracking_kpis.append(the_kpi) + kpis_map[kpi_id] = the_kpi diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/Dockerfile b/vgg16_aws_dist/fluid_benchmark_for_aws/Dockerfile new file mode 100644 index 00000000..bef80bb6 --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/Dockerfile @@ -0,0 +1,7 @@ +FROM paddlepaddlece/paddle:latest + +ENV HOME /root +COPY ./ /root/ +WORKDIR /root +RUN apt install -y python-opencv +ENTRYPOINT ["python", "fluid_benchmark.py"] \ No newline at end of file diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/README.md b/vgg16_aws_dist/fluid_benchmark_for_aws/README.md new file mode 100644 index 00000000..357ce932 --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/README.md @@ -0,0 +1,73 @@ +# Fluid Benchmark + +Originally from https://github.com/PaddlePaddle/Paddle/tree/develop/benchmark/fluid + +This directory contains several models configurations and tools that used to run +Fluid benchmarks for local and distributed training. + + +## Run the Benchmark + +To start, run the following command to get the full help message: + +```bash +python fluid_benchmark.py --help +``` + +Currently supported `--model` argument include: + +* mnist +* resnet + * you can chose to use different dataset using `--data_set cifar10` or + `--data_set flowers`. +* vgg +* stacked_dynamic_lstm +* machine_translation + +* Run the following command to start a benchmark job locally: + ```bash + python fluid_benchmark.py --model mnist --device GPU + ``` + You can choose to use GPU/CPU training. With GPU training, you can specify + `--gpus ` to run multi GPU training. +* Run distributed training with parameter servers: + * start parameter servers: + ```bash + PADDLE_TRAINING_ROLE=PSERVER PADDLE_PSERVER_PORT=7164 PADDLE_PSERVER_IPS=127.0.0.1 PADDLE_TRAINERS=1 PADDLE_CURRENT_IP=127.0.0.1 PADDLE_TRAINER_ID=0 python fluid_benchmark.py --model mnist --device GPU --update_method pserver + ``` + * start trainers: + ```bash + PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_PORT=7164 PADDLE_PSERVER_IPS=127.0.0.1 PADDLE_TRAINERS=1 PADDLE_CURRENT_IP=127.0.0.1 PADDLE_TRAINER_ID=0 python fluid_benchmark.py --model mnist --device GPU --update_method pserver + ``` +* Run distributed training using NCCL2 + ```bash + PADDLE_PSERVER_PORT=7164 PADDLE_TRAINER_IPS=192.168.0.2,192.168.0.3 PADDLE_CURRENT_IP=127.0.0.1 PADDLE_TRAINER_ID=0 python fluid_benchmark.py --model mnist --device GPU --update_method nccl2 + ``` + +## Run Distributed Benchmark on Kubernetes Cluster + +We provide a script `kube_gen_job.py` to generate Kubernetes yaml files to submit +distributed benchmark jobs to your cluster. To generate a job yaml, just run: + +```bash +python kube_gen_job.py --jobname myjob --pscpu 4 --cpu 8 --gpu 8 --psmemory 20 --memory 40 --pservers 4 --trainers 4 --entry "python fluid_benchmark.py --model mnist --parallel 1 --device GPU --update_method pserver " --disttype pserver +``` + +Then the yaml files are generated under directory `myjob`, you can run: + +```bash +kubectl create -f myjob/ +``` + +The job shall start. + + +## Notes for Run Fluid Distributed with NCCL2 and RDMA + +Before running NCCL2 distributed jobs, please check that whether your node has multiple network +interfaces, try to add the environment variable `export NCCL_SOCKET_IFNAME=eth0` to use your actual +network device. + +To run high-performance distributed training, you must prepare your hardware environment to be +able to run RDMA enabled network communication, please check out [this](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/fluid/howto/cluster/nccl2_rdma_training.md) +note for details. diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/fluid_benchmark.py b/vgg16_aws_dist/fluid_benchmark_for_aws/fluid_benchmark.py new file mode 100644 index 00000000..72172726 --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/fluid_benchmark.py @@ -0,0 +1,476 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import cProfile +import time +import os + +import numpy as np + +import paddle.fluid as fluid +import paddle.fluid.core as core +import paddle.fluid.profiler as profiler +import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler + +BENCHMARK_MODELS = [ + "machine_translation", "resnet", "vgg", "mnist", "stacked_dynamic_lstm" +] + + +def parse_args(): + parser = argparse.ArgumentParser('Fluid model benchmarks.') + parser.add_argument( + '--model', + type=str, + choices=BENCHMARK_MODELS, + default='resnet', + help='The model to run benchmark with.') + parser.add_argument( + '--batch_size', type=int, default=32, help='The minibatch size.') + parser.add_argument( + '--learning_rate', + type=float, + default=0.001, + help='The minibatch size.') + # TODO(wuyi): add "--use_fake_data" option back. + parser.add_argument( + '--skip_batch_num', + type=int, + default=5, + help='The first num of minibatch num to skip, for better performance test' + ) + parser.add_argument( + '--iterations', + type=int, + default=80, + help='The number of minibatches.') + parser.add_argument( + '--pass_num', type=int, default=100, help='The number of passes.') + parser.add_argument( + '--data_format', + type=str, + default='NCHW', + choices=['NCHW', 'NHWC'], + help='The data data_format, now only support NCHW.') + parser.add_argument( + '--device', + type=str, + default='GPU', + choices=['CPU', 'GPU'], + help='The device type.') + parser.add_argument( + '--gpus', + type=int, + default=1, + help='If gpus > 1, will use ParallelExecutor to run, else use Executor.' + ) + parser.add_argument( + '--data_set', + type=str, + default='flowers', + choices=['cifar10', 'flowers'], + help='Optional dataset for benchmark.') + parser.add_argument( + '--infer_only', action='store_true', help='If set, run forward only.') + parser.add_argument( + '--use_cprof', action='store_true', help='If set, use cProfile.') + parser.add_argument( + '--use_nvprof', + action='store_true', + help='If set, use nvprof for CUDA.') + parser.add_argument( + '--no_test', + action='store_false', + help='If set, test the testset during training.') + parser.add_argument( + '--memory_optimize', + action='store_true', + help='If set, optimize runtime memory before start.') + parser.add_argument( + '--use_fake_data', + action='store_true', + help='If set ommit the actual read data operators.') + parser.add_argument( + '--profile', action='store_true', help='If set, profile a few steps.') + parser.add_argument( + '--update_method', + type=str, + default='local', + choices=['local', 'pserver', 'nccl2'], + help='Choose parameter update method, can be local, pserver, nccl2.') + + parser.add_argument( + "--acc_target", + default=0.6, + type=float, + help="trianing will be terminated when acc_target reaches") + + args = parser.parse_args() + return args + + +def append_nccl2_prepare(trainer_id): + if trainer_id >= 0: + # append gen_nccl_id at the end of startup program + trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) + port = os.getenv("PADDLE_PSERVER_PORT") + worker_ips = os.getenv("PADDLE_TRAINER_IPS") + worker_endpoints = [] + for ip in worker_ips.split(","): + worker_endpoints.append(':'.join([ip, port])) + num_trainers = len(worker_endpoints) + current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port + worker_endpoints.remove(current_endpoint) + + nccl_id_var = fluid.default_startup_program().global_block( + ).create_var( + name="NCCLID", + persistable=True, + type=fluid.core.VarDesc.VarType.RAW) + fluid.default_startup_program().global_block().append_op( + type="gen_nccl_id", + inputs={}, + outputs={"NCCLID": nccl_id_var}, + attrs={ + "endpoint": current_endpoint, + "endpoint_list": worker_endpoints, + "trainer_id": trainer_id + }) + return nccl_id_var, num_trainers, trainer_id + else: + raise Exception( + "must set positive PADDLE_TRAINER_ID env variables for " + "nccl-based dist train.") + + +def dist_transpile(trainer_id): + if trainer_id < 0: + return None, None + + # the port of all pservers, needed by both trainer and pserver + port = os.getenv("PADDLE_PSERVER_PORT", "6174") + # comma separated ips of all pservers, needed by trainer and + # pserver + pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "") + eplist = [] + for ip in pserver_ips.split(","): + eplist.append(':'.join([ip, port])) + pserver_endpoints = ",".join(eplist) + # total number of workers/trainers in the job, needed by + # trainer and pserver + trainers = int(os.getenv("PADDLE_TRAINERS")) + # the IP of the local machine, needed by pserver only + current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port + # the role, should be either PSERVER or TRAINER + training_role = os.getenv("PADDLE_TRAINING_ROLE") + + t = distribute_transpiler.DistributeTranspiler() + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) + if training_role == "PSERVER": + pserver_program = t.get_pserver_program(current_endpoint) + pserver_startup_program = t.get_startup_program(current_endpoint, + pserver_program) + return pserver_program, pserver_startup_program + elif training_role == "TRAINER": + train_program = t.get_trainer_program() + return train_program, fluid.default_startup_program() + else: + raise ValueError( + 'TRAINING_ROLE environment variable must be either TRAINER or PSERVER' + ) + + +def test(exe, inference_program, test_reader, feeder, batch_acc): + accuracy_evaluator = fluid.metrics.Accuracy() + for batch_id, data in enumerate(test_reader()): + acc = exe.run(inference_program, + feed=feeder.feed(data), + fetch_list=[batch_acc]) + accuracy_evaluator.update(value=np.array(acc), weight=len(data)) + + return accuracy_evaluator.eval() + + +# TODO(wuyi): replace train, train_parallel, test functions with new trainer +# API once it is ready. +def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, + batch_acc, batch_size_tensor, args, train_prog, startup_prog): + if os.getenv("PADDLE_TRAINING_ROLE") == "PSERVER": + place = core.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_prog) + exe.run(train_prog) + return + + if args.use_fake_data: + raise Exception( + "fake data is not supported in single GPU test for now.") + + place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) + exe = fluid.Executor(place) + exe.run(startup_prog) + feed_var_list = [ + var for var in train_prog.global_block().vars.itervalues() + if var.is_data + ] + feeder = fluid.DataFeeder(feed_var_list, place) + + acc_4passes = None + converge_speed = None + train_pass_acc = fluid.average.WeightedAverage() + fetch_list = [avg_loss] + if batch_acc is not None: + fetch_list.append(batch_acc) + + iters, num_samples, start_time = 0, 0, time.time() + for pass_id in range(args.pass_num): + train_losses = [] + train_pass_acc.reset() + for batch_id, data in enumerate(train_reader()): + if iters == args.skip_batch_num: + start_time = time.time() + num_samples = 0 + if iters == args.iterations: + break + outs = exe.run(train_prog, + feed=feeder.feed(data), + fetch_list=fetch_list) + iters += 1 + num_samples += len(data) + loss = outs[0] + if batch_acc is not None: + acc = np.mean(outs[1]).item() + train_pass_acc.add(value=acc, weight=len(data)) + else: + acc = None + train_losses.append(loss) + print("Pass: %d, Iter: %d, Loss: %f, acc %s\n" % + (pass_id, iters, np.mean(train_losses), str(acc))) + if converge_speed is None and args.acc_target and acc >= args.acc_target: + converge_speed = time.time() - start_time + print("converge_speed set with %f" % converge_speed) + train_elapsed = time.time() - start_time + examples_per_sec = num_samples / train_elapsed + if batch_acc is not None: + pass_train_acc = train_pass_acc.eval() + else: + pass_train_acc = None + + if pass_id == 4 and batch_acc is not None: + print("acc_4passes set with %f" % pass_train_acc) + acc_4passes = float(pass_train_acc) + + output_metric_data(pass_id, examples_per_sec, pass_train_acc, + acc_4passes, converge_speed) + + # evaluation + if not args.no_test and batch_acc != None: + pass_test_acc = test(exe, infer_prog, test_reader, feeder, + batch_acc) + print(", Test Accuracy: %f" % pass_test_acc) + print("\n") + # TODO(wuyi): add warmup passes to get better perf data. + exit(0) + + +# TODO(wuyi): replace train, train_parallel, test functions with new trainer +# API once it is ready. +def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, + batch_acc, batch_size_tensor, args, train_prog, + startup_prog, nccl_id_var, num_trainers, trainer_id): + feed_var_list = [ + var for var in train_prog.global_block().vars.itervalues() + if var.is_data + ] + # generate fake: + if args.use_fake_data: + for var in feed_var_list: + v = startup_prog.global_block().clone_variable(var) + var.persistable = True + v.persistable = True + + real_shape = list(var.shape) + real_shape[0] = args.batch_size / args.gpus + startup_prog.global_block().append_op( + outputs={"Out": v}, + type="fill_constant", + attrs={"shape": real_shape, + "value": 1.0, + "dtype": var.dtype}) + + place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) + if nccl_id_var and trainer_id == 0: + #FIXME(wuyi): wait other trainer to start listening + time.sleep(30) + + startup_exe = fluid.Executor(place) + startup_exe.run(startup_prog) + strategy = fluid.ExecutionStrategy() + strategy.num_threads = 1 + strategy.allow_op_delay = False + exe = fluid.ParallelExecutor( + True, + avg_loss.name, + exec_strategy=strategy, + num_trainers=num_trainers, + trainer_id=trainer_id) + + feeder = fluid.DataFeeder(feed_var_list, place) + acc_4passes = None + converge_speed = None + accuracy_evaluator = fluid.metrics.Accuracy() + fetch_list = [avg_loss.name] + if batch_acc is not None: + fetch_list.append(batch_acc.name) + start_time = time.time() + + for pass_id in range(args.pass_num): + num_samples = 0 + iters = 0 + pass_start_time = time.time() + accuracy_evaluator.reset() + for batch_id, data in enumerate(train_reader()): + if args.profile and pass_id == 0 and batch_id == 5: + profiler.start_profiler("All") + elif args.profile and pass_id == 0 and batch_id == 10: + profiler.stop_profiler("total", "/tmp/profile_%d" % trainer_id) + + if iters == args.skip_batch_num: + start_time = time.time() + num_samples = 0 + if iters == args.iterations: + break + if args.use_fake_data: + outs = exe.run(fetch_list) + else: + outs = exe.run(fetch_list, feed=feeder.feed(data)) + + if args.update_method == "pserver": + exe.bcast_params() + num_samples += len(data) + iters += 1 + + if batch_acc is not None: + acc = np.mean(outs[1]).item() + accuracy_evaluator.update(value=acc, weight=len(data)) + else: + acc = None + + if batch_id % 1 == 0: + print("Pass %d, batch %d, loss %s, acc %s" % + (pass_id, batch_id, np.mean(outs[0]), str(acc))) + if converge_speed is None and args.acc_target and acc >= args.acc_target: + converge_speed = time.time() - start_time + print("converge_speed set with %f" % converge_speed) + + pass_elapsed = time.time() - pass_start_time + examples_per_sec = num_samples / pass_elapsed + if batch_acc is not None: + pass_train_acc = accuracy_evaluator.eval() + else: + pass_train_acc = None + + if pass_id == 4 and batch_acc is not None: + print("acc_4passes set with %f" % pass_train_acc) + acc_4passes = float(pass_train_acc) + + output_metric_data(pass_id, examples_per_sec, pass_train_acc, + acc_4passes, converge_speed) + + if not args.no_test and batch_acc != None: + test_acc = test(startup_exe, infer_prog, test_reader, feeder, + batch_acc) + print("Pass: %d, Test Accuracy: %f\n" % (pass_id, test_acc)) + exit(0) + + +def output_metric_data(pass_id, examples_per_sec, pass_train_acc, acc_4passes, + converge_speed): + msgs = [] + msgs.append("pass = %d" % pass_id) + msgs.append("train_speed = %f" % float(examples_per_sec)) + if isinstance(pass_train_acc, float): + msgs.append("train_accuracy = %f" % pass_train_acc) + if isinstance(acc_4passes, float): + msgs.append("acc_4passes = %f" % acc_4passes) + if isinstance(converge_speed, float): + msgs.append("converge_speed = %f" % converge_speed) + print("**metrics_data: " + ", ".join(msgs)) + + +def print_arguments(args): + vars(args)['use_nvprof'] = (vars(args)['use_nvprof'] and + vars(args)['device'] == 'GPU') + print('----------- resnet Configuration Arguments -----------') + for arg, value in sorted(vars(args).iteritems()): + print('%s: %s' % (arg, value)) + print('------------------------------------------------') + + +def main(): + args = parse_args() + print_arguments(args) + + # the unique trainer id, starting from 0, needed by trainer + # only + nccl_id_var, num_trainers, trainer_id = ( + None, 1, int(os.getenv("PADDLE_TRAINER_ID", "-1"))) + + if args.use_cprof: + pr = cProfile.Profile() + pr.enable() + model_def = __import__("models.%s" % args.model, fromlist=["models"]) + train_args = list(model_def.get_model(args)) + train_args.append(args) + # Run optimizer.minimize(avg_loss) + train_args[2].minimize(train_args[0]) + if args.memory_optimize: + fluid.memory_optimize(fluid.default_main_program()) + + if args.update_method == "pserver": + train_prog, startup_prog = dist_transpile(trainer_id) + if not train_prog: + raise Exception( + "Must configure correct environments to run dist train.") + train_args.extend([train_prog, startup_prog]) + if args.gpus > 1 and os.getenv("PADDLE_TRAINING_ROLE") == "TRAINER": + train_args.extend([nccl_id_var, num_trainers, trainer_id]) + train_parallel(*train_args) + train(*train_args) + exit(0) + + # for other update methods, use default programs + train_args.append(fluid.default_main_program()) + train_args.append(fluid.default_startup_program()) + + if args.update_method == "nccl2": + nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare( + trainer_id) + if args.gpus == 1: + # NOTE: parallel executor use profiler interanlly + if args.use_nvprof and args.device == 'GPU': + with profiler.cuda_profiler("cuda_profiler.txt", 'csv') as nvprof: + train(*train_args) + else: + train(*train_args) + else: + if args.device == "CPU": + raise Exception("Only support GPU perf with parallel exe") + train_args.extend([nccl_id_var, num_trainers, trainer_id]) + train_parallel(*train_args) + + +if __name__ == "__main__": + main() diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/kube_gen_job.py b/vgg16_aws_dist/fluid_benchmark_for_aws/kube_gen_job.py new file mode 100644 index 00000000..92680a89 --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/kube_gen_job.py @@ -0,0 +1,194 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import yaml +import copy +import argparse +import random +import os +from kube_templates import pserver, trainer, envs + + +def parse_args(): + parser = argparse.ArgumentParser(description='Generate dist job yamls.') + + parser.add_argument( + '--jobname', default="paddlejob", help='unique job name') + parser.add_argument( + '--cpu', default=1, type=int, help='CPU cores per trainer node') + parser.add_argument( + '--pscpu', default=1, type=int, help='CPU cores per pserver node') + parser.add_argument( + '--gpu', default=0, type=int, help='num of GPUs per node') + parser.add_argument( + '--image', + default="bootstrapper:5000/fluid_benchmark:gpu", + help='num of GPUs per node') + parser.add_argument( + '--pservers', default=1, type=int, help='num of pservers') + parser.add_argument( + '--trainers', default=1, type=int, help='num of trainers') + parser.add_argument('--memory', default=1, type=int, help='trainer memory') + parser.add_argument( + '--psmemory', default=1, type=int, help='pserver memory') + parser.add_argument( + '--port', default=30236, type=int, help='num of trainers') + parser.add_argument( + '--entry', default="python train.py", help='command to run') + parser.add_argument( + '--fluid', default=1, type=int, help='whether is fluid job') + parser.add_argument( + '--rdma', action='store_ture', help='whether mount rdma libs') + parser.add_argument( + '--disttype', + default="pserver", + type=str, + choices=['pserver', 'nccl2', 'local'], + help='pserver or nccl2 or local') + + args = parser.parse_args() + return args + + +def gen_job(): + ps = pserver + tn = trainer + args = parse_args() + + ps_container = ps["spec"]["template"]["spec"]["containers"][0] + tn_container = tn["spec"]["template"]["spec"]["containers"][0] + + if args.fluid == 1: + ps_container["command"] = \ + ["paddle_k8s", "start_fluid"] + tn_container["command"] = \ + ["paddle_k8s", "start_fluid"] + ps["metadata"]["name"] = args.jobname + "-pserver" + ps["spec"]["template"]["metadata"]["labels"][ + "paddle-job-pserver"] = args.jobname + tn["metadata"]["name"] = args.jobname + "-trainer" + tn["spec"]["template"]["metadata"]["labels"]["paddle-job"] = args.jobname + + ps_container["image"] = args.image + tn_container["image"] = args.image + + ps_container["resources"]["requests"]["cpu"] = str(args.pscpu) + ps_container["resources"]["requests"]["memory"] = str(args.psmemory) + "Gi" + ps_container["resources"]["limits"]["cpu"] = str(args.pscpu) + ps_container["resources"]["limits"]["memory"] = str(args.psmemory) + "Gi" + + tn_container["resources"]["requests"]["cpu"] = str(args.cpu) + tn_container["resources"]["requests"]["memory"] = str(args.memory) + "Gi" + tn_container["resources"]["limits"]["cpu"] = str(args.cpu) + tn_container["resources"]["limits"]["memory"] = str(args.memory) + "Gi" + if args.gpu > 0: + tn_container["resources"]["requests"][ + "alpha.kubernetes.io/nvidia-gpu"] = str(args.gpu) + tn_container["resources"]["limits"][ + "alpha.kubernetes.io/nvidia-gpu"] = str(args.gpu) + + ps["spec"]["replicas"] = int(args.pservers) + tn["spec"]["parallelism"] = int(args.trainers) + tn["spec"]["completions"] = int(args.trainers) + ps_container["ports"][0]["name"] = "jobport-" + str(args.port) + ps_container["ports"][0]["containerPort"] = args.port + spreadport = random.randint(40000, 60000) + tn_container["ports"][0]["name"] = "spr-" + str(spreadport) + tn_container["ports"][0]["containerPort"] = spreadport + + envs.append({"name": "PADDLE_JOB_NAME", "value": args.jobname}) + envs.append({"name": "TRAINERS", "value": str(args.trainers)}) + envs.append({"name": "PSERVERS", "value": str(args.pservers)}) + envs.append({"name": "ENTRY", "value": args.entry}) + envs.append({"name": "PADDLE_INIT_PORT", "value": str(args.port)}) + envs.append({"name": "PADDLE_PSERVER_PORT", "value": str(args.port)}) + # NOTE: these directories below are cluster specific, please modify + # this settings before you run on your own cluster. + envs.append({ + "name": "LD_LIBRARY_PATH", + "value": + "/usr/local/lib:/usr/local/nvidia/lib64:/usr/local/rdma/lib64:/usr/lib64/mlnx_ofed/valgrind" + }) + + volumes = [{ + "name": "nvidia-driver", + "hostPath": { + "path": "/usr/local/nvidia/lib64" + } + }] + volumeMounts = [{ + "mountPath": "/usr/local/nvidia/lib64", + "name": "nvidia-driver" + }] + + if args.rdma: + volumes.extend([{ + "name": "ibetc", + "hostPath": { + "path": "/etc/libibverbs.d" + } + }, { + "name": "iblibs", + "hostPath": { + "path": "/usr/local/rdma" + } + }, { + "name": "valgrind", + "hostPath": { + "path": "/usr/lib64/mlnx_ofed/valgrind" + } + }]) + volumeMounts.extend([{ + "mountPath": "/etc/libibverbs.d", + "name": "ibetc" + }, { + "mountPath": "/usr/local/rdma", + "name": "iblibs" + }, { + "mountPath": "/usr/lib64/mlnx_ofed/valgrind", + "name": "valgrind" + }]) + # append shm for NCCL2 + volumes.append({"name": "dshm", "emptyDir": {"medium": "Memory"}}) + volumeMounts.append({"mountPath": "/dev/shm", "name": "dshm"}) + + tn["spec"]["template"]["spec"]["volumes"] = volumes + tn_container["volumeMounts"] = volumeMounts + + ps_container["env"] = envs + ps_container["env"].append({"name": "TRAINING_ROLE", "value": "PSERVER"}) + tn_container["env"] = envs + if args.disttype == "pserver": + tn_container["env"].append({ + "name": "TRAINING_ROLE", + "value": "TRAINER" + }) + elif args.disttype == "nccl2" or args.disttype == "local": + # NCCL2 have no training role, set to plain WORKER + tn_container["env"].append({ + "name": "TRAINING_ROLE", + "value": "WORKER" + }) + + os.mkdir(args.jobname) + if args.disttype == "pserver": + with open("%s/pserver.yaml" % args.jobname, "w") as fn: + yaml.dump(ps, fn) + + with open("%s/trainer.yaml" % args.jobname, "w") as fn: + yaml.dump(tn, fn) + + +if __name__ == "__main__": + gen_job() diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/kube_templates/__init__.py b/vgg16_aws_dist/fluid_benchmark_for_aws/kube_templates/__init__.py new file mode 100644 index 00000000..2d09d940 --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/kube_templates/__init__.py @@ -0,0 +1,66 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pserver import pserver +from trainer import trainer + +__all__ = ["pserver", "trainer", "envs"] + +envs = [ + # envs that don't need to change + { + "name": "GLOG_v", + "value": "0" + }, + { + "name": "GLOG_logtostderr", + "value": "1" + }, + { + "name": "TOPOLOGY", + "value": "" + }, + { + "name": "TRAINER_PACKAGE", + "value": "/workspace" + }, + { + "name": "PADDLE_INIT_NICS", + "value": "eth2" + }, + { + "name": "NAMESPACE", + "valueFrom": { + "fieldRef": { + "fieldPath": "metadata.namespace" + } + } + }, + { + "name": "POD_IP", + "valueFrom": { + "fieldRef": { + "fieldPath": "status.podIP" + } + } + }, + { + "name": "PADDLE_CURRENT_IP", + "valueFrom": { + "fieldRef": { + "fieldPath": "status.podIP" + } + } + } +] diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/kube_templates/pserver.py b/vgg16_aws_dist/fluid_benchmark_for_aws/kube_templates/pserver.py new file mode 100644 index 00000000..b54982c8 --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/kube_templates/pserver.py @@ -0,0 +1,58 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +pserver = { + "apiVersion": "extensions/v1beta1", + "kind": "ReplicaSet", + "metadata": { + "name": "jobname-pserver" + }, + "spec": { + "replicas": 1, + "template": { + "metadata": { + "labels": { + "paddle-job-pserver": "jobname" + } + }, + "spec": { + "hostNetwork": True, + "imagePullSecrets": [{ + "name": "job-registry-secret" + }], + "containers": [{ + "name": "pserver", + "image": "", + "imagePullPolicy": "Always", + "ports": [{ + "name": "jobport-1", + "containerPort": 1 + }], + "env": [], + "command": ["paddle_k8s", "start_pserver"], + "resources": { + "requests": { + "memory": "10Gi", + "cpu": "4" + }, + "limits": { + "memory": "10Gi", + "cpu": "4" + } + } + }] + } + } + } +} diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/kube_templates/trainer.py b/vgg16_aws_dist/fluid_benchmark_for_aws/kube_templates/trainer.py new file mode 100644 index 00000000..b915d31e --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/kube_templates/trainer.py @@ -0,0 +1,70 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +trainer = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "name": "jobname-pserver" + }, + "spec": { + "parallelism": 4, + "completions": 4, + "template": { + "metadata": { + "labels": { + "paddle-job": "jobname" + } + }, + "spec": { + "hostNetwork": True, + "imagePullSecrets": [{ + "name": "job-registry-secret" + }], + "restartPolicy": "Never", + "containers": [{ + "name": "trainer", + "image": "", + "imagePullPolicy": "Always", + # to let container set rlimit + "securityContext": { + "privileged": True + # TODO(wuyi): use below specific cap instead of privileged, + # using privileged will cause all GPU device are visible + # in the container. + # "capabilities": { + # "add": ["SYS_RESOURCE"] + # } + }, + "ports": [{ + "name": "jobport-1", + "containerPort": 1 + }], + "env": [], + "command": ["paddle_k8s", "start_trainer", "v2"], + "resources": { + "requests": { + "memory": "10Gi", + "cpu": "4", + }, + "limits": { + "memory": "10Gi", + "cpu": "4", + } + } + }] + } + } + } +} diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/models/__init__.py b/vgg16_aws_dist/fluid_benchmark_for_aws/models/__init__.py new file mode 100644 index 00000000..1c3fcac8 --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/models/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = [ + "machine_translation", "resnet", "vgg", "mnist", "stacked_dynamic_lstm" +] diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/models/machine_translation.py b/vgg16_aws_dist/fluid_benchmark_for_aws/models/machine_translation.py new file mode 100644 index 00000000..745f4415 --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/models/machine_translation.py @@ -0,0 +1,233 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""seq2seq model for fluid.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import argparse +import time +import distutils.util + +import paddle +import paddle.fluid as fluid +import paddle.fluid.core as core +import paddle.fluid.framework as framework +from paddle.fluid.executor import Executor + + +def lstm_step(x_t, hidden_t_prev, cell_t_prev, size): + def linear(inputs): + return fluid.layers.fc(input=inputs, size=size, bias_attr=True) + + forget_gate = fluid.layers.sigmoid(x=linear([hidden_t_prev, x_t])) + input_gate = fluid.layers.sigmoid(x=linear([hidden_t_prev, x_t])) + output_gate = fluid.layers.sigmoid(x=linear([hidden_t_prev, x_t])) + cell_tilde = fluid.layers.tanh(x=linear([hidden_t_prev, x_t])) + + cell_t = fluid.layers.sums(input=[ + fluid.layers.elementwise_mul( + x=forget_gate, y=cell_t_prev), fluid.layers.elementwise_mul( + x=input_gate, y=cell_tilde) + ]) + + hidden_t = fluid.layers.elementwise_mul( + x=output_gate, y=fluid.layers.tanh(x=cell_t)) + + return hidden_t, cell_t + + +def seq_to_seq_net(embedding_dim, encoder_size, decoder_size, source_dict_dim, + target_dict_dim, is_generating, beam_size, max_length): + """Construct a seq2seq network.""" + + def bi_lstm_encoder(input_seq, gate_size): + # Linear transformation part for input gate, output gate, forget gate + # and cell activation vectors need be done outside of dynamic_lstm. + # So the output size is 4 times of gate_size. + input_forward_proj = fluid.layers.fc(input=input_seq, + size=gate_size * 4, + act=None, + bias_attr=False) + forward, _ = fluid.layers.dynamic_lstm( + input=input_forward_proj, size=gate_size * 4, use_peepholes=False) + input_reversed_proj = fluid.layers.fc(input=input_seq, + size=gate_size * 4, + act=None, + bias_attr=False) + reversed, _ = fluid.layers.dynamic_lstm( + input=input_reversed_proj, + size=gate_size * 4, + is_reverse=True, + use_peepholes=False) + return forward, reversed + + src_word_idx = fluid.layers.data( + name='source_sequence', shape=[1], dtype='int64', lod_level=1) + + src_embedding = fluid.layers.embedding( + input=src_word_idx, + size=[source_dict_dim, embedding_dim], + dtype='float32') + + src_forward, src_reversed = bi_lstm_encoder( + input_seq=src_embedding, gate_size=encoder_size) + + encoded_vector = fluid.layers.concat( + input=[src_forward, src_reversed], axis=1) + + encoded_proj = fluid.layers.fc(input=encoded_vector, + size=decoder_size, + bias_attr=False) + + backward_first = fluid.layers.sequence_pool( + input=src_reversed, pool_type='first') + + decoder_boot = fluid.layers.fc(input=backward_first, + size=decoder_size, + bias_attr=False, + act='tanh') + + def lstm_decoder_with_attention(target_embedding, encoder_vec, + encoder_proj, decoder_boot, decoder_size): + def simple_attention(encoder_vec, encoder_proj, decoder_state): + decoder_state_proj = fluid.layers.fc(input=decoder_state, + size=decoder_size, + bias_attr=False) + decoder_state_expand = fluid.layers.sequence_expand( + x=decoder_state_proj, y=encoder_proj) + concated = fluid.layers.concat( + input=[encoder_proj, decoder_state_expand], axis=1) + attention_weights = fluid.layers.fc(input=concated, + size=1, + act='tanh', + bias_attr=False) + attention_weights = fluid.layers.sequence_softmax( + input=attention_weights) + weigths_reshape = fluid.layers.reshape( + x=attention_weights, shape=[-1]) + scaled = fluid.layers.elementwise_mul( + x=encoder_vec, y=weigths_reshape, axis=0) + context = fluid.layers.sequence_pool(input=scaled, pool_type='sum') + return context + + rnn = fluid.layers.DynamicRNN() + + cell_init = fluid.layers.fill_constant_batch_size_like( + input=decoder_boot, + value=0.0, + shape=[-1, decoder_size], + dtype='float32') + cell_init.stop_gradient = False + + with rnn.block(): + current_word = rnn.step_input(target_embedding) + encoder_vec = rnn.static_input(encoder_vec) + encoder_proj = rnn.static_input(encoder_proj) + hidden_mem = rnn.memory(init=decoder_boot, need_reorder=True) + cell_mem = rnn.memory(init=cell_init) + context = simple_attention(encoder_vec, encoder_proj, hidden_mem) + decoder_inputs = fluid.layers.concat( + input=[context, current_word], axis=1) + h, c = lstm_step(decoder_inputs, hidden_mem, cell_mem, + decoder_size) + rnn.update_memory(hidden_mem, h) + rnn.update_memory(cell_mem, c) + out = fluid.layers.fc(input=h, + size=target_dict_dim, + bias_attr=True, + act='softmax') + rnn.output(out) + return rnn() + + if not is_generating: + trg_word_idx = fluid.layers.data( + name='target_sequence', shape=[1], dtype='int64', lod_level=1) + + trg_embedding = fluid.layers.embedding( + input=trg_word_idx, + size=[target_dict_dim, embedding_dim], + dtype='float32') + + prediction = lstm_decoder_with_attention(trg_embedding, encoded_vector, + encoded_proj, decoder_boot, + decoder_size) + label = fluid.layers.data( + name='label_sequence', shape=[1], dtype='int64', lod_level=1) + cost = fluid.layers.cross_entropy(input=prediction, label=label) + avg_cost = fluid.layers.mean(x=cost) + + feeding_list = ["source_sequence", "target_sequence", "label_sequence"] + + return avg_cost, feeding_list + + +def to_lodtensor(data, place): + seq_lens = [len(seq) for seq in data] + cur_len = 0 + lod = [cur_len] + for l in seq_lens: + cur_len += l + lod.append(cur_len) + flattened_data = np.concatenate(data, axis=0).astype("int64") + flattened_data = flattened_data.reshape([len(flattened_data), 1]) + lod_t = core.LoDTensor() + lod_t.set(flattened_data, place) + lod_t.set_lod([lod]) + return lod_t, lod[-1] + + +def lodtensor_to_ndarray(lod_tensor): + dims = lod_tensor.get_dims() + ndarray = np.zeros(shape=dims).astype('float32') + for i in xrange(np.product(dims)): + ndarray.ravel()[i] = lod_tensor.get_float_element(i) + return ndarray + + +def get_model(args): + embedding_dim = 512 + encoder_size = 512 + decoder_size = 512 + dict_size = 30000 + beam_size = 3 + max_length = 250 + avg_cost, feeding_list = seq_to_seq_net( + embedding_dim, + encoder_size, + decoder_size, + dict_size, + dict_size, + False, + beam_size=beam_size, + max_length=max_length) + + # clone from default main program + inference_program = fluid.default_main_program().clone() + + optimizer = fluid.optimizer.Adam(learning_rate=args.learning_rate) + + train_batch_generator = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.wmt14.train(dict_size), buf_size=1000), + batch_size=args.batch_size) + + test_batch_generator = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.wmt14.test(dict_size), buf_size=1000), + batch_size=args.batch_size) + + return avg_cost, inference_program, optimizer, train_batch_generator, \ + test_batch_generator, None, None diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/models/mnist.py b/vgg16_aws_dist/fluid_benchmark_for_aws/models/mnist.py new file mode 100644 index 00000000..9606304b --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/models/mnist.py @@ -0,0 +1,94 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import argparse +import time +import cProfile + +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler + +SEED = 1 +DTYPE = "float32" + +# random seed must set before configuring the network. +# fluid.default_startup_program().random_seed = SEED + + +def cnn_model(data): + conv_pool_1 = fluid.nets.simple_img_conv_pool( + input=data, + filter_size=5, + num_filters=20, + pool_size=2, + pool_stride=2, + act="relu") + conv_pool_2 = fluid.nets.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=50, + pool_size=2, + pool_stride=2, + act="relu") + + # TODO(dzhwinter) : refine the initializer and random seed settting + SIZE = 10 + input_shape = conv_pool_2.shape + param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE] + scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5 + + predict = fluid.layers.fc( + input=conv_pool_2, + size=SIZE, + act="softmax", + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.NormalInitializer( + loc=0.0, scale=scale))) + return predict + + +def get_model(args): + # Input data + images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + # Train program + predict = cnn_model(images) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) + + # Evaluator + batch_size_tensor = fluid.layers.create_tensor(dtype='int64') + batch_acc = fluid.layers.accuracy( + input=predict, label=label, total=batch_size_tensor) + + # inference program + inference_program = fluid.default_main_program().clone() + + # Optimization + opt = fluid.optimizer.AdamOptimizer( + learning_rate=0.001, beta1=0.9, beta2=0.999) + + # Reader + train_reader = paddle.batch( + paddle.dataset.mnist.train(), batch_size=args.batch_size) + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=args.batch_size) + return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc, batch_size_tensor diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/models/resnet.py b/vgg16_aws_dist/fluid_benchmark_for_aws/models/resnet.py new file mode 100644 index 00000000..933d1105 --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/models/resnet.py @@ -0,0 +1,162 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import functools +import numpy as np +import time + +import cProfile, pstats, StringIO + +import paddle +import paddle.fluid as fluid +import paddle.fluid.core as core +import paddle.fluid.profiler as profiler + + +def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'): + conv1 = fluid.layers.conv2d( + input=input, + filter_size=filter_size, + num_filters=ch_out, + stride=stride, + padding=padding, + act=None, + bias_attr=False) + return fluid.layers.batch_norm(input=conv1, act=act) + + +def shortcut(input, ch_out, stride): + ch_in = input.shape[ + 1] # if args.data_format == 'NCHW' else input.shape[-1] + if ch_in != ch_out: + return conv_bn_layer(input, ch_out, 1, stride, 0, None) + else: + return input + + +def basicblock(input, ch_out, stride): + short = shortcut(input, ch_out, stride) + conv1 = conv_bn_layer(input, ch_out, 3, stride, 1) + conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, act=None) + return fluid.layers.elementwise_add(x=short, y=conv2, act='relu') + + +def bottleneck(input, ch_out, stride): + short = shortcut(input, ch_out * 4, stride) + conv1 = conv_bn_layer(input, ch_out, 1, stride, 0) + conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1) + conv3 = conv_bn_layer(conv2, ch_out * 4, 1, 1, 0, act=None) + return fluid.layers.elementwise_add(x=short, y=conv3, act='relu') + + +def layer_warp(block_func, input, ch_out, count, stride): + res_out = block_func(input, ch_out, stride) + for i in range(1, count): + res_out = block_func(res_out, ch_out, 1) + return res_out + + +def resnet_imagenet(input, class_dim, depth=50, data_format='NCHW'): + + cfg = { + 18: ([2, 2, 2, 1], basicblock), + 34: ([3, 4, 6, 3], basicblock), + 50: ([3, 4, 6, 3], bottleneck), + 101: ([3, 4, 23, 3], bottleneck), + 152: ([3, 8, 36, 3], bottleneck) + } + stages, block_func = cfg[depth] + conv1 = conv_bn_layer(input, ch_out=64, filter_size=7, stride=2, padding=3) + pool1 = fluid.layers.pool2d( + input=conv1, pool_type='avg', pool_size=3, pool_stride=2) + res1 = layer_warp(block_func, pool1, 64, stages[0], 1) + res2 = layer_warp(block_func, res1, 128, stages[1], 2) + res3 = layer_warp(block_func, res2, 256, stages[2], 2) + res4 = layer_warp(block_func, res3, 512, stages[3], 2) + pool2 = fluid.layers.pool2d( + input=res4, + pool_size=7, + pool_type='avg', + pool_stride=1, + global_pooling=True) + out = fluid.layers.fc(input=pool2, size=class_dim, act='softmax') + return out + + +def resnet_cifar10(input, class_dim, depth=32, data_format='NCHW'): + assert (depth - 2) % 6 == 0 + + n = (depth - 2) // 6 + + conv1 = conv_bn_layer( + input=input, ch_out=16, filter_size=3, stride=1, padding=1) + res1 = layer_warp(basicblock, conv1, 16, n, 1) + res2 = layer_warp(basicblock, res1, 32, n, 2) + res3 = layer_warp(basicblock, res2, 64, n, 2) + pool = fluid.layers.pool2d( + input=res3, pool_size=8, pool_type='avg', pool_stride=1) + out = fluid.layers.fc(input=pool, size=class_dim, act='softmax') + return out + + +def get_model(args): + model = resnet_cifar10 + if args.data_set == "cifar10": + class_dim = 10 + if args.data_format == 'NCHW': + dshape = [3, 32, 32] + else: + dshape = [32, 32, 3] + model = resnet_cifar10 + else: + class_dim = 102 + if args.data_format == 'NCHW': + dshape = [3, 224, 224] + else: + dshape = [224, 224, 3] + model = resnet_imagenet + + input = fluid.layers.data(name='data', shape=dshape, dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + predict = model(input, class_dim) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) + + batch_size_tensor = fluid.layers.create_tensor(dtype='int64') + batch_acc = fluid.layers.accuracy( + input=predict, label=label, total=batch_size_tensor) + + inference_program = fluid.default_main_program().clone() + with fluid.program_guard(inference_program): + inference_program = fluid.io.get_inference_program( + target_vars=[batch_acc, batch_size_tensor]) + + optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9) + + train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.cifar.train10() + if args.data_set == 'cifar10' else paddle.dataset.flowers.train(), + buf_size=5120), + batch_size=args.batch_size) + test_reader = paddle.batch( + paddle.dataset.cifar.test10() + if args.data_set == 'cifar10' else paddle.dataset.flowers.test(), + batch_size=args.batch_size) + + return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc, batch_size_tensor diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/models/stacked_dynamic_lstm.py b/vgg16_aws_dist/fluid_benchmark_for_aws/models/stacked_dynamic_lstm.py new file mode 100644 index 00000000..bd44a607 --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/models/stacked_dynamic_lstm.py @@ -0,0 +1,139 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import cPickle +import os +import random +import time + +import numpy +import paddle +import paddle.dataset.imdb as imdb +import paddle.fluid as fluid +import paddle.batch as batch +import paddle.fluid.profiler as profiler + +word_dict = imdb.word_dict() + + +def crop_sentence(reader, crop_size): + unk_value = word_dict[''] + + def __impl__(): + for item in reader(): + if len([x for x in item[0] if x != unk_value]) < crop_size: + yield item + + return __impl__ + + +def get_model(args): + lstm_size = 512 + emb_dim = 512 + crop_size = 1500 + + data = fluid.layers.data( + name="words", shape=[1], lod_level=1, dtype='int64') + sentence = fluid.layers.embedding( + input=data, size=[len(word_dict), emb_dim]) + + sentence = fluid.layers.fc(input=sentence, size=lstm_size, act='tanh') + + rnn = fluid.layers.DynamicRNN() + with rnn.block(): + word = rnn.step_input(sentence) + prev_hidden = rnn.memory(value=0.0, shape=[lstm_size]) + prev_cell = rnn.memory(value=0.0, shape=[lstm_size]) + + def gate_common( + ipt, + hidden, + size, ): + gate0 = fluid.layers.fc(input=ipt, size=size, bias_attr=True) + gate1 = fluid.layers.fc(input=hidden, size=size, bias_attr=False) + gate = fluid.layers.sums(input=[gate0, gate1]) + return gate + + forget_gate = fluid.layers.sigmoid( + x=gate_common(word, prev_hidden, lstm_size)) + input_gate = fluid.layers.sigmoid( + x=gate_common(word, prev_hidden, lstm_size)) + output_gate = fluid.layers.sigmoid( + x=gate_common(word, prev_hidden, lstm_size)) + cell_gate = fluid.layers.tanh( + x=gate_common(word, prev_hidden, lstm_size)) + + cell = fluid.layers.sums(input=[ + fluid.layers.elementwise_mul( + x=forget_gate, y=prev_cell), fluid.layers.elementwise_mul( + x=input_gate, y=cell_gate) + ]) + + hidden = fluid.layers.elementwise_mul( + x=output_gate, y=fluid.layers.tanh(x=cell)) + + rnn.update_memory(prev_cell, cell) + rnn.update_memory(prev_hidden, hidden) + rnn.output(hidden) + + last = fluid.layers.sequence_pool(rnn(), 'last') + logit = fluid.layers.fc(input=last, size=2, act='softmax') + loss = fluid.layers.cross_entropy( + input=logit, + label=fluid.layers.data( + name='label', shape=[1], dtype='int64')) + loss = fluid.layers.mean(x=loss) + + # add acc + batch_size_tensor = fluid.layers.create_tensor(dtype='int64') + batch_acc = fluid.layers.accuracy(input=logit, label=fluid.layers.data(name='label', \ + shape=[1], dtype='int64'), total=batch_size_tensor) + + inference_program = fluid.default_main_program().clone() + with fluid.program_guard(inference_program): + inference_program = fluid.io.get_inference_program( + target_vars=[batch_acc, batch_size_tensor]) + + adam = fluid.optimizer.Adam() + + train_reader = batch( + paddle.reader.shuffle( + crop_sentence(imdb.train(word_dict), crop_size), buf_size=25000), + batch_size=args.batch_size) + test_reader = batch( + paddle.reader.shuffle( + crop_sentence(imdb.test(word_dict), crop_size), buf_size=25000), + batch_size=args.batch_size) + + return loss, inference_program, adam, train_reader, test_reader, batch_acc, batch_size_tensor + + +def to_lodtensor(data, place): + seq_lens = [len(seq) for seq in data] + cur_len = 0 + lod = [cur_len] + for l in seq_lens: + cur_len += l + lod.append(cur_len) + flattened_data = numpy.concatenate(data, axis=0).astype("int64") + flattened_data = flattened_data.reshape([len(flattened_data), 1]) + res = fluid.LoDTensor() + res.set(flattened_data, place) + res.set_lod([lod]) + return res diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/models/vgg.py b/vgg16_aws_dist/fluid_benchmark_for_aws/models/vgg.py new file mode 100644 index 00000000..6571bbf6 --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/models/vgg.py @@ -0,0 +1,104 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""VGG16 benchmark in Fluid""" +from __future__ import print_function + +import sys +import time +import numpy as np +import paddle +import paddle.fluid as fluid +import paddle.fluid.core as core +import argparse +import functools + + +def vgg16_bn_drop(input): + def conv_block(input, num_filter, groups, dropouts): + return fluid.nets.img_conv_group( + input=input, + pool_size=2, + pool_stride=2, + conv_num_filter=[num_filter] * groups, + conv_filter_size=3, + conv_act='relu', + conv_with_batchnorm=True, + conv_batchnorm_drop_rate=dropouts, + pool_type='max') + + conv1 = conv_block(input, 64, 2, [0.3, 0]) + conv2 = conv_block(conv1, 128, 2, [0.4, 0]) + conv3 = conv_block(conv2, 256, 3, [0.4, 0.4, 0]) + conv4 = conv_block(conv3, 512, 3, [0.4, 0.4, 0]) + conv5 = conv_block(conv4, 512, 3, [0.4, 0.4, 0]) + + drop = fluid.layers.dropout(x=conv5, dropout_prob=0.5) + fc1 = fluid.layers.fc(input=drop, size=512, act=None) + bn = fluid.layers.batch_norm(input=fc1, act='relu') + drop2 = fluid.layers.dropout(x=bn, dropout_prob=0.5) + fc2 = fluid.layers.fc(input=drop2, size=512, act=None) + return fc2 + + +def get_model(args): + if args.data_set == "cifar10": + classdim = 10 + if args.data_format == 'NCHW': + data_shape = [3, 32, 32] + else: + data_shape = [32, 32, 3] + else: + classdim = 102 + if args.data_format == 'NCHW': + data_shape = [3, 224, 224] + else: + data_shape = [224, 224, 3] + + # Input data + images = fluid.layers.data(name='pixel', shape=data_shape, dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + # Train program + net = vgg16_bn_drop(images) + predict = fluid.layers.fc(input=net, size=classdim, act='softmax') + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) + + # Evaluator + batch_size_tensor = fluid.layers.create_tensor(dtype='int64') + batch_acc = fluid.layers.accuracy( + input=predict, label=label, total=batch_size_tensor) + + # inference program + inference_program = fluid.default_main_program().clone() + with fluid.program_guard(inference_program): + inference_program = fluid.io.get_inference_program( + target_vars=[batch_acc, batch_size_tensor]) + + # Optimization + optimizer = fluid.optimizer.Adam(learning_rate=args.learning_rate) + + # data reader + train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.cifar.train10() + if args.data_set == 'cifar10' else paddle.dataset.flowers.train(), + buf_size=5120), + batch_size=args.batch_size) + test_reader = paddle.batch( + paddle.dataset.cifar.test10() + if args.data_set == 'cifar10' else paddle.dataset.flowers.test(), + batch_size=args.batch_size) + + return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc, batch_size_tensor diff --git a/vgg16_aws_dist/fluid_benchmark_for_aws/run.sh b/vgg16_aws_dist/fluid_benchmark_for_aws/run.sh new file mode 100644 index 00000000..f6dfd20b --- /dev/null +++ b/vgg16_aws_dist/fluid_benchmark_for_aws/run.sh @@ -0,0 +1,105 @@ +#!/bin/bash +# This script benchmarking the PaddlePaddle Fluid on +# single thread single GPU. + +#export FLAGS_fraction_of_gpu_memory_to_use=0.0 +export CUDNN_PATH=/paddle/cudnn_v5 + +# disable openmp and mkl parallel +#https://github.com/PaddlePaddle/Paddle/issues/7199 +export MKL_NUM_THREADS=1 +export OMP_NUM_THREADS=1 +ht=`lscpu |grep "per core"|awk -F':' '{print $2}'|xargs` +if [ $ht -eq 1 ]; then # HT is OFF + if [ -z "$KMP_AFFINITY" ]; then + export KMP_AFFINITY="granularity=fine,compact,0,0" + fi + if [ -z "$OMP_DYNAMIC" ]; then + export OMP_DYNAMIC="FALSE" + fi +else # HT is ON + if [ -z "$KMP_AFFINITY" ]; then + export KMP_AFFINITY="granularity=fine,compact,1,0" + fi +fi +# disable multi-gpu if have more than one +export CUDA_VISIBLE_DEVICES=0 +export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH +export LD_LIBRARY_PATH=$CUDNN_PATH:$LD_LIBRARY_PATH + +# only query the gpu used +nohup stdbuf -oL nvidia-smi \ + --id=${CUDA_VISIBLE_DEVICES} \ + --query-gpu=timestamp \ + --query-compute-apps=pid,process_name,used_memory \ + --format=csv \ + --filename=mem.log \ + -l 1 & +# mnist +# mnist gpu mnist 128 +FLAGS_benchmark=true stdbuf -oL python fluid/mnist.py \ + --device=GPU \ + --batch_size=128 \ + --skip_batch_num=5 \ + --iterations=500 \ + 2>&1 | tee -a mnist_gpu_128.log + +# vgg16 +# gpu cifar10 128 +FLAGS_benchmark=true stdbuf -oL python fluid/vgg16.py \ + --device=GPU \ + --batch_size=128 \ + --skip_batch_num=5 \ + --iterations=30 \ + 2>&1 | tee -a vgg16_gpu_128.log + +# flowers gpu 128 +FLAGS_benchmark=true stdbuf -oL python fluid/vgg16.py \ + --device=GPU \ + --batch_size=32 \ + --data_set=flowers \ + --skip_batch_num=5 \ + --iterations=30 \ + 2>&1 | tee -a vgg16_gpu_flowers_32.log + +# resnet50 +# resnet50 gpu cifar10 128 +FLAGS_benchmark=true stdbuf -oL python fluid/resnet50.py \ + --device=GPU \ + --batch_size=128 \ + --data_set=cifar10 \ + --model=resnet_cifar10 \ + --skip_batch_num=5 \ + --iterations=30 \ + 2>&1 | tee -a resnet50_gpu_128.log + +# resnet50 gpu flowers 64 +FLAGS_benchmark=true stdbuf -oL python fluid/resnet50.py \ + --device=GPU \ + --batch_size=64 \ + --data_set=flowers \ + --model=resnet_imagenet \ + --skip_batch_num=5 \ + --iterations=30 \ + 2>&1 | tee -a resnet50_gpu_flowers_64.log + +# lstm +# lstm gpu imdb 32 # tensorflow only support batch=32 +FLAGS_benchmark=true stdbuf -oL python fluid/stacked_dynamic_lstm.py \ + --device=GPU \ + --batch_size=32 \ + --skip_batch_num=5 \ + --iterations=30 \ + --hidden_dim=512 \ + --emb_dim=512 \ + --crop_size=1500 \ + 2>&1 | tee -a lstm_gpu_32.log + +# seq2seq +# seq2seq gpu wmb 128 +FLAGS_benchmark=true stdbuf -oL python fluid/machine_translation.py \ + --device=GPU \ + --batch_size=128 \ + --skip_batch_num=5 \ + --iterations=30 \ + 2>&1 | tee -a lstm_gpu_128.log diff --git a/vgg16_aws_dist/run.xsh b/vgg16_aws_dist/run.xsh new file mode 100755 index 00000000..e512fcbe --- /dev/null +++ b/vgg16_aws_dist/run.xsh @@ -0,0 +1,67 @@ +#!/bin/bash + +set -xe + +CURRENT_FILE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +PADDLE_PATH=$CURRENT_FILE_DIR/../../.. +paddle_build_path=$PADDLE_PATH/build +paddle_docker_hub_tag="paddlepaddlece/paddle:latest" +fluid_benchmark_dockerhub_tag="paddlepaddlece/fluid_benchmark:latest" +training_command="update_method:pserver,acc_target:0.6,iterations:100,pass_num:1" + +# clean up docker +docker system prune -f + +# loginto docker hub +# login is now performed in teamcity +# docker login -u $DOCKER_HUB_USERNAME -p $DOCKER_HUB_PASSWORD + +# create paddle docker image +echo "going to build and push paddle production image" +docker build -t $paddle_docker_hub_tag $paddle_build_path +docker push $paddle_docker_hub_tag + +# build test docker image +cd $CURRENT_FILE_DIR + +cd fluid_benchmark_for_aws +if [ -d ~/.cache/paddle/dataset/cifar ]; then + echo "host cifar dataset cache found, copying it to docker root" + mkdir -p .cache/paddle/dataset/ + cp -r -f ~/.cache/paddle/dataset/cifar .cache/paddle/dataset/ +fi + +if [ -d ~/.cache/paddle/dataset/flowers ]; then + echo "host flower dataset cache found, copying it to docker root" + mkdir -p .cache/paddle/dataset/ + cp -r -f ~/.cache/paddle/dataset/flowers .cache/paddle/dataset/ +fi + +cd .. + +echo "going to build fluid_benchmark_for_aws docker image and push it" +docker build -t $fluid_benchmark_dockerhub_tag ./fluid_benchmark_for_aws +docker push $fluid_benchmark_dockerhub_tag + +# fetch runner and install dependencies +echo "going to work with aws_runner" +if [ ! -d aws_runner ]; then + echo "no aws_runner found, cloning one" + git clone https://github.com/putcn/aws_runner.git +fi +cd aws_runner +git pull +cd .. +echo "going to install aws_runner dependencies" +pip install -r aws_runner/client/requirements.txt + +echo "going to start testing" +# start aws testingr +python ce_runner.py \ + --key_name aws_benchmark_us_east \ + --security_group_id sg-95539dff \ + --online_mode yes \ + --pserver_command $training_command \ + --trainer_command $training_command \ + --docker_image $fluid_benchmark_dockerhub_tag + \ No newline at end of file diff --git a/vgg16_aws_dist/run_local.xsh b/vgg16_aws_dist/run_local.xsh new file mode 100755 index 00000000..7045882b --- /dev/null +++ b/vgg16_aws_dist/run_local.xsh @@ -0,0 +1,65 @@ +#!/bin/bash + +set -xe + +CURRENT_FILE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +PADDLE_PATH=$CURRENT_FILE_DIR/../../.. +paddle_build_path=$PADDLE_PATH/build +paddle_docker_hub_tag="paddlepaddlece/paddle:latest" +fluid_benchmark_dockerhub_tag="paddlepaddlece/fluid_benchmark:latest" +training_command="update_method:pserver,acc_target:0.6,iterations:100,pass_num:1" + +# clean up docker +docker system prune -f + +# loginto docker hub +# login is now performed in teamcity +# docker login -u $DOCKER_HUB_USERNAME -p $DOCKER_HUB_PASSWORD + +# create paddle docker image +echo "going to build and push paddle production image" +#docker build -t $paddle_docker_hub_tag $paddle_build_path +#docker push $paddle_docker_hub_tag + +# build test docker image +cd $CURRENT_FILE_DIR + +cd fluid_benchmark_for_aws +if [ -d ~/.cache/paddle/dataset/cifar ]; then + echo "host cifar dataset cache found, copying it to docker root" + mkdir -p .cache/paddle/dataset/ + cp -r -f ~/.cache/paddle/dataset/cifar .cache/paddle/dataset/ +fi + +if [ -d ~/.cache/paddle/dataset/flowers ]; then + echo "host flower dataset cache found, copying it to docker root" + mkdir -p .cache/paddle/dataset/ + cp -r -f ~/.cache/paddle/dataset/flowers .cache/paddle/dataset/ +fi +cd .. + +echo "going to build fluid_benchmark_for_aws docker image and push it" +docker build -t $fluid_benchmark_dockerhub_tag ./fluid_benchmark_for_aws +docker push $fluid_benchmark_dockerhub_tag + +# fetch runner and install dependencies +echo "going to work with aws_runner" +if [ ! -d aws_runner ]; then + echo "no aws_runner found, cloning one" + git clone https://github.com/putcn/aws_runner.git +fi +cd aws_runner +git pull +cd .. +echo "going to install aws_runner dependencies" +pip install -r aws_runner/client/requirements.txt + +echo "going to start testing" +# start aws testingr +python ce_runner.py \ + --key_name aws_benchmark_us_east \ + --security_group_id sg-95539dff \ + --online_mode yes \ + --pserver_command $training_command \ + --trainer_command $training_command \ + --docker_image $fluid_benchmark_dockerhub_tag \ No newline at end of file diff --git a/vgg16_aws_dist/speedup_vgg_16_1_1_0_factor.txt b/vgg16_aws_dist/speedup_vgg_16_1_1_0_factor.txt new file mode 100644 index 00000000..e7a19a6e --- /dev/null +++ b/vgg16_aws_dist/speedup_vgg_16_1_1_0_factor.txt @@ -0,0 +1 @@ +[1.0] \ No newline at end of file diff --git a/vgg16_aws_dist/speedup_vgg_16_4_4_4_factor.txt b/vgg16_aws_dist/speedup_vgg_16_4_4_4_factor.txt new file mode 100644 index 00000000..3ea09272 --- /dev/null +++ b/vgg16_aws_dist/speedup_vgg_16_4_4_4_factor.txt @@ -0,0 +1 @@ +[10.233551979064941] \ No newline at end of file diff --git a/vgg16_aws_dist/speedup_vgg_16_7_8_7_factor.txt b/vgg16_aws_dist/speedup_vgg_16_7_8_7_factor.txt new file mode 100644 index 00000000..c3f822e5 --- /dev/null +++ b/vgg16_aws_dist/speedup_vgg_16_7_8_7_factor.txt @@ -0,0 +1 @@ +[11.316923141479492] \ No newline at end of file diff --git a/vgg16_aws_dist/train_speed_vgg_16_1_1_0_factor.txt b/vgg16_aws_dist/train_speed_vgg_16_1_1_0_factor.txt new file mode 100644 index 00000000..55d41345 --- /dev/null +++ b/vgg16_aws_dist/train_speed_vgg_16_1_1_0_factor.txt @@ -0,0 +1 @@ +[11.437457084655762] \ No newline at end of file diff --git a/vgg16_aws_dist/train_speed_vgg_16_4_4_4_factor.txt b/vgg16_aws_dist/train_speed_vgg_16_4_4_4_factor.txt new file mode 100644 index 00000000..c133cf2f --- /dev/null +++ b/vgg16_aws_dist/train_speed_vgg_16_4_4_4_factor.txt @@ -0,0 +1 @@ +[29.26145362854004] \ No newline at end of file diff --git a/vgg16_aws_dist/train_speed_vgg_16_7_8_7_factor.txt b/vgg16_aws_dist/train_speed_vgg_16_7_8_7_factor.txt new file mode 100644 index 00000000..b0991782 --- /dev/null +++ b/vgg16_aws_dist/train_speed_vgg_16_7_8_7_factor.txt @@ -0,0 +1 @@ +[18.49097442626953] \ No newline at end of file