Show Parquet / Pyarrow API.

## Imports

In [1]:
import os
import random

import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq

In [16]:
def get_df(num_days: int = 15) -> pd.DataFrame:
    """
    Create Pandas random data, like:

    ```
                idx instr  val1  val2
    2000-01-01    0     A    99    30
    2000-01-02    0     A    54    46
    2000-01-03    0     A    85    86
    ```
    """
    instruments = "A B C D E".split()
    start_idx = pd.Timestamp("2000-01-01")
    end_idx = start_idx + pd.Timedelta(days=num_days - 1)
    df_idx = pd.date_range(
        start_idx, end_idx, freq="1D"
    )
    # print(df_idx)
    random.seed(1000)
    df = []
    for idx, inst in enumerate(instruments):
        df_tmp = pd.DataFrame(
            {
                "idx": idx,
                "instr": inst,
                "val1": [random.randint(0, 100) for k in range(len(df_idx))],
                "val2": [random.randint(0, 100) for k in range(len(df_idx))],
            },
            index=df_idx,
        )
        # print(df_tmp)
        df.append(df_tmp)
    df = pd.concat(df)
    return df

get_df()

Unnamed: 0,idx,instr,val1,val2
2000-01-01,0,A,99,30
2000-01-02,0,A,54,46
2000-01-03,0,A,85,86
2000-01-04,0,A,97,62
2000-01-05,0,A,12,25
...,...,...,...,...
2000-01-11,4,E,59,9
2000-01-12,4,E,48,17
2000-01-13,4,E,50,70
2000-01-14,4,E,66,34


In [80]:
def df_to_str(df: pd.DataFrame) -> str:
    txt = ""
    txt += "# df=\n%s" % df.head(3)
    txt += "\n...\n"
    # Remove first line with index.
    txt += "\n".join(str(df.tail(3)).split("\n")[1:])
    txt += "\n# df.shape=\n%s" % str(df.shape)
    txt += "\n# df.dtypes=\n%s" % str(df.dtypes)
    return txt

# Save and load all data in one file

In [78]:
df = get_df()
print(df_to_str(df))

# df=
            idx instr  val1  val2
2000-01-01    0     A    99    30
2000-01-02    0     A    54    46
2000-01-03    0     A    85    86
...
2000-01-13    4     E    50    70
2000-01-14    4     E    66    34
2000-01-15    4     E     3    26
# df.shape=
(75, 4)
# df.dtypes=
idx       int64
instr    object
val1      int64
val2      int64
dtype: object


In [81]:
df.to_csv("df.csv")

!ls -lh df.csv

-rw-r--r-- 1 root root 1.6K Feb  6 10:22 df.csv


In [39]:
# Transform a Pandas df into a Python Arrow object.
table = pa.Table.from_pandas(df)

print("# table=\n%s" % table)

# table=
pyarrow.Table
idx: int64
instr: string
val1: int64
val2: int64
__index_level_0__: timestamp[ns]
----
idx: [[0,0,0,0,0,...,4,4,4,4,4]]
instr: [["A","A","A","A","A",...,"E","E","E","E","E"]]
val1: [[99,54,85,97,12,...,59,48,50,66,3]]
val2: [[30,46,86,62,25,...,9,17,70,34,26]]
__index_level_0__: [[2000-01-01 00:00:00.000000000,2000-01-02 00:00:00.000000000,2000-01-03 00:00:00.000000000,2000-01-04 00:00:00.000000000,2000-01-05 00:00:00.000000000,...,2000-01-11 00:00:00.000000000,2000-01-12 00:00:00.000000000,2000-01-13 00:00:00.000000000,2000-01-14 00:00:00.000000000,2000-01-15 00:00:00.000000000]]


In [40]:
# Save.
file_name = "df_in_one_file.pq"
pq.write_table(table, file_name)

!ls -lh df_in_one_file.pq

-rw-r--r-- 1 root root 4.7K Feb  6 10:10 df_in_one_file.pq


In [41]:
# Load.
df2 = pq.read_table(file_name)

# Convert to Pandas: types are conserved.
df2 = df2.to_pandas()
print(df_to_str(df2))

# df=
            idx instr  val1  val2
