In [1]:
import time
from harpy import (
    Session,
    MapTask,
    ReduceTask,
    TransformTask,
    Result,
    TaskSetResults
)

from deltalake import DeltaTable, write_deltalake
from harpy.quack import QuackContext
import pandas as pd
import pyarrow as pa

start = time.time()
session = Session().create_session()
start_after_session = time.time()

In [2]:
# This example utilizes the motor_collisions dataset from the NYC Open Data portal
# https://catalog.data.gov/dataset/motor-vehicle-collisions-crashes
root = "/Volumes/data/"
#session.fs.ls("../_example_data/motor_colisions/")
session.fs.ls(root)

Unnamed: 0,fileName,filePath,sizeInBytes
0,motor_colisions,/Volumes/data/motor_colisions,4096
1,test-big-data,/Volumes/data/test-big-data,4096
2,Motor_Vehicle_Collisions_-_Crashes.csv,/Volumes/data/Motor_Vehicle_Collisions_-_Crash...,449301967
3,motor_colisions_silver,/Volumes/data/motor_colisions_silver,4096
4,Flights_1m.parquet,/Volumes/data/Flights_1m.parquet,12942235


In [3]:
session.fs.rm(root + "motor_colisions_silver", recursive=True)
session.fs.rm(root + "motor_colisions", recursive=True)

True

In [4]:
session.fs.mkdir(root + "motor_colisions")

True

In [5]:
# Convert the CSV to Parquet format
N = 10 # Number of partitions

# Read the CSV file
df_count = session.sql(f"SELECT COUNT(*) as count FROM read_csv('{root}Motor_Vehicle_Collisions_-_Crashes.csv')")
total_count = df_count.iloc[0]['count']
partition_size = total_count // N
remainder = total_count % N

In [6]:
# Repartition the data
def repart_map(location: str, write_location: str, partition_size: int, index: int, remainder: int) -> None:
    offset = partition_size * index
    limit = partition_size + (1 if index < remainder else 0)
    sql = f"""
        COPY (
            SELECT * FROM read_csv('{location}', ALL_VARCHAR=True) LIMIT {limit} OFFSET {offset}
        ) TO '{write_location}/file_{index}.parquet' (FORMAT PARQUET, ROW_GROUP_SIZE 1024, COMPRESSION SNAPPY)
    """
    print(f"Processing partition {index} with offset {offset} and limit {limit}")
    print(f"Writing to {write_location}/file_{index}.parquet")

    with QuackContext() as q:
        q.sql(sql)

ts = session.create_task_set()
ts.add_maps([
    MapTask("split", repart_map, args=[], 
            kwargs={
                'location': f'{root}Motor_Vehicle_Collisions_-_Crashes.csv',
                'write_location': root + 'motor_colisions',
                'partition_size': partition_size,
                'index': i, 
                'remainder': remainder
            }
    ) for i in range(N)
])
ts.run()

In [7]:
# Verify the output
count = session.sql(f"SELECT COUNT(*) as count FROM read_parquet('{root}motor_colisions/*.parquet')").iloc[0]['count']

print(f"Total input count: {total_count}")
print(f"Total output count: {count}")

Total input count: 2126535
Total output count: 2126535


In [8]:
silver_sql = f"""
    SELECT 
        strptime(CONCAT("CRASH DATE", ' - ', "CRASH TIME"), '%m/%d/%Y - %H:%M') as crash_datetime,
        "BOROUGH" as borough,
        "ZIP CODE" as zip_code,
        "LATITUDE" as latitude,
        "LONGITUDE" as longitude,
        "LOCATION" as location,
        "ON STREET NAME" as on_street_name,
        "CROSS STREET NAME" as cross_street_name,
        "OFF STREET NAME" as off_street_name,
        "NUMBER OF PERSONS INJURED" as number_of_persons_injured,
        "NUMBER OF PERSONS KILLED" as number_of_persons_killed,
        "NUMBER OF PEDESTRIANS INJURED" as number_of_pedestrians_injured,
        "NUMBER OF PEDESTRIANS KILLED" as number_of_pedestrians_killed,
        "NUMBER OF CYCLIST INJURED" as number_of_cyclist_injured,
        "NUMBER OF CYCLIST KILLED" as number_of_cyclist_killed,
        "NUMBER OF MOTORIST INJURED" as number_of_motorist_injured,
        "NUMBER OF MOTORIST KILLED" as number_of_motorist_killed,
        "CONTRIBUTING FACTOR VEHICLE 1" as contributing_factor_vehicle_1,
        "CONTRIBUTING FACTOR VEHICLE 2" as contributing_factor_vehicle_2,
        "CONTRIBUTING FACTOR VEHICLE 3" as contributing_factor_vehicle_3,
        "CONTRIBUTING FACTOR VEHICLE 4" as contributing_factor_vehicle_4,
        "CONTRIBUTING FACTOR VEHICLE 5" as contributing_factor_vehicle_5,
        "COLLISION_ID" as collision_id,
        "VEHICLE TYPE CODE 1" as vehicle_type_code_1,
        "VEHICLE TYPE CODE 2" as vehicle_type_code_2,
        "VEHICLE TYPE CODE 3" as vehicle_type_code_3,
        "VEHICLE TYPE CODE 4" as vehicle_type_code_4,
        "VEHICLE TYPE CODE 5" as vehicle_type_code_5
    --FROM read_parquet('/Volumes/data/motor_colisions/*.parquet')
    FROM read_parquet('{root}motor_colisions/*.parquet')
"""

In [9]:
session.fs.mkdir(root + "motor_colisions_silver")

True

In [10]:
from harpy.tasksets.tasks import taskset_from_sql, write_to_deltalake

taskset = taskset_from_sql(silver_sql)
write_to_deltalake(taskset, root + "motor_colisions_silver")
taskset.run()

In [11]:
session.sql(f"SELECT COUNT(*) as count FROM delta_scan('{root}motor_colisions_silver')").iloc[0]['count']

np.int64(2126535)

In [12]:
session.close()

<harpy.session.Session at 0x7fdc51cfb490>

In [13]:
stop = time.time()
print(f"Total wall-clock: {stop - start}")
print(f"Total time: {stop - start_after_session}")

Total wall-clock: 56.2147855758667
Total time: 55.14059805870056
