Skip to content

Commit

Permalink
Use non-ns units on timestamps declared the old way (#874)
Browse files Browse the repository at this point in the history
* Use non-ns units on timestamps declared the old way

* Maybe fixed

* fix v2

* pandas dep
  • Loading branch information
martindurant committed Aug 17, 2023
1 parent d83c555 commit 9995b9d
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ jobs:
run: |
pip install Cython
pip install hypothesis
pip install pytest-httpserver pytest-localserver pytest-xdist pytest-asyncio
pip install pytest-localserver pytest-xdist pytest-asyncio
pip install -e . --no-deps # Install fastparquet
git clone https://github.com/pandas-dev/pandas
cd pandas
Expand Down
6 changes: 6 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ API
.. autofunction:: write

.. autofunction:: update_file_custom_metadata


.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
6 changes: 6 additions & 0 deletions docs/source/details.rst
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,9 @@ Dask and Pandas fully support calling ``fastparquet`` directly, with the functio
Please see their relevant docstrings. Remote filesystems are supported by using
a URL with a "protocol://" specifier and any ``storage_options`` to be passed to
the file system implementation.


.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
6 changes: 6 additions & 0 deletions docs/source/filesystems.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ Similarly, providing an open function and another to make any necessary director
row_group_offsets=[0, 500], open_with=myopen, mkdirs=noop)
(In the case of s3, no intermediate directories need to be created)


.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
6 changes: 6 additions & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,9 @@ Index
1. :ref:`genindex`
1. :ref:`modindex`
1. :ref:`search`


.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
6 changes: 6 additions & 0 deletions docs/source/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,9 @@ This will produce a ``build/html/`` subdirectory, where the entry point is
``index.html``.




.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
6 changes: 6 additions & 0 deletions docs/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,9 @@ Further options that may be of interest are:
write('outdir.parq', df, row_group_offsets=[0, 10000, 20000],
compression='GZIP', file_scheme='hive')
.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
6 changes: 6 additions & 0 deletions docs/source/releasenotes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,9 @@ NB: minor versions up to 0.6.3 fix build issues
consolidate of many data files.

.. _cramjam: https://github.com/milesgranger/pyrus-cramjam


