In [1]:
from pyspark.sql.types import *
import time
from pyspark.sql import SparkSession
import json
from pyspark.sql.functions import col, to_timestamp, date_format, countDistinct, unix_timestamp, from_unixtime, count, asc, desc, split, explode, min, max
import argparse
from pyspark.sql import DataFrame as SparkDataFrame
import glob
import shutil
import os

TypeError: an integer is required (got type bytes)

In [3]:
spark = (
            SparkSession.builder.appName("Spark Benchmarking")
            .master("local[*]")
            .config('spark.driver.host', '127.0.0.1')
            .config("spark.driver.memory", "16g")
            .config("spark.driver.maxResultSize", "16g")
            .getOrCreate()
        )


In [4]:
def get_inventory(spark: SparkSession, file_type:str) -> SparkDataFrame:
    df = ""
    if file_type == 'parquet':
        return spark.read.parquet("./data/checkouts/inventory_parquet").select("BibNum", "Title","Author", "Subjects").dropDuplicates(['BibNum'])
    else:
        return spark.read.option("header","true").csv("./data/checkouts/Library_Collection_Inventory.csv").select("BibNum", "Title","Author", "Subjects").dropDuplicates(['BibNum'])

def get_checkouts(spark: SparkSession, file_type:str) -> SparkDataFrame:
    df = ''
    if file_type == 'multi-csv':
        return spark.read.option("header","true").csv("./data/checkouts/Checkouts_By_Title_Data_Lens_*.csv")
    elif file_type == 'single-csv':
        return spark.read.option("header","true").csv("./data/checkouts/Checkouts.csv")
    else:
        return spark.read.option("header","true").parquet("./data/checkouts/checkouts_parquet")


def writter_csv(df: SparkDataFrame, single_file: bool) -> None:
    if single_file:
        df.coalesce(1).write.format('csv').mode("overwrite").save(f"./output/output_folder")
        for filename in glob.glob('./output/output_folder/*.csv'):
            shutil.move(filename, './output/output.csv')
    else:
        df.write.format('csv').mode("overwrite").save(f"./output/output_folder")
        
def writter_parquet(df: SparkDataFrame, single_file: bool) ->None:
    df.write.format("parquet").mode("overwrite").save(f"./output/parquet_output")


In [12]:
file_types = ["parquet", "multi-csv", "single-csv", ]
wr = [[writter_parquet,False], [writter_csv, False], [writter_csv, True]]

In [13]:
def scenario1(writter, single_file, file_type):
    df = get_checkouts(spark, file_type)
    writter(df, single_file)
#Read and write
def scenario1_5(writter, single_file, file_type):
    df = get_inventory(spark, file_type)
    writter(df, single_file)

In [14]:
for file_type in file_types:
    for writter in wr:
        print(f"Checkouts. file_type: {file_type}, writter: {writter[0].__name__}, single_file = {writter[1]}")
        %timeit -n 1 -r 1 scenario1(writter[0], writter[1], file_type)
        print(f"Inventory. file_type: {file_type}, writter: {writter[0].__name__}, single_file = {writter[1]}")
        %timeit -n 1 -r 1 scenario1_5(writter[0], writter[1], file_type)

Checkouts. file_type: parquet, writter: writter_parquet, single_file = False
1min 1s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Inventory. file_type: parquet, writter: writter_parquet, single_file = False
8.17 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Checkouts. file_type: parquet, writter: writter_csv, single_file = False
57.2 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Inventory. file_type: parquet, writter: writter_csv, single_file = False
7.73 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Checkouts. file_type: parquet, writter: writter_csv, single_file = True
2min 2s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Inventory. file_type: parquet, writter: writter_csv, single_file = True
8.76 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Checkouts. file_type: multi-csv, writter: writter_parquet, single_file = False
1min 9s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Inventory. file_type: mult

In [15]:
def add_formatted_checkout(spark:SparkSession, file_type:str)->SparkDataFrame:
    df = get_checkouts(spark, file_type)
    df = df.withColumn("CheckoutTime_formated", to_timestamp(from_unixtime(unix_timestamp(col(('CheckoutDateTime')), "MM/dd/yyyy hh:mm:ss aa"))))
    return df

In [16]:
#read add a column and write
def scenario2(writter, single_flag, file_type):
    df = add_formatted_checkout(spark,file_type)
    writter(df, single_flag)
    

In [17]:
for file_type in file_types:
    for writter in wr:
        print(f"file_type: {file_type}, writter: {writter[0].__name__}, single_file = {writter[1]}")
        %timeit -n 1 -r 1 scenario2(writter[0], writter[1], file_type)
        

file_type: parquet, writter: writter_parquet, single_file = False
2min 33s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: parquet, writter: writter_csv, single_file = False
2min 27s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: parquet, writter: writter_csv, single_file = True
7min 40s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_parquet, single_file = False
2min 59s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_csv, single_file = False
2min 44s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_csv, single_file = True
10min 25s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: single-csv, writter: writter_parquet, single_file = False
2min 57s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: single-csv, writter: writter_csv, single_file = False
2min 54s ± 0 ns per

In [28]:
#read add a column and write
def scenario3(writter, single_flag, file_type):
    df = add_formatted_checkout(spark,file_type)
    df = df.withColumn('month', date_format('CheckoutTime_formated','yyyy-MM'))
    df.groupby('month').count().collect()

    

