In [38]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark optimization").getOrCreate()

# DataFrames

## 1. Creating DataFrames

### Use Columns

In [39]:
# Data as a list of tuples
data = [('James', 34), ('Anna', 20), ('Lee', 30)]

columns = ['Name', 'Age']
df = spark.createDataFrame(data, schema=columns)

### Use Schema

In [40]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType

# Data as a list of tuples
data = [("James", 34), ("Anna", 20), ("Lee", 30)]


# Use Schema
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True)
])
df = spark.createDataFrame(data, schema=schema)

### Use RDD

In [41]:
# Data as a list of tuples
data = [("James", 34), ("Anna", 20), ("Lee", 30)]

rdd = spark.sparkContext.parallelize(data)
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True)
])
df = spark.createDataFrame(rdd, schema=schema)

In [42]:
df.show()

+-----+---+
| Name|Age|
+-----+---+
|James| 34|
| Anna| 20|
|  Lee| 30|
+-----+---+



## 2. Show Database

In [43]:
df.printSchema()
print(df.schema)
print(df.columns)
df.describe().show()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)

StructType([StructField('Name', StringType(), True), StructField('Age', IntegerType(), True)])
['Name', 'Age']
+-------+----+-----------------+
|summary|Name|              Age|
+-------+----+-----------------+
|  count|   3|                3|
|   mean|NULL|             28.0|
| stddev|NULL|7.211102550927978|
|    min|Anna|               20|
|    max| Lee|               34|
+-------+----+-----------------+



## 3. Select Columns

In [44]:
df.select(df[0]).show()
df.select(df.Name).show()
df.select(df["Name"]).show()
df.select("Name").show()

+-----+
| Name|
+-----+
|James|
| Anna|
|  Lee|
+-----+

+-----+
| Name|
+-----+
|James|
| Anna|
|  Lee|
+-----+

+-----+
| Name|
+-----+
|James|
| Anna|
|  Lee|
+-----+

+-----+
| Name|
+-----+
|James|
| Anna|
|  Lee|
+-----+



## 4. Filter Data

In [45]:
df.filter(df[1] > 25).show()
df.filter(df.Age > 25).show()
df.filter(df["Age"] > 25).show()
df.filter('Age > 25').show()

+-----+---+
| Name|Age|
+-----+---+
|James| 34|
|  Lee| 30|
+-----+---+

+-----+---+
| Name|Age|
+-----+---+
|James| 34|
|  Lee| 30|
+-----+---+

+-----+---+
| Name|Age|
+-----+---+
|James| 34|
|  Lee| 30|
+-----+---+

+-----+---+
| Name|Age|
+-----+---+
|James| 34|
|  Lee| 30|
+-----+---+



## 5. Write Dataframe to files

In [46]:
# Writing a JSON file
df.write.json("./output/test.json")

# Writing a Parquet file
df.write.parquet("./output/test.parquet")

## 6. Reading file to Dataframe

In [47]:
# Reading a JSON file
df_json = spark.read.json("./output/test.json")
df_json.show()
# Reading a Parquet file
df_parquet = spark.read.parquet("./output/test.parquet")
df_parquet.show()

+---+-----+
|Age| Name|
+---+-----+
| 34|James|
| 20| Anna|
| 30|  Lee|
+---+-----+

+-----+---+
| Name|Age|
+-----+---+
|James| 34|
| Anna| 20|
|  Lee| 30|
+-----+---+



## 7. Add new Column with new Complex DataType

In [48]:
from pyspark.sql.functions import struct

df2 = df.withColumn("NameAndAge", struct(df.Name, df.Age))
df2.show()
df2.printSchema()

+-----+---+-----------+
| Name|Age| NameAndAge|
+-----+---+-----------+
|James| 34|{James, 34}|
| Anna| 20| {Anna, 20}|
|  Lee| 30|  {Lee, 30}|
+-----+---+-----------+

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- NameAndAge: struct (nullable = false)
 |    |-- Name: string (nullable = true)
 |    |-- Age: integer (nullable = true)



