Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Saving and Loading partitioned parquet with empty partitions is inconsistant #5252

Closed
bluecoconut opened this issue Aug 8, 2019 · 13 comments

Comments

@bluecoconut
Copy link
Contributor

bluecoconut commented Aug 8, 2019

Problems

There seems to be a few issues with saving and loading partitioned dataframes where some partitions are empty or where columns have null values. I'm also finding that the behavior is inconsistent and semi-unpredictable.

  1. In fastparquet: It seems I can save and re-load the parquet files, but then repartition on the resulting dataframe does not always succeed depending on the number of partitions requested.
  2. In pyarrow it seems as if there is an empty partition, then saving and loading doesn't always work (schema validation error). However, even in the case that the partitions are all populated, sometimes re partitioning fails.
  3. Without serializing the dataframes to parquet, it seems like everything behaves fine. (repartioning is always fine).

Environment

Ubuntu 18.04
Python 3.7.1

Python environment:

numpy==1.17.0
pandas==0.25.0
fastparquet==0.3.2
pyarrow==0.14.1
dask==2.2.0

Example Code

Fastparquet

from dask.datasets import timeseries
import dask.dataframe as dd

engine = 'fastparquet'
print(f"Engine: {engine}")
for i in range(10):
    df = timeseries().persist()
    df_filtered = df[df.x>0.99999]
    print(f"Length of dataframe going in {len(df_filtered)}, Npartitions going in {df_filtered.npartitions}")
    df_filtered.to_parquet(f"{engine}{i}.pq", engine=engine)
    try:
        fpq = dd.read_parquet(f"{engine}{i}.pq", engine=engine)
    except ValueError as e:
        print(f"Load failed: {e}")
        continue
    for N in range(1, 50):
        try:
            fpq.repartition(npartitions=N)
            print(f"[✔️{N}]", end =" ")
        except ValueError:
            print(f"[✖️{N}]", end =" ")
    print()
Fast Parquet Output (be sure to scroll right, and see how they fail at different times that seem unrelated (?) to the df length)

