Skip to content

Commit

Permalink
Drop parquet pyarrow 0.13.0 support
Browse files Browse the repository at this point in the history
Due to incompatibilities and bugs in pyarrow 0.13.0 we drop support for
it in our parquet implementation and tests.
  • Loading branch information
jcrist committed Apr 19, 2019
1 parent 42395b6 commit 3dabb4d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 28 deletions.
2 changes: 2 additions & 0 deletions dask/bytes/tests/test_hdfs.py
Expand Up @@ -240,6 +240,8 @@ def test_pyarrow_compat():

@require_pyarrow
def test_parquet_pyarrow(hdfs):
if LooseVersion(pyarrow.__version__) == '0.13.0':
pytest.skip('pyarrow 0.13.0 not supported for parquet')
dd = pytest.importorskip('dask.dataframe')
import pandas as pd
import numpy as np
Expand Down
5 changes: 4 additions & 1 deletion dask/bytes/tests/test_s3.py
Expand Up @@ -4,6 +4,7 @@
import sys
import os
from contextlib import contextmanager
from distutils.version import LooseVersion

import pytest
import numpy as np
Expand Down Expand Up @@ -353,7 +354,9 @@ def test_read_text_passes_through_options():
@pytest.mark.parametrize("engine", ['pyarrow', 'fastparquet'])
def test_parquet(s3, engine):
dd = pytest.importorskip('dask.dataframe')
pytest.importorskip(engine)
lib = pytest.importorskip(engine)
if engine == 'pyarrow' and LooseVersion(lib.__version__) == '0.13.0':
pytest.skip('pyarrow 0.13.0 not supported for parquet')
import pandas as pd
import numpy as np

Expand Down
6 changes: 5 additions & 1 deletion dask/dataframe/io/parquet.py
Expand Up @@ -1059,8 +1059,12 @@ def get_engine(engine):
elif engine == 'pyarrow':
pa = import_required('pyarrow', "`pyarrow` not installed")

if LooseVersion(pa.__version__) < '0.8.0':
pa_version = LooseVersion(pa.__version__)
if pa_version < '0.8.0':
raise RuntimeError("PyArrow version >= 0.8.0 required")
elif pa_version == '0.13.0':
raise RuntimeError("PyArrow versino 0.13.0 isn't supported, please "
"upgrade or downgrade")

_ENGINES['pyarrow'] = eng = {'read': _read_pyarrow,
'write': _write_pyarrow}
Expand Down
68 changes: 42 additions & 26 deletions dask/dataframe/io/tests/test_parquet.py
Expand Up @@ -21,17 +21,43 @@
except ImportError:
fastparquet = False


try:
import pyarrow.parquet as pq
import pyarrow as pa
pa_version = LooseVersion(pa.__version__)
check_pa_divs = pa_version >= '0.9.0'
except ImportError:
pq = False
pa_version = None
check_pa_divs = False


try:
import pyarrow as pa
check_pa_divs = pa.__version__ >= LooseVersion('0.9.0')
import pyarrow.parquet as pq
except ImportError:
check_pa_divs = False
pq = False


SKIP_FASTPARQUET = not fastparquet
SKIP_FASTPARQUET_REASON = 'fastparquet not found'
FASTPARQUET_MARK = pytest.mark.skipif(SKIP_FASTPARQUET, reason=SKIP_FASTPARQUET_REASON)

if pa_version == '0.13.0':
SKIP_PYARROW = True
SKIP_PYARROW_REASON = 'pyarrow 0.13.0 not supported'
else:
SKIP_PYARROW = not pq
SKIP_PYARROW_REASON = 'pyarrow not found'
PYARROW_MARK = pytest.mark.skipif(SKIP_PYARROW, reason=SKIP_PYARROW_REASON)


def check_fastparquet():
if SKIP_FASTPARQUET:
pytest.skip(SKIP_FASTPARQUET_REASON)


def check_pyarrow():
if SKIP_PYARROW:
pytest.skip(SKIP_PYARROW_REASON)


def should_check_divs(engine):
Expand All @@ -53,22 +79,12 @@ def should_check_divs(engine):


@pytest.fixture(params=[
pytest.param('fastparquet', marks=pytest.mark.skipif(not fastparquet, reason='fastparquet not found')),
pytest.param('pyarrow', marks=pytest.mark.skipif(not pq, reason='pyarrow not found'))])
pytest.param('fastparquet', marks=FASTPARQUET_MARK),
pytest.param('pyarrow', marks=PYARROW_MARK)])
def engine(request):
return request.param


def check_fastparquet():
if not fastparquet:
pytest.skip('fastparquet not found')


def check_pyarrow():
if not pq:
pytest.skip('pyarrow not found')


def write_read_engines(**kwargs):
"""Product of both engines for write/read:
Expand All @@ -78,9 +94,10 @@ def write_read_engines(**kwargs):
marks = {(w, r): [] for w in backends for r in backends}

# Skip if uninstalled
for name, exists in [('fastparquet', fastparquet), ('pyarrow', pq)]:
val = pytest.mark.skip(reason='%s not found' % name)
if not exists:
for name, skip, reason in [('fastparquet', SKIP_FASTPARQUET, SKIP_FASTPARQUET_REASON),
('pyarrow', SKIP_PYARROW, SKIP_PYARROW_REASON)]:
if skip:
val = pytest.mark.skip(reason=reason)
for k in marks:
if name in k:
marks[k].append(val)
Expand Down Expand Up @@ -1177,8 +1194,7 @@ def test_parse_pandas_metadata_null_index():
def test_read_no_metadata(tmpdir, engine):
# use pyarrow.parquet to create a parquet file without
# pandas metadata
pa = pytest.importorskip("pyarrow")
import pyarrow.parquet as pq
check_pyarrow()
tmp = str(tmpdir) + "table.parq"

table = pa.Table.from_arrays([pa.array([1, 2, 3]),
Expand Down Expand Up @@ -1342,7 +1358,7 @@ def test_with_tz(tmpdir, engine):

def test_arrow_partitioning(tmpdir):
# Issue #3518
pytest.importorskip('pyarrow')
check_pyarrow()
path = str(tmpdir)
data = {
'p': np.repeat(np.arange(3), 2).astype(np.int8),
Expand All @@ -1369,7 +1385,7 @@ def test_informative_error_messages():


def test_append_cat_fp(tmpdir):
pytest.importorskip('fastparquet')
check_fastparquet()
path = str(tmpdir)
# https://github.com/dask/dask/issues/4120
df = pd.DataFrame({"x": ["a", "a", "b", "a", "b"]})
Expand All @@ -1386,13 +1402,13 @@ def test_append_cat_fp(tmpdir):

def test_passing_parquetfile(tmpdir):
import shutil
fp = pytest.importorskip('fastparquet')
check_fastparquet()
path = str(tmpdir)
df = pd.DataFrame({"x": [1, 3, 2, 4]})
ddf = dd.from_pandas(df, npartitions=1)

dd.to_parquet(ddf, path)
pf = fp.ParquetFile(path)
pf = fastparquet.ParquetFile(path)
shutil.rmtree(path)

# should pass, because no need to re-read metadata
Expand Down

0 comments on commit 3dabb4d

Please sign in to comment.