# Query: Grouping and Aggregation

In [49]:
# Create a DataFrame
data = [("James", "Sales", 3000), 
        ("Michael", "Sales", 4600), 
        ("Robert", "Sales", 4100),
        ("Maria", "Finance", 3000),
        ("James", "Sales", 3000),
        ("Scott", "Finance", 3300),
        ("Jen", "Finance", 3900),
        ("Jeff", "Marketing", 3000),
        ("Kumar", "Marketing", 2000),
        ("Saif", "Sales", 4100)]
columns = ["employee_name", "department", "salary"]
df = spark.createDataFrame(data, schema=columns)
df.show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



## 1. count()

In [50]:
from pyspark.sql.functions import count

grouped_df = df.groupBy("department").count()
grouped_df.show()

+----------+-----+
|department|count|
+----------+-----+
|     Sales|    5|
|   Finance|    3|
| Marketing|    2|
+----------+-----+



## max(), min(), avg(), and sum()

In [51]:
# Perform GroupBy and Max
max_df = df.groupBy("department").max("salary").alias("max_salary")
max_df.show()

+----------+-----------+
|department|max(salary)|
+----------+-----------+
|     Sales|       4600|
|   Finance|       3900|
| Marketing|       3000|
+----------+-----------+



In [52]:
# Perform GroupBy and Max
max_df = df.groupBy("department").min("salary").alias("min_salary")
max_df.show()

+----------+-----------+
|department|min(salary)|
+----------+-----------+
|     Sales|       3000|
|   Finance|       3000|
| Marketing|       2000|
+----------+-----------+



In [53]:
# Perform GroupBy and avg
max_df = df.groupBy("department").avg("salary").alias("avg_salary")
max_df.show()

+----------+-----------+
|department|avg(salary)|
+----------+-----------+
|     Sales|     3760.0|
|   Finance|     3400.0|
| Marketing|     2500.0|
+----------+-----------+



In [54]:
# Perform GroupBy and Max
max_df = df.groupBy("department").sum("salary").alias("sum_salary")
max_df.show()

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|     Sales|      18800|
|   Finance|      10200|
| Marketing|       5000|
+----------+-----------+



# 3. agg() + F.max(), F.count() etc…

In [55]:
from pyspark.sql import functions as F

# Perform multiple aggregations
agg_df = df.groupBy("department").agg(
    F.count("salary").alias("count"),
    F.max("salary").alias("max_salary"),
    F.min("salary").alias("min_salary"),
    F.sum("salary").alias("total_salary"),
    F.avg("salary").alias("average_salary")
)
agg_df.show()

+----------+-----+----------+----------+------------+--------------+
|department|count|max_salary|min_salary|total_salary|average_salary|
+----------+-----+----------+----------+------------+--------------+
|     Sales|    5|      4600|      3000|       18800|        3760.0|
|   Finance|    3|      3900|      3000|       10200|        3400.0|
| Marketing|    2|      3000|      2000|        5000|        2500.0|
+----------+-----+----------+----------+------------+--------------+



## 4. agg() + collect_list() and collect_set()

In [56]:
from pyspark.sql.functions import collect_list, collect_set

# Perform GroupBy and collect list
collected_list_df = df.groupBy("department").agg(
  collect_list("salary"),
  collect_set("salary") # drop duplicates
)
collected_list_df.show(truncate=False)

+----------+------------------------------+-------------------+
|department|collect_list(salary)          |collect_set(salary)|
+----------+------------------------------+-------------------+
|Sales     |[3000, 4600, 4100, 3000, 4100]|[4600, 3000, 4100] |
|Finance   |[3000, 3300, 3900]            |[3900, 3000, 3300] |
|Marketing |[3000, 2000]                  |[3000, 2000]       |
+----------+------------------------------+-------------------+



## 5. agg() + User-Defined Aggregation Functions (UDAF)

In [57]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("double")
def mean_salary(s: pd.Series) -> float:
 return s.mean()

udaf_df = df.groupBy("department").agg(
  mean_salary(df["salary"]).alias("average_salary")
)
udaf_df.show()

[Stage 197:>                                                        (0 + 1) / 1]

+----------+--------------+
|department|average_salary|
+----------+--------------+
|   Finance|        3400.0|
| Marketing|        2500.0|
|     Sales|        3760.0|
+----------+--------------+



                                                                                

## 6. agg() + Complex Conditions: when()

In [58]:
df.show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



In [59]:
from pyspark.sql.functions import sum, when, col

# Conditional Aggregation
conditional_agg_df = df.groupBy("department").agg(
    # sum(when(df["salary"] > 3000, df["salary"]).otherwise(0)).alias("sum_high_salaries")
    sum(when(col("salary") > 3000, col("salary")).otherwise(0)).alias("sum_high_salaries")

)
conditional_agg_df.show()

+----------+-----------------+
|department|sum_high_salaries|
+----------+-----------------+
|     Sales|            12800|
|   Finance|             7200|
| Marketing|                0|
+----------+-----------------+



## 6. Using RDD Map Function with GroupBy after agg()

In [60]:
# Applying map operation after GroupBy
result_rdd = df.groupBy("department").agg(
  collect_list("salary")
).rdd.map(
  lambda x: (x[0], max(x[1]))
)
print(result_rdd.collect())

result_df = spark.createDataFrame(result_rdd, ["department", "max_salary"])
result_df.show()

[('Sales', 4600), ('Finance', 3900), ('Marketing', 3000)]
+----------+----------+
|department|max_salary|
+----------+----------+
|     Sales|      4600|
|   Finance|      3900|
| Marketing|      3000|
+----------+----------+



# Query: Something Else

## 1. rollup() and cube()

In [61]:
# Rollup() generates a multidimensional aggregation and provides a hierarchical summary, akin to subtotals in Excel.
rollup_df = df.rollup("department", "employee_name").sum("salary")
rollup_df.show()



+----------+-------------+-----------+
|department|employee_name|sum(salary)|
+----------+-------------+-----------+
|     Sales|        James|       6000|
|      NULL|         NULL|      34000|
|     Sales|         NULL|      18800|
|     Sales|      Michael|       4600|
|     Sales|       Robert|       4100|
|   Finance|         NULL|      10200|
|   Finance|        Maria|       3000|
|   Finance|        Scott|       3300|
|   Finance|          Jen|       3900|
| Marketing|         NULL|       5000|
| Marketing|         Jeff|       3000|
| Marketing|        Kumar|       2000|
|     Sales|         Saif|       4100|
+----------+-------------+-----------+



                                                                                

In [62]:
# Cube(): Cube generates a multidimensional aggregation and provides insights across multiple combinations of the specified grouping columns.
cube_df = df.cube("department", "employee_name").sum("salary")
cube_df.show()

+----------+-------------+-----------+
|department|employee_name|sum(salary)|
+----------+-------------+-----------+
|      NULL|        James|       6000|
|     Sales|        James|       6000|
|      NULL|         NULL|      34000|
|     Sales|         NULL|      18800|
|      NULL|      Michael|       4600|
|     Sales|      Michael|       4600|
|     Sales|       Robert|       4100|
|      NULL|       Robert|       4100|
|   Finance|         NULL|      10200|
|   Finance|        Maria|       3000|
|      NULL|        Maria|       3000|
|      NULL|        Scott|       3300|
|   Finance|        Scott|       3300|
|      NULL|          Jen|       3900|
|   Finance|          Jen|       3900|
| Marketing|         NULL|       5000|
| Marketing|         Jeff|       3000|
|      NULL|         Jeff|       3000|
| Marketing|        Kumar|       2000|
|      NULL|        Kumar|       2000|
+----------+-------------+-----------+
only showing top 20 rows



## 2. groupBy() + pivot()

