Reading data from the top of this page: http://anson.ucdavis.edu/~clarkf/

I used conda to install pyarrow: https://anaconda.org/conda-forge/pyarrow

# Parquet

Summarizing the [parquet documentation](https://parquet.apache.org/documentation/latest/):

- Goal is interoperability across Hadoop ecosystem
- Compression can be specified per column
- Handles complex nested data structures (similar to XML)
- Follows the 2010 Google Dremel paper

Column chunks are guaranteed to be contiguous within a row group. Here's a hierarcy for a table with 2 columns. There's also metadata at each level.
```
File
    Row Group 1
        Column Chunk 1
        Column Chunk 2
    Row Group 2
        Column Chunk 1
        Column Chunk 2        
    Row Group 3
        ...
```
Docs recommend configuring 1GB row group sizes, corresponding to 1 GB HDFS blocks. 

- Row based MapReduce runs in parallel across Row Groups
- IO runs in parallel across column chunks.

Advantages include pushdowns for queries, ie. only reading 1 column.

In [1]:
import os

import pandas as pd
import pyarrow.parquet as pq

Reading the dataset as below is cheap because it only reads the metadata.

Metadata is represented with Apache Thrift.

In [2]:
pems = pq.ParquetDataset("/Users/clark/data/pems/pems_sorted/")

Schema and partitioning from the database (Hive) is preserved.

In [3]:
pems.schema

<pyarrow._parquet.ParquetSchema object at 0x1149141c8>
timeperiod: BYTE_ARRAY UTF8
flow1: INT32
occupancy1: DOUBLE
speed1: DOUBLE
flow2: INT32
occupancy2: DOUBLE
speed2: DOUBLE
flow3: INT32
occupancy3: DOUBLE
speed3: DOUBLE
flow4: INT32
occupancy4: DOUBLE
speed4: DOUBLE
flow5: INT32
occupancy5: DOUBLE
speed5: DOUBLE
flow6: INT32
occupancy6: DOUBLE
speed6: DOUBLE
flow7: INT32
occupancy7: DOUBLE
speed7: DOUBLE
flow8: INT32
occupancy8: DOUBLE
speed8: DOUBLE
 

## Nulls

Quite a bit different than using IEEE NaN or a special bit pattern.

> Nullity is encoded in the definition levels (which is run-length encoded). NULL values are not encoded in the data. For example, in a non-nested schema, a column with 1000 NULLs would be encoded with run-length encoding (0, 1000 times) for the definition levels and nothing else.

Much better for sparse data.

In [4]:
onefile = pq.ParquetFile("/Users/clark/data/pems/pems_sorted/station=402260/part-r-00000-ddaee723-f3f6-4f25-a34b-3312172aa6d7.snappy.parquet")

In [5]:
onefile.num_row_groups

1

In [6]:
onefile.metadata

<pyarrow._parquet.FileMetaData object at 0x11694d598>
  created_by: parquet-mr version 1.6.0
  num_columns: 25
  num_rows: 2575
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 3389

In [7]:
onefile.metadata.metadata

{b'org.apache.spark.sql.parquet.row.metadata': b'{"type":"struct","fields":[{"name":"timeperiod","type":"string","nullable":true,"metadata":{}},{"name":"flow1","type":"integer","nullable":true,"metadata":{}},{"name":"occupancy1","type":"double","nullable":true,"metadata":{}},{"name":"speed1","type":"double","nullable":true,"metadata":{}},{"name":"flow2","type":"integer","nullable":true,"metadata":{}},{"name":"occupancy2","type":"double","nullable":true,"metadata":{}},{"name":"speed2","type":"double","nullable":true,"metadata":{}},{"name":"flow3","type":"integer","nullable":true,"metadata":{}},{"name":"occupancy3","type":"double","nullable":true,"metadata":{}},{"name":"speed3","type":"double","nullable":true,"metadata":{}},{"name":"flow4","type":"integer","nullable":true,"metadata":{}},{"name":"occupancy4","type":"double","nullable":true,"metadata":{}},{"name":"speed4","type":"double","nullable":true,"metadata":{}},{"name":"flow5","type":"integer","nullable":true,"metadata":{}},{"name":

# Apache Arrow

> Powering Columnar In-Memory Analytics

A bold claim... R, Python (Numpy), and Julia all compete in this space.

Source: [Arrow Docs](https://arrow.apache.org/)

Essentially Arrow is a specification for a memory layout, along with high performance C++ and Java implementations.

My initial experiments using it for shared memory were positive. Shared memory would be very nice for interoperability between different languages.

![Common memory](common_memory.png)

## Some thoughts

To load data from parquet into a high level language one needs to go from parquet to arrow to data structures in language X. Why not load directly from parquet to language X?

In [8]:
pems_table = pems.read(["timeperiod", "flow1", "occupancy1", "speed1"])
pems_table

pyarrow.Table
timeperiod: string
flow1: int32
occupancy1: double
speed1: double
station: dictionary<values=int64, indices=int32>
-- metadata --
org.apache.spark.sql.parquet.row.metadata: {"type":"struct","fields":[{"name":"timeperiod","type":"string","nullable":true,"metadata":{}},{"name":"flow1","type":"integer","nullable":true,"metadata":{}},{"name":"occupancy1","type":"double","nullable":true,"metadata":{}},{"name":"speed1","type":"double","nullable":true,"metadata":{}},{"name":"flow2","type":"integer","nullable":true,"metadata":{}},{"name":"occupancy2","type":"double","nullable":true,"metadata":{}},{"name":"speed2","type":"double","nullable":true,"metadata":{}},{"name":"flow3","type":"integer","nullable":true,"metadata":{}},{"name":"occupancy3","type":"double","nullable":true,"metadata":{}},{"name":"speed3","type":"double","nullable":true,"metadata":{}},{"name":"flow4","type":"integer","nullable":true,"metadata":{}},{"name":"occupancy4","type":"double","nullable":true,"metadata":{}

In [9]:
# Metadata tells us things like the shape
pems_table.shape

(3932049, 5)

In [10]:
# Approximate size of data fully in memory in MB
pems_table.shape[0] * 25 * 8 / 1e6

786.4098

In [21]:
# Compare to size on disk:
! du -h ~/data/pems/pems_sorted

7.2M	/Users/clark/data/pems/pems_sorted/station=402260
7.0M	/Users/clark/data/pems/pems_sorted/station=402261
5.4M	/Users/clark/data/pems/pems_sorted/station=402263
8.2M	/Users/clark/data/pems/pems_sorted/station=402264
8.7M	/Users/clark/data/pems/pems_sorted/station=402265
 41M	/Users/clark/data/pems/pems_sorted


In [12]:
# We can pull out underlying pieces

p1 = pems_table[1]
p1

<pyarrow.lib.Column at 0x1148952d0>

In [13]:
p1.type

DataType(int32)

In [14]:
p1.data

<pyarrow.lib.ChunkedArray at 0x1148953f0>

In [15]:
p1.data.num_chunks

1420

In [16]:
p1.data.chunk(0)

<pyarrow.lib.Int32Array object at 0x1169602c8>
[
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  ...
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0
]

In [17]:
p1.data.null_count

0

In [18]:
p1.to_pandas()

0          0
1          0
2          0
3          0
4          0
5          0
6          0
7          0
8          0
9          0
10         0
11         0
12         0
13         0
14         0
15         0
16         0
17         0
18         0
19         0
20         0
21         0
22         0
23         0
24         0
25         0
26         0
27         0
28         0
29         0
          ..
3932019    0
3932020    0
3932021    0
3932022    0
3932023    0
3932024    0
3932025    0
3932026    0
3932027    0
3932028    0
3932029    0
3932030    0
3932031    0
3932032    0
3932033    0
3932034    0
3932035    0
3932036    0
3932037    0
3932038    0
3932039    0
3932040    0
3932041    0
3932042    0
3932043    0
3932044    0
3932045    0
3932046    0
3932047    0
3932048    0
Name: flow1, dtype: int32

### Conversion to a pandas DataFrame


In [19]:
pems_df = pems_table.to_pandas()

In [20]:
pems_df.head()

Unnamed: 0,timeperiod,flow1,occupancy1,speed1,station
0,01/01/2016 00:00:05,0,0.0,0.0,402260
1,01/01/2016 00:00:35,0,0.0,0.0,402260
2,01/01/2016 00:01:06,0,0.0,0.0,402260
3,01/01/2016 00:01:35,0,0.0,0.0,402260
4,01/01/2016 00:02:05,0,0.0,0.0,402260
