#Optimized PySpark Coding
Optimizing PySpark code involves several techniques that can help improve its performance. Here are some ways we can use to optimize our PySpark code.
let's use the Titanic dataset and apply the PySpark optimization techniques step by step:

##1. Use Broadcast Variables
In this example, we load the Titanic dataset and create a small lookup table containing the Name and Age columns. We then broadcast the lookup table using the broadcast function and join it with the Titanic dataset. By broadcasting the lookup table, we avoid transferring the data multiple times across nodes in the cluster, resulting in faster join operations.

In [0]:
from pyspark.sql.functions import broadcast

# Load the Titanic dataset
titanic_df = spark.read.csv("dbfs:/FileStore/shared_uploads/purvajainpj123@gmail.com/train.csv", header=True, inferSchema=True)
titanic_df.show(5)

# Create a small lookup table
lookup_df = titanic_df.select("Name", "Age").distinct()
lookup_df.show(5)

# Broadcast the lookup table to all nodes in the cluster
joined_df = titanic_df.join(broadcast(lookup_df), on="Name")
joined_df.show(5)


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

##2. Avoid Using UDFs
In this, we load the Titanic dataset and convert the Age column to a float data type using the cast function instead of using a UDF. This avoids the overhead of data serialization and deserialization, resulting in faster processing times.

In [0]:
from pyspark.sql.functions import col

# Convert a column to a different data type using the cast function
titanic_df = titanic_df.withColumn("Age", col("Age").cast("float"))
titanic_df.show(5)


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

##3. Use Caching
Cache it using the cache function. We then perform multiple operations on the cached DataFrame, including filtering by gender and grouping by passenger class. By caching the DataFrame, we avoid recomputing the intermediate results multiple times, resulting in faster processing times.

In [0]:
titanic_df = titanic_df.cache()

# Perform multiple operations on the cached DataFrame
df1 = titanic_df.filter("Sex = 'male'")
df2 = titanic_df.filter("Sex = 'female'")
df3 = titanic_df.groupBy("Pclass").count()

df1.show(5)
df2.show(5)
df3.show(5)


+-----------+--------+------+--------------------+----+----+-----+-----+---------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+---------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|male|22.0|    1|    0|A/5 21171|   7.25| null|       S|
|          5|       0|     3|Allen, Mr. Willia...|male|35.0|    0|    0|   373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|male|null|    0|    0|   330877| 8.4583| null|       Q|
|          7|       0|     1|McCarthy, Mr. Tim...|male|54.0|    0|    0|    17463|51.8625|  E46|       S|
|          8|       0|     3|Palsson, Master. ...|male| 2.0|    3|    1|   349909| 21.075| null|       S|
+-----------+--------+------+--------------------+----+----+-----+-----+---------+-------+-----+--------+
only showing top 5 rows

+-----------+--------

##4. Repartition Your Data
Repartition it into 10 partitions using the repartition function. We then perform a join operation on the repartitioned DataFrame. By repartitioning the DataFrame, we balance the workload across nodes in the cluster and reduce data shuffling, resulting in faster processing times.

In [0]:
titanic_df = titanic_df.repartition(10)

# Perform a join operation on the repartitioned DataFrame
joined_df = titanic_df.join(df2, on="PassengerId")
joined_df.limit(5).toPandas()


Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,...,Pclass.1,Name.1,Sex.1,Age.1,SibSp.1,Parch.1,Ticket.1,Fare.1,Cabin,Embarked
0,701,1,1,"Astor, Mrs. John Jacob (Madeleine Talmadge Force)",female,18.0,1,0,PC 17757,227.525,...,1,"Astor, Mrs. John Jacob (Madeleine Talmadge Force)",female,18.0,1,0,PC 17757,227.525,C62 C64,C
1,581,1,2,"Christy, Miss. Julie Rachel",female,25.0,1,1,237789,30.0,...,2,"Christy, Miss. Julie Rachel",female,25.0,1,1,237789,30.0,,S
2,441,1,2,"Hart, Mrs. Benjamin (Esther Ada Bloomfield)",female,45.0,1,1,F.C.C. 13529,26.25,...,2,"Hart, Mrs. Benjamin (Esther Ada Bloomfield)",female,45.0,1,1,F.C.C. 13529,26.25,,S
3,536,1,2,"Hart, Miss. Eva Miriam",female,7.0,0,2,F.C.C. 13529,26.25,...,2,"Hart, Miss. Eva Miriam",female,7.0,0,2,F.C.C. 13529,26.25,,S
4,597,1,2,"Leitch, Miss. Jessie Wills",female,,0,0,248727,33.0,...,2,"Leitch, Miss. Jessie Wills",female,,0,0,248727,33.0,,S


##5. Use Lazy Evaluation
Here we perform a groupBy and count operation together. By using lazy evaluation, we avoid processing the data twice, resulting in faster processing times.

