In [1]:
import tempfile
import pathlib
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np

import pyarrow.compute as pc
from pyarrow import csv

import os
import polars as pl
import math

In [49]:
def get_size(path, unit='MB'):
    size = os.path.getsize(path)
    if unit == 'MB':
        print(f'Size: {size / (1024 * 1024)} MB')
    elif unit == 'GB':
        print(f'Size: {size / (1024 * 1024 * 1024)} GB')
    else:
        print(f'Size: {size} B')

path = './data/test/playlist_2010to2022.parquet'

get_size(path=path)

Size: 0.2697105407714844 MB


In [50]:
path = 'data/test2/audio_features.csv'
get_size(path, 'MB')

Size: 458.2281322479248 MB


In [58]:
pf = pq.ParquetFile(path)
nrows = pf.metadata.num_rows
n_partitions = math.ceil(os.path.getsize(path) / (1024 ** 2) / 50)
batch_size = nrows / n_partitions
first_n_rows = next(pf.iter_batches(batch_size = batch_size))

In [61]:
pf.metadata

<pyarrow._parquet.FileMetaData object at 0x7f410c1c6660>
  created_by: parquet-cpp-arrow version 13.0.0
  num_columns: 15
  num_rows: 4687104
  num_row_groups: 5
  format_version: 2.6
  serialized_size: 10131

### Using `yield` in iterator function to avoid returning entire dataframe

In [43]:
## setup
import polars as pl
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from pathlib import Path
import datetime


DATA_PATH = Path('/home/flemm0/school_stuff/USC_Fall_2023/DSCI551-Final_Project/data/')
TEST_DB_PATH = Path(DATA_PATH / 'test')
TEMP_DB_PATH = Path(DATA_PATH / 'temp')


## step 1: select
def read_table(table_name):
    dataset = ds.dataset(TEST_DB_PATH / table_name, format='parquet')
    for partition in dataset.files:
        partition = Path(partition)
        data = pq.read_table(partition)
        yield data, partition.stem


step = 0
query_id = 'query_' + datetime.datetime.now().strftime("%y%m%d_%H%M%S") 
query_step_dir = query_id + '_' + str(step)

curr_query_path = Path(TEMP_DB_PATH / query_step_dir)
if not curr_query_path.exists():
    Path.mkdir(curr_query_path)

for partition, name in read_table('audio_features'):
    where = (curr_query_path / name).with_suffix('.parquet')
    pq.write_table(table=partition, where=(curr_query_path / name).with_suffix('.parquet'))


## step 2: where
def filter(prev_query_path, filters):
    dataset = ds.dataset(prev_query_path, format='parquet')
    for partition in dataset.files:
        partition = Path(partition)
        data = pq.read_table(partition, filters=filters) # list of tuples e.g. ('acousticness', '<', 1)
        yield data, partition.stem

prev_query_path = curr_query_path
step += 1
query_step_dir = query_id + '_' + str(step)
curr_query_path = Path(TEMP_DB_PATH / query_step_dir)
if not curr_query_path.exists():
    Path.mkdir(curr_query_path)
    for partition, name in filter(prev_query_path=prev_query_path, filters=[('acousticness', '<', 1)]):
        pq.write_table(table=partition, where=(curr_query_path / name).with_suffix('.parquet'))


## step 3: projection
def projection(prev_query_path, selected_cols, new_col_names):
    '''
    Reads intermediate query results from prev_query_path and selects only specified columns. Assigns new column names.
    Yields filtered data partitions and partition name
    '''
    dataset = ds.dataset(prev_query_path, format='parquet')
    for partition in dataset.files:
        partition = Path(partition)
        data = pq.read_table(partition, columns=selected_cols) # list of column names
        data.rename_columns(new_col_names)
        yield data, partition.stem


columns = [['acousticness'], ['test']]
selected_cols = columns[0]
new_col_names = columns[1] # TODO set to selected_cols if no new names provided

prev_query_path = curr_query_path
step += 1
query_step_dir = query_id + '_' + str(step)
curr_query_path = Path(TEMP_DB_PATH / query_step_dir)
if not curr_query_path.exists():
    Path.mkdir(curr_query_path)
for partition, name in projection(prev_query_path=prev_query_path, selected_cols=selected_cols, new_col_names=new_col_names):
    pq.write_table(table=partition, where=(curr_query_path / name).with_suffix('.parquet'))

In [None]:
## test 

dataset = ds.dataset(curr_query_path)
for f in dataset.files:
    data = pq.read_table(f)
    print(pl.DataFrame._from_arrow(data))

## filter works!!
## projection works!!

`pyarrow` tabular datasets

In [74]:
import pyarrow.dataset as ds
import pathlib
import sys

dataset = ds.dataset(source=['data/test2/audio_features_0.parquet', 'data/test2/audio_features_1.parquet'], format='parquet')
batches = dataset.to_batches()


# print(f'dataset object size in main memory: {sys.getsizeof(dataset) / (2 ** 20)} MB')
# print(f'batches object size in main memory: {sys.getsizeof(batches) / (2 ** 20)} MB')
# print(f'first batch size in main memory: {sys.getsizeof(next(batches)) / (2 ** 20)} MB')

def select_all_from_table(dataset):
    batches = dataset.to_batches()
    truncated_dataset = dataset.head(100)
    head = pl.DataFrame._from_arrow(truncated_dataset)

    pl.Config.set_tbl_hide_dataframe_shape(True)
    print(f'shape: ({dataset.count_rows()}, {df.shape[1]})')
    print(head)
    #pl.Config.set_tbl_hide_dataframe_shape(False)



In [75]:
select_all_from_table(dataset)

shape: (1874842, 15)
shape: (100, 15)
┌────────────┬────────────┬────────────┬────────────┬───┬───────┬────────────┬─────────┬───────────┐
│ isrc       ┆ acousticne ┆ danceabili ┆ duration_m ┆ … ┆ tempo ┆ time_signa ┆ valence ┆ updated_o │
│ ---        ┆ ss         ┆ ty         ┆ s          ┆   ┆ ---   ┆ ture       ┆ ---     ┆ n         │
│ str        ┆ ---        ┆ ---        ┆ ---        ┆   ┆ i64   ┆ ---        ┆ f64     ┆ ---       │
│            ┆ f64        ┆ f64        ┆ i64        ┆   ┆       ┆ i64        ┆         ┆ str       │
╞════════════╪════════════╪════════════╪════════════╪═══╪═══════╪════════════╪═════════╪═══════════╡
│ AD4X657521 ┆ 0.906      ┆ 0.65       ┆ 296733     ┆ … ┆ 110   ┆ 4          ┆ 0.336   ┆ 2023-08-2 │
│ 84         ┆            ┆            ┆            ┆   ┆       ┆            ┆         ┆ 4         │
│            ┆            ┆            ┆            ┆   ┆       ┆            ┆         ┆ 09:27:00  │
│ AEA0D19911 ┆ 0.00095    ┆ 0.621      ┆ 191989     ┆

In [1]:
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pathlib
import polars as pl

base = pathlib.Path('/home/flemm0/school_stuff/USC_Fall_2023/DSCI551-Final_Project/data/test/')

dataset = ds.dataset(base / 'audio_features', format='parquet')

dataset.files

['/home/flemm0/school_stuff/USC_Fall_2023/DSCI551-Final_Project/data/test/audio_features/audio_features_0.parquet',
 '/home/flemm0/school_stuff/USC_Fall_2023/DSCI551-Final_Project/data/test/audio_features/audio_features_1.parquet',
 '/home/flemm0/school_stuff/USC_Fall_2023/DSCI551-Final_Project/data/test/audio_features/audio_features_2.parquet',
 '/home/flemm0/school_stuff/USC_Fall_2023/DSCI551-Final_Project/data/test/audio_features/audio_features_3.parquet',
 '/home/flemm0/school_stuff/USC_Fall_2023/DSCI551-Final_Project/data/test/audio_features/audio_features_4.parquet']

