## Imports

In [1]:
# Import Spark
import pyspark
from pyspark.sql import SparkSession, functions as F, types as T

In [2]:

spark = SparkSession.builder.master("local[*]").getOrCreate()
print(spark.version)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/07 09:05:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


4.0.0


## Instantiate a Spark session

In [3]:
# Create a Spark session
spark = (SparkSession.builder
         # Give a name to the Spark application
         .appName("spark-fundamentals-lab")
         # Execute the Spark using the resources of the local device
         .master("local[*]")
         # Activate AQE to optimize shuffle and join strategy
         .config("spark.sql.adaptive.enabled", "true")
         # Looks for another Spark session to activate it, or create one if needed
         .getOrCreate())

25/09/07 09:05:49 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# Print the Spark version
print("Spark version:", spark.version)

Spark version: 4.0.0


## Load the dataframe and write it in Parquet

### Load the dataframe

In [5]:
# Create a plan to load the csv, in multiple steps (nothing is loaded until an action is performed)
df = (
    spark.read
    # Set the first rows as headers
    .option("header", True)
    # Deduct data types
    .option("inferSchema", True)
    # Import the csv 
    .csv("data/airlines_flights_data.csv")
)

                                                                                

In [6]:
# Print the schema
df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- source_city: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stops: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- class: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- days_left: integer (nullable = true)
 |-- price: integer (nullable = true)



In [7]:
# Execute an action to load the data: load the 5 first rows of the dataframe
df.show(5, truncate=False) # truncate=False is to be sure we print the entire content of a cell

+-----+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|index|airline |flight |source_city|departure_time|stops|arrival_time |destination_city|class  |duration|days_left|price|
+-----+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|0    |SpiceJet|SG-8709|Delhi      |Evening       |zero |Night        |Mumbai          |Economy|2.17    |1        |5953 |
|1    |SpiceJet|SG-8157|Delhi      |Early_Morning |zero |Morning      |Mumbai          |Economy|2.33    |1        |5953 |
|2    |AirAsia |I5-764 |Delhi      |Early_Morning |zero |Early_Morning|Mumbai          |Economy|2.17    |1        |5956 |
|3    |Vistara |UK-995 |Delhi      |Morning       |zero |Afternoon    |Mumbai          |Economy|2.25    |1        |5955 |
|4    |Vistara |UK-963 |Delhi      |Morning       |zero |Morning      |Mumbai          |Economy|2.33    |1        |5955 |
+-----+--------+-------+

### Write the dataframe in Parquet

In [8]:
# Write in Parquet
(
    df.write
      # Removes and entirely reloads data
      .mode("overwrite")
      # Convert the dataframe to Parquet and export it to the data folder
      .parquet("data/silver/airlines_flights_data")
)

                                                                                

### Reload the data in Parquet

In [9]:
dfp = spark.read.parquet("data/silver/airlines_flights_data")

In [10]:
# Print the schema (columns and their respective types)
dfp.printSchema()

root
 |-- index: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- source_city: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stops: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- class: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- days_left: integer (nullable = true)
 |-- price: integer (nullable = true)



In [11]:
# Count the number of rows in the dataframe (equivalent of len in pandas)
dfp.count()

300153

In [12]:
# Print the main statistics on the dataframe (equivalent of describe in pandas)
dfp.summary().show()

25/09/07 09:05:52 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+-----------------+-------+--------------------+-----------+--------------+------+------------+----------------+--------+------------------+------------------+------------------+
|summary|            index|airline|              flight|source_city|departure_time| stops|arrival_time|destination_city|   class|          duration|         days_left|             price|
+-------+-----------------+-------+--------------------+-----------+--------------+------+------------+----------------+--------+------------------+------------------+------------------+
|  count|           300153| 300153|              300153|     300153|        300153|300153|      300153|          300153|  300153|            300153|            300153|            300153|
|   mean|         150076.0|   NULL|5.427411873908628...|       NULL|          NULL|  NULL|        NULL|            NULL|    NULL|12.221020812718939|26.004750910369044|20889.660523133203|
| stddev|86646.85201148395|   NULL|1.803651814074487...|       NU

                                                                                

## Explore the data

### First rows

In [13]:
dfp.show(5, truncate=False)

+-----+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|index|airline |flight |source_city|departure_time|stops|arrival_time |destination_city|class  |duration|days_left|price|
+-----+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|0    |SpiceJet|SG-8709|Delhi      |Evening       |zero |Night        |Mumbai          |Economy|2.17    |1        |5953 |
|1    |SpiceJet|SG-8157|Delhi      |Early_Morning |zero |Morning      |Mumbai          |Economy|2.33    |1        |5953 |
|2    |AirAsia |I5-764 |Delhi      |Early_Morning |zero |Early_Morning|Mumbai          |Economy|2.17    |1        |5956 |
|3    |Vistara |UK-995 |Delhi      |Morning       |zero |Afternoon    |Mumbai          |Economy|2.25    |1        |5955 |
|4    |Vistara |UK-963 |Delhi      |Morning       |zero |Morning      |Mumbai          |Economy|2.33    |1        |5955 |
+-----+--------+-------+

### Filtered rows

In [14]:
# filter is a transformation --> By executing only this command, we only get a plan
dfp.filter("airline=='SpiceJet'")

# By adding the action show, we can execute a query and print a result dataframe
dfp.filter("airline=='SpiceJet'").show(5, truncate=False)

# Explain the execution plan of the query
dfp.filter("airline=='SpiceJet'").explain()

# Print the number of rows matching with the condition
dfp.filter("airline=='SpiceJet'").count()