In [0]:
#perform a groupBy and count operation together

titanic_df.groupBy("Sex").count().show()


+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



##6. Use the Correct Data Types
Convert the Fare column to a decimal data type using the cast function. We then perform a groupBy and sum operation on the Fare column. By using the correct data type, we ensure that the data is processed correctly and avoid data conversion overhead, resulting in faster processing times.

In [0]:
from pyspark.sql.functions import col

# convert the Fare column to a decimal data type

titanic_df = titanic_df.withColumn("Fare", col("Fare").cast("decimal(10,2)"))

# Perform a groupBy and sum operation on the Fare column
titanic_df.groupBy("Name").sum("Fare").show(5)


+--------------------+---------+
|                Name|sum(Fare)|
+--------------------+---------+
|Soholt, Mr. Peter...|     7.65|
|Hagland, Mr. Ingv...|    19.97|
|Nysveen, Mr. Joha...|     6.24|
|Goldenberg, Mrs. ...|    89.10|
|Sinkkonen, Miss. ...|    13.00|
+--------------------+---------+
only showing top 5 rows



##7. Use Efficient Join Algorithms

####Broadcast Hash Join: 
This algorithm is efficient when one DataFrame is much smaller than the other and can be broadcasted to all worker nodes. Here's an example of how to use broadcast hash join to join the Titanic dataset with a lookup DataFrame based on the PassengerId column:

In [0]:
# Load the Titanic dataset
titanic_df = spark.read.csv("dbfs:/FileStore/shared_uploads/purvajainpj123@gmail.com/train.csv", header=True, inferSchema=True)

# Create a small lookup table
lookup_df = titanic_df.select("PassengerID","Name", "Age").distinct()

# Use broadcast hash join
joined_df = titanic_df.join(broadcast(lookup_df), "PassengerID", "inner")
joined_df.limit(5).toPandas()


Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Name.1,Age.1
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S,"Braund, Mr. Owen Harris",22.0
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C,"Cumings, Mrs. John Bradley (Florence Briggs Th...",38.0
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S,"Heikkinen, Miss. Laina",26.0
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",35.0
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S,"Allen, Mr. William Henry",35.0


####Sort Merge Join: 
This algorithm is efficient when both DataFrames are sorted based on the join key. Here's an example of how to use sort merge join to join the Titanic dataset with a lookup DataFrame based on the PassengerId column:

In [0]:
# Sort both DataFrames based on the join key
sorted_titanic_df = titanic_df.sort("PassengerId")
sorted_lookup_df = lookup_df.sort("PassengerId")

# Use sort merge join
joined_df = sorted_titanic_df.join(sorted_lookup_df, "PassengerID", "inner")
joined_df.show(5)


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------------------+----+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|                Name| Age|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------------------+----+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|Braund, Mr. Owen ...|22.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|Cumings, Mrs. Joh...|38.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|Heikkinen, Miss. ...|26.0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|Futrelle, Mrs. Ja...|35.0|
|          5|

####Bucketed Sorted Join: 
This algorithm is efficient when both DataFrames are bucketed and sorted based on the join key. Here's an example of how to use bucketed sorted join to join the Titanic dataset with a lookup DataFrame based on the PassengerId column:

In [0]:
# Bucket both DataFrames based on the join key
bucketed_titanic_df = titanic_df.repartitionByRange(100, "PassengerId")
bucketed_lookup_df = lookup_df.repartitionByRange(100, "PassengerId")

# Sort both bucketed DataFrames based on the join key
sorted_titanic_df = bucketed_titanic_df.sortWithinPartitions("PassengerId")
sorted_lookup_df = bucketed_lookup_df.sortWithinPartitions("PassengerId")

# Use bucketed sorted join
joined_df = sorted_titanic_df.join(sorted_lookup_df, "PassengerId", "inner")
joined_df.limit(5).toPandas()


Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Name.1,Age.1
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S,"Braund, Mr. Owen Harris",22.0
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C,"Cumings, Mrs. John Bradley (Florence Briggs Th...",38.0
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S,"Heikkinen, Miss. Laina",26.0
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",35.0
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S,"Allen, Mr. William Henry",35.0


##8. Use Filters to Reduce Data Size
Filter rows by passenger class and age using the filter function. We then perform a groupBy and count operation on the filtered DataFrame. By using filters to reduce the size of the data, we reduce the workload and avoid processing unnecessary data, resulting in faster processing times.

In [0]:
# Filter the DataFrame using the correct data type
df1 = titanic_df.filter(col("Pclass") == 1)
df1.show(5)


+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|PC 17599|71.2833|  C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1| C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|   17463|51.8625|  E46|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|  113783|  26.55| C103|       S|
|         24|       1|     1|Sloper, Mr. Willi...|  male|28.0|    0|    0|  113788|   35.5|   A6|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
only showing top 5 rows

