In [0]:
pip install pandas openpyxl xlsxwriter


Python interpreter will be restarted.
Collecting openpyxl
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Collecting xlsxwriter
  Downloading XlsxWriter-3.2.0-py3-none-any.whl (159 kB)
Collecting et-xmlfile
  Downloading et_xmlfile-1.1.0-py3-none-any.whl (4.7 kB)
Installing collected packages: et-xmlfile, xlsxwriter, openpyxl
Successfully installed et-xmlfile-1.1.0 openpyxl-3.1.5 xlsxwriter-3.2.0
Python interpreter will be restarted.


aircrafts transform layer - Add a new attribute “Make” & retain all existing attribute 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when



# Load the aircrafts CSV file
aircrafts_df = spark.read.format("csv").option("header", "true").load("/FileStore/tables/aircrafts-1.csv")

# Add the "Make" column based on the "AircraftName" column in the aircrafts DataFrame
aircrafts_df_with_make = aircrafts_df.withColumn(
    "Make",
    when(col("AircraftName").contains("Airbus"), "Airbus")
    .when(col("AircraftName").contains("Boeing"), "Boeing")
    .otherwise("Unknown")
)

# Display the updated aircrafts DataFrame with the "Make" column
aircrafts_df_with_make.show()

# Optionally, save the updated aircrafts DataFrame with the Make column
aircrafts_df_with_make.write.mode("overwrite").parquet("/path/to/save/aircrafts_with_make_final.parquet")



+------------+--------+----------+------+
|AircraftName|Capacity|TailNumber|  Make|
+------------+--------+----------+------+
| Airbus-A330|     131|     T2366|Airbus|
|  Boeing-747|     213|     O2349|Boeing|
| Airbus-A330|      42|     C2888|Airbus|
|  Boeing-747|     140|     C2896|Boeing|
|  Boeing-777|     191|     F2838|Boeing|
| Airbus-A340|     125|     Y2161|Airbus|
| Airbus-A330|     235|     C2990|Airbus|
|  Boeing-777|     168|     P2752|Boeing|
|  Boeing-787|     240|     T2335|Boeing|
| Airbus-A350|      94|     C2386|Airbus|
| Airbus-A300|      73|     O2378|Airbus|
| Airbus-A340|      48|     Z2357|Airbus|
|  Boeing-767|     209|     J2465|Boeing|
|  Boeing-747|     249|     R2852|Boeing|
| Airbus-A350|      24|     N2252|Airbus|
|  Boeing-787|     168|     H3059|Boeing|
| Airbus-A350|      96|     A2975|Airbus|
|  Boeing-777|     226|     C2750|Boeing|
|  Boeing-747|      29|     S2869|Boeing|
|  Boeing-767|     164|     G2971|Boeing|
+------------+--------+----------+

In [0]:
# Get the total number of rows in the resulting DataFrame
total_rows = aircrafts_df_with_make.count()

# Print the total number of rows
print(f"Total number of rows: {total_rows}")

Total number of rows: 20


TRIPS DATA 

transorm layer trips -  Origin & Destination should be transformed to city name 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, concat, substring, lit, cast
from pyspark.sql.types import StringType, IntegerType, DoubleType

# Initialize Spark session
spark = SparkSession.builder.appName("Trips Data Processing").getOrCreate()

# Load the trips data
df_trips = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/Trips_Data-2.csv")

# Define UDFs for origin and destination mappings
origin = {
    "LAS": "Las Vegas", "DEN": "Denver", "SEA": "Seattle", "DFW": "Dallas",
    "SFO": "San Francisco", "ATL": "Atlanta", "ORD": "Chicago", "LAX": "Los Angeles",
    "MCO": "Orlando", "JFK": "New York"
}
destination = {
    "IAH": "Houston", "BOS": "Boston", "EWR": "Newark", "CLT": "Charlotte",
    "MIA": "Miami", "PHX": "Phoenix", "FLL": "Fort Lauderdale", "DTW": "Detroit",
    "MSP": "Minneapolis", "PHL": "Philadelphia"
}

origin_udf = udf(lambda x: origin.get(x, x), StringType())
destination_udf = udf(lambda x: destination.get(x, x), StringType())

# Add mapped origin and destination columns
df_trips = df_trips.withColumn("origin1", origin_udf(col("origin"))) \
                   .withColumn("dest1", destination_udf(col("destination")))