Engine: fastparquet
Length of dataframe going in 13, Npartitions going in 30
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✔️18] [✔️19] [✔️20] [✔️21] [✖️22] [✖️23] [✖️24] [✖️25] [✖️26] [✖️27] [✖️28] [✖️29] [✖️30] [✖️31] [✖️32] [✖️33] [✖️34] [✖️35] [✖️36] [✖️37] [✖️38] [✖️39] [✖️40] [✖️41] [✖️42] [✖️43] [✖️44] [✖️45] [✖️46] [✖️47] [✖️48] [✖️49] 
Length of dataframe going in 12, Npartitions going in 30
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✖️18] [✖️19] [✖️20] [✖️21] [✖️22] [✖️23] [✖️24] [✖️25] [✖️26] [✖️27] [✖️28] [✖️29] [✖️30] [✖️31] [✖️32] [✖️33] [✖️34] [✖️35] [✖️36] [✖️37] [✖️38] [✖️39] [✖️40] [✖️41] [✖️42] [✖️43] [✖️44] [✖️45] [✖️46] [✖️47] [✖️48] [✖️49] 
Length of dataframe going in 9, Npartitions going in 30
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✖️18] [✖️19] [✖️20] [✖️21] [✖️22] [✖️23] [✖️24] [✖️25] [✖️26] [✖️27] [✖️28] [✖️29] [✖️30] [✖️31] [✖️32] [✖️33] [✖️34] [✖️35] [✖️36] [✖️37] [✖️38] [✖️39] [✖️40] [✖️41] [✖️42] [✖️43] [✖️44] [✖️45] [✖️46] [✖️47] [✖️48] [✖️49] 
Length of dataframe going in 15, Npartitions going in 30
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✔️18] [✔️19] [✔️20] [✔️21] [✔️22] [✔️23] [✔️24] [✔️25] [✖️26] [✖️27] [✖️28] [✖️29] [✖️30] [✖️31] [✖️32] [✖️33] [✖️34] [✖️35] [✖️36] [✖️37] [✖️38] [✖️39] [✖️40] [✖️41] [✖️42] [✖️43] [✖️44] [✖️45] [✖️46] [✖️47] [✖️48] [✖️49] 
Length of dataframe going in 10, Npartitions going in 30
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✔️18] [✔️19] [✔️20] [✔️21] [✔️22] [✔️23] [✔️24] [✔️25] [✔️26] [✔️27] [✔️28] [✔️29] [✔️30] [✔️31] [✔️32] [✔️33] [✔️34] [✔️35] [✔️36] [✔️37] [✔️38] [✔️39] [✔️40] [✔️41] [✔️42] [✔️43] [✔️44] [✔️45] [✔️46] [✔️47] [✔️48] [✔️49] 
Length of dataframe going in 12, Npartitions going in 30
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✔️18] [✔️19] [✔️20] [✔️21] [✔️22] [✔️23] [✔️24] [✔️25] [✔️26] [✔️27] [✔️28] [✔️29] [✔️30] [✔️31] [✔️32] [✔️33] [✔️34] [✔️35] [✔️36] [✔️37] [✔️38] [✔️39] [✔️40] [✔️41] [✔️42] [✔️43] [✔️44] [✔️45] [✔️46] [✔️47] [✔️48] [✔️49] 
Length of dataframe going in 15, Npartitions going in 30
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✔️18] [✔️19] [✔️20] [✔️21] [✔️22] [✔️23] [✔️24] [✔️25] [✔️26] [✔️27] [✖️28] [✖️29] [✖️30] [✖️31] [✖️32] [✖️33] [✖️34] [✖️35] [✖️36] [✖️37] [✖️38] [✖️39] [✖️40] [✖️41] [✖️42] [✖️43] [✖️44] [✖️45] [✖️46] [✖️47] [✖️48] [✖️49] 
Length of dataframe going in 15, Npartitions going in 30
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✔️18] [✔️19] [✔️20] [✔️21] [✔️22] [✔️23] [✔️24] [✔️25] [✔️26] [✔️27] [✖️28] [✖️29] [✖️30] [✖️31] [✖️32] [✖️33] [✖️34] [✖️35] [✖️36] [✖️37] [✖️38] [✖️39] [✖️40] [✖️41] [✖️42] [✖️43] [✖️44] [✖️45] [✖️46] [✖️47] [✖️48] [✖️49] 
Length of dataframe going in 14, Npartitions going in 30
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✔️18] [✔️19] [✔️20] [✔️21] [✔️22] [✔️23] [✖️24] [✖️25] [✖️26] [✖️27] [✖️28] [✖️29] [✖️30] [✖️31] [✖️32] [✖️33] [✖️34] [✖️35] [✖️36] [✖️37] [✖️38] [✖️39] [✖️40] [✖️41] [✖️42] [✖️43] [✖️44] [✖️45] [✖️46] [✖️47] [✖️48] [✖️49] 
Length of dataframe going in 14, Npartitions going in 30
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✔️18] [✔️19] [✖️20] [✖️21] [✖️22] [✖️23] [✖️24] [✖️25] [✖️26] [✖️27] [✖️28] [✖️29] [✖️30] [✖️31] [✖️32] [✖️33] [✖️34] [✖️35] [✖️36] [✖️37] [✖️38] [✖️39] [✖️40] [✖️41] [✖️42] [✖️43] [✖️44] [✖️45] [✖️46] [✖️47] [✖️48] [✖️49] 

Pyarrow Input

(Almost exact same code, notice the change to repartition(npartitions=5), to give it a fighting chance)

from dask.datasets import timeseries
import dask.dataframe as dd

engine = 'pyarrow'
print(f"Engine: {engine}")
for i in range(10):
    df = timeseries().persist()
    df_filtered = df[df.x>0.99999].repartition(npartitions=5)
    print(f"Length of dataframe going in {len(df_filtered)}, Npartitions going in {df_filtered.npartitions}")
    df_filtered.to_parquet(f"{engine}{i}.pq", engine=engine)
    try:
        fpq = dd.read_parquet(f"{engine}{i}.pq", engine=engine)
    except ValueError as e:
        print(f"Load failed: {e}")
        continue
    for N in range(1, 50):
        try:
            fpq.repartition(npartitions=N)
            print(f"[✔️{N}]", end =" ")
        except ValueError:
            print(f"[✖️{N}]", end =" ")
    print()

Pyarrow Output

