Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to output Hive/Impala compatible timestamps #82

Closed
andrewgross opened this issue Feb 1, 2017 · 21 comments
Closed

Add ability to output Hive/Impala compatible timestamps #82

andrewgross opened this issue Feb 1, 2017 · 21 comments

Comments

@andrewgross
Copy link

andrewgross commented Feb 1, 2017

Hey,

I am doing some work with Amazon's Athena (Presto under the hood) system and using fastparquet to convert JSON files to parquet format. However, when I output datetime[ns] fields and read them in Athena, I get incorrect results:

Actual Dates:

"2016-08-08 23:08:22"
"2016-08-08 23:08:22"
"2016-08-08 23:08:22"
"2016-08-08 23:08:22"
"2016-08-08 23:08:22"
"2016-08-08 23:08:22"
"2016-08-08 23:08:23"
"2016-08-08 23:08:22"
"2016-08-08 23:08:22"
"2016-08-08 23:08:22"

Returned Dates:

"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:43:20.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"

This is because Hive (and Impala) use a different timestamp format (int96) than the Parquet default. Check out these posts for more details:

  1. http://stackoverflow.com/questions/28292876/hives-timestamp-is-same-as-parquets-timestamp
  2. https://community.mapr.com/thread/18883-getting-weird-output-for-date-timestamp-data-type-columns-while-selecting-data-from-parquet-file-in-drill
  3. https://github.com/Parquet/parquet-mr/issues/218

It would be helpful if it used the compatible TIMESTAMP format when writing with file_scheme='hive'.

@martindurant
Copy link
Member

martindurant commented Feb 3, 2017

