Show Parquet / Pyarrow API.

## Imports

In [25]:
import logging
import os
import random

import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq

_LOG = logging.getLogger(__name__)

In [26]:
def get_df() -> pd.DataFrame:
    """
    Create pandas random data, like:

    ```
                idx instr  val1  val2
    2000-01-01    0     A    99    30
    2000-01-02    0     A    54    46
    2000-01-03    0     A    85    86
    ```
    """
    instruments = "A B C D E".split()
    "id stock val1 val2".split()
    df_idx = pd.date_range(
        pd.Timestamp("2000-01-01"), pd.Timestamp("2000-01-15"), freq="1D"
    )
    # print(df_idx)
    random.seed(1000)

    df = []
    for idx, inst in enumerate(instruments):
        df_tmp = pd.DataFrame(
            {
                "idx": idx,
                "instr": inst,
                "val1": [random.randint(0, 100) for k in range(len(df_idx))],
                "val2": [random.randint(0, 100) for k in range(len(df_idx))],
            },
            index=df_idx,
        )
        # print(df_tmp)
        df.append(df_tmp)
    df = pd.concat(df)
    return df

In [27]:
def df_to_str(df: pd.DataFrame) -> str:
    txt = ""
    txt += "# df=\n%s" % df.head(3)
    txt += "\n# df.shape=\n%s" % str(df.shape)
    txt += "\n# df.dtypes=\n%s" % str(df.dtypes)
    return txt

# Save and load all data in one file

In [28]:
df = get_df()
# print(df.head())
print(df_to_str(df))

# df=
            idx instr  val1  val2
2000-01-01    0     A    99    30
2000-01-02    0     A    54    46
2000-01-03    0     A    85    86
# df.shape=
(75, 4)
# df.dtypes=
idx       int64
instr    object
val1      int64
val2      int64
dtype: object


In [29]:
table = pa.Table.from_pandas(df)

print("table=\n%s" % table)

table=
pyarrow.Table
idx: int64
instr: string
val1: int64
val2: int64
__index_level_0__: timestamp[ns]
----
idx: [[0,0,0,0,0,...,4,4,4,4,4]]
instr: [["A","A","A","A","A",...,"E","E","E","E","E"]]
val1: [[99,54,85,97,12,...,59,48,50,66,3]]
val2: [[30,46,86,62,25,...,9,17,70,34,26]]
__index_level_0__: [[2000-01-01 00:00:00.000000000,2000-01-02 00:00:00.000000000,2000-01-03 00:00:00.000000000,2000-01-04 00:00:00.000000000,2000-01-05 00:00:00.000000000,...,2000-01-11 00:00:00.000000000,2000-01-12 00:00:00.000000000,2000-01-13 00:00:00.000000000,2000-01-14 00:00:00.000000000,2000-01-15 00:00:00.000000000]]


In [30]:
# Save.
file_name = "df_in_one_file.pq"
pq.write_table(table, file_name)

In [31]:
# Load.
df2 = pq.read_table(file_name)
print(df2)

df2 = df2.to_pandas()
print(df_to_str(df2))

pyarrow.Table
idx: int64
instr: string
val1: int64
val2: int64
__index_level_0__: timestamp[us]
----
idx: [[0,0,0,0,0,...,4,4,4,4,4]]
instr: [["A","A","A","A","A",...,"E","E","E","E","E"]]
val1: [[99,54,85,97,12,...,59,48,50,66,3]]
val2: [[30,46,86,62,25,...,9,17,70,34,26]]
__index_level_0__: [[2000-01-01 00:00:00.000000,2000-01-02 00:00:00.000000,2000-01-03 00:00:00.000000,2000-01-04 00:00:00.000000,2000-01-05 00:00:00.000000,...,2000-01-11 00:00:00.000000,2000-01-12 00:00:00.000000,2000-01-13 00:00:00.000000,2000-01-14 00:00:00.000000,2000-01-15 00:00:00.000000]]
# df=
            idx instr  val1  val2
2000-01-01    0     A    99    30
2000-01-02    0     A    54    46
2000-01-03    0     A    85    86
# df.shape=
(75, 4)
# df.dtypes=
idx       int64
instr    object
val1      int64
val2      int64
dtype: object


## Read a subset of columns

In [32]:
df2 = pq.read_table(file_name, columns=["idx", "val1"])
print(df2)

df2 = df2.to_pandas()
print(df_to_str(df2))

pyarrow.Table
idx: int64
val1: int64
----
idx: [[0,0,0,0,0,...,4,4,4,4,4]]
val1: [[99,54,85,97,12,...,59,48,50,66,3]]
# df=
   idx  val1
0    0    99
1    0    54
2    0    85
# df.shape=
(75, 2)
# df.dtypes=
idx     int64
val1    int64
dtype: object


## Partitioned dataset

from https://arrow.apache.org/docs/python/dataset.html#reading-partitioned-data

