Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python][Parquet] improve reading of partitioned parquet datasets whose schema changed #25089

Closed
asfimport opened this issue May 27, 2020 · 8 comments

Comments

@asfimport
Copy link

asfimport commented May 27, 2020

Hi there, i'm encountering the following issue when reading from HDFS:

 

My situation:

I have a paritioned parquet dataset in HDFS, whose recent partitions contain parquet files with more columns than the older ones. When i try to read data using pyarrow.dataset.dataset and filter on recent data, i still get only the columns that are also contained in the old parquet files. I'd like to somehow merge the schema or use the schema from parquet files from which data ends up being loaded.

when using:

pyarrow.dataset.dataset(path_to_hdfs_directory, paritioning = 'hive', filters = my_filter_expression).to_table().to_pandas()

Is there please a way to handle schema changes in a way, that the read data would contain all columns?

everything works fine when i copy the needed parquet files into a separate folder, however it is very inconvenient way of working. 

 

Environment: Ubuntu 18.04, latest miniconda with python 3.7, pyarrow 0.17.1
Reporter: Ira Saktor

Related issues:

Note: This issue was originally created as ARROW-8964. Please see the migration documentation for further details.

@asfimport
Copy link
Author

Joris Van den Bossche / @jorisvandenbossche:
What you are describing should normally already be implemented (so something else should be going wrong for some reason).
When multiple files with different schemas are read in a dataset, the dataset discovery uses a very basic "schema evoluation / normalization", which right now only involves adding missing columns as "null" values (so exactly the use case you are describing, I think). In the future we also want to allow some type evoluation (like the same columns in different files with int32 and int64 type, which right now would raise an error).

Can you show an example of just reading one of the old and one of the new files in the directory (you can pass the exact file name instead of the directory to dataset(..)) ?

@asfimport
Copy link
Author

Joris Van den Bossche / @jorisvandenbossche:
While pressing "Add comment" button, I realized we changed the dataset discovery process: it by default only checks the schema of the first file it encounters, and then will assume the full dataset has this schema. So that is probably the reason you only see the old columns even for the new files.

There are in principle two solutions for this:

  1. Manually specify the schema (where you yourself ensure it includes all columns of both old and new files)
  2. Specify that the discovery should inspect all files to infer the dataset schema, and not just the first file. The problem here is that this option is not yet exposed in the python interface ..

So for now, only the first is an option.

@asfimport
Copy link
Author

Joris Van den Bossche / @jorisvandenbossche:
Existing issue for exposing this in python: ARROW-8221

@asfimport
Copy link
Author

Ira Saktor:
Thank you very much for your fast answer. In the meantime, regarding schema specification, could you please tell me if there a way in pyarrow.dataset to read schema from specific parquet file? I could then simply pass it one of the recent parquet files to infer schema from.

I know how to load schema with pyarrow.parquet, however non-legacy dataset in parquet doesn't yet support schema specification, so i was hoping to manage this with pyarrow.dataset, if that's possible.

@asfimport
Copy link
Author

Joris Van den Bossche / @jorisvandenbossche:
Yes, if you have the path to a single parquet file, you can easily get the schema from that one.

I think something like this should work:

schema = pyarrow.dataset.dataset(path_to_hdfs_single_parquet_file, paritioning = 'hive').schema
dataset = pyarrow.dataset.dataset(path_to_hdfs_directory, paritioning = 'hive', schema=schema)
dataset.to_table(filter = my_filter_expression).to_pandas()

@asfimport
Copy link
Author

Ira Saktor:
awesome! thank you, that's what i was looking for. Looking forward to ARROW-8221 being resolved :). Should i close this task as it's pretty much a duplicate?

@asfimport
Copy link
Author

Ira Saktor:
in an unrelated question, any chance you would also know how to write timestamps to parquet files so that i can then create impala table with columns as timestamp type from the given parquets? 

@asfimport
Copy link
Author

Joris Van den Bossche / @jorisvandenbossche:
I am not familiar with Impala. Looking at https://impala.apache.org/docs/build/html/topics/impala_parquet.html#parquet_data_types, it seems that the INT64 based timestamp types (the default when writing with pyarrow) is only supported in Impala > 3.2). If you need the older INT96 representation for timestamps, there is an option use_deprecated_int96_timestamps=True you can set in pq.write_table

Should i close this task as it's pretty much a duplicate?

Yes, will close it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant