In [1]:
#sparksession

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark=SparkSession.builder.appName("spark_job").master("local[*]").getOrCreate()
spark


#local[*] takes all the cores present in the machine, you cant specify number of executors in local mode
#For local development, Spark runs in a single executor but can process tasks in parallel across multiple cores.

In [2]:
sc = spark.sparkContext
print("Number of cores:", sc.defaultParallelism) 

#it gives number of cores in the machine as i used local[*]

Number of cores: 10


In [3]:
#reading parquet files
df_transactions = spark.read.parquet("spark-experiments/data/data_skew/transactions.parquet")

#job-1: creates a job, eventhough the action is not called here it creates a job since it reads only metadata
#stages: 1/1
#tasks: 1/1
#input : None
#output: None
#shuffle_read: None
#shuffle_write: None

In [4]:
df_transactions.rdd.getNumPartitions()

12

In [5]:
df_transactions.show()

#job-1: creates a job since action is called
#stages: 1/1
#tasks: 1/1
#input : 7.6 MiB  (1 MiB= 1024 KiB, 1MB= 1000 KB) [refers to the amount of data read into Spark for processing in this job.]
#output: None
#shuffle_read: None
#shuffle_write: None


#dag details
# Scan parquet

# number of files read: 163
# scan time: 149 ms
# metadata time: 82 ms
# size of files read: 862.7 MiB
# number of output rows: 4,096 

+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+------------+
|   cust_id|start_date|  end_date|         txn_id|      date|year|month|day| expense_type|   amt|        city|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+------------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|   10|  7|Entertainment| 10.42|      boston|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|    3| 27| Motor/Travel| 44.34|    portland|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXHJ6W|2011-04-11|2011|    4| 11|Entertainment|  3.18|     chicago|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TQKL1QFJY3EM8LO|2018-02-22|2018|    2| 22|    Groceries|268.97| los_angeles|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYL6DFP09PPXMVB|2010-10-16|2010|   10| 16|Entertainment|  2.66|     chicago|
|C0YDPQWPBJ|2010-07-01|2018-12-01|T1SMX9EUG21BBSE|2015-02-11|2015|    2| 11|    Education| 54.14|    portland|
|

In [6]:
#reading parquet files
df_customers = spark.read.parquet("spark-experiments/data/data_skew/customers.parquet")

In [7]:
df_customers.show()
df_customers.count() #gives number of rows

+----------+--------------+---+------+----------+-----+------------+
|   cust_id|          name|age|gender|  birthday|  zip|        city|
+----------+--------------+---+------+----------+-----+------------+
|C007YEYTX9|  Aaron Abbott| 34|Female| 7/13/1991|97823|      boston|
|C00B971T1J|  Aaron Austin| 37|Female|12/16/2004|30332|     chicago|
|C00WRSJF1Q|  Aaron Barnes| 29|Female| 3/11/1977|23451|      denver|
|C01AZWQMF3| Aaron Barrett| 31|  Male|  7/9/1998|46613| los_angeles|
|C01BKUFRHA|  Aaron Becker| 54|  Male|11/24/1979|40284|   san_diego|
|C01RGUNJV9|    Aaron Bell| 24|Female| 8/16/1968|86331|      denver|
|C01USDV4EE|   Aaron Blair| 35|Female|  9/9/1974|80078|    new_york|
|C01WMZQ7PN|   Aaron Brady| 51|Female| 8/20/1994|52204|philadelphia|
|C021567NJZ|  Aaron Briggs| 57|  Male| 3/10/1990|22008|philadelphia|
|C023M6MKR3|   Aaron Bryan| 29|  Male| 4/10/1976|05915|philadelphia|
|C0248N0EK3|  Aaron Burton| 26|Female| 8/27/1964|50477| los_angeles|
|C02C54RPNL|  Aaron Burton| 46|  M

5000

In [8]:
df_customers.rdd.getNumPartitions()

1

# Narrow Transformations
- `filter` rows where `city='boston'`
- `add` a new column: adding `first_name` and `last_name`
- `alter` an exisitng column: adding 5 to `age` column
- `select` relevant columns

In [9]:
from pyspark.sql import functions as F
df_narrow_transformations=(
df_customers
 .filter(F.col("city")=="boston")
 .withColumn("age_plus_5",F.col("age")+5)
  .select("cust_id","name","age","gender"))

#it is treated as 1 job, 1 stage and 1 task

