diff --git a/sdc/io/csv_ext.py b/sdc/io/csv_ext.py index 292d82204..df8f2b7e7 100644 --- a/sdc/io/csv_ext.py +++ b/sdc/io/csv_ext.py @@ -25,6 +25,9 @@ # ***************************************************************************** +import contextlib +import functools + import llvmlite.binding as ll from llvmlite import ir as lir from .. import hio @@ -398,6 +401,26 @@ def to_varname(string): return re.sub(r'\W|^(?=\d)','_', string) +@contextlib.contextmanager +def pyarrow_cpu_count(cpu_count=pyarrow.cpu_count()): + old_cpu_count = pyarrow.cpu_count() + pyarrow.set_cpu_count(cpu_count) + try: + yield + finally: + pyarrow.set_cpu_count(old_cpu_count) + + +def pyarrow_cpu_count_equal_numba_num_treads(func): + """Decorator. Set pyarrow cpu_count the same as NUMBA_NUM_THREADS.""" + @functools.wraps(func) + def wrapper(*args, **kwargs): + with pyarrow_cpu_count(numba.config.NUMBA_NUM_THREADS): + return func(*args, **kwargs) + return wrapper + + +@pyarrow_cpu_count_equal_numba_num_treads def pandas_read_csv( filepath_or_buffer, sep=",", diff --git a/sdc/tests/tests_perf/gen_csv.py b/sdc/tests/tests_perf/gen_csv.py index 84c94ce92..ab04c788a 100644 --- a/sdc/tests/tests_perf/gen_csv.py +++ b/sdc/tests/tests_perf/gen_csv.py @@ -43,3 +43,46 @@ def generate(rows, headers, providers, file_name): writer.writeheader() for i in range(rows): writer.writerow({k: p() for k, p in zip(headers, providers)}) + + +def md5(filename): + """Return MD5 sum of the file.""" + import hashlib + hash_md5 = hashlib.md5() + with open(filename, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + +def csv_file_name(rows=10**5, columns=10, seed=0): + """Return file name for given parameters.""" + return f"data_{rows}_{columns}_{seed}.csv" + + +def generate_csv(rows=10**5, columns=10, seed=0): + """Generate CSV file and return file name.""" + import random + + md5_sums = { + (10**6, 10, 0): "6fa2a115dfeaee4f574106b513ad79e6" + } + + file_name = csv_file_name(rows, columns, seed) + + try: + if md5_sums.get((rows, columns, seed)) == md5(file_name): + return file_name + except: + pass + + r = random.Random(seed) + generate(rows, + [f"f{c}" for c in range(columns)], + [lambda: r.uniform(-1.0, 1.0) for _ in range(columns)], + file_name + ) + + md5_sums[(rows, columns, seed)] = md5(file_name) + + return file_name diff --git a/sdc/tests/tests_perf/test_perf_read_csv.py b/sdc/tests/tests_perf/test_perf_read_csv.py index a190b0e05..3d1d02af1 100644 --- a/sdc/tests/tests_perf/test_perf_read_csv.py +++ b/sdc/tests/tests_perf/test_perf_read_csv.py @@ -26,32 +26,15 @@ # ***************************************************************************** import time -import random import pandas +import pyarrow.csv import sdc -from .test_perf_base import TestBase -from .test_perf_utils import calc_compilation, get_times +from sdc.tests.tests_perf.test_perf_base import TestBase +from sdc.tests.tests_perf.test_perf_utils import calc_compilation, get_times -from .gen_csv import generate - - -def generate_csv(): - """Generate CSV file and return file name.""" - rows = 10**5 - file_name = f"data_{rows}.csv" - r = random.Random(0) # seed=0 - generate(rows, - ['A', 'B', 'C'], - [ - lambda: int(r.random() * 10000), - lambda: r.uniform(-1.0, 1.0), - lambda: r.uniform(-1.0, 1.0) - ], - file_name - ) - return file_name +from sdc.tests.tests_perf.gen_csv import generate_csv def make_func(file_name): @@ -65,12 +48,23 @@ def _function(): return _function +def make_func_pyarrow(file_name): + """Create function implemented via PyArrow.""" + def _function(): + start = time.time() + df = sdc.io.csv_ext.pandas_read_csv(file_name) + return time.time() - start, df + return _function + + class TestPandasReadCSV(TestBase): @classmethod def setUpClass(cls): super().setUpClass() - cls.generated_file = generate_csv() + cls.rows = 10**6 + cls.columns = 10 + cls.generated_file = generate_csv(cls.rows, cls.columns) def _test_jitted(self, pyfunc, record, *args, **kwargs): # compilation time @@ -92,7 +86,7 @@ def _test_python(self, pyfunc, record, *args, **kwargs): def _test_case(self, pyfunc, name): base = { "test_name": name, - "data_size": 10**5, + "data_size": f"[{self.rows},{self.columns}]", } record = base.copy() @@ -107,3 +101,23 @@ def _test_case(self, pyfunc, name): def test_read_csv(self): self._test_case(make_func(self.generated_file), 'read_csv') + + def test_read_csv_pyarrow(self): + pyfunc = make_func_pyarrow(self.generated_file) + name = 'read_csv' + + base = { + "test_name": name, + "data_size": f"[{self.rows},{self.columns}]", + } + + record = base.copy() + record["test_type"] = 'PyArrow' + self._test_python(pyfunc, record) + self.test_results.add(**record) + + +if __name__ == "__main__": + print("Gererate data files...") + generate_csv(rows=10**6, columns=10) + print("Data files generated!")