In [2]:
import findspark
findspark.init()

In [3]:
# Time analysis for pandas read and write operations

import pandas as pd
import time
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
    .appName("My Spark Application") \
    .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.executor.memoryOverhead", "1g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.sql.debug.maxToStringFields", 1000) \
    .getOrCreate()

24/05/27 15:45:48 WARN Utils: Your hostname, Abhimanyus-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.191.2.98 instead (on interface en0)
24/05/27 15:45:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/abhi/.ivy2/cache
The jars for the packages stored in: /Users/abhi/.ivy2/jars
com.crealytics#spark-excel_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-383c2687-0cc3-4d2f-8eda-5ad57acdcb2a;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/abhi/miniconda3/envs/bigdata/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found com.crealytics#spark-excel_2.12;0.13.7 in central
	found org.apache.poi#poi;4.1.2 in central
	found commons-codec#commons-codec;1.13 in central
	found org.apache.commons#commons-collections4;4.4 in central
	found org.apache.commons#commons-math3;3.6.1 in central
	found com.zaxxer#SparseBitSet;1.2 in central
	found org.apache.poi#poi-ooxml;4.1.2 in central
	found org.apache.poi#poi-ooxml-schemas;4.1.2 in central
	found org.apache.xmlbeans#xmlbeans;3.1.0 in central
	found com.github.virtuald#curvesapi;1.06 in central
	found com.norbitltd#spoiwo_2.12;1.8.0 in central
	found org.scala-lang.modules#scala-xml_2.12;1.3.0 in central
	found com.github.pjfanning#excel-streaming-reader;2.3.6 in central
	found com.github.pjfanning#poi-shared-strings;1.0.4 in central
	found com.h2database#h2;1.4.200 in central
	found org.apache.commons#commons-text;1.8 in central
	found org.apache.commons#commons-lang3;3.9 in central
	found xml-apis#xml-apis;1.4.01 in central
	found org.slf4j#slf4j-api;1.7.3

24/05/27 15:46:01 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


# analysis with pandas

In [6]:
# Start the timer
start_time = time.time()

# Read the Excel file
df_layoff_pd = pd.read_excel('tech_layoffs.xlsx')

# Read the TSV file
df_housing_pd = pd.read_csv("latest_weekly_housing_market_data_most_recent.tsv000", sep='\t')

# Stop the timer
end_time = time.time()

# Calculate total duration
total_duration = end_time - start_time

print(f"Total time to run the two lines of code: {total_duration} seconds")

Total time to run the two lines of code: 55.639257192611694 seconds


# analysis with Pyspark

In [7]:
# Start timer for both operations
start_time = time.time()

# Read Excel file
df_layoff_sk = spark.read.format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("dataAddress", "'Sheet1'!A1") \
    .load('tech_layoffs.xlsx')

# Read CSV file
df_housing_sk = spark.read.csv('latest_weekly_housing_market_data_most_recent.tsv000', header=True, sep='\t')

# Calculate and print the total time taken for both read operations
total_time_taken = time.time() - start_time
print(f"Total time taken to read both files: {total_time_taken} seconds")

                                                                                

Total time taken to read both files: 5.870390892028809 seconds


# Parquet write and read with pandas

In [8]:
import time

# Start the timer
start_time = time.time()

# Write the DataFrame from the Excel file to a Parquet file
df_layoff_pd.to_parquet('tech_layoffs_pd.parquet')

# Write the DataFrame from the TSV file to a Parquet file
df_housing_pd.to_parquet('housing_market_pd.parquet')

# Stop the timer
end_time = time.time()

# Calculate total duration
total_duration = end_time - start_time

print(f"Total time to write both DataFrames to Parquet files: {total_duration} seconds")

Total time to write both DataFrames to Parquet files: 12.101958990097046 seconds


# parallel write to parquet with pyspark

In [33]:
#TODO: experimentation write parallel and more memory

In [9]:
# Start the timer
start_time = time.time()

# Read the Parquet file into a DataFrame
df_layoff_pd_pq = pd.read_parquet('tech_layoffs_pd.parquet')
df_housing_pd_pq = pd.read_parquet('housing_market_pd.parquet')

# Stop the timer
end_time = time.time()

# Calculate total duration
total_duration = end_time - start_time

print(f"Total time to read both Parquet files into DataFrames: {total_duration} seconds")

Total time to read both Parquet files into DataFrames: 4.315072059631348 seconds


# Parquet write and read with pyspark

In [10]:
# Start timer for both write operations
start_time = time.time()

# Write the DataFrame to a Parquet file
df_layoff_sk.write.parquet('tech_layoffs_sk.parquet', mode='overwrite')

# Write the DataFrame to a Parquet file
df_housing_sk.write.parquet('housing_market_sk.parquet', mode='overwrite')

# Calculate and print the total time taken for both write operations
total_time_taken = time.time() - start_time
print(f"Total time taken to write both DataFrames to Parquet files: {total_time_taken} seconds")


                                                                                

Total time taken to write both DataFrames to Parquet files: 44.41936421394348 seconds


In [11]:
# Start timer for both read operations
start_time = time.time()

# Read the Parquet file into a DataFrame
df_layoff_sk_pq = spark.read.parquet('tech_layoffs_sk.parquet')

# Read the Parquet file into a DataFrame
df_housing_sk_pq = spark.read.parquet('housing_market_sk.parquet')

# Calculate and print the total time taken for both read operations
total_time_taken = time.time() - start_time
print(f"Total time taken to read both DataFrames from Parquet files: {total_time_taken} seconds")

Total time taken to read both DataFrames from Parquet files: 0.1904592514038086 seconds
