Skip to content
This repository has been archived by the owner on Feb 2, 2024. It is now read-only.

Commit

Permalink
Merge 1ace757 into 5fba799
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlov-alexey committed Aug 28, 2019
2 parents 5fba799 + 1ace757 commit 6c95171
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 224 deletions.
66 changes: 44 additions & 22 deletions hpat/distributed_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ def gather_scalar(data): # pragma: no cover

c_gather_scalar = types.ExternalFunction("c_gather_scalar", types.void(types.voidptr, types.voidptr, types.int32))


# TODO: test
@overload(gather_scalar)
def gather_scalar_overload(val):
assert isinstance(val, (types.Integer, types.Float))
# TODO: other types like boolean
typ_val = _numba_to_c_type_map[val]

func_text = (
"def gather_scalar_impl(val):\n"
" n_pes = hpat.distributed_api.get_size()\n"
Expand Down Expand Up @@ -159,7 +161,6 @@ def gatherv_impl(data):
displs = np.empty(1, np.int32)
if rank == MPI_ROOT:
displs = hpat.hiframes.join.calc_disp(recv_counts)
# print(rank, n_loc, n_total, recv_counts, displs)
c_gatherv(
data.ctypes,
np.int32(n_loc),
Expand Down Expand Up @@ -195,15 +196,14 @@ def gatherv_str_arr_impl(data):

# displacements
all_data = StringArray(['']) # dummy arrays on non-root PEs
displs = np.empty(1, np.int32)
displs_char = np.empty(1, np.int32)
displs = np.empty(0, np.int32)
displs_char = np.empty(0, np.int32)

if rank == MPI_ROOT:
all_data = pre_alloc_string_array(n_total, n_total_char)
displs = hpat.hiframes.join.calc_disp(recv_counts)
displs_char = hpat.hiframes.join.calc_disp(recv_counts_char)

# print(rank, n_loc, n_total, recv_counts, displs)
offset_ptr = get_offset_ptr(all_data)
data_ptr = get_data_ptr(all_data)
c_gatherv(
Expand Down Expand Up @@ -330,30 +330,46 @@ def const_slice_getitem(arr, slice_index, start, count):

@overload(const_slice_getitem)
def const_slice_getitem_overload(arr, slice_index, start, count):
'''Provides parallel implementation of getting a const slice from arrays of different types
Arguments:
arr -- part of the input array processed by this processor
slice_index -- start and stop of the slice in the input array (same on all ranks)
start -- position of first arr element in the input array
count -- lenght of the part of the array processed by this processor
Return value:
Function providing implementation basing on arr type. The function should implement
logic of fetching const slice from the array distributed over multiple processes.
'''

# TODO: should this also handle slices not staring from zero?
if arr == string_array_type:
reduce_op = Reduce_Type.Sum.value

def getitem_str_impl(arr, slice_index, start, count):
rank = hpat.distributed_api.get_rank()
k = slice_index.stop

# get total characters for allocation
n_chars = np.uint64(0)
if k > count:
if k > start:
# if slice end is beyond the start of this subset we have to send our elements
my_end = min(count, max(k - start, 0))
my_arr = arr[:my_end]
my_arr = hpat.distributed_api.gatherv(my_arr)
n_chars = hpat.distributed_api.dist_reduce(
num_total_chars(my_arr), np.int32(reduce_op))
if rank == 0:
out_arr = my_arr
else:
if rank == 0:
my_arr = arr[:k]
n_chars = num_total_chars(my_arr)
out_arr = my_arr
n_chars = bcast_scalar(n_chars)
my_arr = arr[:0]

# get the total number of chars in our array, then gather all arrays into one
# and compute total number of chars in all arrays
n_chars = num_total_chars(my_arr)
my_arr = hpat.distributed_api.gatherv(my_arr)
n_chars = hpat.distributed_api.dist_reduce(n_chars, np.int32(reduce_op))

if rank != 0:
out_arr = pre_alloc_string_array(k, n_chars)
else:
out_arr = my_arr

# actual communication
hpat.distributed_api.bcast(out_arr)
Expand All @@ -364,17 +380,23 @@ def getitem_str_impl(arr, slice_index, start, count):
def getitem_impl(arr, slice_index, start, count):
rank = hpat.distributed_api.get_rank()
k = slice_index.stop

out_arr = np.empty(k, arr.dtype)
if k > count:
my_arr = arr[:0]
if k > start:
# if slice end is beyond the start of this subset we have to send our elements
my_end = min(count, max(k - start, 0))
my_arr = arr[:my_end]
my_arr = hpat.distributed_api.gatherv(my_arr)
if rank == 0:
print(my_arr)
out_arr = my_arr
else:
if rank == 0:
out_arr = arr[:k]
my_arr = arr[:0]

# gather all subsets from all processors
my_arr = hpat.distributed_api.gatherv(my_arr)

if rank == 0:
out_arr = my_arr

# actual communication
hpat.distributed_api.bcast(out_arr)
return out_arr

Expand Down
44 changes: 26 additions & 18 deletions hpat/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import hpat
import random
from hpat.tests.test_utils import (count_array_REPs, count_parfor_REPs,
count_parfor_OneDs, count_array_OneDs, count_array_OneD_Vars,
dist_IR_contains, get_rank, get_start_end)
count_parfor_OneDs, count_array_OneDs, count_array_OneD_Vars,
dist_IR_contains, get_rank, get_start_end)


def get_np_state_ptr():
return numba._helperlib.rnd_get_np_state_ptr()


def _copy_py_state(r, ptr):
"""
Copy state of Python random *r* to Numba state *ptr*.
Expand Down Expand Up @@ -67,7 +69,7 @@ def test_impl(N):

def test_setitem1(self):
def test_impl(N):
A = np.arange(10)+1.0
A = np.arange(10) + 1.0
A[0] = 30
return A.sum()

Expand All @@ -79,7 +81,7 @@ def test_impl(N):

def test_setitem2(self):
def test_impl(N):
A = np.arange(10)+1.0
A = np.arange(10) + 1.0
A[0:4] = 30
return A.sum()

Expand Down Expand Up @@ -147,7 +149,7 @@ def test_impl(N):
def test_whole_slice(self):
def test_impl(N):
X = np.ones((N, 4))
X[:,3] = (X[:,3]) / (np.max(X[:,3]) - np.min(X[:,3]))
X[:, 3] = (X[:, 3]) / (np.max(X[:, 3]) - np.min(X[:, 3]))
return X.sum()

hpat_func = hpat.jit(test_impl)
Expand All @@ -170,10 +172,12 @@ def test_impl(N):

def test_assert(self):
# make sure assert in an inlined function works

def g(a):
assert a==0
assert a == 0

hpat_g = hpat.jit(g)

def f():
hpat_g(0)

Expand All @@ -199,8 +203,9 @@ def test_reduce(self):
funcs = ['sum', 'prod', 'min', 'max', 'argmin', 'argmax']
for (dtype, func) in itertools.product(dtypes, funcs):
# loc allreduce doesn't support int64 on windows
if (sys.platform.startswith('win') and dtype=='int64'
and func in ['argmin', 'argmax']):
if (sys.platform.startswith('win')
and dtype == 'int64'
and func in ['argmin', 'argmax']):
continue
func_text = """def f(n):
A = np.arange(0, n, 1, np.{})
Expand All @@ -222,8 +227,9 @@ def test_reduce2(self):
funcs = ['sum', 'prod', 'min', 'max', 'argmin', 'argmax']
for (dtype, func) in itertools.product(dtypes, funcs):
# loc allreduce doesn't support int64 on windows
if (sys.platform.startswith('win') and dtype=='int64'
and func in ['argmin', 'argmax']):
if (sys.platform.startswith('win')
and dtype == 'int64'
and func in ['argmin', 'argmax']):
continue
func_text = """def f(A):
return A.{}()
Expand All @@ -232,7 +238,7 @@ def test_reduce2(self):
exec(func_text, {'np': np}, loc_vars)
test_impl = loc_vars['f']

hpat_func = hpat.jit(locals={'A:input':'distributed'})(test_impl)
hpat_func = hpat.jit(locals={'A:input': 'distributed'})(test_impl)
n = 21
start, end = get_start_end(n)
np.random.seed(0)
Expand All @@ -248,8 +254,9 @@ def test_reduce_filter1(self):
funcs = ['sum', 'prod', 'min', 'max', 'argmin', 'argmax']
for (dtype, func) in itertools.product(dtypes, funcs):
# loc allreduce doesn't support int64 on windows
if (sys.platform.startswith('win') and dtype=='int64'
and func in ['argmin', 'argmax']):
if (sys.platform.startswith('win')
and dtype == 'int64'
and func in ['argmin', 'argmax']):
continue
func_text = """def f(A):
A = A[A>5]
Expand All @@ -259,7 +266,7 @@ def test_reduce_filter1(self):
exec(func_text, {'np': np}, loc_vars)
test_impl = loc_vars['f']

hpat_func = hpat.jit(locals={'A:input':'distributed'})(test_impl)
hpat_func = hpat.jit(locals={'A:input': 'distributed'})(test_impl)
n = 21
start, end = get_start_end(n)
np.random.seed(0)
Expand All @@ -273,7 +280,7 @@ def test_reduce_filter1(self):
def test_array_reduce(self):
binops = ['+=', '*=', '+=', '*=', '|=', '|=']
dtypes = ['np.float32', 'np.float32', 'np.float64', 'np.float64', 'np.int32', 'np.int64']
for (op,typ) in zip(binops,dtypes):
for (op, typ) in zip(binops, dtypes):
func_text = """def f(n):
A = np.arange(0, 10, 1, {})
B = np.arange(0 + 3, 10 + 3, 1, {})
Expand Down Expand Up @@ -310,7 +317,7 @@ def test_impl(N):
def test_dist_return_tuple(self):
def test_impl(N):
A = np.arange(N)
B = np.arange(N)+1.5
B = np.arange(N) + 1.5
return A, B

hpat_func = hpat.jit(locals={'A:return': 'distributed',
Expand Down Expand Up @@ -339,7 +346,7 @@ def test_impl(A):
def test_rebalance(self):
def test_impl(N):
A = np.arange(n)
B = A[A>10]
B = A[A > 10]
C = hpat.distributed_api.rebalance_array(B)
return C.sum()

Expand All @@ -356,7 +363,7 @@ def test_impl(N):
def test_rebalance_loop(self):
def test_impl(N):
A = np.arange(n)
B = A[A>10]
B = A[A > 10]
s = 0
for i in range(3):
s += B.sum()
Expand Down Expand Up @@ -479,5 +486,6 @@ def test_rhs(arr_len):
A, B, _ = hpat_func3(arr_len)
np.testing.assert_allclose(A, B)


if __name__ == "__main__":
unittest.main()
4 changes: 3 additions & 1 deletion hpat/tests/test_d4p.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ def train_impl(n, d):
def prdct_impl(n, d, model):
w = np.ones((n, d), dtype=np.double) - 22.5
algo = d4p.logistic_regression_prediction(
2, resultsToCompute="computeClassesLabels|computeClassesProbabilities|computeClassesLogProbabilities")
2,
resultsToCompute="computeClassesLabels|computeClassesProbabilities|computeClassesLogProbabilities"
)
return algo.compute(w, model)

train_hpat = hpat.jit(train_impl)
Expand Down
Loading

0 comments on commit 6c95171

Please sign in to comment.