In [63]:
pivot_df = df.groupBy('department').pivot('employee_name').sum('salary')
pivot_df.show()

+----------+-----+----+----+-----+-----+-------+------+----+-----+
|department|James|Jeff| Jen|Kumar|Maria|Michael|Robert|Saif|Scott|
+----------+-----+----+----+-----+-----+-------+------+----+-----+
|     Sales| 6000|NULL|NULL| NULL| NULL|   4600|  4100|4100| NULL|
|   Finance| NULL|NULL|3900| NULL| 3000|   NULL|  NULL|NULL| 3300|
| Marketing| NULL|3000|NULL| 2000| NULL|   NULL|  NULL|NULL| NULL|
+----------+-----+----+----+-----+-----+-------+------+----+-----+



## 3. Window Functions: partitionBy() + row_number()/rank().over(w)

In [64]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window.partitionBy("department").orderBy(col("salary").asc())
df_with_row_number = df.withColumn("row_number", row_number().over(windowSpec))
df_with_row_number.show()

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         2|
|       Robert|     Sales|  4100|         3|
|         Saif|     Sales|  4100|         4|
|      Michael|     Sales|  4600|         5|
+-------------+----------+------+----------+



In [65]:
from pyspark.sql.functions import rank
windowSpec = Window.partitionBy("department").orderBy(col("salary").desc())
df_with_rank = df.withColumn("rank", rank().over(windowSpec))
df_with_rank.show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|          Jen|   Finance|  3900|   1|
|        Scott|   Finance|  3300|   2|
|        Maria|   Finance|  3000|   3|
|         Jeff| Marketing|  3000|   1|
|        Kumar| Marketing|  2000|   2|
|      Michael|     Sales|  4600|   1|
|       Robert|     Sales|  4100|   2|
|         Saif|     Sales|  4100|   2|
|        James|     Sales|  3000|   4|
|        James|     Sales|  3000|   4|
+-------------+----------+------+----+



# Optimizations I: Lessen Weight

## 1. Cache a DataFrame when it is accessed multiple times.

In [66]:
df.cache()
df.count()

10

## 2. Using Appropriate File Formats

In [69]:
import shutil
import os

In [78]:
# compressed files can save the files I/O and memory
output_dir = "./output/output.parquet"
df.coalesce(1).write \
    .mode("overwrite") \
    .parquet(output_dir)

# Find the single generated parquet file and rename it
parquet_file = [f for f in os.listdir(output_dir) if f.endswith('.parquet')][0]
print(parquet_file)
shutil.move(os.path.join(output_dir, parquet_file), './output/out.parquet')

# Remove the folder created by Spark
shutil.rmtree(output_dir)


part-00000-4a511f89-6f29-419b-a434-e7ecbfae61b4-c000.snappy.parquet


## 3. Specifying Schema Manually

In [93]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
df = spark.read.schema(schema).csv("../data/users.csv")
df.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Arnol| 34|
|  2| Camila| 35|
|  3|Mathieu|  4|
|  4|  Maite|  3|
+---+-------+---+



## 4. Selecting Columns Early

In [88]:
# Select only the necessary columns early in your data processing pipeline to reduce memory usage.
df.select("name", "age").filter("age >= 10").show()
df.select("name", "age").filter(df.age >= 10).show()

+------+---+
|  name|age|
+------+---+
| Arnol| 34|
|Camila| 35|
+------+---+

+------+---+
|  name|age|
+------+---+
| Arnol| 34|
|Camila| 35|
+------+---+



## 5. Apply filters Early, especially before Joins and Aggregations.

In [94]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("id_user", IntegerType(), True),
    StructField("sport", StringType(), True)
])
df_sport = spark.read.schema(schema).csv("../data/user_sports.csv")
df.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Arnol| 34|
|  2| Camila| 35|
|  3|Mathieu|  4|
|  4|  Maite|  3|
+---+-------+---+



In [98]:
df.filter("age > 10").join(df_sport, df["id"] == df_sport["id_user"]).show()