2000-01-01    0     A    99    30
2000-01-02    0     A    54    46
2000-01-03    0     A    85    86
# df.shape=
(75, 4)
# df.dtypes=
idx       int64
instr    object
val1      int64
val2      int64
dtype: object


## Scalability

In [35]:
for num_rows in (15, 100, 1000, 10000):
    print("# num_rows=", num_rows)
    #
    df = get_df(num_rows)
    df.to_csv("df_tmp.csv")
    !ls -lh df_tmp.csv
    #
    df.to_csv("df_tmp.csv.gz")
    !ls -lh df.csv.gz
    #
    file_name = "df_tmp_in_one_file.pq"
    table = pa.Table.from_pandas(df)
    pq.write_table(table, file_name)
    !ls -lh df_in_one_file.pq

# num_rows= 15
-rw-r--r-- 1 root root 1.6K Feb  6 10:10 df_tmp.csv
-rw-r--r-- 1 root root 241K Feb  6 10:10 df.csv.gz
-rw-r--r-- 1 root root 223K Feb  6 10:10 df_in_one_file.pq
# num_rows= 100
-rw-r--r-- 1 root root 11K Feb  6 10:10 df_tmp.csv
-rw-r--r-- 1 root root 241K Feb  6 10:10 df.csv.gz
-rw-r--r-- 1 root root 223K Feb  6 10:10 df_in_one_file.pq
# num_rows= 1000
-rw-r--r-- 1 root root 102K Feb  6 10:10 df_tmp.csv
-rw-r--r-- 1 root root 241K Feb  6 10:10 df.csv.gz
-rw-r--r-- 1 root root 223K Feb  6 10:10 df_in_one_file.pq
# num_rows= 10000
-rw-r--r-- 1 root root 1017K Feb  6 10:10 df_tmp.csv
-rw-r--r-- 1 root root 241K Feb  6 10:10 df.csv.gz
-rw-r--r-- 1 root root 223K Feb  6 10:10 df_in_one_file.pq


## Read a subset of columns

In [43]:
# Load only two columns.
df2 = pq.read_table(file_name, columns=["idx", "val1"])
#print(df2)

df2 = df2.to_pandas()
print(df_to_str(df2))

# df=
   idx  val1
0    0    99
1    0    54
2    0    85
# df.shape=
(75, 2)
# df.dtypes=
idx     int64
val1    int64
dtype: object


# Partitioned dataset

from https://arrow.apache.org/docs/python/dataset.html#reading-partitioned-data

- A dataset can exploit a nested structure, where the sub-dir names hold information about which subset of the data is stored in that dir
- E.g., "hive" partitioning scheme "key=vale" dir names

In [89]:
df = get_df()
#display(df)
print(df_to_str(df))

# df=
            idx instr  val1  val2
2000-01-01    0     A    99    30
2000-01-02    0     A    54    46
2000-01-03    0     A    85    86
...
2000-01-13    4     E    50    70
2000-01-14    4     E    66    34
2000-01-15    4     E     3    26
# df.shape=
(75, 4)
# df.dtypes=
idx       int64
instr    object
val1      int64
val2      int64
dtype: object


In [107]:
# Clean up dir.
base = "."
dir_name = os.path.join(base, "pq_partitioned1")
os.system("rm -rf %s" % dir_name)

# Save data in a partitioned ways using `idx` as partitioning key.
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, dir_name, partition_cols=["idx"])

!find {dir_name}

./pq_partitioned1
./pq_partitioned1/idx=0
./pq_partitioned1/idx=0/d287f659e7e54d1f886494532187eb03-0.parquet
./pq_partitioned1/idx=1
./pq_partitioned1/idx=1/d287f659e7e54d1f886494532187eb03-0.parquet
./pq_partitioned1/idx=2
./pq_partitioned1/idx=2/d287f659e7e54d1f886494532187eb03-0.parquet
./pq_partitioned1/idx=3
./pq_partitioned1/idx=3/d287f659e7e54d1f886494532187eb03-0.parquet
./pq_partitioned1/idx=4
./pq_partitioned1/idx=4/d287f659e7e54d1f886494532187eb03-0.parquet


In [96]:
# Read data back.
dataset = ds.dataset(dir_name, format="parquet", partitioning="hive")
print("\n".join(dataset.files))

