Skip to content

Commit

Permalink
join paths os-independently (#271)
Browse files Browse the repository at this point in the history
* join paths os-independently

* fix the `long` problem on py3

* remove any mention of os-specific `sep`
let join_path do all the work of cleaning paths

* remove sep_from_open()

* fix join with root
throw error if bad path

* more opportunity for `join_path`

* fix absolute paths

* print() to figure what's going on on Linux

* fix for /tmp absolute directory

* revert the no-common-path test case
add absolute path test case

* add test that os-sepecif filename are ok
revert tempdir() fixture to make tests os-specific (and more demanding)

* another fix
add tests

* revert the skips I added
add comments about path standard form

* add back test for windows paths
ensure test passes

* fix test to work on linux

* revert tests so windows tests still deliver windows paths

* add restrictive skipif to the two tests that do not pass on win32 python2.7
  • Loading branch information
Kyle Lahnakoski authored and martindurant committed Jan 15, 2018
1 parent 4507685 commit 9c9e296
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 98 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ build
dist
parquet.egg-info
.idea
.cached
.cached
/.cache
34 changes: 16 additions & 18 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import struct

import numpy as np
from fastparquet.util import join_path

from .core import read_thrift
from .thrift_structures import parquet_thrift
Expand All @@ -38,20 +39,18 @@ class ParquetFile(object):
open_with: function
With the signature `func(path, mode)`, returns a context which
evaluated to a file open for reading. Defaults to the built-in `open`.
sep: string [`os.sep`]
Path separator to use, if data is in multiple files.
root: str
If passing a list of files, the top directory of the data-set may
be ambiguous for partitioning where the upmost field has only one
value. Use this to specify the data'set root directory, if required.
Attributes
----------
cats: dict
Columns derived from hive/drill directory information, with known
values for each column.
categories: list
Columns marked as categorical in the extra metadata (meaning the
Columns marked as categorical in the extra metadata (meaning the
data must have come from pandas).
columns: list of str
The data columns available
Expand All @@ -78,26 +77,25 @@ class ParquetFile(object):
Max/min/count of each column chunk
"""
def __init__(self, fn, verify=False, open_with=default_open,
sep=os.sep, root=False):
self.sep = sep
root=False):
if isinstance(fn, (tuple, list)):
basepath, fmd = metadata_from_many(fn, verify_schema=verify,
open_with=open_with, root=root)
if basepath:
self.fn = sep.join([basepath, '_metadata']) # effective file
self.fn = join_path(basepath, '_metadata') # effective file
else:
self.fn = '_metadata'
self.fmd = fmd
self._set_attrs()
else:
try:
fn2 = sep.join([fn, '_metadata'])
fn2 = join_path(fn, '_metadata')
self.fn = fn2
with open_with(fn2, 'rb') as f:
self._parse_header(f, verify)
fn = fn2
except (IOError, OSError):
self.fn = fn
self.fn = join_path(fn)
with open_with(fn, 'rb') as f:
self._parse_header(f, verify)
self.open = open_with
Expand Down Expand Up @@ -139,7 +137,7 @@ def _set_attrs(self):
self.schema = schema.SchemaHelper(self._schema)
self.selfmade = self.created_by.split(' ', 1)[0] == "fastparquet-python"
self.file_scheme = get_file_scheme([rg.columns[0].file_path
for rg in self.row_groups], self.sep)
for rg in self.row_groups])
self._read_partitions()
self._dtypes()

Expand All @@ -166,24 +164,24 @@ def _read_partitions(self):
cats = OrderedDict()
for rg in self.row_groups:
for col in rg.columns:
s = ex_from_sep(self.sep)
s = ex_from_sep('/')
path = col.file_path or ""
if self.file_scheme == 'hive':
partitions = s.findall(path)
for key, val in partitions:
cats.setdefault(key, set()).add(val)
else:
for i, val in enumerate(col.file_path.split(self.sep)[:-1]):
for i, val in enumerate(col.file_path.split('/')[:-1]):
key = 'dir%i' % i
cats.setdefault(key, set()).add(val)
self.cats = OrderedDict([(key, list([val_to_num(x) for x in v]))
for key, v in cats.items()])

def row_group_filename(self, rg):
if rg.columns[0].file_path:
base = self.fn.replace('_metadata', '').rstrip(self.sep)
base = self.fn.replace('_metadata', '').rstrip('/')
if base:
return self.sep.join([base, rg.columns[0].file_path])
return join_path(base, rg.columns[0].file_path)
else:
return rg.columns[0].file_path
else:
Expand Down Expand Up @@ -221,7 +219,7 @@ def read_row_group(self, rg, columns, categories, infile=None,
ret = True
core.read_row_group(
infile, rg, columns, categories, self.schema, self.cats,
self.selfmade, index=index, assign=assign, sep=self.sep,
self.selfmade, index=index, assign=assign,
scheme=self.file_scheme)
if ret:
return df
Expand Down Expand Up @@ -274,7 +272,7 @@ def filter_row_groups(self, filters):
"""
return [rg for rg in self.row_groups if
not(filter_out_stats(rg, filters, self.schema)) and
not(filter_out_cats(rg, filters, self.sep))]
not(filter_out_cats(rg, filters))]

def iter_row_groups(self, columns=None, categories=None, filters=[],
index=None):
Expand Down Expand Up @@ -674,7 +672,7 @@ def sorted_partitioned_columns(pf):
return out


def filter_out_cats(rg, filters, sep='/'):
def filter_out_cats(rg, filters):
"""
According to the filters, should this row-group be excluded
Expand All @@ -695,7 +693,7 @@ def filter_out_cats(rg, filters, sep='/'):
# TODO: fix for Drill
if len(filters) == 0 or rg.columns[0].file_path is None:
return False
s = ex_from_sep(sep)
s = ex_from_sep('/')
partitions = s.findall(rg.columns[0].file_path)
pairs = [(p[0], p[1]) for p in partitions]
for cat, v in pairs:
Expand Down
7 changes: 5 additions & 2 deletions fastparquet/benchmarks/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import numpy as np
import pandas as pd
import time

from fastparquet.util import join_path

from fastparquet import write, ParquetFile
from dask.utils import tmpdir

Expand All @@ -18,7 +21,7 @@ def measure(name, result):
def time_column():
with tmpdir() as tempdir:
result = {}
fn = os.path.join(tempdir, 'temp.parq')
fn = join_path(tempdir, 'temp.parq')
n = 10000000
r = np.random.randint(-1e10, 1e10, n)
d = pd.DataFrame({'w': pd.Categorical(np.random.choice(
Expand Down Expand Up @@ -81,7 +84,7 @@ def time_column():
def time_text():
with tmpdir() as tempdir:
result = {}
fn = os.path.join(tempdir, 'temp.parq')
fn = join_path(tempdir, 'temp.parq')
n = 1000000
d = pd.DataFrame({
'a': np.random.choice(['hi', 'you', 'people'], size=n),
Expand Down
12 changes: 6 additions & 6 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


def _read_page(file_obj, page_header, column_metadata):
"""Read the data page from the given file-object and convert it to raw,
"""Read the data page from the given file-object and convert it to raw,
uncompressed bytes (if necessary)."""
raw_bytes = file_obj.read(page_header.compressed_page_size)
raw_bytes = decompress_data(raw_bytes, column_metadata.codec)
Expand Down Expand Up @@ -277,11 +277,11 @@ def read_col(column, schema_helper, infile, use_cat=False,

def read_row_group_file(fn, rg, columns, categories, schema_helper, cats,
open=open, selfmade=False, index=None, assign=None,
sep=os.sep, scheme='hive'):
scheme='hive'):
with open(fn, mode='rb') as f:
return read_row_group(f, rg, columns, categories, schema_helper, cats,
selfmade=selfmade, index=index, assign=assign,
sep=sep, scheme=scheme)
scheme=scheme)