+---+------+---+---+-------+----------+
| id|  name|age| id|id_user|     sport|
+---+------+---+---+-------+----------+
|  1| Arnol| 34|  1|      1|  football|
|  2|Camila| 35|  3|      2|basketball|
+---+------+---+---+-------+----------+



## 6. Using limit() to avoid Collecting Large DataSets

In [100]:
# Avoid using collect() on large datasets to prevent out of memory errors.
df.filter("age < 10").limit(1).collect()

[Row(id=3, name='Mathieu', age=4)]

## 7. Using spark.sql(): Catalyst optimizer for Complex Queries

In [101]:
# Leverage Spark SQL for complex queries, which might be more readable and can benefit from Catalyst optimizer.
df.createOrReplaceTempView("table") 
spark.sql("SELECT count(*) FROM table").show()

+--------+
|count(1)|
+--------+
|       4|
+--------+



## 8. Using RDD: Aggregate with ReduceByKey

In [106]:
rdd = df.rdd.map(lambda x: (x[0], x[1]))
print(rdd.collect())
reduced = rdd.reduceByKey(lambda a, b: a + b)
print(reduced.collect())
reduced.toDF(["key", "value"]).show()

[(1, 'Arnol'), (2, 'Camila'), (3, 'Mathieu'), (4, 'Maite')]
[(1, 'Arnol'), (2, 'Camila'), (3, 'Mathieu'), (4, 'Maite')]
+---+-------+
|key|  value|
+---+-------+
|  1|  Arnol|
|  2| Camila|
|  3|Mathieu|
|  4|  Maite|
+---+-------+



# Optimizations II: No Partitions, No Parallelized

## 1. Partitioning Tables: partitionBy()

In [108]:
df_games = spark.read.option("header",True).csv("../data/juegos.csv")
df_games.show()

+---+-------------+-----+---------+--------------------+
| id| nombre_juego|annio|temporada|              ciudad|
+---+-------------+-----+---------+--------------------+
|  1|  1896 Verano| 1896|   Verano|              Athina|
|  2|  1900 Verano| 1900|   Verano|               Paris|
|  3|  1904 Verano| 1904|   Verano|           St. Louis|
|  4|  1906 Verano| 1906|   Verano|              Athina|
|  5|  1908 Verano| 1908|   Verano|              London|
|  6|  1912 Verano| 1912|   Verano|           Stockholm|
|  7|  1920 Verano| 1920|   Verano|           Antwerpen|
|  8|1924 Invierno| 1924| Invierno|            Chamonix|
|  9|  1924 Verano| 1924|   Verano|               Paris|
| 10|1928 Invierno| 1928| Invierno|        Sankt Moritz|
| 11|  1928 Verano| 1928|   Verano|           Amsterdam|
| 12|1932 Invierno| 1932| Invierno|         Lake Placid|
| 13|  1932 Verano| 1932|   Verano|         Los Angeles|
| 14|1936 Invierno| 1936| Invierno|Garmisch-Partenki...|
| 15|  1936 Verano| 1936|   Ver

In [109]:
# Use partitioning when saving DataFrames to disk for faster subsequent reads.
df_games.write.partitionBy("temporada", "annio").parquet("./output/games.parquet")

                                                                                

## Manage Skew

+ Data skew happens during a join operation when one or more key values have significantly more data than others.
+ For example, if you’re joining on a “customer_id” and most of your transactions belong to a small number of customers, these few keys will end up with a large amount of data compared to other keys. This causes certain tasks (those processing the large keys) to take much longer, resulting in a bottleneck.
+ **This solution** : Add a random prefix to keys to manage skewed data.

In [112]:
from pyspark.sql.functions import monotonically_increasing_id, expr 
df.withColumn("salted_key", 
    expr("concat(name, '_', (monotonically_increasing_id() % 10))")
).groupBy("salted_key").count().select(sum("count")).show()

[Stage 308:>                                                        (0 + 1) / 1]

