<a href="https://colab.research.google.com/github/revendrat/Big-Data-Analytics/blob/main/03_Partitioned_Datasets.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Writing partitioned data sets
* Arrow allows you to split big dataset into multiple separate files using pyarrow.dataset.write_dataset()
* The partitioning argument specifies particular column to split on pyarrow.dataset.write_dataset() function.

## Illustration
* 1000 individuals birthdays from 2010 to 2019


In [2]:
import numpy.random
import pyarrow as pa
data = pa.table({"day": numpy.random.randint(1, 31, size=1000),
                 "month": numpy.random.randint(1, 12, size=1000),
                 "year": [2010 + x // 100 for x in range(1000)]})

In [3]:
data

pyarrow.Table
day: int64
month: int64
year: int64
----
day: [[9,21,12,29,19,13,6,25,7,23,...,10,22,4,3,19,28,10,19,19,21]]
month: [[5,8,10,10,4,5,5,6,5,10,...,10,1,2,1,5,10,9,10,5,7]]
year: [[2010,2010,2010,2010,2010,2010,2010,2010,2010,2010,...,2019,2019,2019,2019,2019,2019,2019,2019,2019,2019]]

In [4]:
import pyarrow.dataset as ds
# partition by year column
ds.write_dataset(data, "./partitioned", format="parquet",
                 partitioning=ds.partitioning(pa.schema([("year", pa.int16())])))

In [5]:
!ls

partitioned  sample_data


In [6]:
# Check the partitioned directory
from pyarrow import fs

localfs = fs.LocalFileSystem()
partitioned_dir_content = localfs.get_file_info(fs.FileSelector("./partitioned", recursive=True))
files = sorted((f.path for f in partitioned_dir_content if f.type == fs.FileType.File))

for file in files:
    print(file)

./partitioned/2010/part-0.parquet
./partitioned/2011/part-0.parquet
./partitioned/2012/part-0.parquet
./partitioned/2013/part-0.parquet
./partitioned/2014/part-0.parquet
./partitioned/2015/part-0.parquet
./partitioned/2016/part-0.parquet
./partitioned/2017/part-0.parquet
./partitioned/2018/part-0.parquet
./partitioned/2019/part-0.parquet


## Reading Partitioned data
* In case dataset is composed by multiple separate files each containing a piece of the data.
* The pyarrow.dataset.dataset() function provides an interface to discover and read all those files as a single big dataset.

In [8]:
# View the files
dataset = ds.dataset("./partitioned", format="parquet")
print(dataset.files)

['./partitioned/2010/part-0.parquet', './partitioned/2011/part-0.parquet', './partitioned/2012/part-0.parquet', './partitioned/2013/part-0.parquet', './partitioned/2014/part-0.parquet', './partitioned/2015/part-0.parquet', './partitioned/2016/part-0.parquet', './partitioned/2017/part-0.parquet', './partitioned/2018/part-0.parquet', './partitioned/2019/part-0.parquet']


* The whole dataset can be viewed as a single big table using pyarrow.dataset.Dataset.to_table(). 
* While each parquet file contains only 10 rows, converting the dataset to a table will expose them as a single Table.

In [9]:
# Read the data from partitioned files
table = dataset.to_table()
print(table)

pyarrow.Table
day: int64
month: int64
----
day: [[9,21,12,29,19,13,6,25,7,23,...,8,11,2,6,22,27,5,26,16,22],[26,15,20,22,12,26,22,24,23,21,...,19,7,3,1,25,29,7,8,20,12],[27,17,9,8,24,21,24,9,10,1,...,28,8,13,23,5,18,18,28,21,1],[16,13,12,30,21,3,17,22,12,21,...,10,30,16,6,12,11,7,25,30,28],[20,29,11,6,30,4,3,14,14,14,...,25,17,30,24,10,12,28,25,23,12],[20,3,25,3,25,29,7,18,5,21,...,22,29,16,27,21,4,9,2,27,11],[11,5,23,1,16,1,17,8,10,24,...,13,3,21,1,3,26,19,18,11,28],[30,27,27,1,29,22,25,25,6,7,...,25,4,6,1,10,14,27,19,11,20],[8,2,1,10,28,10,16,21,17,13,...,6,1,15,10,5,20,9,20,12,1],[22,6,9,30,8,11,5,27,20,27,...,10,22,4,3,19,28,10,19,19,21]]
month: [[5,8,10,10,4,5,5,6,5,10,...,11,2,4,11,10,1,4,5,9,6],[6,3,10,4,4,1,10,3,11,8,...,4,5,6,5,5,3,7,2,6,10],[8,11,11,3,9,5,1,3,6,4,...,5,9,11,11,8,10,6,5,3,4],[9,7,4,5,1,8,9,5,1,2,...,1,6,6,6,9,4,3,5,6,1],[8,11,6,5,9,9,5,4,1,5,...,3,10,10,1,10,8,10,10,8,1],[11,6,9,10,5,3,5,9,9,4,...,3,4,11,7,10,8,8,11,8,1],[7,8,4,9,8,4,10,8,11,5,...,4,9,6,7,8,8,

## Batches: Limitation of converting data files to pyarrow.Table
* Entire data will be loaded into memory
* A lot of memory will be occupied, instead use batches
* pyarrow.dataset.Dataset.to_batches() method iteratively loads the dataset one chunk of data at the time returning a pyarrow.RecordBatch for each one of them.

In [14]:
dataset = ds.dataset("./partitioned", format="parquet")
print(dataset.files)

for record_batch in dataset.to_batches():
    col_day = record_batch.column("day")
    col_mon = record_batch.column("month")
    print(f"{col_day._name} = {col_day[0]} .. {col_day[-1]}")
    print(f"{col_mon._name} = {col_mon[0]} .. {col_mon[-1]}")

['./partitioned/2010/part-0.parquet', './partitioned/2011/part-0.parquet', './partitioned/2012/part-0.parquet', './partitioned/2013/part-0.parquet', './partitioned/2014/part-0.parquet', './partitioned/2015/part-0.parquet', './partitioned/2016/part-0.parquet', './partitioned/2017/part-0.parquet', './partitioned/2018/part-0.parquet', './partitioned/2019/part-0.parquet']
day = 9 .. 22
month = 5 .. 6
day = 26 .. 12
month = 6 .. 10
day = 27 .. 1
month = 8 .. 4
day = 16 .. 28
month = 9 .. 1
day = 20 .. 12
month = 8 .. 1
day = 20 .. 11
month = 11 .. 1
day = 11 .. 28
month = 7 .. 10
day = 30 .. 20
month = 4 .. 10
day = 8 .. 1
month = 9 .. 3
day = 22 .. 21
month = 11 .. 7