# Load the updated aircrafts data with the Make column
df_aircrafts = spark.read.format("parquet").load("/path/to/save/aircrafts_with_make_final.parquet")

# Join trips data with aircrafts data
df_trips = df_trips.join(df_aircrafts, df_trips["tailnumber"] == df_aircrafts["tailnumber"]) \
                   .select(df_trips["*"], df_aircrafts["Make"].alias("aircraftname"))

# Select and cast necessary columns
df_trips = df_trips.select(
    col("tailnumber"),
    col("origin1").alias("origin"),
    col("dest1").alias("destination"),
    col("traveldate"),
    col("totalflighttime").cast(IntegerType()),
    col("revenue").cast(DoubleType()),
    col("firstname"),
    col("lastname"),
    col("passport"),
    col("aircraftname"),
    concat(
        substring(col("traveldate"), 6, 2), lit("-"),  # Month
        substring(col("traveldate"), 1, 4)             # Year
    ).alias("totalmonth")
)

# Display the resulting DataFrame
df_trips.show()

# Save the resulting DataFrame as needed
df_trips.write.mode("overwrite").parquet("/path/to/output/directory/trips_data")


+----------+-----------+---------------+----------+---------------+-------+---------+-----------+----------+------------+----------+
|tailnumber|     origin|    destination|traveldate|totalflighttime|revenue|firstname|   lastname|  passport|aircraftname|totalmonth|
+----------+-----------+---------------+----------+---------------+-------+---------+-----------+----------+------------+----------+
|     C2888|   New York|Fort Lauderdale|2021-04-07|              9| 384.93|      Jay|      Singh|P-64549526|      Airbus|   04-2021|
|     H3059|     Denver|        Houston|2021-08-17|              2| 216.11|  Russell|Constantine|H-44102824|      Boeing|   08-2021|
|     J2465|    Seattle|        Detroit|2021-02-03|              6| 195.73|      Jay|      Singh|P-64549526|      Boeing|   02-2021|
|     A2975|    Orlando|Fort Lauderdale|2021-08-12|              1| 351.65|     Phil|    Hawkins|R-68106487|      Airbus|   08-2021|
|     C2386|    Seattle|Fort Lauderdale|2021-06-29|              2| 2

In [0]:
total_rows = df_trips.count()
print(f"Total number of rows: {total_rows}")



Total number of rows: 2999


aggregate layer trips

a) Total Revenue by Travel month.

In [0]:

from pyspark.sql.functions import sum, month

# Group by the month extracted from 'totalmonth' and calculate the total revenue for each month
total_revenue_by_month = df_trips.groupBy(month("totalmonth").alias("travel_month")).agg(sum("revenue").alias("total_revenue"))

# Show the result
total_revenue_by_month.show()

# Convert the PySpark DataFrame to a Pandas DataFrame
result_pdtripsrevenue_df = total_revenue_by_month.toPandas()

# Define the local path to save the file
local_path = "/tmp/tripsrevenue.xlsx"

# Write the Pandas DataFrame to an Excel file in the local file system
result_pdtripsrevenue_df.to_excel(local_path, index=False)

# Define the DBFS path where the file will be saved
dbfs_path = "dbfs:/FileStore/tables/tripsrevenue.xlsx"

# Copy the file from the local file system to DBFS
dbutils.fs.cp(f"file:{local_path}", dbfs_path)

# Print the URL to download the file
print(f"Download URL: https://community.cloud.databricks.com/files/tables/tripsrevenue.xlsx")


+------------+-----------------+
|travel_month|    total_revenue|
+------------+-----------------+
|          12|         73236.44|
|           1|         69188.76|
|           6|58494.95000000003|
|           3|65765.44999999998|
|           5|65789.93000000002|
|           9|66490.07999999999|
|           4|61736.66000000001|
|           8|80308.53999999998|
|           7|69838.15999999997|
|          10|67833.41999999994|
|          11|71063.48999999996|
|           2|61803.07000000001|
+------------+-----------------+

Download URL: https://community.cloud.databricks.com/files/tables/tripsrevenue.xlsx


b) Total Revenue by FirstName, LastName.

In [0]:
# Group by 'firstname' and 'lastname' and calculate the total revenue for each customer
total_revenue_by_customer = df_trips.groupBy("firstname", "lastname").agg(sum("revenue").alias("total_revenue"))

total_revenue_by_customer.show(10)
# Convert the PySpark DataFrame to a Pandas DataFrame
total_revenue_by_customer_df = total_revenue_by_customer.toPandas()