+----------+
|sum(count)|
+----------+
|         4|
+----------+



                                                                                

In [113]:
# How it balance the loading of data?
from pyspark.sql.functions import monotonically_increasing_id, expr 
df.withColumn("salted_key", 
    expr("concat(name, '_', (monotonically_increasing_id() % 10))")
).groupBy("salted_key").count().show()

+----------+-----+
|salted_key|count|
+----------+-----+
| Mathieu_2|    1|
|  Camila_1|    1|
|   Arnol_0|    1|
|   Maite_3|    1|
+----------+-----+



# Optimizations III: Strategies to Minimize Shuffling

### Strategies to Minimize Shuffling
- **Broadcast Variables**
  - Use for small datasets to avoid shuffling by broadcasting to all nodes.
- **Partition Tuning**
  - Adjust the number of partitions to match the scale of the task and data.
- **Optimize Transformations**
  - Plan operations to minimize wide transformations that require shuffling.

## Shuffling
+ Shuffling is a process where data is redistributed **across different PARTITIONS**.
+ It involves moving data across executors or even **across MACHINES**.
+ It is one of the most expensive operations in terms of network and disk I/O.

### Purpose of Shuffling
+ **Data Redistribution**: Facilitates wide transformations such as: Joins, GroupBy, Aggregations, and Repartitioning
+ **Load Balancing**: Ensures even data and workload distribution across the cluster.
+ **Concurrency**: Enhances parallel processing and optimizes resource utilization.
+ **Optimizing Data Locality**: Moves data closer to where it will be processed. Reduces network traffic.

### Painpoints of Shuffling
+ **Resource Intensive**: Consumes significant network bandwidth and disk I/O.
+ **Increased Latency**: Adds substantial processing time, especially with large datasets.
+ **Potential Bottlenecks**: Can slow down overall system performance if not managed properly.


## 1. Use broadcast join to minimize data shuffling when joining a small DataFrame with a large one.

In [114]:
from pyspark.sql.functions import broadcast

# Create a larger DataFrame for Employees
data_employees = [(1, "John", 101),
                  (2, "Jane", 102),
                  (3, "Joe", 103),
                  (4, "Jill", 101),
                  # Assume many more records exist
                  ]
columns_employees = ["emp_id", "name", "dept_id"]
df_employees = spark.createDataFrame(data_employees, columns_employees)
# Create a small DataFrame for Departments
data_departments = [(101, "HR"),
                    (102, "Marketing"),
                    (103, "Finance"),
                    (104, "IT"),
                    (105, "Support")
                    ]
columns_departments = ["dept_id", "dept_name"]
df_departments = spark.createDataFrame(data_departments, columns_departments)
# Perform a broadcast join
# The goal is to join these datasets on the department ID without causing a large shuffle of the departments dataset.
df_joined = df_employees.join(broadcast(df_departments), "dept_id")
df_joined.show()

                                                                                

+-------+------+----+---------+
|dept_id|emp_id|name|dept_name|
+-------+------+----+---------+
|    101|     1|John|       HR|
|    102|     2|Jane|Marketing|
|    103|     3| Joe|  Finance|
|    101|     4|Jill|       HR|
+-------+------+----+---------+



## 2. Partition Tuning: Repartition to Increase Parallelism

In [115]:
# Repartition a DataFrame to increase or decrease the number of partitions, improving parallelism or reducing shuffle costs.
#But it may still trigger a full shuffle

df = spark.createDataFrame([
  (1, 'foo'), (2, 'bar'), (3, 'baz'), (4, 'qux')
], ["id", "value"])
df_repartitioned = df.repartition(10)  # Increasing the number of partitions

## 3. Partition Tuning: Coalesce to Reduce Partitions

+ **Avoiding Full Shuffle** : coalesce is optimal when you need to reduce the number of partitions after filtering down a large dataset, and you want to avoid the cost of shuffling.
+ **Typical Usage**: This method is often used after filtering a large DataFrame, where many partitions might end up being partially filled or empty. coalesce helps in managing resources more efficiently without the costly network overhead.