- A dataset can exploit a nested structure, where the sub-dir names hold information about which subset of the data is stored in that dir
- E.g., "Hive" patitioning scheme "key=vale" dir names

In [33]:
df = get_df()
print(df_to_str(df))

# df=
            idx instr  val1  val2
2000-01-01    0     A    99    30
2000-01-02    0     A    54    46
2000-01-03    0     A    85    86
# df.shape=
(75, 4)
# df.dtypes=
idx       int64
instr    object
val1      int64
val2      int64
dtype: object


In [34]:
base = "."
dir_name = os.path.join(base, "parquet_dataset_partitioned")
os.system("rm -rf %s" % dir_name)

pq.write_to_dataset(table, dir_name, partition_cols=["idx"])

In [35]:
!ls parquet_dataset_partitioned

'idx=0'  'idx=1'  'idx=2'  'idx=3'  'idx=4'


In [36]:
# Read data back.
dataset = ds.dataset(dir_name, format="parquet", partitioning="hive")

print("\n".join(dataset.files))

./parquet_dataset_partitioned/idx=0/6a4868ebbe7944f6b52b70eb78ee236c-0.parquet
./parquet_dataset_partitioned/idx=1/6a4868ebbe7944f6b52b70eb78ee236c-0.parquet
./parquet_dataset_partitioned/idx=2/6a4868ebbe7944f6b52b70eb78ee236c-0.parquet
./parquet_dataset_partitioned/idx=3/6a4868ebbe7944f6b52b70eb78ee236c-0.parquet
./parquet_dataset_partitioned/idx=4/6a4868ebbe7944f6b52b70eb78ee236c-0.parquet


In [37]:
# Read everything.
df2 = dataset.to_table().to_pandas()

print(df_to_str(df2))

# df=
           instr  val1  val2  idx
2000-01-01     A    99    30    0
2000-01-02     A    54    46    0
2000-01-03     A    85    86    0
# df.shape=
(75, 4)
# df.dtypes=
instr    object
val1      int64
val2      int64
idx       int32
dtype: object


In [38]:
# Load part of the data.

df2 = dataset.to_table(filter=ds.field("idx") == 1).to_pandas()
print(df_to_str(df2))

df2 = dataset.to_table(filter=ds.field("idx") < 3).to_pandas()
print(df_to_str(df2))

# df=
           instr  val1  val2  idx
2000-01-01     B    18    22    1
2000-01-02     B    59    89    1
2000-01-03     B    91    90    1
# df.shape=
(15, 4)
# df.dtypes=
instr    object
val1      int64
val2      int64
idx       int32
dtype: object
# df=
           instr  val1  val2  idx
2000-01-01     A    99    30    0
2000-01-02     A    54    46    0
2000-01-03     A    85    86    0
# df.shape=
(45, 4)
# df.dtypes=
instr    object
val1      int64
val2      int64
idx       int32
dtype: object


## Add year-month partitions

In [39]:
df = get_df()
df["year"] = df.index.year
df["month"] = df.index.month

print(df_to_str(df))

# df=
            idx instr  val1  val2  year  month
2000-01-01    0     A    99    30  2000      1
2000-01-02    0     A    54    46  2000      1
2000-01-03    0     A    85    86  2000      1
# df.shape=
(75, 6)
# df.dtypes=
idx       int64
instr    object
val1      int64
val2      int64
year      int64
month     int64
dtype: object


In [40]:
table = pa.Table.from_pandas(df)

print("table=\n%s" % table)

table=
pyarrow.Table
idx: int64
instr: string
val1: int64
val2: int64
year: int64
month: int64
__index_level_0__: timestamp[ns]
----
idx: [[0,0,0,0,0,...,4,4,4,4,4]]
instr: [["A","A","A","A","A",...,"E","E","E","E","E"]]
val1: [[99,54,85,97,12,...,59,48,50,66,3]]
val2: [[30,46,86,62,25,...,9,17,70,34,26]]
year: [[2000,2000,2000,2000,2000,...,2000,2000,2000,2000,2000]]
month: [[1,1,1,1,1,...,1,1,1,1,1]]
__index_level_0__: [[2000-01-01 00:00:00.000000000,2000-01-02 00:00:00.000000000,2000-01-03 00:00:00.000000000,2000-01-04 00:00:00.000000000,2000-01-05 00:00:00.000000000,...,2000-01-11 00:00:00.000000000,2000-01-12 00:00:00.000000000,2000-01-13 00:00:00.000000000,2000-01-14 00:00:00.000000000,2000-01-15 00:00:00.000000000]]


In [41]:
base = "."
dir_name = os.path.join(base, "pq_partitioned2")
os.system("rm -rf %s" % dir_name)

pq.write_to_dataset(table, dir_name, partition_cols=["idx", "year", "month"])

In [42]:
!ls $dir_name

'idx=0'  'idx=1'  'idx=2'  'idx=3'  'idx=4'