Engine: pyarrow
Length of dataframe going in 14, Npartitions going in 5
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✔️18] [✔️19] [✔️20] [✔️21] [✔️22] [✔️23] [✔️24] [✔️25] [✔️26] [✔️27] [✔️28] [✔️29] [✔️30] [✔️31] [✔️32] [✔️33] [✔️34] [✔️35] [✔️36] [✔️37] [✔️38] [✔️39] [✔️40] [✔️41] [✔️42] [✔️43] [✔️44] [✔️45] [✔️46] [✔️47] [✔️48] [✔️49] 
Length of dataframe going in 13, Npartitions going in 5
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✖️10] [✖️11] [✖️12] [✖️13] [✖️14] [✖️15] [✖️16] [✖️17] [✖️18] [✖️19] [✖️20] [✖️21] [✖️22] [✖️23] [✖️24] [✖️25] [✖️26] [✖️27] [✖️28] [✖️29] [✖️30] [✖️31] [✖️32] [✖️33] [✖️34] [✖️35] [✖️36] [✖️37] [✖️38] [✖️39] [✖️40] [✖️41] [✖️42] [✖️43] [✖️44] [✖️45] [✖️46] [✖️47] [✖️48] [✖️49] 
Length of dataframe going in 7, Npartitions going in 5
Load failed: Schema in /home/jawaugh/code/notebooks/justin/dask-partitions/pyarrow2.pq/part.4.parquet was different. 
id: int64
name: null
x: double
y: double
timestamp: timestamp[us]
metadata
--------
{b'pandas': b'{"index_columns": ["timestamp"], "column_indexes": [{"name": nul'
            b'l, "field_name": null, "pandas_type": "unicode", "numpy_type": "'
            b'object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name'
            b'": "id", "field_name": "id", "pandas_type": "int64", "numpy_type'
            b'": "int64", "metadata": null}, {"name": "name", "field_name": "n'
            b'ame", "pandas_type": "empty", "numpy_type": "object", "metadata"'
            b': null}, {"name": "x", "field_name": "x", "pandas_type": "float6'
            b'4", "numpy_type": "float64", "metadata": null}, {"name": "y", "f'
            b'ield_name": "y", "pandas_type": "float64", "numpy_type": "float6'
            b'4", "metadata": null}, {"name": "timestamp", "field_name": "time'
            b'stamp", "pandas_type": "datetime", "numpy_type": "datetime64[ns]'
            b'", "metadata": null}], "creator": {"library": "pyarrow", "versio'
            b'n": "0.14.1"}, "pandas_version": "0.25.0"}'}

vs

id: int64
name: string
x: double
y: double
timestamp: timestamp[us]
metadata
--------
{b'pandas': b'{"index_columns": ["timestamp"], "column_indexes": [{"name": nul'
            b'l, "field_name": null, "pandas_type": "unicode", "numpy_type": "'
            b'object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name'
            b'": "id", "field_name": "id", "pandas_type": "int64", "numpy_type'
            b'": "int64", "metadata": null}, {"name": "name", "field_name": "n'
            b'ame", "pandas_type": "unicode", "numpy_type": "object", "metadat'
            b'a": null}, {"name": "x", "field_name": "x", "pandas_type": "floa'
            b't64", "numpy_type": "float64", "metadata": null}, {"name": "y", '
            b'"field_name": "y", "pandas_type": "float64", "numpy_type": "floa'
            b't64", "metadata": null}, {"name": "timestamp", "field_name": "ti'
            b'mestamp", "pandas_type": "datetime", "numpy_type": "datetime64[n'
            b's]", "metadata": null}], "creator": {"library": "pyarrow", "vers'
            b'ion": "0.14.1"}, "pandas_version": "0.25.0"}'}