.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
33 changes: 16 additions & 17 deletions fastparquet/converted_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ def tobson(x):
parquet_thrift.ConvertedType.INT_16: np.dtype("int16"),
parquet_thrift.ConvertedType.INT_32: np.dtype('int32'),
parquet_thrift.ConvertedType.INT_64: np.dtype('int64'),
parquet_thrift.ConvertedType.TIME_MILLIS: np.dtype('<m8[ns]'),
parquet_thrift.ConvertedType.TIME_MILLIS: np.dtype('<m8[ms]'),
parquet_thrift.ConvertedType.DATE: np.dtype('<M8[ns]'),
parquet_thrift.ConvertedType.TIMESTAMP_MILLIS: np.dtype('<M8[ns]'),
parquet_thrift.ConvertedType.TIME_MICROS: np.dtype('<m8[ns]'),
parquet_thrift.ConvertedType.TIMESTAMP_MICROS: np.dtype('<M8[ns]')
parquet_thrift.ConvertedType.TIMESTAMP_MILLIS: np.dtype('<M8[ms]'),
parquet_thrift.ConvertedType.TIME_MICROS: np.dtype('<m8[us]'),
parquet_thrift.ConvertedType.TIMESTAMP_MICROS: np.dtype('<M8[us]')
}
nullable = {
np.dtype('int8'): pd.Int8Dtype(),
Expand Down Expand Up @@ -115,6 +115,8 @@ def typemap(se, md=None):
return simple[se.type]
else:
return np.dtype("S%i" % se.type_length)
if md and "time" in md.get("numpy_type", ""):
return np.dtype(md["numpy_type"])
if se.converted_type in complex:
return complex[se.converted_type]
return np.dtype("O")
Expand Down Expand Up @@ -145,7 +147,7 @@ def converts_inplace(se):
return False


def convert(data, se, timestamp96=True):
def convert(data, se, timestamp96=True, dtype=None):
"""Convert known types from primitive to rich.
Parameters
Expand All @@ -157,6 +159,7 @@ def convert(data, se, timestamp96=True):
ctype = se.converted_type
if se.type == parquet_thrift.Type.INT96 and timestamp96:
data2 = data.view([('ns', 'i8'), ('day', 'i4')])
# TODO: this should be ms unit, now that we can?
return ((data2['day'] - 2440588) * 86400000000000 +
data2['ns']).view('M8[ns]')
if se.logicalType is not None and se.logicalType.TIMESTAMP is not None:
Expand All @@ -179,6 +182,7 @@ def convert(data, se, timestamp96=True):
# NB: general but slow method
# could optimize when data.dtype.itemsize <= 8
# TODO: easy cythonize (but rare)
# TODO: extension point for pandas-decimal (no conversion needed)
return np.array([
int.from_bytes(
data.data[i:i + 1], byteorder='big', signed=True
Expand All @@ -189,21 +193,16 @@ def convert(data, se, timestamp96=True):
data = data * DAYS_TO_MILLIS
return data.view('datetime64[ns]')
elif ctype == parquet_thrift.ConvertedType.TIME_MILLIS:
out = data.astype('int64', copy=False)
time_shift(out.view("int64"), 1000000)
return out.view('timedelta64[ns]')
# this was not covered by new pandas time units
data = data.astype('int64', copy=False)
time_shift(data, 1000000)
return data.view('timedelta64[ns]')
elif ctype == parquet_thrift.ConvertedType.TIMESTAMP_MILLIS:
out = data
time_shift(data.view("int64"), 1000000)
return out.view('datetime64[ns]')
return data.view('datetime64[ms]')
elif ctype == parquet_thrift.ConvertedType.TIME_MICROS:
out = data
time_shift(data.view("int64"))
return out.view('timedelta64[ns]')
return data.view('timedelta64[us]')
elif ctype == parquet_thrift.ConvertedType.TIMESTAMP_MICROS:
out = data
time_shift(data.view("int64"))
return out.view('datetime64[ns]')
return data.view('datetime64[us]')
elif ctype == parquet_thrift.ConvertedType.UINT_8:
# TODO: return strided views?
# data.view('uint8')[::data.itemsize].view(out_dtype)
Expand Down
9 changes: 5 additions & 4 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
# can read-into
into0 = ((use_cat or converts_inplace(se) and see)
and data_header2.num_nulls == 0
and max_rep == 0 and assign.dtype.kind != "O" and row_filter is None)
and max_rep == 0 and assign.dtype.kind != "O" and row_filter is None
and assign.dtype.kind not in "Mm") # TODO: this can be done in place but is complex
if row_filter is None:
row_filter = Ellipsis
# can decompress-into
Expand Down Expand Up @@ -548,7 +549,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
if d and not use_cat:
part[defi == max_defi] = dic[val]
elif not use_cat:
part[defi == max_defi] = convert(val, se)
part[defi == max_defi] = convert(val, se, dtype=assign.dtype)
else:
part[defi == max_defi] = val
else:
Expand All @@ -557,7 +558,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
piece = piece._data
if use_cat and not d:
# only possible for multi-index
val = convert(val, se)
val = convert(val, se, dtype=assign.dtype)
try:
i = pd.Categorical(val)
except:
Expand All @@ -567,7 +568,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
elif d and not use_cat:
piece[:] = dic[val]
elif not use_cat:
piece[:] = convert(val, se)
piece[:] = convert(val, se, dtype=assign.dtype)
else:
piece[:] = val

Expand Down
3 changes: 2 additions & 1 deletion fastparquet/test/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ def test_auto_null_object(tempdir):
df['bb'] = df['b'].astype('object')
df['aaa'] = df['a'].astype('object')
object_cols = ['d', 'ff', 'bb', 'aaa', 'aa']
test_cols = list(set(df) - set(object_cols)) + ['d']
test_cols = list(set(df) - set(object_cols) - {"c"}) + ['d']
fn = os.path.join(tmp, "test.parq")

with pytest.raises(ValueError):
Expand All @@ -573,6 +573,7 @@ def test_auto_null_object(tempdir):
assert col.repetition_type == parquet_thrift.FieldRepetitionType.OPTIONAL
df2 = pf.to_pandas(categories=['e'])

assert df2.c.equals(df.c)
tm.assert_frame_equal(df[test_cols], df2[test_cols], check_categorical=False,
check_dtype=False)
tm.assert_frame_equal(df[['bb']].astype('float64'), df2[['bb']])
Expand Down
8 changes: 6 additions & 2 deletions fastparquet/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,12 @@ def convert(data, se):

elif converted_type == parquet_thrift.ConvertedType.TIME_MICROS:
# TODO: shift inplace
out = np.empty(len(data), 'int64')
time_shift(data.values.view('int64'), out)
if data.dtype == "m8[ns]":
out = np.empty(len(data), 'int64')
time_shift(data.values.view('int64'), out)
else:
# assuming ms or us
out = data.values
elif type == parquet_thrift.Type.INT96 and dtype.kind == 'M':
ns_per_day = (24 * 3600 * 1000000000)
day = data.values.view('int64') // ns_per_day + 2440588
Expand Down
10 changes: 0 additions & 10 deletions readthedocs.yml

This file was deleted.

0 comments on commit 9995b9d

Please sign in to comment.