In [1]:
import pyarrow as pa

In [2]:
mmap = pa.memory_map("/data/nyc-taxi/yellow_tripdata_2015-01.csv")

In [3]:
print("RSS: {} MB".format(pa.total_allocated_bytes() >> 20))

RSS: 0 MB


In [4]:
mmap.tell()

0

In [5]:
from pyarrow.csv import open_csv

reader = open_csv(mmap)
reader

<pyarrow._csv.CSVStreamingReader at 0x7f2c72482720>

In [6]:
mmap.tell()

35651584

In [7]:
reader.schema

VendorID: int64
tpep_pickup_datetime: timestamp[s]
tpep_dropoff_datetime: timestamp[s]
passenger_count: int64
trip_distance: double
pickup_longitude: double
pickup_latitude: double
RateCodeID: int64
store_and_fwd_flag: string
dropoff_longitude: double
dropoff_latitude: double
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double

In [8]:
import pyarrow.parquet as pq

In [9]:
!mkdir /data/test-output

In [10]:
with pq.ParquetWriter("/data/test-output/yellow_tripdata_2015-01.parquet", reader.schema) as writer:
    ii = 0
    while True:
        try:
            batch = reader.read_next_batch()
            writer.write_batch(batch)
            ii += 1
            if ii % 100 == 0:
                print("RSS: {} MB".format(pa.total_allocated_bytes() >> 20))
        except StopIteration:
            print("End of file!")
            break

RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
RSS: 9 MB
End of file!


In [15]:
!du -h /data/test-output/*

479M	/data/test-output/yellow_tripdata_2015-01.parquet


In [16]:
!du -h /data/nyc-taxi/*

1.9G	/data/nyc-taxi/yellow_tripdata_2015-01.csv


In [17]:
reloaded_nyc = pq.read_table("/data/test-output/yellow_tripdata_2015-01.parquet")

In [18]:
reloaded_nyc.schema

VendorID: int64
tpep_pickup_datetime: timestamp[ms]
tpep_dropoff_datetime: timestamp[ms]
passenger_count: int64
trip_distance: double
pickup_longitude: double
pickup_latitude: double
RateCodeID: int64
store_and_fwd_flag: string
dropoff_longitude: double
dropoff_latitude: double
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double

In [19]:
import io

buf = io.BytesIO()
buf

with pa.ipc.new_stream(buf, reloaded_nyc.schema) as writer:
    for index, batch in enumerate(reloaded_nyc.to_batches()):
        writer.write_batch(batch)
        if index > 5:
            break

print(writer.stats)

WriteStats(num_messages=8, num_record_batches=7, num_dictionary_batches=0, num_dictionary_deltas=0, num_replaced_dictionaries=0)


In [20]:
buf.seek(0)

with pa.ipc.open_stream(buf) as reader:
    schema = reader.schema
    batches = [b for b in reader]

pa.Table.from_batches(batches)

pyarrow.Table
VendorID: int64
tpep_pickup_datetime: timestamp[ms]
tpep_dropoff_datetime: timestamp[ms]
passenger_count: int64
trip_distance: double
pickup_longitude: double
pickup_latitude: double
RateCodeID: int64
store_and_fwd_flag: string
dropoff_longitude: double
dropoff_latitude: double
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
----
VendorID: [[2,1,1,1,1,1,1,1,1,1,...,2,2,2,2,2,2,2,2,2,2],[2,2,1,1,1,1,1,1,1,1,...,2,2,2,2,2,2,2,2,2,2],[2,2,2,2,2,2,2,2,2,2,...,2,2,2,2,2,2,2,2,2,2],[2,2,2,2,2,2,2,2,2,2,...,2,2,2,2,1,1,1,2,2,1],[1,1,1,2,2,2,2,1,1,1,...,1,1,1,1,1,1,1,1,1,1],[1,1,1,1,1,1,1,1,2,2,...,1,1,1,1,1,1,1,1,1,1],[1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1]]
tpep_pickup_datetime: [[2015-01-15 19:05:39.000,2015-01-10 20:33:38.000,2015-01-10 20:33:38.000,2015-01-10 20:33:39.000,2015-01-10 20:33:39.000,2015-01-10 20:33:39.000,2015-01-10 20:33:39.000,2015-01-10 20:3