In [116]:
# Example of coalesce to reduce partitions without extensive shuffling
df_filtered = df.filter("id > 1")
df_coalesced = df_filtered.coalesce(2)  # Reducing the number of partitions

## 4. Minimize data shuffling by optimizing Transformations

+ Minimizing shuffling in Apache Spark through optimized transformations is a crucial aspect of enhancing the performance of Spark applications.
+ Optimizing transformations involves structuring your data processing operations to reduce unnecessary data movement across the cluster, which can be resource-intensive and slow down execution.

### 4–1. Filter Early

Apply filters as early as possible in your data processing pipeline to reduce the volume of data that needs to be shuffled later in operations like joins or aggregations.

In [117]:
# Create DataFrame
data = [("John", "Finance", 3000), ("Jane", "Marketing", 4000), ("Joe", "Marketing", 2800), ("Jill", "Finance", 3900)]
columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, schema=columns)

# Filter early before any wide transformation
filtered_df = df.filter(df["Salary"] > 3000)

# Now perform aggregation
aggregated_df = filtered_df.groupBy("Department").avg("Salary")
aggregated_df.show()



+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
| Marketing|     4000.0|
|   Finance|     3900.0|
+----------+-----------+



                                                                                

### 4–2. Use RDD/Narrow Transformations Where Possible

**Narrow transformations, such as `map` and `filter`, operate on individual partitions and do not require data shuffling. Use these operations instead of wide transformations when possible**.

In [119]:
# Use map to create a new column without causing a shuffle
rdd = df.rdd.map(lambda x: (x.Name, x.Salary * 1.1))
print(rdd.collect())
updated_salaries_df = spark.createDataFrame(
  rdd, schema=["Name", "UpdatedSalary"]
)
updated_salaries_df.show()

                                                                                

[('John', 3300.0000000000005), ('Jane', 4400.0), ('Joe', 3080.0000000000005), ('Jill', 4290.0)]


[Stage 334:>                                                        (0 + 7) / 7]

+----+------------------+
|Name|     UpdatedSalary|
+----+------------------+
|John|3300.0000000000005|
|Jane|            4400.0|
| Joe|3080.0000000000005|
|Jill|            4290.0|
+----+------------------+



                                                                                

### 4–3. Avoid Unnecessary Shuffles with `join` by Boardcasting join

In [120]:
# For joins, use broadcast joins when one of the datasets is significantly smaller than the other to avoid shuffling the larger dataset.

# Assuming df_small is much smaller than df_large
df_small = spark.createDataFrame(
  [(1, "HR"), (2, "Marketing")], ["id", "Dept"]
)
df_large = spark.createDataFrame(
  [(1, "John"), (2, "Jane"), (1, "Joe"), (2, "Jill")],
  ["DeptId", "Name"]
)
# Broadcast the smaller DataFrame to optimize the join
optimized_join_df = df_large.join(broadcast(df_small), df_large.DeptId == df_small.id)
optimized_join_df.show()

                                                                                

+------+----+---+---------+
|DeptId|Name| id|     Dept|
+------+----+---+---------+
|     1|John|  1|       HR|
|     2|Jane|  2|Marketing|
|     1| Joe|  1|       HR|
|     2|Jill|  2|Marketing|
+------+----+---+---------+



### 4–4. Repartition Strategically

If you have to use wide transformations, repartition data based on keys that you will join or aggregate on later. This strategy can reduce shuffling by co-locating rows with the same key on the same partition.

In [121]:
# Repartition before an aggregation to minimize shuffling
repartitioned_df = df.repartition("Department")
aggregated_df = repartitioned_df.groupBy("Department").avg("Salary")
aggregated_df.show()

[Stage 339:>                                                      (0 + 12) / 12]

+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|   Finance|     3450.0|
| Marketing|     3400.0|
+----------+-----------+



                                                                                

# Performance Monitoring and Fine-Tuning

## 1. Memory Management