def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,
Expand Down Expand Up @@ -324,7 +324,7 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,

def read_row_group(file, rg, columns, categories, schema_helper, cats,
selfmade=False, index=None, assign=None,
sep=os.sep, scheme='hive'):
scheme='hive'):
"""
Access row-group in a file and read some columns into a data-frame.
"""
Expand All @@ -335,10 +335,10 @@ def read_row_group(file, rg, columns, categories, schema_helper, cats,

for cat in cats:
if scheme == 'hive':
s = ex_from_sep(sep)
s = ex_from_sep('/')
partitions = s.findall(rg.columns[0].file_path)
else:
partitions = [('dir%i' % i, v) for (i, v) in enumerate(
rg.columns[0].file_path.split(sep)[:-1])]
rg.columns[0].file_path.split('/')[:-1])]
val = val_to_num([p[1] for p in partitions if p[0] == cat][0])
assign[cat][:] = cats[cat].index(val)
7 changes: 4 additions & 3 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from fastparquet.test.util import tempdir
from fastparquet import write, ParquetFile
from fastparquet.api import statistics, sorted_partitioned_columns
from fastparquet.util import join_path

TEST_DATA = "test-data"

Expand Down Expand Up @@ -123,8 +124,8 @@ def test_attributes(tempdir):
assert pf.columns == ['x', 'y', 'z']
assert len(pf.row_groups) == 2
assert pf.count == 4
assert fn == pf.info['name']
assert fn in str(pf)
assert join_path(fn) == pf.info['name']
assert join_path(fn) in str(pf)
for col in df:
assert pf.dtypes[col] == df.dtypes[col]