In [10]:
#writing back to parquet
df_narrow_transformations.write.format("parquet").mode("overwrite").save("spark-experiments/data/data_skew/modified.parquet")

#job details 
#input - 143.2 KiB [read whole data customer files]
#output - 11.9 KiB [after filteration the size of data taken as output]

# Wide Transformations
1. Joins
   - Sort Merge Join
   - Broadcast Join
2. GroupBy
   - `count`
   - `countDistinct`
   - `sum`

### Sort merge join

In [11]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)  

#spark.sql.autoBroadcastJoinThreshold: This configuration option controls the maximum size of a DataFrame (in bytes) that Spark is allowed to broadcast when performing a join.
#By default, this value is set to 10MB (10 * 1024 * 1024 bytes), meaning that Spark will automatically broadcast DataFrames smaller than this size during a join operation.

#Setting it to -1: By setting this value to -1, you are effectively disabling automatic broadcasting of any DataFrame, regardless of its size. 
#This means Spark will not automatically broadcast any DataFrames during joins, and you will need to manually specify when to use a broadcast join (via F.broadcast()), even for small DataFrames.


In [12]:
df_sort_merge= df_transactions.join(df_customers, on='cust_id',how='inner')

In [13]:
unique_cust_ids_transactions = df_transactions.select("cust_id").distinct()
unique_cust_ids_customers = df_customers.select("cust_id").distinct()
common_cust_ids = unique_cust_ids_transactions.intersect(unique_cust_ids_customers)
print(unique_cust_ids_transactions.count())
print(unique_cust_ids_customers.count())
print(common_cust_ids.count())

2958
5000
2958


In [14]:
df_sort_merge.show(5)
#3 jobs will be created here
#job -1: stages 1, task 12 [since df_transactions.rdd.getNumPartitions() is 12] ,reads the entire data from df_transactions [input: 871.6 MiB] and then shuffles the data [shuffle write : 2.4 GiB] 
#job -2:stages 1, task 1 [since df_customers.rdd.getNumPartitions() is 1], reads the entire data from df_customers [input: 157.2 KiB] and then shuffles the data [shuffle write :383.8 KiB]
#job -3: stages 1, task 1, executes join , shuffle read: 60.0 MiB. 

+----------+----------+--------+---------------+----------+----+-----+---+-------------+-------+-------------+------------+---+------+---------+-----+------+
|   cust_id|start_date|end_date|         txn_id|      date|year|month|day| expense_type|    amt|         city|        name|age|gender| birthday|  zip|  city|
+----------+----------+--------+---------------+----------+----+-----+---+-------------+-------+-------------+------------+---+------+---------+-----+------+
|C00WRSJF1Q|2012-11-01|    null|TXNU40MYVB3QXBU|2018-11-01|2018|   11|  1| Motor/Travel|2129.82|    san_diego|Aaron Barnes| 29|Female|3/11/1977|23451|denver|
|C00WRSJF1Q|2012-11-01|    null|TKGK0XNNTDI0MPX|2014-08-06|2014|    8|  6|    Groceries| 126.65|       boston|Aaron Barnes| 29|Female|3/11/1977|23451|denver|
|C00WRSJF1Q|2012-11-01|    null|T1QLRMJWEYOP8YD|2015-09-10|2015|    9| 10|Entertainment|  28.94|     new_york|Aaron Barnes| 29|Female|3/11/1977|23451|denver|
|C00WRSJF1Q|2012-11-01|    null|T7YCEUYHV6FCVRR|2020

### Broadcast Join

In [15]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)

#This configures the maximum size (in bytes) for a table to be eligible for broadcast joins. In this example:
#10485760 bytes = 10 MB

In [16]:
df_broadcast_transations=(
    df_transactions.join(F.broadcast(df_customers),on='cust_id',how='inner'))

#Even though the broadcast join optimizes the shuffling by broadcasting a small table to all worker nodes, it is still a wide transformation

In [17]:
df_broadcast_transations.show(5)

#2 jobs are created here 
#job -1 : job name - broadcast exchange - 1 stage , 1 task [only 1 partition] reads all the customers data. Input - 157.2 KiB, no shuffling of data here
#job -2: after broadcast exchange - the smaller df is sent for BroadcastHashJoin


