# Partitions Basics

- Partitions are logical divisions of your data
- Each node in the cluster contains one or more partitions

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, count, format_number
from pyspark.sql import functions as f

In [32]:
spark = SparkSession \
    .builder \
    .appName("Spark Tutorial") \
    .master("local[4]") \
    .config("spark.driver.host", "localhost") \
    .enableHiveSupport() \
    .getOrCreate()

In [33]:
spark.sparkContext.uiWebUrl

'http://localhost:4040'

##### Default Behavior
Spark initially distributes based on cores, but that's just the default behavior

In [46]:
print(f"Default parallelism: {spark.sparkContext.defaultParallelism}")

Default parallelism: 4


## Dataset

- Synthetic dataset obtained from Kaggle [here](https://www.kaggle.com/datasets/iamsouravbanerjee/airline-dataset/data?select=Airline+Dataset+Updated+-+v2.csv)

In [35]:
df = spark.read.csv('airlines.csv', header=True, inferSchema=True)
df.show(n=10)

+------------+----------+---------+------+---+--------------------+--------------------+--------------------+--------------+-----------------+-------------+--------------+---------------+-------------------+-------------+
|Passenger ID|First Name|Last Name|Gender|Age|         Nationality|        Airport Name|Airport Country Code|  Country Name|Airport Continent|   Continents|Departure Date|Arrival Airport|         Pilot Name|Flight Status|
+------------+----------+---------+------+---+--------------------+--------------------+--------------------+--------------+-----------------+-------------+--------------+---------------+-------------------+-------------+
|      ABVWIg|    Edithe|   Leggis|Female| 62|               Japan|    Coldfoot Airport|                  US| United States|              NAM|North America|     6/28/2022|            CXF|Fransisco Hazeldine|      On Time|
|      jkXXAX|    Elwood|     Catt|  Male| 62|           Nicaragua|   Kugluktuk Airport|                  CA|   

In [36]:
df.dtypes

[('Passenger ID', 'string'),
 ('First Name', 'string'),
 ('Last Name', 'string'),
 ('Gender', 'string'),
 ('Age', 'int'),
 ('Nationality', 'string'),
 ('Airport Name', 'string'),
 ('Airport Country Code', 'string'),
 ('Country Name', 'string'),
 ('Airport Continent', 'string'),
 ('Continents', 'string'),
 ('Departure Date', 'string'),
 ('Arrival Airport', 'string'),
 ('Pilot Name', 'string'),
 ('Flight Status', 'string')]

In [37]:
print(f'Number of partitions {df.rdd.getNumPartitions()}')
print(f'Number of rows {df.count()}')

Number of partitions 4
Number of rows 98619


## Percentage of cancelled fligths

In [38]:
def calculate_cancellation_percentages(df):
    # Add cancellation flag
    df_with_flag = df.withColumn('is_cancelled', 
        when(col('Flight Status') == 'Cancelled', 1).otherwise(0))
    
    # Calculate percentages by country
    result = df_with_flag.groupBy('Country Name').agg(
        (count(when(col('is_cancelled') == 1, True)) / count('*')).alias('cancellation_percentage'),
        count('*').alias('total_flights')
    )
    
    # Format and sort results
    return result.withColumn(
        'cancellation_percentage', 
        format_number('cancellation_percentage', 2)
    ).orderBy('cancellation_percentage', ascending=False)

In [39]:
spark.sparkContext.setJobDescription("Calculating cancellation percentage for each country")
result = calculate_cancellation_percentages(df)
result.show()

+--------------------+-----------------------+-------------+
|        Country Name|cancellation_percentage|total_flights|
+--------------------+-----------------------+-------------+
|             Andorra|                   0.57|            7|
|            Barbados|                   0.54|           13|
|            Djibouti|                   0.52|           44|
|            Anguilla|                   0.50|           14|
|Cocos (Keeling) I...|                   0.50|            8|
|       Guinea-Bissau|                   0.50|           10|
|             Georgia|                   0.49|           35|
|             Grenada|                   0.48|           23|
|      American Samoa|                   0.47|           45|
|                Guam|                   0.46|           26|
|             Réunion|                   0.45|           20|
|             Hungary|                   0.45|           65|
|              Serbia|                   0.44|           43|
|               Aruba|  

In [40]:
spark.sparkContext.setJobDescription("Filtering results for United States")
result.filter(col("Country Name") == "United States").show()

+-------------+-----------------------+-------------+
| Country Name|cancellation_percentage|total_flights|
+-------------+-----------------------+-------------+
|United States|                   0.33|        22104|
+-------------+-----------------------+-------------+



In [41]:
spark.sparkContext.setJobDescription("Checking partitions uniformity for source 'result_df'")

def rows_per_partition(sdf) -> None:
    sdf_part = sdf.withColumn("partition_id", f.spark_partition_id())
    sdf_part_count = sdf_part.groupBy("partition_id").count()
    sdf_part_count = sdf_part_count.withColumn("count_perc", 100*f.col("count")/sdf.count())
    sdf_part_count.orderBy("partition_id").show()


rows_per_partition(result)

+------------+-----+----------+
|partition_id|count|count_perc|
+------------+-----+----------+
|           0|  235|     100.0|
+------------+-----+----------+



In [42]:
spark.sparkContext.setJobDescription("Checking partitions uniformity for source 'DF'")
rows_per_partition(df)

+------------+-----+------------------+
|partition_id|count|        count_perc|
+------------+-----+------------------+
|           0|32795| 33.25424106916517|
|           1|32796| 33.25525507255194|
|           2|32801|  33.2603250894858|
|           3|  227|0.2301787687970878|
+------------+-----+------------------+



In [43]:
spark.sparkContext.setJobDescription("Repartitioning the DataFrame")
df_repartitioned = df.repartition(4)
rows_per_partition(df_repartitioned)

+------------+-----+------------------+
|partition_id|count|        count_perc|
+------------+-----+------------------+
|           0|24655|25.000253500846693|
|           1|24655|25.000253500846693|
|           2|24654| 24.99923949745992|
|           3|24655|25.000253500846693|
+------------+-----+------------------+



In [44]:
spark.sparkContext.setJobDescription("Calculating cancellation percentage for each country - AFTER REPARTITIONING")
result = calculate_cancellation_percentages(df)
result.show()

+--------------------+-----------------------+-------------+
|        Country Name|cancellation_percentage|total_flights|
+--------------------+-----------------------+-------------+
|             Andorra|                   0.57|            7|
|            Barbados|                   0.54|           13|
|            Djibouti|                   0.52|           44|
|            Anguilla|                   0.50|           14|
|Cocos (Keeling) I...|                   0.50|            8|
|       Guinea-Bissau|                   0.50|           10|
|             Georgia|                   0.49|           35|
|             Grenada|                   0.48|           23|
|      American Samoa|                   0.47|           45|
|                Guam|                   0.46|           26|
|             Réunion|                   0.45|           20|
|             Hungary|                   0.45|           65|
|              Serbia|                   0.44|           43|
|               Aruba|  