In [43]:
!ls $dir_name/idx=0/year=2000/month=1

11f5e2911e294dd8882e3fb14a035ba2-0.parquet


In [44]:
# Read data back.
dataset = ds.dataset(dir_name, format="parquet", partitioning="hive")

print("\n".join(dataset.files))

./pq_partitioned2/idx=0/year=2000/month=1/11f5e2911e294dd8882e3fb14a035ba2-0.parquet
./pq_partitioned2/idx=1/year=2000/month=1/11f5e2911e294dd8882e3fb14a035ba2-0.parquet
./pq_partitioned2/idx=2/year=2000/month=1/11f5e2911e294dd8882e3fb14a035ba2-0.parquet
./pq_partitioned2/idx=3/year=2000/month=1/11f5e2911e294dd8882e3fb14a035ba2-0.parquet
./pq_partitioned2/idx=4/year=2000/month=1/11f5e2911e294dd8882e3fb14a035ba2-0.parquet


In [45]:
# Read data back.
dataset = ds.dataset(dir_name, format="parquet", partitioning="hive")

df2 = dataset.to_table(filter=ds.field("idx") == 2).to_pandas()
print(df_to_str(df2))

# df=
           instr  val1  val2  idx  year  month
2000-01-01     C    99    37    2  2000      1
2000-01-02     C    98    48    2  2000      1
2000-01-03     C    70    58    2  2000      1
# df.shape=
(15, 6)
# df.dtypes=
instr    object
val1      int64
val2      int64
idx       int32
year      int32
month     int32
dtype: object


In [46]:
# We could scan manually and create the dirs manually if we don't want to add
# add a new dir.
base = "."
dir_name = os.path.join(base, "parquet_dataset_partitioned2")
os.system("rm -rf %s" % dir_name)

schemas = []

schema = pa.Table.from_pandas(df).schema
print(schema)
# assert 0
# idx: int64
# instr: string
# val1: int64
# val2: int64
# year: int64
# month: int64

# grouped = df.groupby(lambda x: x.day)
group_by_idx = df.groupby("idx")
for idx, df_tmp in group_by_idx:
    _LOG.debug("idx=%s -> df.shape=%s", idx, str(df_tmp.shape))
    #
    group_by_year = df_tmp.groupby(lambda x: x.year)
    for year, df_tmp2 in group_by_year:
        _LOG.debug("year=%s -> df.shape=%s", year, str(df_tmp2.shape))
        #
        group_by_month = df_tmp2.groupby(lambda x: x.month)
        for month, df_tmp3 in group_by_month:
            _LOG.debug("month=%s -> df.shape=%s", month, str(df_tmp3.shape))
            # file_name = "df_in_one_file.pq"
            # pq.write_table(table, file_name)
            # /app/data/idx=0/year=2000/month=1/02e3265d515e4fb88ebe1a72a405fc05.parquet
            subdir_name = os.path.join(
                dir_name, f"idx={idx}", f"year={year}", f"month={month}"
            )
            table = pa.Table.from_pandas(df_tmp3, schema=schema)
            schemas.append(table.schema)
            # print(df_tmp3)
            # print(table.schema)
            #             pq.write_to_dataset(table,
            #                     subdir_name, schema=schema)
            file_name = os.path.join(subdir_name, "df_out.pq")
            #hio.create_enclosing_dir(file_name)
            os.makedirs(os.path.dirname(file_name), exist_ok=True)
            pq.write_table(table, file_name)

idx: int64
instr: string
val1: int64
val2: int64
year: int64
month: int64
__index_level_0__: timestamp[ns]
-- schema metadata --
pandas: '{"index_columns": ["__index_level_0__"], "column_indexes": [{"na' + 976


In [47]:
#!ls $dir_name/idx=0/year=2000/month=1

In [48]:
# Read data back.
# https://github.com/dask/dask/issues/4194
# src_dir = f"{dir_name}/idx=0/year=2000/month=1"
src_dir = f"{dir_name}/idx=0/year=2000"
dataset = ds.dataset(src_dir, format="parquet", partitioning="hive")

df2 = dataset.to_table().to_pandas()
# print(df_to_str(df2))
print("\n".join(dataset.files))

ArrowInvalid: Unable to merge: Field month has incompatible types: int64 vs int32

## Partition manually

In [None]:
from pyarrow.dataset import DirectoryPartitioning

partitioning = DirectoryPartitioning(
    pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int8())])
)
print(partitioning.parse("/2009/11/3"))

# partitioning.discover()

In [None]:
!ls /data

In [None]:
dir_name = "/app/data"

# Read data back.
dataset = ds.dataset(dir_name, format="parquet", partitioning="hive")

print("\n".join(dataset.files))

In [None]:
# Read everything.
df2 = dataset.to_table().to_pandas()

print(df_to_str(df2))

In [None]:
print(df2["instr"].unique())
print(df2.index)