+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+--------+---+------+---------+-----+------+
|   cust_id|start_date|  end_date|         txn_id|      date|year|month|day| expense_type|   amt|       city|    name|age|gender| birthday|  zip|  city|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+--------+---+------+---------+-----+------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|   10|  7|Entertainment| 10.42|     boston|Ada Lamb| 32|Female|9/29/2005|22457|denver|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|    3| 27| Motor/Travel| 44.34|   portland|Ada Lamb| 32|Female|9/29/2005|22457|denver|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXHJ6W|2011-04-11|2011|    4| 11|Entertainment|  3.18|    chicago|Ada Lamb| 32|Female|9/29/2005|22457|denver|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TQKL1QFJY3EM8LO|2018-02-22|2018|    2| 22|    Gr

### GroupBy Count

In [8]:
df_city_counts=(df_transactions.groupBy("city").count())


In [9]:
df_city_counts.show(5)

#2 jobs is created here

#job-1 : it is to read all the data of df_transactions data, input - 21.7 MiB, shuffle write - 8.4 KiB

#job-2: HashAggregate happens here, shuffle read - 8.4 KiB
#2 hashaggregate happens here [1st before shuffle and 2nd after shuffle]
#hash aggregate happens in each partition i.e, [partition 1 has A,A,B cities hashaggregate give A-2, B-1, partition 2 has A,A,A,B hashaggreagte gives A-3, B-1] 
#once it is done it shuffles the data like same key under same partition (if data is less then AQE will coalesce all the partitions to 1)
#Now hashaggregate happens again to give final count since A-2,A-3 will combine to give A-5



+---------+-------+
|     city|  count|
+---------+-------+
|san_diego|3977780|
|  chicago|3979023|
|   denver|3980274|
|   boston|3978268|
|  seattle|3980022|
+---------+-------+
only showing top 5 rows



In [20]:
df_transactions.printSchema()

root
 |-- cust_id: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- txn_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- expense_type: string (nullable = true)
 |-- amt: string (nullable = true)
 |-- city: string (nullable = true)



### GroupBy Sum

In [21]:
df_txn_amt_sum=(df_transactions.groupBy("city").agg(F.sum("amt").alias("txn_amt")))

#have to agg function since amt in the schema is a string or can directly use sum('amt') if not in string

In [22]:
df_txn_amt_sum.show(5)

# 2 jobs will be created here
#same as groupby Count

+---------+--------------------+
|     city|             txn_amt|
+---------+--------------------+
|san_diego| 3.297982686000007E8|
|  chicago|3.2988120044000095E8|
|   denver| 3.298814956400023E8|
|   boston| 3.301009563300014E8|
|  seattle|3.3019513776999813E8|
+---------+--------------------+
only showing top 5 rows



### GroupBy Count distinct       
[different job pattern compared to above two]

In [6]:
df_txn_per_city=(df_transactions.groupBy("cust_id").agg(F.countDistinct("city")))

In [10]:
df_txn_per_city.show()
# 2 jobs re created here
#job - 1 : it is to read all the data of df_transactions data, input - 21.7 MiB, shuffle write - 8.4 KiB
#job - 2: HashAggregate happens here, shuffle read - 8.4 KiB
#You should always observe on what keys and function that hashaggregate is happening 


#

+----------+-----------+
|   cust_id|count(city)|
+----------+-----------+
|CPP8BY8U93|         10|
|CYB8BX9LU1|         10|
|CFRT841CCD|         10|
|CA0TSNMYDK|         10|
|COZ8NONEVZ|         10|
|C46OCVH3WG|         10|
|C1QF29WCA6|         10|
|CTJBQB0OJ1|         10|
|CD0DXL8XTM|         10|
|CADBQ5OL5C|         10|
|CUCQ9LBQWW|         10|
|C3NH8CDGWM|         10|
|CEEPXNQ9NQ|         10|
|C7ALJDG81A|         10|
|CUDKFKPAFB|         10|
|C2L2984OZK|         10|
|CDDRDAEY13|         10|
|CIZT509YVA|         10|
|CSTJ6YYXE3|         10|
|CW1X1V0PRG|         10|
+----------+-----------+
only showing top 20 rows



In [None]:
window functions
pivot


example syntax:
window_spec = Window.partitionBy("name").orderBy("month").rowsBetween(Window.unboundedPreceding, Window.currentRow)
window_desc = Window.partitionBy("name").orderBy(F.col("month").desc())


df.groupBy('').agg(F.sum('')).orderBy(f.col('month').desc())



df_with_running_total = df.withColumn("running_total", F.sum("sales").over(window_spec))
df_with_running_total.show()

pivot_df = df.groupBy("name").pivot("month").sum("sales")
pivot_df.show()