In [None]:
batches = dataset.to_batches()

nxt = next(batches)

pl.DataFrame._from_arrow(nxt).rows()

In [55]:
pq.ParquetFile(dataset.files[0]).metadata.num_row_groups

3

## Hash Join

In [13]:
import polars as pl
from collections import defaultdict
import sys
from pprint import pprint

def hash_join(table1: pl.DataFrame, index1, table2: pl.DataFrame, index2):
    new_headers = []
    for c in table1.columns:
        if c in table2.columns:
            new_headers.append(c + '_l')
        else:
            new_headers.append(c)
    for c in table2.columns:
        if c in table1.columns:
            new_headers.append(c + '_r')
        else:
            new_headers.append(c)

    table1, table2 = table1.rows(), table2.rows()
    h = defaultdict(list)
    # hash phase
    for s in table1:
        h[s[index1]].append(s)
    # join phase
    pprint(h)
    res = [(s + r) for r in table2 for s in h[r[index2]]]

    return pl.DataFrame._from_records(res, schema=new_headers)
    

df1 = pl.DataFrame({
    'age': [27, 18, 28, 18, 28],
    'name': ["Jonah", "Alan", "Glory", "Popeye", "Alan"]
    })

df2 = pl.DataFrame({
    'name': ["Jonah", "Jonah", "Alan", "Alan", "Glory"],
    'word': ['Whales', 'Spiders', 'Ghosts', 'Zombies', 'Buffy']
})

hash_join(df1, 1, df2, 0)

defaultdict(<class 'list'>,
            {'Alan': [(18, 'Alan'), (28, 'Alan')],
             'Glory': [(28, 'Glory')],
             'Jonah': [(27, 'Jonah')],
             'Popeye': [(18, 'Popeye')]})


age,name_l,name_r,word
i64,str,str,str
27,"""Jonah""","""Jonah""","""Whales"""
27,"""Jonah""","""Jonah""","""Spiders"""
18,"""Alan""","""Alan""","""Ghosts"""
28,"""Alan""","""Alan""","""Ghosts"""
18,"""Alan""","""Alan""","""Zombies"""
28,"""Alan""","""Alan""","""Zombies"""
28,"""Glory""","""Glory""","""Buffy"""


In [None]:
def hash_join_with_partitions(table1: pyarrow.dataset, index1, table2: pyarrow.dataset, index2):
    '''implement hash join that accepts table partitions
    
    the hash phase should wrap a for loop above `for s in table1` for all the partitions and store the join values in the hash
    '''

    hash_table = defaultdict(list)
    result = []
    # hash phase
    for batch in table1.to_batches():
        rows = pl.DataFrame._from_arrow(batch).rows()
        for row in rows:
            hash_table[row[index1]].append(row)

    # join phase
    for batch in table2.to_batches():
        rows = pl.DataFrame._from_arrow(batch).rows()
        for row in rows:
            for entry in hash_table[row[index2]]:
                result.append(entry + row)

## External Merge Sort

In [170]:
import names
import random

random.seed(123)
data_1 = [(names.get_first_name(), random.randint(0, 100)) for i in range(10)]
data_2 = [(names.get_first_name(), random.randint(0, 100)) for i in range(10)]
data_3 = [(names.get_first_name(), random.randint(0, 100)) for i in range(10)]
data_4 = [(names.get_first_name(), random.randint(0, 100)) for i in range(10)]

data_files = [data_1, data_2, data_3, data_4]

# sort phase
for data in data_files:
    data.sort(key=lambda x: x[1])

# merge phase
def current_tuple(idx, data: list) -> list:
    if idx < len(data):
        return [data[idx]]
    else:
        return []

i = j = k = l = 0
out_buffer = []
while i < len(data_1) or j < len(data_2) or k < len(data_3) or l < len(data_4):
    current_min = min(current_tuple(i, data_1) + current_tuple(j, data_2) + \
        current_tuple(k, data_3) + current_tuple(l, data_4), key=lambda x: x[1])
    if i < len(data_1) and current_min == data_1[i]:
        out_buffer.append(data_1[i])
        i += 1
    elif j < len(data_2) and current_min == data_2[j]:
        out_buffer.append(data_2[j])
        j += 1
    elif k < len(data_3) and current_min == data_3[k]:
        out_buffer.append(data_3[k])
        k += 1
    else:
        out_buffer.append(data_4[l])
        l += 1