In [122]:
spark.conf.set('spark.executor.memory', '4g')
spark.conf.set('spark.driver.memory', '2g')

## 2. Monitoring Tasks and Stages

+ Use Spark UI to monitor the performance of tasks and stages within your application.
+ Access the Spark UI by navigating to: http://[your-spark-driver-host]:4040
+ **Analyzing Executor Metrics**: Monitoring the metrics for each executor can give insights into memory usage, disk spills, and garbage collection:

In [None]:
# Configure Spark to collect detailed executor metrics
spark.conf.set("spark.executor.metrics.pollingInterval", "5000")

## 3. Tuning SQL Performance

Leverage the `EXPLAIN` plan to understand and optimize the SQL execution plan.

In [125]:
df.explain("formatted")

== Physical Plan ==
* Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [3]: [Name#3615, Department#3616, Salary#3617L]
Arguments: [Name#3615, Department#3616, Salary#3617L], MapPartitionsRDD[633] at applySchemaToPythonRDD at <unknown>:0, ExistingRDD, UnknownPartitioning(0)




## 4. Dynamic Allocation

Enable dynamic allocation to allow Spark to adjust the number of executors dynamically based on the workload.

In [None]:
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "20")
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
spark.conf.set("spark.shuffle.service.enabled", "true")

## 5. Data Locality

Optimize data locality by minimizing the distance data has to travel between the storage and processing units.

In [None]:
spark.conf.set("spark.locality.wait", "300ms")

## 6. Garbage Collection Tuning

Tune the garbage collector settings to optimize memory management and reduce pause times.

In [None]:
# Use G1GC for better latency
spark.conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
# Set up explicit GC settings to optimize for short pauses
spark.conf.set("spark.executor.extraJavaOptions", "-XX:MaxGCPauseMillis=100")

## 7. Fine-Tuning Data Serialization

Data serialization plays a vital role in the performance of distributed applications. Spark supports two serializers

In [None]:
# Use Kryo serializer for better performance and efficiency
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "true")

# Register custom classes with Kryo
class MyClass:
    def __init__(self, name, id):
        self.name = name
        self.id = id
spark.sparkContext.getConf().registerKryoClasses([MyClass])

## 8. Optimizing Network Configuration

Network settings can significantly impact the performance, especially in large-scale deployments

In [None]:
# Tune network timeout settings to avoid unnecessary job failures in large clusters
spark.conf.set("spark.network.timeout", "800s")
spark.conf.set("spark.core.connection.ack.wait.timeout", "600s")

## Advanced Spark SQL Tuning

Leveraging the Catalyst optimizer and Tungsten execution engine can enhance the performance of Spark SQL

In [None]:
# Enable whole stage code generation for serialized processing
spark.conf.set("spark.sql.codegen.wholeStage", "true")

# Increase the max number of bytes for broadcasting a table, 
# which is useful for join optimization
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")  # 10 MB

## 10. Optimizing Data Partitioning

Fine-tune the data distribution to enhance query performance and reduce shuffle overhead

In [None]:
# Manually setting the number of shuffle partitions based on data size and operation

spark.conf.set("spark.sql.shuffle.partitions", "200") 
# Adjust based on your cluster size and data

## 11. Enabling Adaptive Query Execution

Adaptive Query Execution (AQE) is a feature that makes Spark SQL queries faster and more robust to data skew and other issues by adapting query plans in runtime.

In [None]:
# Enable Adaptive Query Execution, which can simplify configuration and improve performance
spark.conf.set('spark.sql.adaptive.enabled', 'true')
# AQE can adjust shuffle partitioning, handle skewed joins, and optimize sorts by adapting to the actual data.

## 12. Specifying Memory Management

Proper memory management can help prevent spillages and improve performance, especially for memory-intensive operations.

In [None]:
# Configure the memory fraction to be reserved for RDD storage
spark.conf.set('spark.memory.fraction', '0.6')
spark.conf.set('spark.memory.storageFraction', '0.5')