Length of dataframe going in 11, Npartitions going in 5
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✔️18] [✔️19] [✔️20] [✔️21] [✔️22] [✔️23] [✔️24] [✔️25] [✔️26] [✔️27] [✔️28] [✔️29] [✔️30] [✔️31] [✔️32] [✔️33] [✔️34] [✔️35] [✔️36] [✔️37] [✔️38] [✔️39] [✔️40] [✔️41] [✔️42] [✔️43] [✔️44] [✔️45] [✔️46] [✔️47] [✔️48] [✔️49] 
Length of dataframe going in 6, Npartitions going in 5
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✖️10] [✖️11] [✖️12] [✖️13] [✖️14] [✖️15] [✖️16] [✖️17] [✖️18] [✖️19] [✖️20] [✖️21] [✖️22] [✖️23] [✖️24] [✖️25] [✖️26] [✖️27] [✖️28] [✖️29] [✖️30] [✖️31] [✖️32] [✖️33] [✖️34] [✖️35] [✖️36] [✖️37] [✖️38] [✖️39] [✖️40] [✖️41] [✖️42] [✖️43] [✖️44] [✖️45] [✖️46] [✖️47] [✖️48] [✖️49] 
Length of dataframe going in 7, Npartitions going in 5
Load failed: Schema in /home/jawaugh/code/notebooks/justin/dask-partitions/pyarrow5.pq/part.1.parquet was different. 
id: int64
name: null
x: double
y: double
timestamp: timestamp[us]
metadata
--------
{b'pandas': b'{"index_columns": ["timestamp"], "column_indexes": [{"name": nul'
            b'l, "field_name": null, "pandas_type": "unicode", "numpy_type": "'
            b'object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name'
            b'": "id", "field_name": "id", "pandas_type": "int64", "numpy_type'
            b'": "int64", "metadata": null}, {"name": "name", "field_name": "n'
            b'ame", "pandas_type": "empty", "numpy_type": "object", "metadata"'
            b': null}, {"name": "x", "field_name": "x", "pandas_type": "float6'
            b'4", "numpy_type": "float64", "metadata": null}, {"name": "y", "f'
            b'ield_name": "y", "pandas_type": "float64", "numpy_type": "float6'
            b'4", "metadata": null}, {"name": "timestamp", "field_name": "time'
            b'stamp", "pandas_type": "datetime", "numpy_type": "datetime64[ns]'
            b'", "metadata": null}], "creator": {"library": "pyarrow", "versio'
            b'n": "0.14.1"}, "pandas_version": "0.25.0"}'}

vs

id: int64
name: string
x: double
y: double
timestamp: timestamp[us]
metadata
--------
{b'pandas': b'{"index_columns": ["timestamp"], "column_indexes": [{"name": nul'
            b'l, "field_name": null, "pandas_type": "unicode", "numpy_type": "'
            b'object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name'
            b'": "id", "field_name": "id", "pandas_type": "int64", "numpy_type'
            b'": "int64", "metadata": null}, {"name": "name", "field_name": "n'
            b'ame", "pandas_type": "unicode", "numpy_type": "object", "metadat'
            b'a": null}, {"name": "x", "field_name": "x", "pandas_type": "floa'
            b't64", "numpy_type": "float64", "metadata": null}, {"name": "y", '
            b'"field_name": "y", "pandas_type": "float64", "numpy_type": "floa'
            b't64", "metadata": null}, {"name": "timestamp", "field_name": "ti'
            b'mestamp", "pandas_type": "datetime", "numpy_type": "datetime64[n'
            b's]", "metadata": null}], "creator": {"library": "pyarrow", "vers'
            b'ion": "0.14.1"}, "pandas_version": "0.25.0"}'}
Length of dataframe going in 12, Npartitions going in 5
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✖️10] [✖️11] [✖️12] [✖️13] [✖️14] [✖️15] [✖️16] [✖️17] [✖️18] [✖️19] [✖️20] [✖️21] [✖️22] [✖️23] [✖️24] [✖️25] [✖️26] [✖️27] [✖️28] [✖️29] [✖️30] [✖️31] [✖️32] [✖️33] [✖️34] [✖️35] [✖️36] [✖️37] [✖️38] [✖️39] [✖️40] [✖️41] [✖️42] [✖️43] [✖️44] [✖️45] [✖️46] [✖️47] [✖️48] [✖️49] 
Length of dataframe going in 6, Npartitions going in 5
Load failed: Schema in /home/jawaugh/code/notebooks/justin/dask-partitions/pyarrow7.pq/part.2.parquet was different. 
id: int64
name: null
x: double
y: double
timestamp: timestamp[us]
metadata
--------
{b'pandas': b'{"index_columns": ["timestamp"], "column_indexes": [{"name": nul'
            b'l, "field_name": null, "pandas_type": "unicode", "numpy_type": "'
            b'object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name'
            b'": "id", "field_name": "id", "pandas_type": "int64", "numpy_type'
            b'": "int64", "metadata": null}, {"name": "name", "field_name": "n'
            b'ame", "pandas_type": "empty", "numpy_type": "object", "metadata"'
            b': null}, {"name": "x", "field_name": "x", "pandas_type": "float6'
            b'4", "numpy_type": "float64", "metadata": null}, {"name": "y", "f'
            b'ield_name": "y", "pandas_type": "float64", "numpy_type": "float6'
            b'4", "metadata": null}, {"name": "timestamp", "field_name": "time'
            b'stamp", "pandas_type": "datetime", "numpy_type": "datetime64[ns]'
            b'", "metadata": null}], "creator": {"library": "pyarrow", "versio'
            b'n": "0.14.1"}, "pandas_version": "0.25.0"}'}

