Skip to content
This repository was archived by the owner on Feb 2, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 267 additions & 7 deletions sdc/datatypes/common_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@
"""

import numpy
import pandas

import numba
from numba import types
from numba.errors import TypingError
from numba.extending import overload
from numba import numpy_support

import sdc
from sdc.str_arr_ext import (string_array_type, num_total_chars, append_string_array_to)
from sdc.str_arr_ext import (string_array_type, num_total_chars, append_string_array_to,
str_arr_is_na, pre_alloc_string_array, str_arr_set_na)


class TypeChecker:
Expand Down Expand Up @@ -91,7 +94,7 @@ def check(self, data, accepted_type, name=''):


def has_literal_value(var, value):
'''Used during typing to check that variable var is a Numba literal value equal to value'''
"""Used during typing to check that variable var is a Numba literal value equal to value"""

if not isinstance(var, types.Literal):
return False
Expand All @@ -103,7 +106,7 @@ def has_literal_value(var, value):


def has_python_value(var, value):
'''Used during typing to check that variable var was resolved as Python type and has specific value'''
"""Used during typing to check that variable var was resolved as Python type and has specific value"""

if not isinstance(var, type(value)):
return False
Expand All @@ -114,13 +117,18 @@ def has_python_value(var, value):
return var == value


def check_index_is_numeric(ty_series):
"""Used during typing to check that series has numeric index"""
return isinstance(ty_series.index, types.Array) and isinstance(ty_series.index.dtype, types.Number)


def hpat_arrays_append(A, B):
pass


@overload(hpat_arrays_append)
def hpat_arrays_append_overload(A, B):
'''Function for appending underlying arrays (A and B) or list/tuple of arrays B to an array A'''
"""Function for appending underlying arrays (A and B) or list/tuple of arrays B to an array A"""

if isinstance(A, types.Array):
if isinstance(B, types.Array):
Expand All @@ -131,9 +139,7 @@ def _append_single_numeric_impl(A, B):
elif isinstance(B, (types.UniTuple, types.List)):
# TODO: this heavily relies on B being a homogeneous tuple/list - find a better way
# to resolve common dtype of heterogeneous sequence of arrays
np_dtypes = [numpy_support.as_dtype(A.dtype), numpy_support.as_dtype(B.dtype.dtype)]
np_common_dtype = numpy.find_common_type([], np_dtypes)
numba_common_dtype = numpy_support.from_dtype(np_common_dtype)
numba_common_dtype = find_common_dtype_from_numpy_dtypes([A.dtype, B.dtype.dtype], [])

# TODO: refactor to use numpy.concatenate when Numba supports building a tuple at runtime
def _append_list_numeric_impl(A, B):
Expand Down Expand Up @@ -181,3 +187,257 @@ def _append_list_string_array_impl(A, B):
return new_data

return _append_list_string_array_impl


@numba.njit
def _hpat_ensure_array_capacity(new_size, arr):
""" Function ensuring that the size of numpy array is at least as specified
Returns newly allocated array of bigger size with copied elements if existing size is less than requested
"""

k = len(arr)
if k >= new_size:
return arr

n = k
while n < new_size:
n = 2 * n
res = numpy.empty(n, arr.dtype)
res[:k] = arr[:k]
return res


def find_common_dtype_from_numpy_dtypes(array_types, scalar_types):
"""Used to find common numba dtype for a sequences of numba dtypes each representing some numpy dtype"""
np_array_dtypes = [numpy_support.as_dtype(dtype) for dtype in array_types]
np_scalar_dtypes = [numpy_support.as_dtype(dtype) for dtype in scalar_types]
np_common_dtype = numpy.find_common_type(np_array_dtypes, np_scalar_dtypes)
numba_common_dtype = numpy_support.from_dtype(np_common_dtype)

return numba_common_dtype


def hpat_join_series_indexes(left, right):
pass


@overload(hpat_join_series_indexes)
def hpat_join_series_indexes_overload(left, right):
"""Function for joining arrays left and right in a way similar to pandas.join 'outer' algorithm"""

# TODO: eliminate code duplication by merging implementations for numeric and StringArray
# requires equivalents of numpy.arsort and _hpat_ensure_array_capacity for StringArrays
if (isinstance(left, types.Array) and isinstance(right, types.Array)):

numba_common_dtype = find_common_dtype_from_numpy_dtypes([left.dtype, right.dtype], [])
if isinstance(numba_common_dtype, types.Number):

def hpat_join_series_indexes_impl(left, right):

# allocate result arrays
lsize = len(left)
rsize = len(right)
est_total_size = int(1.1 * (lsize + rsize))

lidx = numpy.empty(est_total_size, numpy.int64)
ridx = numpy.empty(est_total_size, numpy.int64)
joined = numpy.empty(est_total_size, numba_common_dtype)

# sort arrays saving the old positions
sorted_left = numpy.argsort(left, kind='mergesort')
sorted_right = numpy.argsort(right, kind='mergesort')

i, j, k = 0, 0, 0
while (i < lsize and j < rsize):
joined = _hpat_ensure_array_capacity(k + 1, joined)
lidx = _hpat_ensure_array_capacity(k + 1, lidx)
ridx = _hpat_ensure_array_capacity(k + 1, ridx)

left_index = left[sorted_left[i]]
right_index = right[sorted_right[j]]

if (left_index < right_index):
joined[k] = left_index
lidx[k] = sorted_left[i]
ridx[k] = -1
i += 1
k += 1
elif (left_index > right_index):
joined[k] = right_index
lidx[k] = -1
ridx[k] = sorted_right[j]
j += 1
k += 1
else:
# find ends of sequences of equal index values in left and right
ni, nj = i, j
while (ni < lsize and left[sorted_left[ni]] == left_index):
ni += 1
while (nj < rsize and right[sorted_right[nj]] == right_index):
nj += 1

# join the blocks found into results
for s in numpy.arange(i, ni, 1):
block_size = nj - j
to_joined = numpy.repeat(left_index, block_size)
to_lidx = numpy.repeat(sorted_left[s], block_size)
to_ridx = numpy.array([sorted_right[k] for k in numpy.arange(j, nj, 1)], numpy.int64)

joined = _hpat_ensure_array_capacity(k + block_size, joined)
lidx = _hpat_ensure_array_capacity(k + block_size, lidx)
ridx = _hpat_ensure_array_capacity(k + block_size, ridx)

joined[k:k + block_size] = to_joined
lidx[k:k + block_size] = to_lidx
ridx[k:k + block_size] = to_ridx
k += block_size
i = ni
j = nj

# fill the end of joined with remaining part of left or right
if i < lsize:
block_size = lsize - i
joined = _hpat_ensure_array_capacity(k + block_size, joined)
lidx = _hpat_ensure_array_capacity(k + block_size, lidx)
ridx = _hpat_ensure_array_capacity(k + block_size, ridx)
ridx[k: k + block_size] = numpy.repeat(-1, block_size)
while i < lsize:
joined[k] = left[sorted_left[i]]
lidx[k] = sorted_left[i]
i += 1
k += 1

elif j < rsize:
block_size = rsize - j
joined = _hpat_ensure_array_capacity(k + block_size, joined)
lidx = _hpat_ensure_array_capacity(k + block_size, lidx)
ridx = _hpat_ensure_array_capacity(k + block_size, ridx)
lidx[k: k + block_size] = numpy.repeat(-1, block_size)
while j < rsize:
joined[k] = right[sorted_right[j]]
ridx[k] = sorted_right[j]
j += 1
k += 1

return joined[:k], lidx[:k], ridx[:k]

return hpat_join_series_indexes_impl

else:
# TODO: support joining indexes with common dtype=object - requires Numba
# support of such numpy arrays in nopython mode, for now just return None
return None

elif (left == string_array_type and right == string_array_type):

def hpat_join_series_indexes_impl(left, right):

# allocate result arrays
lsize = len(left)
rsize = len(right)
est_total_size = int(1.1 * (lsize + rsize))

lidx = numpy.empty(est_total_size, numpy.int64)
ridx = numpy.empty(est_total_size, numpy.int64)

# use Series.sort_values since argsort for StringArrays not implemented
original_left_series = pandas.Series(left)
original_right_series = pandas.Series(right)

# sort arrays saving the old positions
left_series = original_left_series.sort_values(kind='mergesort')
right_series = original_right_series.sort_values(kind='mergesort')
sorted_left = left_series._index
sorted_right = right_series._index

i, j, k = 0, 0, 0
while (i < lsize and j < rsize):
lidx = _hpat_ensure_array_capacity(k + 1, lidx)
ridx = _hpat_ensure_array_capacity(k + 1, ridx)

left_index = left[sorted_left[i]]
right_index = right[sorted_right[j]]

if (left_index < right_index):
lidx[k] = sorted_left[i]
ridx[k] = -1
i += 1
k += 1
elif (left_index > right_index):
lidx[k] = -1
ridx[k] = sorted_right[j]
j += 1
k += 1
else:
# find ends of sequences of equal index values in left and right
ni, nj = i, j
while (ni < lsize and left[sorted_left[ni]] == left_index):
ni += 1
while (nj < rsize and right[sorted_right[nj]] == right_index):
nj += 1

# join the blocks found into results
for s in numpy.arange(i, ni, 1):
block_size = nj - j
to_lidx = numpy.repeat(sorted_left[s], block_size)
to_ridx = numpy.array([sorted_right[k] for k in numpy.arange(j, nj, 1)], numpy.int64)

lidx = _hpat_ensure_array_capacity(k + block_size, lidx)
ridx = _hpat_ensure_array_capacity(k + block_size, ridx)

lidx[k:k + block_size] = to_lidx
ridx[k:k + block_size] = to_ridx
k += block_size
i = ni
j = nj

# fill the end of joined with remaining part of left or right
if i < lsize:
block_size = lsize - i
lidx = _hpat_ensure_array_capacity(k + block_size, lidx)
ridx = _hpat_ensure_array_capacity(k + block_size, ridx)
ridx[k: k + block_size] = numpy.repeat(-1, block_size)
while i < lsize:
lidx[k] = sorted_left[i]
i += 1
k += 1

elif j < rsize:
block_size = rsize - j
lidx = _hpat_ensure_array_capacity(k + block_size, lidx)
ridx = _hpat_ensure_array_capacity(k + block_size, ridx)
lidx[k: k + block_size] = numpy.repeat(-1, block_size)
while j < rsize:
ridx[k] = sorted_right[j]
j += 1
k += 1

# count total number of characters and allocate joined array
total_joined_size = k
num_chars_in_joined = 0
for i in numpy.arange(total_joined_size):
if lidx[i] != -1:
num_chars_in_joined += len(left[lidx[i]])
elif ridx[i] != -1:
num_chars_in_joined += len(right[ridx[i]])

joined = pre_alloc_string_array(total_joined_size, num_chars_in_joined)

# iterate over joined and fill it with indexes using lidx and ridx indexers
for i in numpy.arange(total_joined_size):
if lidx[i] != -1:
joined[i] = left[lidx[i]]
if (str_arr_is_na(left, lidx[i])):
str_arr_set_na(joined, i)
elif ridx[i] != -1:
joined[i] = right[ridx[i]]
if (str_arr_is_na(right, ridx[i])):
str_arr_set_na(joined, i)
else:
str_arr_set_na(joined, i)

return joined, lidx, ridx

return hpat_join_series_indexes_impl

return None
Loading