In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### **Exploration**

In [0]:
%sql
SELECT * FROM csv.`/Volumes/interview_catalog/source/ansh_volume/pyspark/dim_airports.csv`

In [0]:
%sql
SELECT * FROM csv.`/Volumes/interview_catalog/source/ansh_volume/pyspark/dim_passengers.csv`

In [0]:
%sql
SELECT * FROM csv.`/Volumes/interview_catalog/source/ansh_volume/pyspark/fact_bookings.csv`

### Ques-11
You work as a Data Engineer at an airline company. The business team wants to understand how flight ticket sales are trending over time to plan pricing and promotions.
Using PySpark, how would you analyze the trend of flight ticket sales over time (daily or monthly)?

In [0]:
df = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("/Volumes/interview_catalog/source/ansh_volume/pyspark/fact_bookings.csv")

display(df)

In [0]:
df = df.groupBy("booking_date").agg(round(sum("amount"),2).alias("total_amount")).sort("booking_date")
display(df)

Databricks visualization. Run in Databricks to view.

### Ques-12
You work as a Data Engineer for an airline analytics team. The business wants to identify the top 5 airports based on total flight ticket sales for the last few months.
How would you calculate the top 5 airports by total sales revenue?
Explain the logic.

In [0]:
df_airport = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("/Volumes/interview_catalog/source/ansh_volume/pyspark/dim_airports.csv")

df_booking = spark.read.format("csv")\
                      .option("header", "true")\
                     .option("inferSchema", "true")\
                    .load("/Volumes/interview_catalog/source/ansh_volume/pyspark/fact_bookings.csv")

df_passenger = spark.read.format("csv")\
                      .option("header", "true")\
                     .option("inferSchema", "true")\
                    .load("/Volumes/interview_catalog/source/ansh_volume/pyspark/dim_passengers.csv")


In [0]:
df_join = df_booking.join(df_airport, df_booking['airport_id'] == df_airport['airport_id'], how="left")\
                    .select(df_booking['airport_id'].alias('airport_id'), col('amount'), col('airport_name').alias('airport_name'))

df_group = df_join.groupBy("airport_name").agg(sum('amount').alias('total_amount')).sort("total_amount", ascending=False).limit(5)

display(df_group)



Databricks visualization. Run in Databricks to view.

### Ques-13
Every booking generates a sale_amount for a flight. Management wants to track monthly performance trends, so they ask you to calculate a running total of sales for each flight, but the total should reset at the start of every month.

In [0]:
display(df_booking)

In [0]:
from pyspark.sql.window import Window

df_run = df_booking.withColumn("running_total",sum('amount').over(Window.partitionBy(month('booking_date')).orderBy('booking_date').rowsBetween(Window.unboundedPreceding, Window.currentRow)))

display(df_run)

### Ques-14
You are assigned to find the revenue generated by each city so far and tag those cities as high mid low based on the revenue buckets.
- Less than 1000 (low)
- Between 1000 - 20,000 (mid)
- More than 10,000 (high)



In [0]:
df = df_booking.join(df_airport,df_booking['airport_id'] == df_airport['airport_id'], how="left").select(df_booking['airport_id'].alias('airport_id'), col('amount'), col('airport_name').alias('airport_name'),col('city'))

df = df.groupBy("city").agg(sum('amount').alias("total_amount"))

df = df.withColumn("tag",when(col('total_amount')<1000,lit("low")).when(col("total_amount")<20000,lit("medium")).otherwise(lit("high")))

display(df)

### Ques-15
Multiple source systems send data with different schemas, but all of them contain some common numeric columns that need aggregation.
The business frequently changes:
- Which columns to aggregate
- Which aggregation functions to apply (sum, avg, max, count, etc.)
- Hard-coding logic every time is not acceptable.

In [0]:
class my_utility:


  def __init__(self):
    pass


  def aggregator(self, df, groupby_col,aggregator_col,agg_type):

    df = df.groupBy(groupby_col).agg(agg_type(aggregator_col))

    return df

In [0]:
obj = my_utility()

df_results = obj.aggregator(df_booking,'airport_id','amount',sum)
display(df_results)