# Define the local path to save the file
local_path = "/tmp/total_revenue_by_customer.xlsx"

# Write the Pandas DataFrame to an Excel file in the local file system
total_revenue_by_customer_df.to_excel(local_path, index=False)
# Define the DBFS path where the file will be saved
dbfs_path = "dbfs:/FileStore/tables/total_revenue_by_customer.xlsx"

# Copy the file from the local file system to DBFS
dbutils.fs.cp(f"file:{local_path}", dbfs_path)


+---------+-----------+------------------+
|firstname|   lastname|     total_revenue|
+---------+-----------+------------------+
|  Russell|Constantine| 94070.26999999996|
|    Billy|      Kohli|104907.63000000006|
|     Mary|      Singh|101128.36000000002|
|   Robert|     Banner|          98323.88|
|      Jay|      Singh|105116.43999999996|
|     Phil|    Hawkins|100715.76000000005|
|  Heather|      Singh|105923.50999999995|
|     Phil|      Singh|101363.09999999996|
+---------+-----------+------------------+

Out[6]: True

d) Show top 5 customer who paid maximum in total revenue by each Travel month

In [0]:
from pyspark.sql.functions import sum, rank, month, concat, lit

# Group by 'totalmonth', 'firstname', and 'lastname', then calculate total revenue for each customer per month
revenue_by_customer_month = df_trips.groupBy("totalmonth", "firstname", "lastname").agg(sum("revenue").alias("total_revenue"))

# Extract month from 'totalmonth' and create a new column 'month'
revenue_by_customer_month = revenue_by_customer_month.withColumn("month", month(col("totalmonth")))

# Combine firstname and lastname into a single column 'name'
revenue_by_customer_month = revenue_by_customer_month.withColumn("name", concat(col("firstname"), lit(" "), col("lastname")))

# Define a window specification for ranking customers within each month based on total revenue
window_spec = Window.partitionBy("month").orderBy(col("total_revenue").desc())

# Add a rank column to rank customers within each month
ranked_customers = revenue_by_customer_month.withColumn("rank", rank().over(window_spec))

# Filter to get the top 5 customers by revenue for each month
top_5_customers_per_month = ranked_customers.filter(col("rank") <= 5)

# Select the desired columns
result_df = top_5_customers_per_month.select("month", "name", "total_revenue", "rank")

result_df.show()

# Convert the PySpark DataFrame to a Pandas DataFrame
top_5_customers_per_month_df = result_df.toPandas()

# Define the local path to save the file
local_path = "/tmp/top_5_customers_per_month.xlsx"

# Write the Pandas DataFrame to an Excel file in the local file system
top_5_customers_per_month_df.to_excel(local_path, index=False)
# Define the DBFS path where the file will be saved
dbfs_path = "dbfs:/FileStore/tables/top_5_customers_per_month.xlsx"

# Copy the file from the local file system to DBFS
dbutils.fs.cp(f"file:{local_path}", dbfs_path)



+-----+-------------------+------------------+----+
|month|               name|     total_revenue|rank|
+-----+-------------------+------------------+----+
|    1|       Phil Hawkins|1777.3500000000001|   1|
|    1|         Mary Singh|           1279.89|   2|
|    1|         Phil Singh|           1246.78|   3|
|    1|         Mary Singh|           1233.27|   4|
|    1|       Phil Hawkins|           1178.33|   5|
|    2|Russell Constantine|1154.6699999999998|   1|
|    2|         Mary Singh|           1088.71|   2|
|    2|       Phil Hawkins|           1079.17|   3|
|    2|       Phil Hawkins| 994.3399999999999|   4|
|    2|Russell Constantine|            983.87|   5|
|    3|Russell Constantine|           1609.96|   1|
|    3|      Heather Singh|           1078.05|   2|
|    3|         Mary Singh|           1040.87|   3|
|    3|      Robert Banner|            967.79|   4|
|    3|Russell Constantine|            957.39|   5|
|    4|         Mary Singh|           1192.19|   1|
|    4|     

c) Total Flying Hours by TailNumber.

In [0]:
# Group by 'tailnumber' and calculate the total flight time for each aircraft
total_flying_hours_by_tailnumber = df_trips.groupBy("tailnumber").agg(sum("totalflighttime").alias("total_flying_hours"))