./pq_partitioned1/idx=0/26957e442e9548e79e15923b40b336a5-0.parquet
./pq_partitioned1/idx=1/26957e442e9548e79e15923b40b336a5-0.parquet
./pq_partitioned1/idx=2/26957e442e9548e79e15923b40b336a5-0.parquet
./pq_partitioned1/idx=3/26957e442e9548e79e15923b40b336a5-0.parquet
./pq_partitioned1/idx=4/26957e442e9548e79e15923b40b336a5-0.parquet


In [97]:
# Read everything.
df2 = dataset.to_table().to_pandas()

print(df_to_str(df2))

# df=
           instr  val1  val2  idx
2000-01-01     A    99    30    0
2000-01-02     A    54    46    0
2000-01-03     A    85    86    0
...
2000-01-13     E    50    70    4
2000-01-14     E    66    34    4
2000-01-15     E     3    26    4
# df.shape=
(75, 4)
# df.dtypes=
instr    object
val1      int64
val2      int64
idx       int32
dtype: object


In [98]:
# Load a subset of rows of the data.
df2 = dataset.to_table(filter=ds.field("idx") == 1).to_pandas()
display(df2)
# Note that type of partitioning key sometimes is slightly changed.
print(df_to_str(df2))

Unnamed: 0,instr,val1,val2,idx
2000-01-01,B,18,22,1
2000-01-02,B,59,89,1
2000-01-03,B,91,90,1
2000-01-04,B,90,13,1
2000-01-05,B,53,32,1
2000-01-06,B,88,20,1
2000-01-07,B,25,36,1
2000-01-08,B,2,11,1
2000-01-09,B,97,0,1
2000-01-10,B,68,40,1


# df=
           instr  val1  val2  idx
2000-01-01     B    18    22    1
2000-01-02     B    59    89    1
2000-01-03     B    91    90    1
...
2000-01-13     B    98    32    1
2000-01-14     B    27    37    1
2000-01-15     B    69    66    1
# df.shape=
(15, 4)
# df.dtypes=
instr    object
val1      int64
val2      int64
idx       int32
dtype: object


In [99]:
df2 = dataset.to_table(filter=ds.field("idx") < 3).to_pandas()
display(df2)
print(df_to_str(df2))

Unnamed: 0,instr,val1,val2,idx
2000-01-01,A,99,30,0
2000-01-02,A,54,46,0
2000-01-03,A,85,86,0
2000-01-04,A,97,62,0
2000-01-05,A,12,25,0
2000-01-06,A,50,87,0
2000-01-07,A,45,85,0
2000-01-08,A,8,46,0
2000-01-09,A,59,29,0
2000-01-10,A,21,58,0


# df=
           instr  val1  val2  idx
2000-01-01     A    99    30    0
2000-01-02     A    54    46    0
2000-01-03     A    85    86    0
...
2000-01-13     C    79    57    2
2000-01-14     C    62    83    2
2000-01-15     C    15    91    2
# df.shape=
(45, 4)
# df.dtypes=
instr    object
val1      int64
val2      int64
idx       int32
dtype: object


## Add year-month partitions

In [102]:
df = get_df(num_days=100)
df["year"] = df.index.year
df["month"] = df.index.month

print(df_to_str(df))

# df=
            idx instr  val1  val2  year  month
2000-01-01    0     A    99    64  2000      1
2000-01-02    0     A    54    42  2000      1
2000-01-03    0     A    85    44  2000      1
...
2000-04-07    4     E    35    77  2000      4
2000-04-08    4     E    53    26  2000      4
2000-04-09    4     E    54    82  2000      4
# df.shape=
(500, 6)
# df.dtypes=
idx       int64
instr    object
val1      int64
val2      int64
year      int64
month     int64
dtype: object


In [109]:
# Convert to Pyarrow.
table = pa.Table.from_pandas(df)
#print("table=\n%s" % table)

# Clean up dir.
base = "."
dir_name = os.path.join(base, "pq_partitioned2")
os.system("rm -rf %s" % dir_name)

# Save it using 3 partitioning keys.
pq.write_to_dataset(table, dir_name, partition_cols=["idx", "year", "month"])