In [29]:
for file_type in file_types:
    for writter in wr:
        print(f"file_type: {file_type}, writter: {writter[0].__name__}, single_file = {writter[1]}")
        %timeit -n 1 -r 1 scenario3(writter[0], writter[1], file_type)
        

file_type: parquet, writter: writter_parquet, single_file = False
1min 40s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: parquet, writter: writter_csv, single_file = False
1min 35s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: parquet, writter: writter_csv, single_file = True
1min 33s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_parquet, single_file = False
2min 8s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_csv, single_file = False
2min 5s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_csv, single_file = True
2min 10s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: single-csv, writter: writter_parquet, single_file = False
2min 7s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: single-csv, writter: writter_csv, single_file = False
2min 8s ± 0 ns per loop

In [32]:
# read and get row count Scenario3
for file_type in file_types:
    print(f"file_type: {file_type}")
    %timeit -n 1 -r 1 print(get_checkouts(spark, file_type).count())

file_type: parquet
94194815
240 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv
91980693
9.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: single-csv
91980693
9.83 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [37]:
#read, column transform + min value of new column compute
for file_type in file_types:
    print(f"file_type: {file_type}")
    %timeit -n 1 -r 1 print(add_formatted_checkout(spark, file_type).select(min("CheckoutTime_formated")).collect())



file_type: parquet
[Row(min(CheckoutTime_formated)=datetime.datetime(2005, 4, 13, 8, 0))]
1min 51s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv
[Row(min(CheckoutTime_formated)=datetime.datetime(2005, 4, 13, 8, 0))]
1min 39s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: single-csv
[Row(min(CheckoutTime_formated)=datetime.datetime(2005, 4, 13, 8, 0))]
1min 33s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [46]:
#read, join, write
def scenario4(writter, single_flag, file_type):
    checkouts = get_checkouts(spark, file_type).select("BibNumber","ItemBarcode")
    inventory = get_inventory(spark, file_type).select("BibNum", "Author")
    merged = checkouts.join(inventory, inventory.BibNum==checkouts.BibNumber, how="inner")
    # row count of merge: 62545918
    writter(merged, single_flag)

In [47]:
for file_type in file_types:
    for writter in wr:
        print(f"file_type: {file_type}, writter: {writter[0].__name__}, single_file = {writter[1]}")
        %timeit -n 1 -r 1 scenario4(writter[0], writter[1], file_type)

file_type: parquet, writter: writter_parquet, single_file = False
29.6 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: parquet, writter: writter_csv, single_file = False
29.6 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: parquet, writter: writter_csv, single_file = True
1min 40s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_parquet, single_file = False
1min 5s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_csv, single_file = False
1min 9s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_csv, single_file = True
2min 29s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: single-csv, writter: writter_parquet, single_file = False
1min 3s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: single-csv, writter: writter_csv, single_file = False
1min 8s ± 0 ns per loop (me

In [50]:
#read, join, group by + count , compute top 10, 
def scenario5(writter, single_flag, file_type):
    checkouts = get_checkouts(spark, file_type).select("BibNumber","ItemBarcode")
    inventory = get_inventory(spark, file_type).select("BibNum", "Author")
    merged = checkouts.join(inventory, inventory.BibNum==checkouts.BibNumber, how="inner")
    merged = (merged.select("BibNum", "Author")
              .groupby('Author')
              .agg(count('BibNum').alias("times_taken"))
              .orderBy(desc("times_taken")).head(10))

In [51]:
for file_type in file_types:
    print(f"file_type: {file_type}, writter: {writter[0].__name__}, single_file = {writter[1]}")
    %timeit -n 1 -r 1 scenario4(writter[0], writter[1], file_type)

file_type: parquet, writter: writter_csv, single_file = True
1min 43s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_csv, single_file = True
2min 29s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: single-csv, writter: writter_csv, single_file = True
2min 30s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [54]:
#read, string split, explode, write
def scenario7(writter, single_flag, file_type):
    inventory = get_inventory(spark, file_type).select("BibNum", "Subjects")
    inventory = inventory.withColumn("Subject", explode(split(col("Subjects"), ",").alias("Subject")))
    inventory = inventory.groupby('Subject').agg(countDistinct('BibNum').alias("subject_occurence"))
    writter(inventory, single_flag)


In [55]:
for file_type in file_types:
    for writter in wr:
        print(f"file_type: {file_type}, writter: {writter[0].__name__}, single_file = {writter[1]}")
        %timeit -n 1 -r 1 scenario7(writter[0], writter[1], file_type)
            

file_type: parquet, writter: writter_parquet, single_file = False
10.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: parquet, writter: writter_csv, single_file = False
9.72 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: parquet, writter: writter_csv, single_file = True
12.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_parquet, single_file = False
11.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_csv, single_file = False
11.4 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: multi-csv, writter: writter_csv, single_file = True
12.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: single-csv, writter: writter_parquet, single_file = False
11.1 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
file_type: single-csv, writter: writter_csv, single_file = False
11.2 s ± 0 ns per loop (mean ± std

In [6]:
spark.read.option("header","true").csv("./data/checkouts/Library_Collection_Inventory.csv").write.format("parquet").mode("overwrite").save(f"./data/checkouts/inventory_parquet")

In [8]:
get_inventory(spark, "parquet").count()

584391