Skip to content

Commit

Permalink
Polish code
Browse files Browse the repository at this point in the history
test=develop
  • Loading branch information
chengduozh committed Feb 28, 2019
1 parent c2ccd23 commit db3afe9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 53 deletions.
42 changes: 26 additions & 16 deletions python/paddle/fluid/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,30 @@ def _place_obj(place):
return p


def _is_pserver_mode(main_program):
main = main_program if main_program \
else default_main_program()
for op in main.global_block().ops:
if op.type in ["send", "recv"]:
return True
return False


def get_available_places(use_cuda):
if use_cuda:
gpus_env = os.getenv("FLAGS_selected_gpus")
if gpus_env:
gpus = [int(s) for s in gpus_env.split(",")]
else:
gpus = [i for i in six.moves.range(core.get_cuda_device_count())]
places = [core.CUDAPlace(i) for i in gpus]
else:
cpu_num = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
places = [core.CPUPlace() for _ in six.moves.range(cpu_num)]
assert places, "no place for execution"
return places


class CompiledProgram(object):
"""
Compiles to Graph for execution.
Expand Down Expand Up @@ -126,8 +150,7 @@ def with_data_parallel(self,
self._exec_strategy = ExecutionStrategy()
if self._build_strategy is None:
self._build_strategy = BuildStrategy()
self._build_strategy.is_distribution = framework.is_pserver_mode(
self._program)
self._build_strategy.is_distribution = _is_pserver_mode(self._program)
return self

def with_inference_optimize(self, config):
Expand Down Expand Up @@ -169,20 +192,7 @@ def _compile_data_parallel(self, use_cuda=False, scope=None):
self._local_scopes = []

self._exec_strategy.use_cuda = use_cuda
if self._exec_strategy.use_cuda:
gpus_env = os.getenv("FLAGS_selected_gpus")
if gpus_env:
gpus = [int(s) for s in gpus_env.split(",")]
else:
gpus = [
i for i in six.moves.range(core.get_cuda_device_count())
]
self._places = [core.CUDAPlace(i) for i in gpus]
else:
cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
self._places = [core.CPUPlace() for _ in six.moves.range(cpu_num)]
assert self._places, "no place for execution"
self._places = get_available_places(self._exec_strategy.use_cuda)

if self._exec_strategy.num_threads == 0:
if self._exec_strategy.use_cuda:
Expand Down
9 changes: 0 additions & 9 deletions python/paddle/fluid/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,6 @@ def _current_expected_place():
return _imperative_current_expected_place_


def is_pserver_mode(main_program):
main = main_program if main_program \
else default_main_program()
for op in main.global_block().ops:
if op.type in ["send", "recv"]:
return True
return False


class NameScope(object):
def __init__(self, name="", parent=None):
self._children = dict()
Expand Down
42 changes: 14 additions & 28 deletions python/paddle/fluid/parallel_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from . import core
from . import framework
from . import executor
from .. import compat as cpt
import warnings
import sys
import six
Expand Down Expand Up @@ -99,39 +98,26 @@ def __init__(self,
'Please use CompiledProgram and Executor. CompiledProgram '
'is a central place for optimization and Executor is the '
'unified executor. Example can be found in compiler.py.\n')
self._places = []
if use_cuda:
gpus_env = os.getenv("FLAGS_selected_gpus")
if gpus_env:
gpus = [int(s) for s in gpus_env.split(",")]
else:
gpus = [
i for i in six.moves.range(core.get_cuda_device_count())
]
self._places = [core.CUDAPlace(i) for i in gpus]
else:
cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
self._places = [core.CPUPlace() for _ in six.moves.range(cpu_num)]
assert self._places, "no place for execution"

main_program = main_program if main_program is not None\
else framework.default_main_program()

self.scope = scope if scope is not None else executor.global_scope()

if build_strategy is None:
build_strategy = BuildStrategy()
build_strategy.num_trainers = num_trainers
build_strategy.trainer_id = trainer_id

self._executor = compiler.CompiledProgram(
main_program).with_data_parallel(
loss_name=loss_name,
build_strategy=build_strategy,
exec_strategy=exec_strategy,
share_vars_from=share_vars_from)._compile_data_parallel(
use_cuda=use_cuda, scope=self.scope)
self._places = compiler.get_available_places(use_cuda)
self.scope = scope if scope is not None else executor.global_scope()

main_program = main_program if main_program is not None \
else framework.default_main_program()

self._compiled_program = compiler.CompiledProgram(main_program)
self._compiled_program.with_data_parallel(
loss_name=loss_name,
build_strategy=build_strategy,
exec_strategy=exec_strategy,
share_vars_from=share_vars_from)
self._executor = self._compiled_program._compile_data_parallel(
use_cuda=use_cuda, scope=self.scope)

def run(self, fetch_list, feed=None, feed_dict=None, return_numpy=True):
"""
Expand Down

0 comments on commit db3afe9

Please sign in to comment.