# Apache Spark - Optimization 
by Mohamed Rachidi


To get Setup for this workshop follow these two blog posts: 
- https://mohamed-rachidi.medium.com/part-1-database-setup-for-tpc-ds-e4a684731bed
- https://mohamed-rachidi.medium.com/part-2-setup-jupyter-notebook-and-spark-with-scala-locally-9d46a9423ba0

## Initial Setup



In [4]:

import os
import sys
os.environ["PYSPARK_PYTHON"] = '/usr/local/Cellar/apache-spark/3.0.1/libexec/python/'
os.environ["SPARK_HOME"] = '/usr/local/Cellar/apache-spark/3.0.1/libexec'
sys.path.append("/usr/local/Cellar/apache-spark/3.0.1/libexec/python")
sys.path.append("/usr/local/Cellar/apache-spark/3.0.1/libexec/python/lib/py4j-0.10.4-src.zip")



store_sales = spark.read.format("csv")\
  .option("quote", "")\
  .option("delimiter", "|")\
  .option("ignoreTrailingWhiteSpace", True)\
  .load("/Users/mrachidi/Code/tpcds-kit/tools/store_sales.dat")\
.toDF("ss_sold_date_sk","ss_sold_time_sk","ss_item_sk","ss_customer_sk",\
      "ss_cdemo_sk","ss_hdemo_sk","ss_addr_sk","ss_store_sk","ss_promo_sk",\
      "ss_ticket_number","ss_quantity","ss_wholesale_cost","ss_list_price",\
      "ss_sales_price","ss_ext_discount_amt","ss_ext_sales_price","ss_ext_wholesale_cost",\
      "ss_ext_list_price","ss_ext_tax","ss_coupon_amt","ss_net_paid","ss_net_paid_inc_tax",\
      "ss_net_profit","useless1")
date_dim = spark.read.format("csv")\
  .option("quote", "")\
  .option("delimiter", "|")\
  .option("ignoreTrailingWhiteSpace", True)\
  .load("/Users/mrachidi/Code/tpcds-kit/tools/date_dim.dat")\
.toDF("d_date_sk","d_date_id","d_date","d_month_seq","d_week_seq",\
      "d_quarter_seq","d_year","d_dow","d_moy","d_dom","d_qoy","d_fy_year",\
      "d_fy_quarter_seq","d_fy_week_seq","d_day_name","d_quarter_name","d_holiday",\
      "d_weekend","d_following_holiday","d_first_dom","d_last_dom","d_same_day_ly",\
      "d_same_day_lq","d_current_day","d_current_week","d_current_month","d_current_quarter",\
      "d_current_year","useless2")
item = spark.read.format("csv")\
  .option("quote", "")\
  .option("delimiter", "|")\
  .option("ignoreTrailingWhiteSpace", True)\
  .load("/Users/mrachidi/Code/tpcds-kit/tools/item.dat")\
.toDF("i_item_sk","i_item_id","i_rec_start_date","i_rec_end_date",\
      "i_item_desc","i_current_price","i_wholesale_cost","i_brand_id",\
      "i_brand","i_class_id","i_class","i_category_id","i_category","i_manufact_id",\
      "i_manufact","i_size","i_formulation","i_color","i_units","i_container",\
      "i_manager_id","i_product_name","useless3")
inv = spark.read.format("csv")\
  .option("quote", "")\
  .option("delimiter", "|")\
  .option("ignoreTrailingWhiteSpace", True)\
  .load("/Users/mrachidi/Code/tpcds-kit/tools/inventory.dat")\
.toDF("inv_date_sk","inv_item_sk","inv_warehouse_sk","inv_quantity_on_hand","useless4")

# Visualize Data

In [2]:
date_dim_pd = date_dim.limit(10).toPandas()

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/tpcds") \
    .option("dbtable", "store_sales") \
    .option("user", "postgres") \
    .option("driver", "org.postgresql.Driver") \
    .load()

df.printSchema()
date_dim_pd.sample(10)

root
 |-- ss_sold_date_sk: integer (nullable = true)
 |-- ss_sold_time_sk: integer (nullable = true)
 |-- ss_item_sk: integer (nullable = true)
 |-- ss_customer_sk: integer (nullable = true)
 |-- ss_cdemo_sk: integer (nullable = true)
 |-- ss_hdemo_sk: integer (nullable = true)
 |-- ss_addr_sk: integer (nullable = true)
 |-- ss_store_sk: integer (nullable = true)
 |-- ss_promo_sk: integer (nullable = true)
 |-- ss_ticket_number: integer (nullable = true)
 |-- ss_quantity: integer (nullable = true)
 |-- ss_wholesale_cost: decimal(7,2) (nullable = true)
 |-- ss_list_price: decimal(7,2) (nullable = true)
 |-- ss_sales_price: decimal(7,2) (nullable = true)
 |-- ss_ext_discount_amt: decimal(7,2) (nullable = true)
 |-- ss_ext_sales_price: decimal(7,2) (nullable = true)
 |-- ss_ext_wholesale_cost: decimal(7,2) (nullable = true)
 |-- ss_ext_list_price: decimal(7,2) (nullable = true)
 |-- ss_ext_tax: decimal(7,2) (nullable = true)
 |-- ss_coupon_amt: decimal(7,2) (nullable = true)
 |-- ss_net_p