vs

id: int64
name: string
x: double
y: double
timestamp: timestamp[us]
metadata
--------
{b'pandas': b'{"index_columns": ["timestamp"], "column_indexes": [{"name": nul'
            b'l, "field_name": null, "pandas_type": "unicode", "numpy_type": "'
            b'object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name'
            b'": "id", "field_name": "id", "pandas_type": "int64", "numpy_type'
            b'": "int64", "metadata": null}, {"name": "name", "field_name": "n'
            b'ame", "pandas_type": "unicode", "numpy_type": "object", "metadat'
            b'a": null}, {"name": "x", "field_name": "x", "pandas_type": "floa'
            b't64", "numpy_type": "float64", "metadata": null}, {"name": "y", '
            b'"field_name": "y", "pandas_type": "float64", "numpy_type": "floa'
            b't64", "metadata": null}, {"name": "timestamp", "field_name": "ti'
            b'mestamp", "pandas_type": "datetime", "numpy_type": "datetime64[n'
            b's]", "metadata": null}], "creator": {"library": "pyarrow", "vers'
            b'ion": "0.14.1"}, "pandas_version": "0.25.0"}'}
Length of dataframe going in 16, Npartitions going in 5
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✔️18] [✔️19] [✔️20] [✔️21] [✔️22] [✔️23] [✔️24] [✔️25] [✔️26] [✔️27] [✔️28] [✔️29] [✔️30] [✔️31] [✔️32] [✔️33] [✔️34] [✔️35] [✔️36] [✔️37] [✔️38] [✔️39] [✔️40] [✔️41] [✔️42] [✔️43] [✔️44] [✔️45] [✔️46] [✔️47] [✔️48] [✔️49] 
Length of dataframe going in 13, Npartitions going in 5
[✔️1] [✔️2] [✔️3] [✔️4] [✔️5] [✔️6] [✔️7] [✔️8] [✔️9] [✔️10] [✔️11] [✔️12] [✔️13] [✔️14] [✔️15] [✔️16] [✔️17] [✔️18] [✔️19] [✔️20] [✔️21] [✔️22] [✔️23] [✔️24] [✔️25] [✔️26] [✔️27] [✔️28] [✔️29] [✔️30] [✔️31] [✔️32] [✔️33] [✔️34] [✔️35] [✔️36] [✔️37] [✔️38] [✔️39] [✔️40] [✔️41] [✔️42] [✔️43] [✔️44] [✔️45] [✔️46] [✔️47] [✔️48] [✔️49] 

Actual new error

