Skip to content

Commit

Permalink
ARROW-8136: [Python] Restore creating a dataset from a relative path
Browse files Browse the repository at this point in the history
Closes #6643 from jorisvandenbossche/ARROW-8136

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
  • Loading branch information
jorisvandenbossche authored and bkietz committed Mar 17, 2020
1 parent 70db8ab commit 8829700
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 11 deletions.
14 changes: 10 additions & 4 deletions python/pyarrow/dataset.py
Expand Up @@ -153,11 +153,17 @@ def partitioning(schema=None, field_names=None, flavor=None):

def _ensure_fs(filesystem, path):
# Validate or infer the filesystem from the path
from pyarrow.fs import FileSystem
from pyarrow.fs import FileSystem, LocalFileSystem

if filesystem is not None:
return filesystem, path
return FileSystem.from_uri(path)
if filesystem is None:
try:
return FileSystem.from_uri(path)
except Exception:
# when path is not found (e.g. relative path), we fall back to a
# local file system
filesystem = LocalFileSystem()

return filesystem, path


def _ensure_fs_and_paths(path, filesystem=None):
Expand Down
49 changes: 42 additions & 7 deletions python/pyarrow/tests/test_dataset.py
Expand Up @@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.

import contextlib
import operator
import os

import numpy as np
import pytest
Expand All @@ -38,6 +40,16 @@
pytestmark = pytest.mark.dataset


@contextlib.contextmanager
def change_cwd(path):
curdir = os.getcwd()
os.chdir(str(path))
try:
yield
finally:
os.chdir(curdir)


def _generate_data(n):
import datetime
import itertools
Expand Down Expand Up @@ -601,6 +613,13 @@ def _check_dataset_from_path(path, table, **kwargs):
result = dataset.to_table(use_threads=False) # deterministic row order
assert result.equals(table)

# relative string path
with change_cwd(path.parent):
dataset = ds.dataset(ds.factory(path.name, **kwargs))
assert dataset.schema.equals(table.schema)
result = dataset.to_table(use_threads=False) # deterministic row order
assert result.equals(table)

# passing directly to dataset
dataset = ds.dataset(str(path), **kwargs)
assert dataset.schema.equals(table.schema)
Expand Down Expand Up @@ -642,28 +661,39 @@ def test_open_dataset_list_of_files(tempdir):
def test_open_dataset_partitioned_directory(tempdir):
import pyarrow.parquet as pq
table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5})

path = tempdir / "dataset"
path.mkdir()

for part in range(3):
path = tempdir / "part={}".format(part)
path.mkdir()
pq.write_table(table, path / "test.parquet")
part = path / "part={}".format(part)
part.mkdir()
pq.write_table(table, part / "test.parquet")

# no partitioning specified, just read all individual files
full_table = pa.concat_tables([table] * 3)
_check_dataset_from_path(tempdir, full_table)
_check_dataset_from_path(path, full_table)

# specify partition scheme with discovery
dataset = ds.dataset(
str(tempdir), partitioning=ds.partitioning(flavor="hive"))
str(path), partitioning=ds.partitioning(flavor="hive"))
expected_schema = table.schema.append(pa.field("part", pa.int32()))
assert dataset.schema.equals(expected_schema)

# specify partition scheme with discovery and relative path
with change_cwd(tempdir):
dataset = ds.dataset(
"dataset/", partitioning=ds.partitioning(flavor="hive"))
expected_schema = table.schema.append(pa.field("part", pa.int32()))
assert dataset.schema.equals(expected_schema)

# specify partition scheme with string short-cut
dataset = ds.dataset(str(tempdir), partitioning="hive")
dataset = ds.dataset(str(path), partitioning="hive")
assert dataset.schema.equals(expected_schema)

# specify partition scheme with explicit scheme
dataset = ds.dataset(
str(tempdir),
str(path),
partitioning=ds.partitioning(
pa.schema([("part", pa.int8())]), flavor="hive"))
expected_schema = table.schema.append(pa.field("part", pa.int8()))
Expand All @@ -688,6 +718,11 @@ def test_open_dataset_filesystem(tempdir):
dataset2 = ds.dataset(str(path), filesystem=fs.LocalFileSystem())
assert dataset2.schema.equals(table.schema)

# local filesystem specified with relative path
with change_cwd(tempdir):
dataset3 = ds.dataset("test.parquet", filesystem=fs.LocalFileSystem())
assert dataset3.schema.equals(table.schema)

# passing different filesystem
with pytest.raises(FileNotFoundError):
ds.dataset(str(path), filesystem=fs._MockFileSystem())
Expand Down

0 comments on commit 8829700

Please sign in to comment.