Skip to content
This repository was archived by the owner on Feb 2, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions sdc/io/csv_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
# *****************************************************************************


import contextlib
import functools

import llvmlite.binding as ll
from llvmlite import ir as lir
from .. import hio
Expand Down Expand Up @@ -398,6 +401,26 @@ def to_varname(string):
return re.sub(r'\W|^(?=\d)','_', string)


@contextlib.contextmanager
def pyarrow_cpu_count(cpu_count=pyarrow.cpu_count()):
old_cpu_count = pyarrow.cpu_count()
pyarrow.set_cpu_count(cpu_count)
try:
yield
finally:
pyarrow.set_cpu_count(old_cpu_count)


def pyarrow_cpu_count_equal_numba_num_treads(func):
"""Decorator. Set pyarrow cpu_count the same as NUMBA_NUM_THREADS."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
with pyarrow_cpu_count(numba.config.NUMBA_NUM_THREADS):
return func(*args, **kwargs)
return wrapper


@pyarrow_cpu_count_equal_numba_num_treads
def pandas_read_csv(
filepath_or_buffer,
sep=",",
Expand Down
43 changes: 43 additions & 0 deletions sdc/tests/tests_perf/gen_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,46 @@ def generate(rows, headers, providers, file_name):
writer.writeheader()
for i in range(rows):
writer.writerow({k: p() for k, p in zip(headers, providers)})


def md5(filename):
"""Return MD5 sum of the file."""
import hashlib
hash_md5 = hashlib.md5()
with open(filename, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()


def csv_file_name(rows=10**5, columns=10, seed=0):
"""Return file name for given parameters."""
return f"data_{rows}_{columns}_{seed}.csv"


def generate_csv(rows=10**5, columns=10, seed=0):
"""Generate CSV file and return file name."""
import random

md5_sums = {
(10**6, 10, 0): "6fa2a115dfeaee4f574106b513ad79e6"
}

file_name = csv_file_name(rows, columns, seed)

try:
if md5_sums.get((rows, columns, seed)) == md5(file_name):
return file_name
except:
pass

r = random.Random(seed)
generate(rows,
[f"f{c}" for c in range(columns)],
[lambda: r.uniform(-1.0, 1.0) for _ in range(columns)],
file_name
)

md5_sums[(rows, columns, seed)] = md5(file_name)

return file_name
60 changes: 37 additions & 23 deletions sdc/tests/tests_perf/test_perf_read_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,15 @@
# *****************************************************************************

import time
import random

import pandas
import pyarrow.csv
import sdc

from .test_perf_base import TestBase
from .test_perf_utils import calc_compilation, get_times
from sdc.tests.tests_perf.test_perf_base import TestBase
from sdc.tests.tests_perf.test_perf_utils import calc_compilation, get_times

from .gen_csv import generate


def generate_csv():
"""Generate CSV file and return file name."""
rows = 10**5
file_name = f"data_{rows}.csv"
r = random.Random(0) # seed=0
generate(rows,
['A', 'B', 'C'],
[
lambda: int(r.random() * 10000),
lambda: r.uniform(-1.0, 1.0),
lambda: r.uniform(-1.0, 1.0)
],
file_name
)
return file_name
from sdc.tests.tests_perf.gen_csv import generate_csv


def make_func(file_name):
Expand All @@ -65,12 +48,23 @@ def _function():
return _function


def make_func_pyarrow(file_name):
"""Create function implemented via PyArrow."""
def _function():
start = time.time()
df = sdc.io.csv_ext.pandas_read_csv(file_name)
return time.time() - start, df
return _function


class TestPandasReadCSV(TestBase):

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.generated_file = generate_csv()
cls.rows = 10**6
cls.columns = 10
cls.generated_file = generate_csv(cls.rows, cls.columns)

def _test_jitted(self, pyfunc, record, *args, **kwargs):
# compilation time
Expand All @@ -92,7 +86,7 @@ def _test_python(self, pyfunc, record, *args, **kwargs):
def _test_case(self, pyfunc, name):
base = {
"test_name": name,
"data_size": 10**5,
"data_size": f"[{self.rows},{self.columns}]",
}

record = base.copy()
Expand All @@ -107,3 +101,23 @@ def _test_case(self, pyfunc, name):

def test_read_csv(self):
self._test_case(make_func(self.generated_file), 'read_csv')

def test_read_csv_pyarrow(self):
pyfunc = make_func_pyarrow(self.generated_file)
name = 'read_csv'

base = {
"test_name": name,
"data_size": f"[{self.rows},{self.columns}]",
}

record = base.copy()
record["test_type"] = 'PyArrow'
self._test_python(pyfunc, record)
self.test_results.add(**record)


if __name__ == "__main__":
print("Gererate data files...")
generate_csv(rows=10**6, columns=10)
print("Data files generated!")