(From previous versions of dask, our tests were passing (with a test that caused something like this), but now they are failing. This is due to code in dataframe.core.check_divisions:

if len(divisions[:-1]) != len(list(unique(divisions[:-1]))):

ValueError: New division must be unique, except for the last element

Expected behavior

In all of the cases above I wouldn't expect any errors. I don't know ahead of time if dataframes will be empty on individual partitions, and on reloads I want to assume I can "split" to a certain degree (even if it's creating empty partitions).

Final notes:

Of particular worry is the new (?) (I haven't seen it before, on pyarrow 0.12 which we were pinned to before now), schema errors across the dataframes, which complain about nulls existing. This makes me think that the metadata is getting written "per partition". I was able to (also intermittantly, so its hard to be sure this will work) show this behavior purely by editing data on a single partition.

Separate (but related?) issue with None's in schema...

import numpy as np
df = timeseries().reset_index().map_partitions(lambda x: x.loc[:5])
df = df.set_index('x').reset_index().persist()
df.name = df.name.where(~np.logical_and(df.x>0, df.x<0.2), None)
df.to_parquet("wow.pq", engine="pyarrow")
dd.read_parquet("wow.pq", engine="pyarrow")

raises an error on the read ValueError: Schema in /home/jawaugh/code/notebooks/justin/dask-partitions/wow.pq/part.12.parquet was different.

@TomAugspurger
Copy link
Member

Is it fair to summarize the issue as "DataFrame.repartition sometimes(?) fails on a parquet dataset` with some(?) empty partitions"?

Separate (but related?) issue with None's in schema...

Can you tell, is this an issue in pyarrow, or in how Dask is using pyarrow?

@TomAugspurger
Copy link
Member

One thing I note (for pyarrow), the divisions are different before & after reading.

ipdb> pp df_filtered.divisions
(Timestamp('2000-01-01 00:00:00', freq='D'),
 Timestamp('2000-01-07 00:00:00', freq='D'),
 Timestamp('2000-01-13 00:00:00', freq='D'),
 Timestamp('2000-01-19 00:00:00', freq='D'),
 Timestamp('2000-01-25 00:00:00', freq='D'),
 Timestamp('2000-01-31 00:00:00', freq='D'))
ipdb> pp fpq.divisions
(Timestamp('2000-01-01 12:57:17'),
 Timestamp('2000-01-09 03:59:58'),
 Timestamp('2000-01-13 12:28:09'),
 Timestamp('2000-01-19 12:21:54'),
 Timestamp('2000-01-26 16:18:39'),
 Timestamp('2000-01-26 16:18:39'))

@bluecoconut
Copy link
Contributor Author

Is it fair to summarize the issue as "DataFrame.repartition sometimes(?) fails on a parquet dataset` with some(?) empty partitions"?

Yeah, there's something odd happening in partitions / divisions on loads of parquets that is new behavior from previous versions of dask. Specifically getting the error E ValueError: New division must be unique, except for the last element, likely caused by the division rewriting before/after reading you identified. (all saves/reloads (imo) should result in the same dataframe. )

I can put together tests for a PR, but the bigger question is what changed in how parquet divisions are being handled in 2.2.0 and why?, and I haven't spent the time digging through the changes exactly yet.

Can you tell, is this an issue in pyarrow, or in how Dask is using pyarrow?

I tried digging but I couldn't easily parse and follow how the schema is being generated and passed around.

@mrocklin
Copy link
Member

mrocklin commented Aug 15, 2019 via email

@rjzamora
Copy link
Member

Thanks for raising @bluecoconut - Sorry about this! The goal of the parquet refactor was to break things down into a clear core-engine structure (consistent between arrow and fastparquet). We tried not to change/break existing behavior, but much of the code was ultimately rewritten and the test coverage was certainly not perfect.

I will take a look at the code snippets you shared here. Any other tests/information you can provide would certainly help me resolve the problems.

@bluecoconut
Copy link
Contributor Author

@rjzamora Any update on this? I don't have any better examples or tests than the ones provided.

Specifically: the two key issues are

  1. (More importantly, starting to become a huge issue) Schema changing on save/load in pyarrow. This is resulting in a lot of failed runs for us, where we have to just re-partition to single >100 GB partitions and use giant machines to prevent the schema mismatch. (Example is here:)
import numpy as np
df = timeseries().reset_index().map_partitions(lambda x: x.loc[:5])
df = df.set_index('x').reset_index().persist()
df.name = df.name.where(~np.logical_and(df.x>0, df.x<0.2), None)
df.to_parquet("wow.pq", engine="pyarrow")
dd.read_parquet("wow.pq", engine="pyarrow")

(This throws an error due to schema mismatch)

  1. Number of partitions changing on save/load (this happens on both fastparquet and pyarrow). This matters less as it does not functionally block actual runs, but it is causing our tests to fail due to the odd behaviors, and is likely something that will cause issues in the future.

@rjzamora
Copy link
Member

Thanks for the nudge @bluecoconut, and thanks for providing a specific example for (1) - I will take a look today.

Can you clarify that both cases are behaving as you want in 2.1.0 and not in 2.3.0? I think I ran some quick tests last week and found that your repartition test was behaving exactly the same for both new and old versions of dask for me - However, I'll need to check this again (might have been an environment mishap on my end)

@bluecoconut
Copy link
Contributor Author

I think the second case (2) with the partition number changing specifically showed up in the newer dask.

For the schema changing in pyarrow (1), I'm not entirely sure when the behavior started, but it seemed like it appeared after we updated from 0.12 (pinned, due to a serialization bug) to the newest (0.14) version of arrow? But this happened at the same time for us as updating dask, so without building out a compatibility matrix, i can't be sure of exactly when this particular behavior appeared.

I can try to find some time soon to see if i can pinpoint all the different versions / what changes caused what.

@rjzamora
Copy link
Member

rjzamora commented Aug 21, 2019

Just started looking at this - The 1st problem is definitly related to changes in pyarrow (rather than dask). Both new and old versions of dask seem to return the same error here. I'll se what I can do to smooth this over :)

@rjzamora
Copy link
Member

rjzamora commented Aug 23, 2019

@bluecoconut - Just an update here... I looked into the pyarrow problem for a bit, and rasied #5307 as a possible fix. However, I am not totally convinced that everything in that PR is ideal/appropriate. That is, I'm not sure that we should set "validate_schema"=False by default in any calls to pq.ParquetDataset.

In the current master branch, you should not get the schema error if you explicitly define the schema when writing the parquet dataset. From your example above:

schema = pa.schema(
    [
        ("x", pa.float64()),
        ("timestamp", pa.timestamp("ns")),
        ("id", pa.int64()),
        ("name", pa.string()),
        ("y", pa.float64()),
    ]
)
df.to_parquet("wow.pq", schema=schema, engine="pyarrow")
dd.read_parquet("wow.pq", engine="pyarrow")

Note that the schema keyword was added to to_parquet in #5150 for this very reason (I apologize for missing/forgetting this earlier). It looks like we need to document this bettor or at least recommend that the user try schema= in an error message.

If we decide not to set "validate_schema"=False by default in #5307, other changes (including a bug fix) in that PR should also allow you to avoid the schema-mismatch error without setting the schema in to_parquet by explicitly specifying "validate_schema"=False" in read_parquet:

dd.read_parquet("wow.pq", dataset={"validate_schema": False}, engine="pyarrow")

@bluecoconut
Copy link
Contributor Author

@rjzamora

I definitely understand the challenge for the pyarrow schema validation, thanks for chasing this!

For now I'm going to generate schema from the _meta and use that (with a careful attention to mapping from intended data type to arrow datatype instead of just relying on the schema from_pandas that generates nulls)

Is that a possible avenue here for a more general solution in dask? (eg. dask dataframes should be able to reliably use their meta to help inform a universal schema on save?)


Also, separate to that bug, but also part of this same issue ticket, what should the path be to fix the problem with loading partitioned parquet? (example below)

from dask.datasets import timeseries
import dask.dataframe as dd

engine = 'fastparquet'
print(f"Engine: {engine}")
for i in range(10):
    df = timeseries().persist()
    df_filtered = df[df.x>0.99999]
    print(f"Length of dataframe going in {len(df_filtered)}, Npartitions going in {df_filtered.npartitions}")
    df_filtered.to_parquet(f"{engine}{i}.pq", engine=engine)
    try:
        fpq = dd.read_parquet(f"{engine}{i}.pq", engine=engine)
    except ValueError as e:
        print(f"Load failed: {e}")
        continue
    for N in range(1, 50):
        try:
            fpq.repartition(npartitions=N)
            print(f"[✔️{N}]", end =" ")
        except ValueError:
            print(f"[✖️{N}]", end =" ")
    print()

Is this something I should put in a separate issue for / try to come up with a simpler / more straightforward test? the issue here is that the behavior is changing depending on the initialization of the timeseries dataset itself (and therefore the distribution of the time column), which affects the loaded partition divisions.

Effectively I think that there should just be something that validates that saving a partitioned dataframe and loading it results in the same divisions across many different odd inputs. (entirely empty dataframe partitions, etc.)

@rjzamora
Copy link
Member

Is that a possible avenue here for a more general solution in dask? (eg. dask dataframes should be able to reliably use their meta to help inform a universal schema on save?)

Yes - good suggestion. I think that is probably doable.

Effectively I think that there should just be something that validates that saving a partitioned dataframe and loading it results in the same divisions across many different odd inputs. (entirely empty dataframe partitions, etc.)

I didn't get to spend much time on this one yet. I understand your point about wanting the same partitions throughout the parquet round trip. Unfortunately this may be tricky. When the dataset is written to a parquet dataset, the metadata only contains row-group statistics about the data that is actually written. During read_parquet, this metadata is what we use to define the new divisions/partitions.

With that said, I do feel that a following repartition call should not fail. So, this is the part I am likely to prioritize.

@TomAugspurger
Copy link
Member

From #5307 (comment)

The current state of the PR does not close all of #5252, but it should close the "schema" component (there is also a repartition-failure discussed there).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants