Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to append to parquet file periodically and read intermediate data - pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer. #3203

Closed
dazzag24 opened this Issue Dec 17, 2018 · 7 comments

Comments

Projects
None yet
3 participants
@dazzag24
Copy link

commented Dec 17, 2018

I'm adding new data to a parquet file every 60 seconds using this code:

import os
import json
import time
import requests
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

api_url = 'https://opensky-network.org/api/states/all'

cols = ['icao24', 'callsign', 'origin', 'time_position',
        'last_contact', 'longitude', 'latitude',
        'baro_altitude', 'on_ground', 'velocity', 'true_track',
        'vertical_rate', 'sensors', 'geo_altitude', 'squawk',
        'spi', 'position_source']

def get_new_flight_info(writer):
    print("Requesting new data")
    req = requests.get(api_url)
    content = req.json()

    states = content['states']
    df = pd.DataFrame(states, columns = cols)
    df['timestamp'] = content['time']
    print("Found {} new items".format(len(df)))

    table = pa.Table.from_pandas(df)
    if writer is None:
        writer = pq.ParquetWriter('openskyflights.parquet', table.schema)
    writer.write_table(table=table)
    return writer

if __name__ == '__main__':
    writer = None 
    while (not os.path.exists('opensky.STOP')):
        writer = get_new_flight_info(writer)
        time.sleep(60)

    if writer:
        writer.close()

This is working fine and the file grows every 60 seconds.
However unless I force the loop to exit I am unable to use the parquet file. In a separate terminal I try to access the parquet file using this code:

import pandas as pd
import pyarrow.parquet as pq

table = pq.read_table("openskyflights.parquet")
df = table.to_pandas()
print(len(df))

which results in this error:

Traceback (most recent call last):
  File "checkdownloadsize.py", line 7, in <module>
    table = pq.read_table("openskyflights.parquet")
  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 1074, in read_table
    use_pandas_metadata=use_pandas_metadata)
  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/filesystem.py", line 182, in read_parquet
    filesystem=self)
  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 882, in __init__
    self.validate_schemas()
  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 895, in validate_schemas
    self.schema = self.pieces[0].get_metadata(open_file).schema
  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 453, in get_metadata
    return self._open(open_file_func).metadata
  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 459, in _open
    reader = open_file_func(self.path)
  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 984, in open_file
    common_metadata=self.common_metadata)
  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 102, in __init__
    self.reader.open(source, metadata=metadata)
  File "pyarrow/_parquet.pyx", line 639, in pyarrow._parquet.ParquetReader.open
  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.

Is there a way to achieve this?
I'm assuming that if I call writer.close() in the while loop then it will prevent any further data being written to the file? Is there some kind of "flush" operation that can be used to ensure all data is written to disk and available to other processes or threads that want to read the data?

Thanks

@wesm

This comment has been minimized.

Copy link
Member

commented Dec 17, 2018

Can you write to the mailing list?

@dazzag24

This comment has been minimized.

Copy link
Author

commented Dec 17, 2018

Sorry, which mailing list is that?

@wesm

This comment has been minimized.

Copy link
Member

commented Dec 17, 2018

@dazzag24

This comment has been minimized.

Copy link
Author

commented Dec 18, 2018

Done

@fsaintjacques

This comment has been minimized.

Copy link
Contributor

commented Dec 18, 2018

I don't see any email?

@wesm

This comment has been minimized.

Copy link
Member

commented Dec 18, 2018

I don't think I did either. @dazzag24 make sure to first subscribe to the mailing list you write to

@dazzag24

This comment has been minimized.

Copy link
Author

commented Dec 18, 2018

@wesm wesm closed this Jan 27, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.