total_flying_hours_by_tailnumber.show()
# Convert the PySpark DataFrame to a Pandas DataFrame
total_flying_hours_by_tailnumber_df = total_flying_hours_by_tailnumber.toPandas()

# Define the local path to save the file
local_path = "/tmp/total_flying_hours_by_tailnumber.xlsx"

# Write the Pandas DataFrame to an Excel file in the local file system
total_flying_hours_by_tailnumber_df.to_excel(local_path, index=False)
# Define the DBFS path where the file will be saved
dbfs_path = "dbfs:/FileStore/tables/total_flying_hours_by_tailnumber.xlsx"

# Copy the file from the local file system to DBFS
dbutils.fs.cp(f"file:{local_path}", dbfs_path)


+----------+------------------+
|tailnumber|total_flying_hours|
+----------+------------------+
|     T2335|               878|
|     O2378|               838|
|     C2386|               878|
|     C2896|               888|
|     H3059|               737|
|     R2852|               750|
|     C2750|               809|
|     G2971|               871|
|     O2349|               784|
|     Y2161|               942|
|     A2975|               827|
|     N2252|               757|
|     P2752|               808|
|     Z2357|               874|
|     F2838|               856|
|     S2869|               765|
|     J2465|               895|
|     C2990|               863|
|     T2366|               683|
|     C2888|               792|
+----------+------------------+

Out[8]: True

In [0]:
dbutils.fs.mkdirs("/FileStore/tables/")


Out[9]: True

In [0]:
# Convert the PySpark DataFrame to a Pandas DataFrame
result_pd_df = aircrafts_df_with_make_final.toPandas()

# Define the local path to save the file
local_path = "/tmp/trips_with_aircraft_make.xlsx"

# Write the Pandas DataFrame to an Excel file in the local file system
result_pd_df.to_excel(local_path, index=False)


In [0]:
# Define the DBFS path where the file will be saved
dbfs_path = "dbfs:/FileStore/tables/trips_with_aircraft_make.xlsx"

# Copy the file from the local file system to DBFS
dbutils.fs.cp(f"file:{local_path}", dbfs_path)


Out[11]: True

In [0]:
# Convert the PySpark DataFrame to a Pandas DataFrame
result_pdtrips_df = df_trips.toPandas()

# Define the local path to save the file
local_path = "/tmp/trips.xlsx"

# Write the Pandas DataFrame to an Excel file in the local file system
result_pdtrips_df.to_excel(local_path, index=False)

In [0]:
# Define the DBFS path where the file will be saved
dbfs_path = "dbfs:/FileStore/tables/trips.xlsx"

# Copy the file from the local file system to DBFS
dbutils.fs.cp(f"file:{local_path}", dbfs_path)

Out[13]: True

transform layer trips - Retain only Date from TravelDate field. 

In [0]:
# Select the 'traveldate' column from the DataFrame
travel_date_df = df_trips.select("traveldate")

# Convert to Pandas DataFrame
travel_date_pd_df = travel_date_df.toPandas()

# Import pandas library
import pandas as pd

# Write the Pandas DataFrame to an Excel file
dbfs_path = "dbfs:/FileStore/tables/travel_date.xlsx"

dbutils.fs.cp(f"file:{local_path}", dbfs_path)
print("The 'traveldate' column has been saved to 'travel_dates.xlsx'")


The 'traveldate' column has been saved to 'travel_dates.xlsx'


agg layer aircraft - Find the total capacity by Make in current aircraft inventory

In [0]:
# Assuming the 'aircrafts_df_with_make' DataFrame already includes the 'Make' and 'Capacity' columns

# Calculate the total capacity by 'Make'
total_capacity_by_make = aircrafts_df_with_make.groupBy("Make").agg(sum("Capacity").alias("TotalCapacity"))

# Display the results
total_capacity_by_make.show()
# Convert to Pandas DataFrame
total_capacity_by_make_df = total_capacity_by_make.toPandas()
import pandas as pd

# Write the Pandas DataFrame to an Excel file
dbfs_path = "dbfs:/FileStore/tables/total_capacity_by_make.xlsx"

dbutils.fs.cp(f"file:{local_path}", dbfs_path)
print("The 'traveldate' column has been saved to 'total_capacity_by_make.xlsx'")


+------+-------------+
|  Make|TotalCapacity|
+------+-------------+
|Boeing|       1997.0|
|Airbus|        868.0|
+------+-------------+

The 'traveldate' column has been saved to 'total_capacity_by_make.xlsx'
