Skip to content
This repository has been archived by the owner on Feb 2, 2024. It is now read-only.

Commit

Permalink
Merge 0067bdd into be69dbf
Browse files Browse the repository at this point in the history
  • Loading branch information
shssf committed Aug 3, 2019
2 parents be69dbf + 0067bdd commit efa9411
Show file tree
Hide file tree
Showing 19 changed files with 1,266 additions and 973 deletions.
30 changes: 14 additions & 16 deletions hpat/distributed_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ class Distribution(Enum):
OneD_Var = 4
OneD = 5


_dist_analysis_result = namedtuple(
'dist_analysis_result', 'array_dists,parfor_dists')

distributed_analysis_extensions = {}
auto_rebalance = False


class DistributedAnalysis(object):
"""Analyze program for distributed transformation"""

Expand Down Expand Up @@ -120,8 +122,8 @@ def _analyze_assign(self, inst, array_dists, parfor_dists):
rhs = rhs.value

if isinstance(rhs, ir.Var) and (is_array(self.typemap, lhs)
or isinstance(self.typemap[lhs], (SeriesType, DataFrameType))
or is_array_container(self.typemap, lhs)):
or isinstance(self.typemap[lhs], (SeriesType, DataFrameType))
or is_array_container(self.typemap, lhs)):
self._meet_array_dists(lhs, rhs.name, array_dists)
return
elif (is_array(self.typemap, lhs)
Expand Down Expand Up @@ -207,7 +209,7 @@ def _analyze_parfor(self, parfor, array_dists, parfor_dists):
ind_def = self.func_ir._definitions[index]
if len(ind_def) == 1 and isinstance(ind_def[0], ir.Var):
index = ind_def[0].name
if index == par_index_var: #or index in stencil_accesses:
if index == par_index_var: # or index in stencil_accesses:
parfor_arrs.add(arr)
self._parallel_accesses.add((arr, index))

Expand Down Expand Up @@ -259,7 +261,7 @@ def _analyze_call(self, lhs, rhs, func_var, args, array_dists):
# blocks of data are passed in, TODO: document
func_def = guard(get_definition, self.func_ir, rhs.func)
if isinstance(func_def, ir.Const) and isinstance(func_def.value,
numba.dispatcher.ObjModeLiftedWith):
numba.dispatcher.ObjModeLiftedWith):
return
warnings.warn(
"function call couldn't be found for distributed analysis")
Expand Down Expand Up @@ -299,11 +301,11 @@ def _analyze_call(self, lhs, rhs, func_var, args, array_dists):
return

if hpat.config._has_h5py and (func_mod == 'hpat.io.pio_api'
and func_name in ('h5read', 'h5write', 'h5read_filter')):
and func_name in ('h5read', 'h5write', 'h5read_filter')):
return

if hpat.config._has_h5py and (func_mod == 'hpat.io.pio_api'
and func_name == 'get_filter_read_indices'):
and func_name == 'get_filter_read_indices'):
if lhs not in array_dists:
array_dists[lhs] = Distribution.OneD
return
Expand Down Expand Up @@ -366,10 +368,10 @@ def _analyze_call(self, lhs, rhs, func_var, args, array_dists):

# dummy hiframes functions
if func_mod == 'hpat.hiframes.api' and func_name in ('get_series_data',
'get_series_index',
'to_arr_from_series', 'ts_series_to_arr_typ',
'to_date_series_type', 'dummy_unbox_series',
'parallel_fix_df_array'):
'get_series_index',
'to_arr_from_series', 'ts_series_to_arr_typ',
'to_date_series_type', 'dummy_unbox_series',
'parallel_fix_df_array'):
# TODO: support Series type similar to Array
self._meet_array_dists(lhs, rhs.args[0].name, array_dists)
return
Expand Down Expand Up @@ -504,7 +506,6 @@ def _analyze_call(self, lhs, rhs, func_var, args, array_dists):
# set REP if not found
self._analyze_call_set_REP(lhs, args, array_dists, fdef)


def _analyze_call_np(self, lhs, func_name, args, array_dists):
"""analyze distributions of numpy functions (np.func_name)
"""
Expand Down Expand Up @@ -566,7 +567,6 @@ def _analyze_call_np(self, lhs, func_name, args, array_dists):
# set REP if not found
self._analyze_call_set_REP(lhs, args, array_dists, 'np.' + func_name)


def _analyze_call_array(self, lhs, arr, func_name, args, array_dists):
"""analyze distributions of array functions (arr.func_name)
"""
Expand Down Expand Up @@ -802,7 +802,7 @@ def _analyze_getitem(self, inst, lhs, rhs, array_dists):
# whole slice or strided slice access
# for example: A = X[:,5], A = X[::2,5]
if guard(is_whole_slice, self.typemap, self.func_ir, index_var,
accept_stride=True):
accept_stride=True):
self._meet_array_dists(lhs, rhs.value.name, array_dists)
return

Expand Down Expand Up @@ -872,7 +872,6 @@ def _set_REP(self, var_list, array_dists):
tuple_vars = var_def.items
self._set_REP(tuple_vars, array_dists)


def _rebalance_arrs(self, array_dists, parfor_dists):
# rebalance an array if it is accessed in a parfor that has output
# arrays or is in a loop
Expand All @@ -893,7 +892,7 @@ def _rebalance_arrs(self, array_dists, parfor_dists):
array_accesses = _get_array_accesses(
inst.loop_body, self.func_ir, self.typemap)
onedv_arrs = set(arr for (arr, ind) in array_accesses
if arr in array_dists and array_dists[arr] == Distribution.OneD_Var)
if arr in array_dists and array_dists[arr] == Distribution.OneD_Var)
if (label in loop_bodies
or _arrays_written(onedv_arrs, inst.loop_body)):
rebalance_arrs |= onedv_arrs
Expand Down Expand Up @@ -1009,7 +1008,6 @@ def _arrays_written(arrs, blocks):
# return False



# array access code is copied from ir_utils to be able to handle specialized
# array access calls such as get_split_view_index()
# TODO: implement extendable version in ir_utils
Expand Down
Loading

0 comments on commit efa9411

Please sign in to comment.