Expand Down Expand Up @@ -215,7 +216,7 @@ def test_single_upper_directory(tempdir):
import glob
flist = list(sorted(glob.glob(os.path.join(tempdir, '*/*'))))
pf = ParquetFile(flist, root=tempdir)
assert pf.fn == os.path.join(tempdir, '_metadata')
assert pf.fn == join_path(os.path.join(tempdir, '_metadata'))
out = pf.to_pandas()
assert (out.y == 'aa').all()

Expand Down
4 changes: 3 additions & 1 deletion fastparquet/test/test_converted_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from __future__ import unicode_literals

import datetime
from decimal import Decimal
import sys

import numpy as np
import pandas as pd
import pytest
Expand Down Expand Up @@ -41,6 +42,7 @@ def test_date():
pd.to_datetime([datetime.date(2004, 11, 3)]))


@pytest.mark.skipif(sys.platform == 'win32' and PY2, reason='does not work on windows 32 py2.7')
def test_time_millis():
"""Test int32 encoding a timedelta in millis."""
schema = pt.SchemaElement(
Expand Down
41 changes: 24 additions & 17 deletions fastparquet/test/test_partition_filters_specialstrings.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@

import datetime as dt
import os
import shutil
import pytest
import string
import sys

import numpy as np
import pandas as pd
import pytest
from pandas.tslib import Timestamp
from fastparquet.test.util import tempdir
from six import PY2

from fastparquet import write, ParquetFile
import datetime as dt
import string
from fastparquet.test.util import tempdir


def frame_symbol_dtTrade_type_strike(days=1 * 252,
start_date=dt.datetime(2005, 1, 1, hour=0, minute=0, second=0),
Expand All @@ -22,27 +27,29 @@ def frame_symbol_dtTrade_type_strike(days=1 * 252,
tuple_list.append((x, y.year, y))
index = pd.MultiIndex.from_tuples(tuple_list, names=('symbol', 'year', 'dtTrade'))
np.random.seed(seed=0)
df = pd.DataFrame(np.random.randn(index.size, numbercolumns),
df = pd.DataFrame(np.random.randn(index.size, numbercolumns),
index=index, columns=[x for x in string.ascii_uppercase[0:numbercolumns]])
return df

@pytest.mark.parametrize('tempdir,input_symbols,input_days,file_scheme,input_columns,partitions,filters',
@pytest.mark.parametrize('tempdir,input_symbols,input_days,file_scheme,input_columns,partitions,filters',
[
(tempdir, ['NOW', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['now', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['TODAY', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['VIX*', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['QQQ*', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['QQQ!', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['Q%QQ', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['NOW', 'SPY', 'VIX'], 10, 'hive', 2, ['symbol', 'dtTrade'], [('symbol', '==', 'SPY')]),
(tempdir, ['NOW', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['now', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['TODAY', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['VIX*', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['QQQ*', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['QQQ!', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['Q%QQ', 'SPY', 'VIX'], 2*252, 'hive', 2, ['symbol', 'year'], [('symbol', '==', 'SPY')]),
(tempdir, ['NOW', 'SPY', 'VIX'], 10, 'hive', 2, ['symbol', 'dtTrade'], [('symbol', '==', 'SPY')]),
(tempdir, ['NOW', 'SPY', 'VIX'], 10, 'hive', 2, ['symbol', 'dtTrade'],
[('dtTrade','==','2005-01-02T00:00:00.000000000')]),
[('dtTrade','==','2005-01-02T00:00:00.000000000')]),
(tempdir, ['NOW', 'SPY', 'VIX'], 10, 'hive', 2, ['symbol', 'dtTrade'],
[('dtTrade','==', Timestamp('2005-01-01 00:00:00'))]),
[('dtTrade','==', Timestamp('2005-01-01 00:00:00'))]),
]
)
def test_frame_write_read_verify(tempdir, input_symbols, input_days, file_scheme,

@pytest.mark.skipif(sys.platform=='win32' and PY2, reason='does not work on windows 32 py2.7')
def test_frame_write_read_verify(tempdir, input_symbols, input_days, file_scheme,
input_columns, partitions, filters):
#Generate Temp Director for parquet Files
fdir = str(tempdir)
Expand Down
43 changes: 36 additions & 7 deletions fastparquet/test/test_util.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,57 @@
import os
import pytest

from fastparquet.util import analyse_paths, get_file_scheme, val_to_num
from fastparquet.util import analyse_paths, get_file_scheme, val_to_num, join_path


def test_analyse_paths():
file_list = ['a', 'b']
base, out = analyse_paths(file_list, '/')
base, out = analyse_paths(file_list)
assert (base, out) == ('', ['a', 'b'])

file_list = ['c/a', 'c/b']
base, out = analyse_paths(file_list, '/')
base, out = analyse_paths(file_list)
assert (base, out) == ('c', ['a', 'b'])

file_list = ['c/d/a', 'c/d/b']
base, out = analyse_paths(file_list, '/')
base, out = analyse_paths(file_list)
assert (base, out) == ('c/d', ['a', 'b'])

file_list = ['/c/d/a', '/c/d/b']
base, out = analyse_paths(file_list)
assert (base, out) == ('/c/d', ['a', 'b'])

file_list = ['c/cat=1/a', 'c/cat=2/b', 'c/cat=1/c']
base, out = analyse_paths(file_list, '/')
base, out = analyse_paths(file_list)
assert (base, out) == ('c', ['cat=1/a', 'cat=2/b', 'cat=1/c'])

file_list = ['c\\cat=1\\a', 'c\\cat=2\\b', 'c\\cat=1\\c']
base, out = analyse_paths(file_list, '\\')
assert (base, out) == ('c', ['cat=1\\a', 'cat=2\\b', 'cat=1\\c'])
temp, os.sep = os.sep, '\\' # We must trick linux into thinking this is windows for this test to work
base, out = analyse_paths(file_list)
os.sep = temp
assert (base, out) == ('c', ['cat=1/a', 'cat=2/b', 'cat=1/c'])


def test_empty():
assert join_path("test", ""), "test"


def test_parents():
assert join_path("test", "../../..") == "../.."

with pytest.raises(Exception):
join_path("/test", "../../..")
with pytest.raises(Exception):
join_path("/test", "../..")


def test_abs_and_rel_paths():
assert join_path('/', 'this/is/a/test/') == '/this/is/a/test'
assert join_path('.', 'this/is/a/test/') == 'this/is/a/test'
assert join_path('', 'this/is/a/test/') == 'this/is/a/test'
assert join_path('/test', '.') == '/test'
assert join_path('/test', '..', 'this') == '/this'
assert join_path('/test', '../this') == '/this'


def test_file_scheme():
Expand Down

0 comments on commit 9c9e296

Please sign in to comment.