diff --git a/sdc/datatypes/hpat_pandas_functions.py b/sdc/datatypes/hpat_pandas_functions.py index 7bbbf951d..a82813836 100644 --- a/sdc/datatypes/hpat_pandas_functions.py +++ b/sdc/datatypes/hpat_pandas_functions.py @@ -39,13 +39,12 @@ from sdc.io.csv_ext import ( _gen_csv_reader_py_pyarrow_py_func, - _gen_pandas_read_csv_func_text, + _gen_csv_reader_py_pyarrow_func_text_dataframe, ) from sdc.str_arr_ext import string_array_type from sdc.hiframes import join, aggregate, sort from sdc.types import CategoricalDtypeType, Categorical -from sdc.datatypes.categorical.pdimpl import _reconstruct_CategoricalDtype def get_numba_array_types_for_csv(df): @@ -256,69 +255,45 @@ def sdc_pandas_read_csv( usecols = [col.literal_value for col in usecols] if infer_from_params: - # dtype is a tuple of format ('A', A_dtype, 'B', B_dtype, ...) - # where column names should be constants and is important only for inference from params + # dtype should be constants and is important only for inference from params if isinstance(dtype, types.Tuple): - assert all(isinstance(key, types.StringLiteral) for key in dtype[::2]) + assert all(isinstance(key, types.Literal) for key in dtype[::2]) keys = (k.literal_value for k in dtype[::2]) - values = dtype[1::2] - - def _get_df_col_type(dtype): - if isinstance(dtype, types.Function): - if dtype.typing_key == int: - return types.Array(types.int_, 1, 'C') - elif dtype.typing_key == float: - return types.Array(types.float64, 1, 'C') - elif dtype.typing_key == str: - return string_array_type - else: - assert False, f"map_dtype_to_col_type: failing to infer column type for dtype={dtype}" - - if isinstance(dtype, types.StringLiteral): - if dtype.literal_value == 'str': - return string_array_type - else: - return types.Array(numba.from_dtype(np.dtype(dtype.literal_value)), 1, 'C') - if isinstance(dtype, types.NumberClass): - return types.Array(dtype.dtype, 1, 'C') - - if isinstance(dtype, CategoricalDtypeType): - return Categorical(dtype) + values = dtype[1::2] + values = [v.typing_key if isinstance(v, types.Function) else v for v in values] + values = [types.Array(numba.from_dtype(np.dtype(v.literal_value)), 1, 'C') + if isinstance(v, types.Literal) else v for v in values] + values = [types.Array(types.int_, 1, 'C') if v == int else v for v in values] + values = [types.Array(types.float64, 1, 'C') if v == float else v for v in values] + values = [string_array_type if v == str else v for v in values] + values = [Categorical(v) if isinstance(v, CategoricalDtypeType) else v for v in values] - col_types_map = dict(zip(keys, map(_get_df_col_type, values))) + dtype = dict(zip(keys, values)) # in case of both are available # inferencing from params has priority over inferencing from file if infer_from_params: + col_names = names # all names should be in dtype - col_names = usecols if usecols else names - col_types = [col_types_map[n] for n in col_names] + return_columns = usecols if usecols else names + col_typs = [dtype[n] for n in return_columns] elif infer_from_file: - col_names, col_types = infer_column_names_and_types_from_constant_filename( + col_names, col_typs = infer_column_names_and_types_from_constant_filename( filepath_or_buffer, delimiter, names, usecols, skiprows) else: return None - def _get_py_col_dtype(ctype): - """ Re-creates column dtype as python type to be used in read_csv call """ - dtype = ctype.dtype - if ctype == string_array_type: - return str - if isinstance(ctype, Categorical): - return _reconstruct_CategoricalDtype(ctype.pd_dtype) - return numpy_support.as_dtype(dtype) - - py_col_dtypes = {cname: _get_py_col_dtype(ctype) for cname, ctype in zip(col_names, col_types)} + dtype_present = not isinstance(dtype, (types.Omitted, type(None))) # generate function text with signature and returning DataFrame - func_text, func_name, global_vars = _gen_pandas_read_csv_func_text( - col_names, col_types, py_col_dtypes, usecols, signature) + func_text, func_name = _gen_csv_reader_py_pyarrow_func_text_dataframe( + col_names, col_typs, dtype_present, usecols, signature) # compile with Python - csv_reader_py = _gen_csv_reader_py_pyarrow_py_func(func_text, func_name, global_vars) + csv_reader_py = _gen_csv_reader_py_pyarrow_py_func(func_text, func_name) return csv_reader_py diff --git a/sdc/hiframes/pd_dataframe_ext.py b/sdc/hiframes/pd_dataframe_ext.py index 7c6d5e40a..f863423a3 100644 --- a/sdc/hiframes/pd_dataframe_ext.py +++ b/sdc/hiframes/pd_dataframe_ext.py @@ -26,6 +26,7 @@ import operator +from typing import NamedTuple import numba from numba import types @@ -38,7 +39,7 @@ from numba.core.imputils import impl_ret_new_ref, impl_ret_borrowed from sdc.hiframes.pd_series_ext import SeriesType -from sdc.hiframes.pd_dataframe_type import DataFrameType, ColumnLoc +from sdc.hiframes.pd_dataframe_type import DataFrameType from sdc.str_ext import string_type @@ -53,6 +54,10 @@ def generic_resolve(self, df, attr): return SeriesType(arr_typ.dtype, arr_typ, df.index, True) +class ColumnLoc(NamedTuple): + type_id: int + col_id: int + def get_structure_maps(col_types, col_names): # Define map column name to column location ex. {'A': (0,0), 'B': (1,0), 'C': (0,1)} diff --git a/sdc/hiframes/pd_dataframe_type.py b/sdc/hiframes/pd_dataframe_type.py index 85255b22c..a962329c3 100644 --- a/sdc/hiframes/pd_dataframe_type.py +++ b/sdc/hiframes/pd_dataframe_type.py @@ -24,8 +24,6 @@ # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # ***************************************************************************** -import re -from typing import NamedTuple import numba from numba import types @@ -50,7 +48,7 @@ def __init__(self, data=None, index=None, columns=None, has_parent=False, column self.has_parent = has_parent self.column_loc = column_loc super(DataFrameType, self).__init__( - name="DataFrameType({}, {}, {}, {})".format(data, index, columns, has_parent)) + name="dataframe({}, {}, {}, {})".format(data, index, columns, has_parent)) def copy(self, index=None, has_parent=None): # XXX is copy necessary? @@ -85,16 +83,6 @@ def unify(self, typingctx, other): def is_precise(self): return all(a.is_precise() for a in self.data) and self.index.is_precise() - def __repr__(self): - - # To have correct repr of DataFrame we need some changes to what types.Type gives: - # (1) e.g. array(int64, 1d, C) should be Array(int64, 1, 'C') - # (2) ColumnLoc is not part of DataFrame name, so we need to add it - default_repr = super(DataFrameType, self).__repr__() - res = re.sub(r'array\((\w+), 1d, C\)', r'Array(\1, 1, \'C\')', default_repr) - res = re.sub(r'\)$', f', column_loc={self.column_loc})', res) - return res - @register_model(DataFrameType) class DataFrameModel(models.StructModel): @@ -116,15 +104,6 @@ def __init__(self, dmm, fe_type): super(DataFrameModel, self).__init__(dmm, fe_type, members) -class ColumnLoc(NamedTuple): - type_id: int - col_id: int - - -# FIXME_Numba#3372: add into numba.types to allow returning from objmode -types.DataFrameType = DataFrameType -types.ColumnLoc = ColumnLoc - make_attribute_wrapper(DataFrameType, 'data', '_data') make_attribute_wrapper(DataFrameType, 'index', '_index') make_attribute_wrapper(DataFrameType, 'columns', '_columns') diff --git a/sdc/io/csv_ext.py b/sdc/io/csv_ext.py index 78aaafeb6..2a569945d 100644 --- a/sdc/io/csv_ext.py +++ b/sdc/io/csv_ext.py @@ -24,6 +24,7 @@ # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # ***************************************************************************** + import contextlib import functools @@ -44,8 +45,6 @@ from sdc.utilities.utils import (debug_prints, alloc_arr_tup, empty_like_type, _numba_to_c_type_map) from sdc.distributed_analysis import Distribution -from sdc.hiframes.pd_dataframe_type import DataFrameType, ColumnLoc -from sdc.hiframes.pd_dataframe_ext import get_structure_maps from sdc.str_ext import string_type from sdc.str_arr_ext import (string_array_type, to_string_list, cp_str_list_to_array, str_list_to_array, @@ -57,14 +56,13 @@ import pandas as pd import numpy as np -from sdc.types import Categorical +from sdc.types import CategoricalDtypeType, Categorical import pyarrow import pyarrow.csv class CsvReader(ir.Stmt): - def __init__(self, file_name, df_out, sep, df_colnames, out_vars, out_types, usecols, loc, skiprows=0): self.file_name = file_name self.df_out = df_out @@ -265,10 +263,8 @@ def csv_distributed_run(csv_node, array_dists, typemap, calltypes, typingctx, ta # get global array sizes by calling allreduce on chunk lens # TODO: get global size from C for arr in csv_node.out_vars: - def f(A): return sdc.distributed_api.dist_reduce(len(A), np.int32(_op)) - f_block = compile_to_numba_ir( f, {'sdc': sdc, 'np': np, '_op': sdc.distributed_api.Reduce_Type.Sum.value}, @@ -291,7 +287,6 @@ def f(A): class StreamReaderType(types.Opaque): - def __init__(self): super(StreamReaderType, self).__init__(name='StreamReaderType') @@ -305,6 +300,36 @@ def box_stream_reader(typ, val, c): return val +def _get_dtype_str(t): + dtype = t.dtype + + if isinstance(t, Categorical): + # return categorical representation + # for some reason pandas and pyarrow read_csv() return CategoricalDtype with + # ordered=False in case when dtype is with ordered=None + return str(t).replace('ordered=None', 'ordered=False') + + if dtype == types.NPDatetime('ns'): + dtype = 'NPDatetime("ns")' + if t == string_array_type: + # HACK: add string_array_type to numba.types + # FIXME: fix after Numba #3372 is resolved + types.string_array_type = string_array_type + return 'string_array_type' + return '{}[::1]'.format(dtype) + + +def _get_pd_dtype_str(t): + dtype = t.dtype + if isinstance(t, Categorical): + return 'pd.{}'.format(t.pd_dtype) + if dtype == types.NPDatetime('ns'): + dtype = 'str' + if t == string_array_type: + return 'str' + return 'np.{}'.format(dtype) + + # XXX: temporary fix pending Numba's #3378 # keep the compiled functions around to make sure GC doesn't delete them and # the reference to the dynamic function inside them @@ -318,7 +343,7 @@ def to_varname(string): Replaces unavailable symbols with _ and insert _ if string starts with digit. """ import re - return re.sub(r'\W|^(?=\d)', '_', string) + return re.sub(r'\W|^(?=\d)','_', string) @contextlib.contextmanager @@ -333,12 +358,10 @@ def pyarrow_cpu_count(cpu_count=pyarrow.cpu_count()): def pyarrow_cpu_count_equal_numba_num_treads(func): """Decorator. Set pyarrow cpu_count the same as NUMBA_NUM_THREADS.""" - @functools.wraps(func) def wrapper(*args, **kwargs): with pyarrow_cpu_count(numba.config.NUMBA_NUM_THREADS): return func(*args, **kwargs) - return wrapper @@ -518,81 +541,84 @@ def pandas_read_csv( return dataframe -def _gen_pandas_read_csv_func_text(col_names, col_typs, py_col_dtypes, usecols, signature=None): +def _gen_csv_reader_py_pyarrow(col_names, col_typs, usecols, sep, typingctx, targetctx, parallel, skiprows): + func_text, func_name = _gen_csv_reader_py_pyarrow_func_text(col_names, col_typs, usecols, sep, skiprows) + csv_reader_py = _gen_csv_reader_py_pyarrow_py_func(func_text, func_name) + return _gen_csv_reader_py_pyarrow_jit_func(csv_reader_py) - func_name = 'csv_reader_py' - return_columns = usecols if usecols and isinstance(usecols[0], str) else col_names - - column_loc, _, _ = get_structure_maps(col_typs, return_columns) - df_type = DataFrameType( - tuple(col_typs), - types.none, - tuple(col_names), - column_loc=column_loc - ) - - df_type_repr = repr(df_type) - # for some reason pandas and pyarrow read_csv() return CategoricalDtype with - # ordered=False in case when dtype is with ordered=None - df_type_repr = df_type_repr.replace('ordered=None', 'ordered=False') +def _gen_csv_reader_py_pyarrow_func_text_core(col_names, col_typs, dtype_present, usecols, signature=None): # TODO: support non-numpy types like strings date_inds = ", ".join(str(i) for i, t in enumerate(col_typs) if t.dtype == types.NPDatetime('ns')) return_columns = usecols if usecols and isinstance(usecols[0], str) else col_names + nb_objmode_vars = ", ".join([ + '{}="{}"'.format(to_varname(cname), _get_dtype_str(t)) + for cname, t in zip(return_columns, col_typs) + ]) + pd_dtype_strs = ", ".join([ + "'{}': {}".format(cname, _get_pd_dtype_str(t)) + for cname, t in zip(return_columns, col_typs) + ]) if signature is None: signature = "filepath_or_buffer" - - # map generated func params into values used in inner call of pandas_read_csv - # if no transformation is needed just use outer param name (since APIs match) - # otherwise use value in the dictionary - inner_call_params = {'parse_dates': f"[{date_inds}]"} - used_read_csv_params = ( - 'filepath_or_buffer', - 'names', - 'skiprows', - 'parse_dates', - 'dtype', - 'usecols', - 'sep', - 'delimiter' - ) + func_text = "def csv_reader_py({}):\n".format(signature) + func_text += " with objmode({}):\n".format(nb_objmode_vars) + func_text += " df = pandas_read_csv(filepath_or_buffer,\n" # pyarrow reads unnamed header as " ", pandas reads it as "Unnamed: N" # during inference from file names should be raplaced with "Unnamed: N" # passing names to pyarrow means that one row is header and should be skipped if col_names and any(map(lambda x: x.startswith('Unnamed: '), col_names)): - inner_call_params['names'] = str(col_names) - inner_call_params['skiprows'] = "(skiprows and skiprows + 1) or 1" + func_text += " names={},\n".format(col_names) + func_text += " skiprows=(skiprows and skiprows + 1) or 1,\n" + else: + func_text += " names=names,\n" + func_text += " skiprows=skiprows,\n" - # dtype parameter of compiled function is not used at all, instead a python dict - # of columns dtypes is captured at compile time, because some dtypes (like datetime) - # are converted and also to avoid penalty of creating dict in objmode - inner_call_params['dtype'] = 'read_as_dtypes' + func_text += " parse_dates=[{}],\n".format(date_inds) - params_str = '\n'.join([ - f" {param}={inner_call_params.get(param, param)}," for param in used_read_csv_params - ]) - func_text = '\n'.join([ - f"def {func_name}({signature}):", - f" with objmode(df=\"{df_type_repr}\"):", - f" df = pandas_read_csv(\n{params_str}", - f" )", - f" return df" - ]) + # Python objects (e.g. str, np.float) could not be jitted and passed to objmode + # so they are hardcoded to function + # func_text += " dtype={{{}}},\n".format(pd_dtype_strs) if dtype_present else \ + # " dtype=dtype,\n" + # dtype is hardcoded because datetime should be read as string + func_text += " dtype={{{}}},\n".format(pd_dtype_strs) + + func_text += " usecols=usecols,\n" + func_text += " sep=sep,\n" + func_text += " delimiter=delimiter,\n" + func_text += " )\n" + for cname in return_columns: + func_text += " {} = df['{}'].values\n".format(to_varname(cname), cname) + # func_text += " print({})\n".format(cname) + return func_text, 'csv_reader_py' + + +def _gen_csv_reader_py_pyarrow_func_text(col_names, col_typs, usecols): + func_text, func_name = _gen_csv_reader_py_pyarrow_func_text_core(col_names, col_typs, usecols) - global_vars = { - 'read_as_dtypes': py_col_dtypes, - 'objmode': objmode, - 'pandas_read_csv': pandas_read_csv, - } + func_text += " return ({},)\n".format(", ".join(to_varname(c) for c in col_names)) - return func_text, func_name, global_vars + return func_text, func_name -def _gen_csv_reader_py_pyarrow_py_func(func_text, func_name, global_vars): +def _gen_csv_reader_py_pyarrow_func_text_dataframe(col_names, col_typs, dtype_present, usecols, signature): + func_text, func_name = _gen_csv_reader_py_pyarrow_func_text_core( + col_names, col_typs, dtype_present, usecols, signature) + return_columns = usecols if usecols and isinstance(usecols[0], str) else col_names + + func_text += " return sdc.hiframes.pd_dataframe_ext.init_dataframe({}, None, {})\n".format( + ", ".join(to_varname(c) for c in return_columns), + ", ".join("'{}'".format(c) for c in return_columns) + ) + + return func_text, func_name + + +def _gen_csv_reader_py_pyarrow_py_func(func_text, func_name): locals = {} - exec(func_text, global_vars, locals) + exec(func_text, globals(), locals) func = locals[func_name] return func @@ -602,3 +628,6 @@ def _gen_csv_reader_py_pyarrow_jit_func(csv_reader_py): jit_func = numba.njit(csv_reader_py) compiled_funcs.append(jit_func) return jit_func + + +_gen_csv_reader_py = _gen_csv_reader_py_pyarrow diff --git a/sdc/str_arr_type.py b/sdc/str_arr_type.py index c9cf59ce1..b18967f63 100644 --- a/sdc/str_arr_type.py +++ b/sdc/str_arr_type.py @@ -66,8 +66,6 @@ def copy(self): string_array_type = StringArrayType() -# FIXME_Numba#3372: add into numba.types to allow returning from objmode -types.StringArrayType = StringArrayType class StringArrayPayloadType(types.Type): diff --git a/sdc/tests/test_io.py b/sdc/tests/test_io.py index 55fead5ba..768f2827e 100644 --- a/sdc/tests/test_io.py +++ b/sdc/tests/test_io.py @@ -663,8 +663,6 @@ def test_impl(): return test_impl - @skip_numba_jit("Lowering error while returning df from objmode:\n" - "Can only insert i8* at [4] in {i8*, i8*, i64, i64, i8*, [1 x i64], [1 x i64]}: got i64*") def test_csv_cat1(self): test_impl = self.pd_csv_cat1() hpat_func = self.jit(test_impl) @@ -684,8 +682,6 @@ def test_impl(): return test_impl - @skip_numba_jit("Lowering error while returning df from objmode:\n" - "Can only insert i8* at [4] in {i8*, i8*, i64, i64, i8*, [1 x i64], [1 x i64]}: got i64*") def test_csv_cat2(self): test_impl = self.pd_csv_cat2() hpat_func = self.jit(test_impl)