# Show data structure.
!find $dir_name

./pq_partitioned2
./pq_partitioned2/idx=0
./pq_partitioned2/idx=0/year=2000
./pq_partitioned2/idx=0/year=2000/month=1
./pq_partitioned2/idx=0/year=2000/month=1/62a28645c8b34a6b9e9495d39bfe61ef-0.parquet
./pq_partitioned2/idx=0/year=2000/month=2
./pq_partitioned2/idx=0/year=2000/month=2/62a28645c8b34a6b9e9495d39bfe61ef-0.parquet
./pq_partitioned2/idx=0/year=2000/month=3
./pq_partitioned2/idx=0/year=2000/month=3/62a28645c8b34a6b9e9495d39bfe61ef-0.parquet
./pq_partitioned2/idx=0/year=2000/month=4
./pq_partitioned2/idx=0/year=2000/month=4/62a28645c8b34a6b9e9495d39bfe61ef-0.parquet
./pq_partitioned2/idx=1
./pq_partitioned2/idx=1/year=2000
./pq_partitioned2/idx=1/year=2000/month=1
./pq_partitioned2/idx=1/year=2000/month=1/62a28645c8b34a6b9e9495d39bfe61ef-0.parquet
./pq_partitioned2/idx=1/year=2000/month=2
./pq_partitioned2/idx=1/year=2000/month=2/62a28645c8b34a6b9e9495d39bfe61ef-0.parquet
./pq_partitioned2/idx=1/year=2000/month=3
./pq_partitioned2/idx=1/year=2000/month=3/62a28645c8b34a6b9e94

In [67]:
!ls $dir_name/idx=0/year=2000/month=1

b3affcffc8f4457f956eef1b75a86d4c-0.parquet


In [110]:
# Read data back.
dataset = ds.dataset(dir_name, format="parquet", partitioning="hive")

# Read only one tile back.
df2 = dataset.to_table(filter=ds.field("idx") == 2).to_pandas()
print(df_to_str(df2))

# df=
           instr  val1  val2  idx  year  month
2000-01-01     C     3    80    2  2000      1
2000-01-02     C    45   100    2  2000      1
2000-01-03     C    62    94    2  2000      1
...
2000-04-07     C    10    42    2  2000      4
2000-04-08     C    90    70    2  2000      4
2000-04-09     C    46    91    2  2000      4
# df.shape=
(100, 6)
# df.dtypes=
instr    object
val1      int64
val2      int64
idx       int32
year      int32
month     int32
dtype: object


## Partition manually

In [116]:
# We could scan manually the df and create the dirs manually.
base = "."
dir_name = os.path.join(base, "pq_partitioned4")
os.system("rm -rf %s" % dir_name)

# Extract the schema.
schemas = []
schema = pa.Table.from_pandas(df).schema
print(schema)

# grouped = df.groupby(lambda x: x.day)
group_by_idx = df.groupby("idx")
for idx, df_tmp in group_by_idx:
    print("idx=%s -> df.shape=%s" % (idx, str(df_tmp.shape)))
    #
    group_by_year = df_tmp.groupby(lambda x: x.year)
    for year, df_tmp2 in group_by_year:
        print("year=%s -> df.shape=%s" % (year, str(df_tmp2.shape)))
        #
        group_by_month = df_tmp2.groupby(lambda x: x.month)
        for month, df_tmp3 in group_by_month:
            print("month=%s -> df.shape=%s" % (month, str(df_tmp3.shape)))
            # file_name = "df_in_one_file.pq"
            # pq.write_table(table, file_name)
            # /app/data/idx=0/year=2000/month=1/02e3265d515e4fb88ebe1a72a405fc05.parquet
            subdir_name = os.path.join(
                dir_name, f"idx={idx}", f"year={year}", f"month={month}"
            )
            table = pa.Table.from_pandas(df_tmp3, schema=schema)
            schemas.append(table.schema)
            # print(df_tmp3)
            # print(table.schema)
            #             pq.write_to_dataset(table,
            #                     subdir_name, schema=schema)
            file_name = os.path.join(subdir_name, "df_out.pq")
            #hio.create_enclosing_dir(file_name)
            os.makedirs(os.path.dirname(file_name), exist_ok=True)
            pq.write_table(table, file_name)