Unnamed: 0,d_date_sk,d_date_id,d_date,d_month_seq,d_week_seq,d_quarter_seq,d_year,d_dow,d_moy,d_dom,...,d_first_dom,d_last_dom,d_same_day_ly,d_same_day_lq,d_current_day,d_current_week,d_current_month,d_current_quarter,d_current_year,useless2
2,2415024,AAAAAAAAALJNECAA,1900-01-04,0,1,1,1900,3,1,4,...,2415021,2415020,2414659,2414932,N,N,N,N,N,
6,2415028,AAAAAAAAELJNECAA,1900-01-08,0,1,1,1900,0,1,8,...,2415021,2415020,2414663,2414936,N,N,N,N,N,
3,2415025,AAAAAAAABLJNECAA,1900-01-05,0,1,1,1900,4,1,5,...,2415021,2415020,2414660,2414933,N,N,N,N,N,
1,2415023,AAAAAAAAPKJNECAA,1900-01-03,0,1,1,1900,2,1,3,...,2415021,2415020,2414658,2414931,N,N,N,N,N,
8,2415030,AAAAAAAAGLJNECAA,1900-01-10,0,2,1,1900,2,1,10,...,2415021,2415020,2414665,2414938,N,N,N,N,N,
7,2415029,AAAAAAAAFLJNECAA,1900-01-09,0,2,1,1900,1,1,9,...,2415021,2415020,2414664,2414937,N,N,N,N,N,
9,2415031,AAAAAAAAHLJNECAA,1900-01-11,0,2,1,1900,3,1,11,...,2415021,2415020,2414666,2414939,N,N,N,N,N,
4,2415026,AAAAAAAACLJNECAA,1900-01-06,0,1,1,1900,5,1,6,...,2415021,2415020,2414661,2414934,N,N,N,N,N,
5,2415027,AAAAAAAADLJNECAA,1900-01-07,0,1,1,1900,6,1,7,...,2415021,2415020,2414662,2414935,N,N,N,N,N,
0,2415022,AAAAAAAAOKJNECAA,1900-01-02,0,1,1,1900,1,1,2,...,2415021,2415020,2414657,2414930,N,N,N,N,N,


# Running a Large Query

In [115]:
%%time
query_no_lazy = store_sales.join(date_dim.withColumnRenamed("d_date_sk","ss_sold_date_sk"),"ss_sold_date_sk")\
                    .join(item.withColumnRenamed("i_item_sk","ss_item_sk"),"ss_item_sk")\
                    .join(inv.withColumnRenamed("inv_item_sk","ss_item_sk"),"ss_item_sk")
spark.conf.set("spark.sql.shuffle.partitions", 500)
query_no_lazy.write.mode("overwrite").format("parquet").save("/tmp/mohamed/scrates/query_no_lazy")

CPU times: user 367 ms, sys: 212 ms, total: 579 ms
Wall time: 1h 5min 28s


# Use Lazy Loading or Data Skipping 

In [None]:
%%time

from pyspark.sql import functions as F
import pandas as pd
query_no_lazy = store_sales.join(\
                                 date_dim.filter(F.col("d_date").between(pd.to_datetime("2000"),\
                                                                         pd.to_datetime("2001")))\
                                 ,date_dim.d_date_sk == store_sales.ss_sold_date_sk)\
                    .join(item.withColumnRenamed("i_item_sk","ss_item_sk"),"ss_item_sk")\
                    .join(inv, (inv.inv_item_sk == store_sales.ss_item_sk) & \
                          (inv.inv_date_sk == store_sales.ss_sold_date_sk))
spark.conf.set("spark.sql.shuffle.partitions", 500)
query_no_lazy.write.mode("overwrite").format("parquet").save("/tmp/mohamed/scrates/query_no_lazy")

# Tuning number of paritions

In [117]:
%%time

spark.conf.set("spark.sql.shuffle.partitions", 20)
query_no_lazy.write.mode("overwrite").format("parquet").save("/tmp/mohamed/scrates/query_no_lazy")

CPU times: user 3.28 ms, sys: 2.23 ms, total: 5.51 ms
Wall time: 25.1 s


# Tuning Input Partitions 

In [118]:
%%time


spark.conf.set("spark.sql.files.maxPartitionBytes",13417728)
parquet_read = spark.read.parquet("/tmp/mohamed/scrates/query_no_lazy")
parquet_read.rdd.getNumPartitions()

CPU times: user 1.39 ms, sys: 997 µs, total: 2.39 ms
Wall time: 103 ms


20

In [119]:
%%time
spark.conf.set("spark.sql.files.maxPartitionBytes",   16)
parquet_read = spark.read.parquet("/tmp/mohamed/scrates/query_no_lazy")
parquet_read.rdd.getNumPartitions()

CPU times: user 2.06 ms, sys: 1.5 ms, total: 3.56 ms
Wall time: 2.66 s


3173224

# Optimization Broadcast Join 


In [123]:
%%time
from pyspark.sql.functions import broadcast



query_no_lazy = store_sales.join(\
                                 broadcast(date_dim.filter(F.col("d_date").between(pd.to_datetime("2000"),\
                                                                         pd.to_datetime("2001"))))\
                                 ,date_dim.d_date_sk == store_sales.ss_sold_date_sk)\
                    .join(item.withColumnRenamed("i_item_sk","ss_item_sk"),"ss_item_sk")\
                    .join(inv, (inv.inv_item_sk == store_sales.ss_item_sk) & \
                          (inv.inv_date_sk == store_sales.ss_sold_date_sk))

query_no_lazy.write.mode("overwrite").format("parquet").save("/tmp/mohamed/scrates/query_no_lazy")

KeyboardInterrupt: 