+-----+--------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+
|index|airline |flight |source_city|departure_time|stops|arrival_time|destination_city|class  |duration|days_left|price|
+-----+--------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+
|0    |SpiceJet|SG-8709|Delhi      |Evening       |zero |Night       |Mumbai          |Economy|2.17    |1        |5953 |
|1    |SpiceJet|SG-8157|Delhi      |Early_Morning |zero |Morning     |Mumbai          |Economy|2.33    |1        |5953 |
|28   |SpiceJet|SG-8169|Delhi      |Evening       |zero |Night       |Mumbai          |Economy|2.33    |1        |10260|
|38   |SpiceJet|SG-2976|Delhi      |Evening       |one  |Night       |Mumbai          |Economy|4.5     |1        |12123|
|39   |SpiceJet|SG-2976|Delhi      |Evening       |one  |Morning     |Mumbai          |Economy|15.25   |1        |12123|
+-----+--------+-------+--------

9011

### Query the data using SQL

In [15]:
# Create a temporary view
df.createOrReplaceTempView("flights")

# Query this temporary view by using SQL
spark.sql(
    "SELECT * FROM flights LIMIT 10"
).show()

+-----+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|index| airline| flight|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|price|
+-----+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|    0|SpiceJet|SG-8709|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1| 5953|
|    1|SpiceJet|SG-8157|      Delhi| Early_Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5953|
|    2| AirAsia| I5-764|      Delhi| Early_Morning| zero|Early_Morning|          Mumbai|Economy|    2.17|        1| 5956|
|    3| Vistara| UK-995|      Delhi|       Morning| zero|    Afternoon|          Mumbai|Economy|    2.25|        1| 5955|
|    4| Vistara| UK-963|      Delhi|       Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5955|
|    5| Vistara| UK-945|

### Partition the data

In [16]:
df.repartition(10, "source_city").show()

+-----+---------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+
|index|  airline| flight|source_city|departure_time|stops|arrival_time|destination_city|  class|duration|days_left|price|
+-----+---------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+
|84065|  AirAsia| I5-996|  Bangalore|       Evening| zero|       Night|           Delhi|Economy|    2.83|        1| 7489|
|84066|  Vistara| UK-820|  Bangalore|       Evening| zero|       Night|           Delhi|Economy|    2.67|        1| 7489|
|84067|  Vistara| UK-802|  Bangalore|       Evening| zero|       Night|           Delhi|Economy|    2.75|        1| 7489|
|84068|   Indigo|6E-6139|  Bangalore|     Afternoon| zero|     Evening|           Delhi|Economy|    2.75|        1| 7488|
|84069|   Indigo|6E-2514|  Bangalore|       Evening| zero|     Evening|           Delhi|Economy|    2.83|        1| 7488|
|84070|   Indigo|6E-2027

## Basic analyses

In [None]:
# Print the trip with the highest average price
# Initiate a SQL query in a string variable
sql_query_1 = "SELECT " \
"                   source_city, " \
"                   destination_city, " \
"                   AVG(price) AS average_price " \
"            FROM " \
"                   flights " \
"            GROUP BY " \
"                   source_city, " \
"                   destination_city " \
"            ORDER BY " \
"                   average_price DESC"

# Execute the query using Spark SQL and put into a list
output_1 = spark.sql(
    sql_query_1
).collect() 

print(output_1)

# The most expensive flight seems to be the one between Chennai and Bangalore
# WARNING: Data is not processed yet, so this is just an assumption that needs to be checked

print(f"The most expensive flight seems to be the one between {output_1[0][0]} and {output_1[0][1]}, with a price of {round(output_1[0][2])}")

print("WARNING: Data is not processed yet, so this is just an assumption that needs to be checked")

[Row(source_city='Chennai', destination_city='Bangalore', average_price=25081.85045433544), Row(source_city='Kolkata', destination_city='Chennai', average_price=23660.36104013227), Row(source_city='Bangalore', destination_city='Kolkata', average_price=23500.061228560033), Row(source_city='Bangalore', destination_city='Chennai', average_price=23321.85007800312), Row(source_city='Mumbai', destination_city='Bangalore', average_price=23147.87380675204), Row(source_city='Bangalore', destination_city='Mumbai', average_price=23128.618672231238), Row(source_city='Mumbai', destination_city='Chennai', average_price=22781.89911154985), Row(source_city='Chennai', destination_city='Mumbai', average_price=22765.849646605267), Row(source_city='Kolkata', destination_city='Bangalore', average_price=22744.80842833876), Row(source_city='Chennai', destination_city='Kolkata', average_price=22669.93240727481), Row(source_city='Mumbai', destination_city='Kolkata', average_price=22379.146722742422), Row(sourc

In [44]:
# Print the most represented airline
# Initiate a SQL query in a string variable
sql_query_2 = "SELECT " \
"                   airline," \
"                   COUNT(*) AS number_of_occurrences" \
"              FROM" \
"                   flights" \
"              GROUP BY " \
"                   airline" \
"              ORDER BY " \
"                   number_of_occurrences DESC" \
"               LIMIT " \
"                   1"

# Execute the query using Spark SQL and put into a list
output_2 = spark.sql(
    sql_query_2
).collect() 

print(output_2)

# Vistara seems to be the most represented airline, with 127,589 occurrences
# WARNING: Data is not processed yet, so this is just an assumption that needs to be checked

print(f"The most represented airline seems to be {output_2[0][0]}, with {output_2[0][1]} occurrences.")
print("WARNING: Data is not processed yet, so this is just an assumption that needs to be checked")

[Row(airline='Vistara', number_of_occurrences=127859)]
The most represented airline seems to be Vistara, with 127859 occurrences.