idx: int64
instr: string
val1: int64
val2: int64
year: int64
month: int64
__index_level_0__: timestamp[ns]
-- schema metadata --
pandas: '{"index_columns": ["__index_level_0__"], "column_indexes": [{"na' + 976
idx=0 -> df.shape=(100, 6)
year=2000 -> df.shape=(100, 6)
month=1 -> df.shape=(31, 6)
month=2 -> df.shape=(29, 6)
month=3 -> df.shape=(31, 6)
month=4 -> df.shape=(9, 6)
idx=1 -> df.shape=(100, 6)
year=2000 -> df.shape=(100, 6)
month=1 -> df.shape=(31, 6)
month=2 -> df.shape=(29, 6)
month=3 -> df.shape=(31, 6)
month=4 -> df.shape=(9, 6)
idx=2 -> df.shape=(100, 6)
year=2000 -> df.shape=(100, 6)
month=1 -> df.shape=(31, 6)
month=2 -> df.shape=(29, 6)
month=3 -> df.shape=(31, 6)
month=4 -> df.shape=(9, 6)
idx=3 -> df.shape=(100, 6)
year=2000 -> df.shape=(100, 6)
month=1 -> df.shape=(31, 6)
month=2 -> df.shape=(29, 6)
month=3 -> df.shape=(31, 6)
month=4 -> df.shape=(9, 6)
idx=4 -> df.shape=(100, 6)
year=2000 -> df.shape=(100, 6)
month=1 -> df.shape=(31, 6)
month=2 -> df.shape=(29, 6)


In [117]:
!find $dir_name

./pq_partitioned4
./pq_partitioned4/idx=0
./pq_partitioned4/idx=0/year=2000
./pq_partitioned4/idx=0/year=2000/month=1
./pq_partitioned4/idx=0/year=2000/month=1/df_out.pq
./pq_partitioned4/idx=0/year=2000/month=2
./pq_partitioned4/idx=0/year=2000/month=2/df_out.pq
./pq_partitioned4/idx=0/year=2000/month=3
./pq_partitioned4/idx=0/year=2000/month=3/df_out.pq
./pq_partitioned4/idx=0/year=2000/month=4
./pq_partitioned4/idx=0/year=2000/month=4/df_out.pq
./pq_partitioned4/idx=1
./pq_partitioned4/idx=1/year=2000
./pq_partitioned4/idx=1/year=2000/month=1
./pq_partitioned4/idx=1/year=2000/month=1/df_out.pq
./pq_partitioned4/idx=1/year=2000/month=2
./pq_partitioned4/idx=1/year=2000/month=2/df_out.pq
./pq_partitioned4/idx=1/year=2000/month=3
./pq_partitioned4/idx=1/year=2000/month=3/df_out.pq
./pq_partitioned4/idx=1/year=2000/month=4
./pq_partitioned4/idx=1/year=2000/month=4/df_out.pq
./pq_partitioned4/idx=2
./pq_partitioned4/idx=2/year=2000
./pq_partitioned4/idx=2/year=2000

In [None]:
#!ls $dir_name/idx=0/year=2000/month=1

In [None]:
# Read data back.
# https://github.com/dask/dask/issues/4194
# src_dir = f"{dir_name}/idx=0/year=2000/month=1"
src_dir = f"{dir_name}/idx=0/year=2000"
dataset = ds.dataset(src_dir, format="parquet", partitioning="hive")

df2 = dataset.to_table().to_pandas()
# print(df_to_str(df2))
print("\n".join(dataset.files))

## Partition manually

In [None]:
from pyarrow.dataset import DirectoryPartitioning

partitioning = DirectoryPartitioning(
    pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int8())])
)
print(partitioning.parse("/2009/11/3"))

# partitioning.discover()

In [None]:
!ls /data

In [None]:
dir_name = "/app/data"

# Read data back.
dataset = ds.dataset(dir_name, format="parquet", partitioning="hive")

print("\n".join(dataset.files))

In [None]:
# Read everything.
df2 = dataset.to_table().to_pandas()

print(df_to_str(df2))

In [None]:
print(df2["instr"].unique())
print(df2.index)