Set up the environment

In [1]:
import os
import sys

os.environ["JAVA_HOME"] = "JDK 8"
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

Import pyspark

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
import pyspark.sql.functions as F

Create spark session

In [3]:
# Create SparkSession
spark = SparkSession.builder \
      .master("local[*]") \
      .appName("PySparkApplication") \
      .getOrCreate()  

Inferring the schema -- BAD because the whole file needs to be read after loading and before transforming to a dataframe

In [4]:
df = spark.read.option("header", True).csv("sales.csv")

In [5]:
df.explain()

== Physical Plan ==
FileScan csv [order_id#17,customer_id#18,product_name#19,category#20,quantity#21,unit_price#22,order_date#23,sales_rep#24,region#25] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/JoseIungo/Desktop/Github/iungo/product/Optimizing Data ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<order_id:string,customer_id:string,product_name:string,category:string,quantity:string,uni...




We make the schema explicit causing a 3.2 second increase in the speed of reading the csv file

In [4]:
schema = StructType([StructField("order_id", IntegerType(), True),
                     StructField("customer_id", StringType(), True),
                     StructField("product_name", StringType(), True),
                     StructField("category", StringType(), True),
                     StructField("quantity", IntegerType(), True),
                     StructField("unit_price", DoubleType(), True),
                     StructField("order_date", DateType(), True),
                     StructField("sales_rep", StringType(), True),
                     StructField("region", StringType(), True)])

In [5]:
new_df = spark.read.schema(schema).option("header", True).csv("sales.csv")

In [10]:
new_df.explain()

== Physical Plan ==
FileScan csv [order_id#35,customer_id#36,product_name#37,category#38,quantity#39,unit_price#40,order_date#41,sales_rep#42,region#43] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/JoseIungo/Desktop/Github/iungo/product/Optimizing Data ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<order_id:int,customer_id:string,product_name:string,category:string,quantity:int,unit_pric...




In [11]:
query = new_df.groupBy("region", "category").agg(F.sum("quantity").alias("total_quantity"), F.avg("unit_price").alias("avg_price")).filter(F.col("region") == "North America")

In [12]:
query.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[region#8, category#3], functions=[sum(quantity#4), avg(unit_price#5)])
   +- Exchange hashpartitioning(region#8, category#3, 200), ENSURE_REQUIREMENTS, [plan_id=17]
      +- HashAggregate(keys=[region#8, category#3], functions=[partial_sum(quantity#4), partial_avg(unit_price#5)])
         +- Filter (isnotnull(region#8) AND (region#8 = North America))
            +- FileScan csv [category#3,quantity#4,unit_price#5,region#8] Batched: false, DataFilters: [isnotnull(region#8), (region#8 = North America)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/JoseIungo/Desktop/Github/iungo/product/Optimizing Data ..., PartitionFilters: [], PushedFilters: [IsNotNull(region), EqualTo(region,North America)], ReadSchema: struct<category:string,quantity:int,unit_price:double,region:string>




In [13]:
query.show()

+-------------+-----------+--------------+-----------------+
|       region|   category|total_quantity|        avg_price|
+-------------+-----------+--------------+-----------------+
|North America| Appliances|             2|           189.99|
|North America|Electronics|             4|949.9900000000001|
|North America|  Furniture|             2|           699.99|
+-------------+-----------+--------------+-----------------+



In [19]:
new_query = new_df.filter(F.col("region") == "North America").groupBy("region", "category").agg(F.sum("quantity").alias("total_quantity"), F.avg("unit_price").alias("avg_price"))

In [24]:
new_query.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[region#8, category#3], functions=[sum(quantity#4), avg(unit_price#5)])
   +- Exchange hashpartitioning(region#8, category#3, 200), ENSURE_REQUIREMENTS, [plan_id=180]
      +- HashAggregate(keys=[region#8, category#3], functions=[partial_sum(quantity#4), partial_avg(unit_price#5)])
         +- Filter (isnotnull(region#8) AND (region#8 = North America))
            +- FileScan csv [category#3,quantity#4,unit_price#5,region#8] Batched: false, DataFilters: [isnotnull(region#8), (region#8 = North America)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/JoseIungo/Desktop/Github/iungo/product/Optimizing Data ..., PartitionFilters: [], PushedFilters: [IsNotNull(region), EqualTo(region,North America)], ReadSchema: struct<category:string,quantity:int,unit_price:double,region:string>




### Lazy Evaluation#

Lazy evaluation means that we do not do any work until it is necessary.

Some functions that force evaluation:

df.show()
df.count()
df.collect()

In [23]:
new_query.show()

+-------------+-----------+--------------+-----------------+
|       region|   category|total_quantity|        avg_price|
+-------------+-----------+--------------+-----------------+
|North America| Appliances|             2|           189.99|
|North America|Electronics|             4|949.9900000000001|
|North America|  Furniture|             2|           699.99|
+-------------+-----------+--------------+-----------------+



### CPU COUNT
Spark uses all available CPU's when directed to with the opening argument local[*]. However if you need to limit the amount you can use the following code

In [25]:
num = os.cpu_count()
print(num)

16


In [27]:
import math

In [29]:
new_query.repartition(math.floor(num / 8))

DataFrame[region: string, category: string, total_quantity: bigint, avg_price: double]

In [30]:
new_query.show()

+-------------+-----------+--------------+-----------------+
|       region|   category|total_quantity|        avg_price|
+-------------+-----------+--------------+-----------------+
|North America| Appliances|             2|           189.99|
|North America|Electronics|             4|949.9900000000001|
|North America|  Furniture|             2|           699.99|
+-------------+-----------+--------------+-----------------+



### Using a dimension table to broadcast joins 

When you are joining tables in a distributed context, it is better to broadcast the join axcross the network so that each node gets its own copy of the table so as to avoid unneccessary shuffling (where data is moved across nodes)

In [32]:
regions = [("North America", "NA"), ("Europe", "EU"), ("Asia Pacific", "AP")]
regions_df = spark.createDataFrame(regions, ["region", "region_code"])

In [33]:
join = new_df.join(regions_df, "region")

In [36]:
join.show()

+-------------+--------+-----------+-------------------+-----------+--------+----------+----------+-------------+-----------+
|       region|order_id|customer_id|       product_name|   category|quantity|unit_price|order_date|    sales_rep|region_code|
+-------------+--------+-----------+-------------------+-----------+--------+----------+----------+-------------+-----------+
|North America|    1017|       C006|          Microwave| Appliances|       1|    179.99|2024-02-01| David Wilson|         NA|
|North America|    1016|       C011|       Dining Table|  Furniture|       1|    799.99|2024-01-30|Alice Johnson|         NA|
|North America|    1011|       C008|            Blender| Appliances|       1|    199.99|2024-01-25| David Wilson|         NA|
|North America|    1010|       C003|         Tablet Pro|Electronics|       1|    649.99|2024-01-24|Alice Johnson|         NA|
|North America|    1005|       C004|       Smartphone X|Electronics|       1|    899.99|2024-01-19| David Wilson|     

In [34]:
broad_join = new_df.join(F.broadcast(regions_df), "region")

In [35]:
broad_join.show()

+-------------+--------+-----------+-------------------+-----------+--------+----------+----------+-------------+-----------+
|       region|order_id|customer_id|       product_name|   category|quantity|unit_price|order_date|    sales_rep|region_code|
+-------------+--------+-----------+-------------------+-----------+--------+----------+----------+-------------+-----------+
|North America|    1001|       C001|      Laptop Pro 15|Electronics|       2|   1299.99|2024-01-15|Alice Johnson|         NA|
|       Europe|    1002|       C002|Wireless Headphones|Electronics|       1|    199.99|2024-01-16|    Bob Smith|         EU|
| Asia Pacific|    1003|       C003|       Office Chair|  Furniture|       3|    299.99|2024-01-17|  Carol Davis|         AP|
|North America|    1004|       C001|      Standing Desk|  Furniture|       1|    599.99|2024-01-18|Alice Johnson|         NA|
|North America|    1005|       C004|       Smartphone X|Electronics|       1|    899.99|2024-01-19| David Wilson|     