out_buffer == sorted(data_1 + data_2 + data_3 + data_4, key=lambda x: x[1])

True

implement with polars dataframes

In [10]:
import polars as pl
import names
import random
import os

## setup
random.seed(999)

df1 = pl.DataFrame({
    'name': [names.get_first_name() for i in range(10)],
    'age': [random.randint(0, 100) for i in range(10)]
})
df2 = pl.DataFrame({
    'name': [names.get_first_name() for i in range(10)],
    'age': [random.randint(0, 100) for i in range(10)]
})
df3 = pl.DataFrame({
    'name': [names.get_first_name() for i in range(10)],
    'age': [random.randint(0, 100) for i in range(10)]
})
df4 = pl.DataFrame({
    'name': [names.get_first_name() for i in range(10)],
    'age': [random.randint(0, 100) for i in range(10)]
})

if not os.path.exists('./data_1.parquet'):
    df1.write_parquet('./data_1.parquet')
if not os.path.exists('./data_2.parquet'):
    df2.write_parquet('./data_2.parquet')
if not os.path.exists('./data_3.parquet'):
    df3.write_parquet('./data_3.parquet')
if not os.path.exists('./data_4.parquet'):
    df4.write_parquet('./data_4.parquet')

#############--------------###################

data_files = sorted([f for f in os.listdir('.') if f.endswith('parquet')])
schema = list(pl.read_parquet_schema(data_files[0]).keys()) # get schema/column names

'''
# sort
for file in data_files:
    data = pl.read_parquet('./' + file).rows()
    data.sort(key=lambda x: x[1])
    data = pl.DataFrame(data, schema=schema)
    if not os.path.exists('./' + file.split('.')[0] + '_sorted.parquet'):
        data.write_parquet('./' + file.split('.')[0] + '_sorted.parquet')
'''

# merge
n_buffers = 4
len_file = 10
batch_size = len_file / (n_buffers + 1)

In [59]:
import pyarrow as pa
import pyarrow.parquet as pq

pf1 = pq.ParquetFile('./data_1_sorted.parquet')
iterator_1 = pf1.iter_batches(batch_size=1)

pf2 = pq.ParquetFile('./data_2_sorted.parquet')
iterator_2 = pf2.iter_batches(batch_size=1)

merged_data = []


data_1, data_2 = next(iterator_1, False), next(iterator_2, False)

while data_1 and data_2:
    current_min = min(data_1.column('age')[0].as_py(), data_2.column('age')[0].as_py())
    if current_min == (data_1.column('age')[0].as_py()):
        merged_data.append(pl.from_arrow(data_1).rows()[0])
        data_1 = next(iterator_1, False)
    else:
        merged_data.append(pl.from_arrow(data_1).rows()[0])
        data_2 = next(iterator_2, False)

In [60]:
merged_data

[('Jamie', 11),
 ('Jamie', 11),
 ('Jamie', 11),
 ('Christina', 21),
 ('Christina', 21),
 ('Clarence', 23),
 ('Clarence', 23),
 ('Elsie', 26),
 ('Chad', 42),
 ('Chad', 42),
 ('Chad', 42),
 ('Chad', 42),
 ('Gladys', 64),
 ('Jennifer', 70),
 ('Jennifer', 70),
 ('Rocco', 76),
 ('Rocco', 76),
 ('Jean', 82)]

In [61]:
pl.DataFrame(merged_data)

column_0,column_1
str,i64
"""Jamie""",11
"""Jamie""",11
"""Jamie""",11
"""Christina""",21
"""Christina""",21
"""Clarence""",23
"""Clarence""",23
"""Elsie""",26
"""Chad""",42
"""Chad""",42