An option to output dates in int96 is certainly doable, but I am surprised, as the parquet standard (https://github.com/Parquet/parquet-format/blob/master/LogicalTypes.md) makes clear that these are intervals of various accuracy, not timestamps.

Have you any idea how those values above relate to the input? Perhaps a snippet of code would help.

-edit-
I would guess it's simply a factor of 1000, do you need to specify that the dates units are ms or ns?

@martindurant
Copy link
Member

Could you please produce for me some parquet data from impala containing such a column, and the same data in another format (csv, whatever), so that I can do testing?

@andrewgross
Copy link
Author

andrewgross commented Feb 8, 2017

Hey, sorry for the delay in getting back to you, I will layout the data and the steps I used to produce it.

Source Data

Here is the JSON data I used as a source for these tests. Each line is one record, and a newline delineates a new record. There are two columns. id is a simple string based identifier, and date_added which is in the format YYYY-MM-DD HH:MM:SS.

{"id": "1", "date_added": "2016-08-01 23:08:01"}
{"id": "2", "date_added": "2016-08-02 23:08:02"}
{"id": "3", "date_added": "2016-08-03 23:08:03"}
{"id": "4", "date_added": "2016-08-04 23:08:04"}
{"id": "5", "date_added": "2016-08-05 23:08:04"}
{"id": "6", "date_added": "2016-08-06 23:08:05"}
{"id": "7", "date_added": "2016-08-07 23:08:06"}
{"id": "8", "date_added": "2016-08-08 23:08:07"}
{"id": "9", "date_added": "2016-08-09 23:08:08"}
{"id": "10", "date_added": "2016-08-10 23:08:09"}

For my tests I saved this in a file called parquet_demo.json.

Creating Parquet with Hive

I uploaded the JSON data to S3, and used Amazons EMR service with Hive to do the parquet conversion with the following HiveQL commands (the s3 buckets are not public, so to duplicate you would need to create your own):

DROP TABLE IF EXISTS parquet_format_test_json;
CREATE EXTERNAL TABLE parquet_format_test_json (
    id STRING,
    date_added TIMESTAMP
) ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 's3://redshift-scratch/parquet_test';

DROP TABLE IF EXISTS parquet_format_test_parq;
CREATE EXTERNAL TABLE parquet_format_test_parq (
    id STRING,
    date_added TIMESTAMP
) STORED AS PARQUET
LOCATION 's3://yipit-kinesis-test/test_custom_parquet';

INSERT INTO TABLE parquet_format_test_parq
SELECT *
FROM parquet_format_test_json;

This simple maps two tables. One to the source JSON file, and one to the output location of the Parquet file. The tables have different formats, and when we INSERT between them Hive will take care of the conversion between formats.

Creating Parquet with FastParquet

Here is the python3script I used to convert the JSON input file to a pandas dataframe, and then into a parquet file.

import json
import pandas
import datetime
from fastparquet import write


FILE_LOCATION = "/Users/awgross/parquet_demo.json"
OUTPUT_LOCATION = "fastparquet.parq"

DATA_TYPES = {
  "id": "object",
  "date_added": "datetime64[s]",
}

def get_data(file_location):
    with open(file_location) as f:
        merged_data = []
        for line in f.readlines():
            data = json.loads(line)
            dt = datetime.datetime.strptime(data["date_added"], '%Y-%m-%d %H:%M:%S')
            data["date_added"] = dt.timestamp()
            merged_data.append(data)
        return merged_data

def create_data_frame(data):
    df = pandas.DataFrame.from_records(data)
    for k, v in DATA_TYPES.items():
        df[k] = df[k].astype(v)
    return df

def write_dataframe(df, location):
    write(location, df)

if __name__ == "__main__":
    data = get_data(FILE_LOCATION)
    df = create_data_frame(data)
    write_dataframe(df, OUTPUT_LOCATION)

Reading the Data in Athena

Athena is an AWS service based off Presto. It has the capability to read Parquet files and uses HiveQL to query data. I uploaded both parquet files to S3 and queried their contents using Athena.

## Hive
CREATE EXTERNAL TABLE hive_parquet_test (
    id STRING,
    date_added TIMESTAMP
) STORED AS PARQUET
LOCATION 's3://yipit-test/test_hive_parquet';

SELECT * FROM hive_parquet_test;

Here is a screenshot of the Hive Parquet file loaded in Athena.

## Fastparquet
CREATE EXTERNAL TABLE fastparquet_test (
    id STRING,
    date_added TIMESTAMP
) STORED AS PARQUET
LOCATION 's3://yipit-test/test_fastparquet';

SELECT * FROM fastparquet_test;

Here is a screenshot of the Fastparquet file loaded in Athena.

As you can see there is something wrong with the TIMESTAMP field.

Here are links to download the Parquet files and CSVs of the results.

Hive Files

Fastparquet Files

@martindurant
Copy link
Member

Thanks for the detailed description, I'll look into it.

@martindurant
Copy link
Member

OK, I am convinced I can make the appropriate converter for you, and that this can be an option when writing.

I am puzzled at how the hive reader knows that the field should be interpreted as datetime, do you always need to specify TIMESTAMP by hand? parquet is supposed to be self-describing, and the relevant schema element says nothing. Do you think I need to presume that all columns encoded as int96 coming from parquet-mr are in fact timestamps, or is this yet another option the user needs to select?

@andrewgross
Copy link
Author

Hive should know the field needs to be interpreted as a TIMESTAMP because thats how I specify the schema, but I am not reading in the Parquet files with Hive. I assume Athena is similar because I must specify the schema as well, so it might be ignoring whatever is specified in the parquet file.

To be honest, I am not sure about how to deal with the int96 fields. This is the first time I have heard of them used like this, but I am not very experienced with these formats, so it might be safer to force the user to specify conversion.

@andrewgross
Copy link
Author

Another side note for Athena. If I set the fastparquet output format as hive, I will get the folder with separate metadata files, instead of the single combined file. However, I can take just the data section of the output and load that and get the same results from queries. This leads me to believe that they are not reading the schema from the parquet file, but instead relying on what is specified by the user when creating the EXTERNAL TABLE.

@martindurant
Copy link
Member

On your question about multiple files: the schema plus the relevant row-group definition is copied in every data file, as well as stored in _metadata, so they can be read individually. The only difference is that _metadata contains all the row-groups and relative paths linking to the other files - but has offsets so that reading from one of those files doesn't mean reading the metadata copy again.

@andrewgross
Copy link
Author

I see, thanks for the clarification.

@martindurant
Copy link
Member

By the way, a much better way of ingesting your data into pandas:

df = pd.DataFrame([json.loads(s) for s in t])  # t is open text file.
df = df.astype({'id': int, 'date_added': 'M8[ns]'})

@martindurant
Copy link
Member

...and the final thought is, that you could probably have got the right answer with the original fastparquet behaviour by declaring the field as int rather than timestamp, and using whatever athena's unix time function is.

@andrewgross
Copy link
Author

Hey,

Thanks for all the help on this. I have tried your suggestions, but with no luck.

  1. Using your sample code to ingest data did not work unfortunately. When setting the data types it keeps insisting that I need to pass a tuple of the type info, instead of a string, but it would not accept the formatted tuples. Potentially an issue with my library versions? pandas==0.18.1 and numpy==1.11.1.

  2. I tried using my original method but changed datetime64[ns] to M8[ns] with no luck. I had the same issue. Changing the endian-ness with <M8 and >M8 did not help either, it just garbled the data in new and interesting ways.

  3. I tried creating the table with date_added as an int, but when querying the service on a date range by using to_unixtime(from_iso8601_timestamp('2016-08-02T00:00:00+00:00')) it gave me Internal Error. This is more an issue with Athena, but nonetheless is still a blocker.

@martindurant
Copy link
Member

martindurant commented Feb 9, 2017

  1. I have pandas 0.19, so that's probably the difference
  2. datetime64[ns] and M8[ns] are roughly equivalent (the former is the Pandas string version of the latter). I meant that you should output the parquet file using fastparquet as before, but do something like
CREATE EXTERNAL TABLE fastparquet_test (
    id STRING,
    date_added BIGINT
) STORED AS PARQUET
LOCATION 's3://yipit-test/test_fastparquet';

SELECT id, time_from_unix(date_added) FROM fastparquet_test;

where BIGINT and time_from_unix are my guesses of the appropriate athena terms. HiveQL seems to need the integer in seconds, and the data has it in us, so you would need from_unixtime(date_added / 1000000).

Have you tried the new output with MR-times in #83 ?

@andrewgross
Copy link
Author

I will work on that now. Thanks a ton for all your help and advice on this!

@andrewgross
Copy link
Author

So, I failed to properly install your PR when testing. Using the new code with times='mr' when writing I got it working! Thanks again for all of your help. I might spend some time tinkering with my setup to see if I can get this working python2.7, but if it has too many issues I might just setup infrastructure for python3 since we will need it going forward anyways.

If you are ever around NYC let me know, I owe you dinner or some drinks.

@martindurant
Copy link
Member

Excellent. PR #66 is almost ready for py2 support, and I expect both of these to be merged soon.

@andrewgross
Copy link
Author

Merged the two PRs together on my own fork and it works in python2.7

@otmezger
Copy link

I'm having the same problem, what was the solution here? It is strange that this still happens 2 years after this thread. any ideas why?

@martindurant
Copy link
Member

What isn't working? The above thread seems to give full instructions on what to do, and has been tested to work.

@otmezger
Copy link

I have a similar pandas df with dates, and I'm using fastparquet to write it as parquet file. Then, in Athena, I see the dates inflated. Querying as from_unixtime(to_unixtime(seen_time)/1000) AS seen_time2 produces correct dates, which confirms the same error @andrewgross was having here.

If I import the parquet files into pandas again, the dates appear correctly.

My problem is that I didn't understood what I need to do from pandas/python 3.6 in order to produce a parquet file that athena can read without this limitation.

@martindurant
Copy link
Member

I would say this is athena's problem, it doesn't respect the parquet convention on the datetime logical type https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp . Since you can get the times correctly, is this a limitation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants