In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": "33e557d4-d040-46fa-b9e2-6847bfe22386",
          "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="adf1",key="FlightADLSKEY"),
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/b71022e1-db54-4ee1-a8bb-eb361d24e2ea/oauth2/token"}
#dbutils.fs.mount(
  #source = "abfss://flightdata@indiaflightdata.dfs.core.windows.net/",
  #mount_point = "/mnt/flightdata",
  #extra_configs = configs)

#dbutils.fs.mount(
  #source = "abfss://bronzezoneconversions@indiaflightdata.dfs.core.windows.net/",
  #mount_point = "/mnt/bronzezoneconversions",
  #extra_configs = configs)

#dbutils.fs.mount(
  #source = "abfss://goldzoneflightstats@indiaflightdata.dfs.core.windows.net/",
  #mount_point = "/mnt/goldzoneflightstats",
  #extra_configs = configs)

#dbutils.fs.mount(
  #source = "abfss://silverzoneflightdata@indiaflightdata.dfs.core.windows.net/",
  #mount_point = "/mnt/silverzoneflightdata",
  #extra_configs = configs)


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Null Handling") \
    .getOrCreate()

# Define the file path
file_path = "/mnt/flightdata/Flight_Schedule.csv"

# Read the CSV file into a DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Define a dictionary to store data types for each column
column_data_types = {
    "airline": "STRING",
    "flightNumber": "INT",
    "origin": "STRING",
    "destination": "STRING",
    "daysOfWeek": "STRING",
    "scheduledDepartureTime": "TIMESTAMP",
    "scheduledArrivalTime": "TIMESTAMP",
    "timezone": "STRING",
    "validFrom": " DateType",
    "validTo": " DateType",
    "lastUpdated": " DateType"
}

# Iterate through each column and apply null handling based on its data type
for col, data_type in column_data_types.items():
    if data_type == "INT":
        df = df.withColumn(col, F.when(F.col(col).isNotNull(), F.col(col)).otherwise(-1).cast(IntegerType()))
    elif data_type == "TIMESTAMP":
         df = df.withColumn(col, F.when(F.col(col).isNotNull(), F.to_timestamp(F.concat(F.lit("1970-01-01 "), F.col(col)), "yyyy-MM-dd HH:mm")).otherwise(None))
    elif data_type == "STRING":
        df = df.withColumn(col, F.when(F.col(col).isNotNull(), F.col(col)).otherwise("NA").cast(StringType()))

# Write the DataFrame as a Parquet file
parquet_output_path = "/mnt/bronzezoneconversions/Flight_Schedule.parquet"
df.write.parquet(parquet_output_path, mode="overwrite")

# Show the resulting DataFrame
df.display()


airline,flightNumber,origin,destination,daysOfWeek,scheduledDepartureTime,scheduledArrivalTime,timezone,validFrom,validTo,lastUpdated
GoAir,425,Delhi,Hyderabad,"Sunday,Monday,Tuesday,Wednesday,Thursday,Friday,Saturday",1970-01-01T05:45:00Z,,2019-03-30,2018-10-28,2019-03-30,2023-11-05
GoAir,423,Delhi,Hyderabad,Saturday,1970-01-01T07:30:00Z,,2018-10-28,2018-10-28,2018-10-28,2023-11-05
GoAir,423,Delhi,Hyderabad,Friday,1970-01-01T07:30:00Z,,2018-12-01,2018-11-03,2018-12-01,2023-11-05
GoAir,423,Delhi,Hyderabad,Friday,1970-01-01T07:30:00Z,,2019-03-30,2019-02-02,2019-03-30,2023-11-05
GoAir,423,Delhi,Hyderabad,"Sunday,Monday,Tuesday,Wednesday,Thursday,Saturday",1970-01-01T07:30:00Z,,2018-11-30,2018-10-29,2018-11-30,2023-11-05
GoAir,423,Delhi,Hyderabad,"Sunday,Monday,Tuesday,Wednesday,Thursday,Saturday",1970-01-01T07:30:00Z,,2019-03-29,2019-02-01,2019-03-29,2023-11-05
GoAir,423,Delhi,Hyderabad,"Sunday,Monday,Tuesday,Wednesday,Thursday,Friday,Saturday",1970-01-01T07:30:00Z,,2019-01-31,2018-12-02,2019-01-31,2023-11-05
GoAir,422,Delhi,Hyderabad,"Sunday,Monday,Tuesday,Wednesday,Thursday,Friday,Saturday",1970-01-01T20:55:00Z,,2019-03-30,2018-10-28,2019-03-30,2023-11-05
GoAir,559,Lucknow,Hyderabad,"Sunday,Tuesday,Wednesday,Thursday,Friday,Saturday",1970-01-01T15:45:00Z,1970-01-01T17:45:00Z,2019-03-30,2018-10-28,2019-03-30,2023-11-05
GoAir,580,Kochi,Hyderabad,"Sunday,Monday,Tuesday,Wednesday,Thursday,Friday,Saturday",1970-01-01T04:45:00Z,1970-01-01T06:15:00Z,2019-03-30,2018-10-28,2019-03-30,2023-11-05


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Null Handling") \
    .getOrCreate()

# Define the file path
file_path = "/mnt/bronzezoneconversions/Flight_Schedule.parquet"

# Read the Parquet file into a DataFrame
df = spark.read.parquet(file_path)

# Calculate the count of null values in each column
null_counts = df.select([spark_sum(col(column).isNull().cast('int')).alias(column) for column in df.columns])

# Show the count of null values
print("Count of null values in each column:")
null_counts.display()

Count of null values in each column:


airline,flightNumber,origin,destination,daysOfWeek,scheduledDepartureTime,scheduledArrivalTime,timezone,validFrom,validTo,lastUpdated
0,415,0,0,0,30686,30968,0,0,0,0


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, round

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Airlines Data Analysis") \
    .getOrCreate()

# Read the Parquet file into a DataFrame
parquet_file_path = "/mnt/bronzezoneconversions/Flight_Schedule.parquet"
df = spark.read.parquet(parquet_file_path)

# Group the data by airline to set it as the primary key
airline_grouped = df.groupBy("airline")

# Count the number of flights operated by each airline
airline_counts = airline_grouped.agg(count("*").alias("total_flights"))

# Calculate the total number of days
total_days = df.select("daysOfWeek").distinct().count()

# Calculate the average number of flights per day for each airline
avg_flights_per_day = airline_grouped.agg((count("*") / total_days).alias("avg_flights_per_day"))

# Calculate the total number of flights
total_flights = df.count()

# Calculate the market share for each airline as a percentage
airline_market_share = airline_grouped.agg((count("*") / total_flights * 100).alias("market_share"))

# Identify the top airlines with the highest number of flights
top_airlines = airline_counts.orderBy(col("total_flights").desc())

# Rename the columns to avoid duplicate columns after join
airline_counts = airline_counts.withColumnRenamed("total_flights", "total_flights_count")
avg_flights_per_day = avg_flights_per_day.withColumnRenamed("avg_flights_per_day", "avg_flights_per_day_count")
airline_market_share = airline_market_share.withColumnRenamed("market_share", "market_share_percentage")
top_airlines = top_airlines.withColumnRenamed("total_flights", "top_airlines_total_flights")

# Combine all tables into one by joining on the airline key
combined_table = airline_counts.join(avg_flights_per_day, "airline") \
    .join(airline_market_share, "airline") \
    .join(top_airlines, "airline")

# Save the combined table in Delta format to the specified location
output_path = "/mnt/silverzoneflightdata"
combined_table.write.format("delta").mode("overwrite").save(output_path + "/Flight Data")

# Show the combined table
combined_table.display()

combined_table.printSchema()


airline,total_flights_count,avg_flights_per_day_count,market_share_percentage,top_airlines_total_flights
Akasa Air,438,3.448818897637796,0.4922343844822548,438
FlyBig,214,1.68503937007874,0.2404980782630194,214
Air India,8185,64.44881889763779,9.198489582162685,8185
GoAir,11666,91.85826771653544,13.110516733721427,11666
SpiceJet,10824,85.22834645669292,12.164257939808053,10824
,675,5.31496062992126,0.7585803870445708,675
Jet Airways,1307,10.291338582677165,1.4688363938774134,1307
Star Air,472,3.716535433070866,0.5304443595333888,472
AirAsia India,5227,41.15748031496063,5.874221752714033,5227
IndiGo,41291,325.12598425196853,46.40376705401093,41291


root
 |-- airline: string (nullable = true)
 |-- total_flights_count: long (nullable = false)
 |-- avg_flights_per_day_count: double (nullable = true)
 |-- market_share_percentage: double (nullable = true)
 |-- top_airlines_total_flights: long (nullable = false)



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, explode, split

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Cities Data Analysis") \
    .getOrCreate()

# Read the Parquet file into a DataFrame
parquet_file_path = "/mnt/bronzezoneconversions/Flight_Schedule.parquet"
df = spark.read.parquet(parquet_file_path)

# Split the "origin" and "destination" columns and explode them to create a row for each city
df = df.withColumn("origin_city", explode(split(col("origin"), ","))) \
    .withColumn("destination_city", explode(split(col("destination"), ",")))

# Group the data by city (origin and destination) and count the number of inbound and outbound flights for each city
city_flights = df.groupBy("origin_city").agg(
    count("*").alias("inbound_flights"),
    count("*").alias("outbound_flights")
).withColumnRenamed("origin_city", "city")

# Calculate the total number of flights for each city
city_flights = city_flights.withColumn("total_flights", city_flights["inbound_flights"] + city_flights["outbound_flights"])

# Identify the top cities with the highest number of flights
top_cities = city_flights.orderBy(col("total_flights").desc())

# Define the output path
output_path = "/mnt/silverzoneflightdata"

# Save the DataFrame as a Delta table with overwrite option set to true
city_flights.write.format("delta").mode("overwrite").save(output_path + "/city_flights")

# Show the results
print("City flights (inbound, outbound, total):")
city_flights.display()

print("Top cities with the highest number of flights (sorted by descending order):")
top_cities.display()

city_flights.printSchema()

City flights (inbound, outbound, total):


city,inbound_flights,outbound_flights,total_flights
Udaipur,327,327,654
Dimapur,102,102,204
Kochi,1258,1258,2516
Aurangabad,138,138,276
Bhubaneswar,1085,1085,2170
Aizwal,183,183,366
Mysore,154,154,308
MIHAN,967,967,1934
Jammu,823,823,1646
Jalgaon,43,43,86


Top cities with the highest number of flights (sorted by descending order):


city,inbound_flights,outbound_flights,total_flights
Delhi,13332,13332,26664
Bengaluru,8941,8941,17882
Mumbai,8301,8301,16602
Kolkata,6120,6120,12240
Hyderabad,5678,5678,11356
Chennai,4824,4824,9648
Ahmedabad,3560,3560,7120
Pune,2726,2726,5452
Goa,2251,2251,4502
Srinagar,1897,1897,3794


root
 |-- city: string (nullable = false)
 |-- inbound_flights: long (nullable = false)
 |-- outbound_flights: long (nullable = false)
 |-- total_flights: long (nullable = false)



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, explode, split, max, struct

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Major Airlines in Cities") \
    .getOrCreate()

# Read the Parquet file into a DataFrame
parquet_file_path = "/mnt/bronzezoneconversions/Flight_Schedule.parquet"
df = spark.read.parquet(parquet_file_path)

# Split the "origin" and "destination" columns and explode them to create a row for each city
df = df.withColumn("origin_city", explode(split(col("origin"), ","))) \
    .withColumn("destination_city", explode(split(col("destination"), ",")))

# Group the data by airline, origin city, and destination city to count the number of flights operated by each airline in each city
airline_city_counts = df.groupBy("airline", "origin_city", "destination_city").agg(count("*").alias("flights_count"))

# Find the major player (airline with the highest number of flights) in each city
major_airlines_in_cities = airline_city_counts.groupBy("origin_city", "destination_city").agg(
    max(struct(col("flights_count"), col("airline"))).alias("major_airline_info")
) \
    .select(
    "origin_city",
    "destination_city",
    "major_airline_info.airline",
    "major_airline_info.flights_count"
)

# Count the number of flights per day for the majority airline in each city
total_days = df.select("daysOfWeek").distinct().count()
major_airline_flights_per_day = major_airlines_in_cities.withColumn(
    "Average_flights_per_day", col("flights_count") / total_days
)

# Define the output path
output_path = "/mnt/silverzoneflightdata"

# Save the DataFrame as a Delta table with overwrite option set to true
city_flights.write.format("delta").mode("overwrite").save(output_path + "/Citywise_stats")

# Show the results
print("Major Airlines in Cities with Flight Counts and Flights per Day:")
major_airline_flights_per_day.display()

major_airline_flights_per_day.printSchema()

Major Airlines in Cities with Flight Counts and Flights per Day:


origin_city,destination_city,airline,flights_count,Average_flights_per_day
Adampur,Delhi,SpiceJet,6,0.0472440944881889
Adampur,Jaipur,SpiceJet,3,0.0236220472440944
Adampur,Mumbai,SpiceJet,4,0.0314960629921259
Agartala,Aizwal,IndiGo,10,0.0787401574803149
Agartala,Bengaluru,IndiGo,23,0.1811023622047244
Agartala,Chennai,IndiGo,2,0.0157480314960629
Agartala,Delhi,IndiGo,10,0.0787401574803149
Agartala,Dibrugarh,FlyBig,4,0.0314960629921259
Agartala,Guwahati,IndiGo,32,0.2519685039370078
Agartala,Hyderabad,IndiGo,2,0.0157480314960629


root
 |-- origin_city: string (nullable = false)
 |-- destination_city: string (nullable = false)
 |-- airline: string (nullable = true)
 |-- flights_count: long (nullable = true)
 |-- Average_flights_per_day: double (nullable = true)



#Oldest Record in the dataset and Newest

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, max, to_date, col

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Dataset Date Range") \
    .getOrCreate()

# Read the Parquet file into a DataFrame
parquet_file_path = "/mnt/bronzezoneconversions/Flight_Schedule.parquet"
df = spark.read.parquet(parquet_file_path)

# Define the relevant date columns
date_columns = ["validFrom", "validTo", "lastUpdated"]

# Convert date columns to date type if they are not already in date format
for column in date_columns:
    df = df.withColumn(column, to_date(col(column), "dd-MM-yyyy"))

# Find the oldest and newest dates in the dataset
oldest_date = df.select(min(col("validFrom"))).first()[0]
newest_date = df.select(max(col("validTo"))).first()[0]

# Show the result
print("Oldest date in the dataset:", oldest_date)
print("Newest date in the dataset:", newest_date)


Oldest date in the dataset: 2018-10-28
Newest date in the dataset: 2023-10-28


In [0]:
# Read delta tables into DataFrames
flight_data_df = spark.read.format("delta").load("/mnt/silverzoneflightdata/Flight Data")
city_flights_df = spark.read.format("delta").load("/mnt/silverzoneflightdata/city_flights")
city_wise_stats_df = spark.read.format("delta").load("/mnt/silverzoneflightdata/Citywise_stats")

# Create fact table (flight_data)
flight_data_df.createOrReplaceTempView("flight_data")
fact_table = spark.sql("""
    SELECT airline,
           total_flights_count,
           avg_flights_per_day_count,
           market_share_percentage,
           top_airlines_total_flights
    FROM flight_data
""")
fact_table.write.format("delta").save("/mnt/goldzoneflightstats/fact_flight_data")

# Create dimension table (city_flights)
city_flights_df.createOrReplaceTempView("city_flights")
dimension_table_city_flights = spark.sql("""
    SELECT city,
           inbound_flights,
           outbound_flights,
           total_flights
    FROM city_flights
""")
dimension_table_city_flights.write.format("delta").save("/mnt/goldzoneflightstats/dim_city_flights")




[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4382248656589254>, line 16[0m
[1;32m      7[0m flight_data_df[38;5;241m.[39mcreateOrReplaceTempView([38;5;124m"[39m[38;5;124mflight_data[39m[38;5;124m"[39m)
[1;32m      8[0m fact_table [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124m"""[39m
[1;32m      9[0m [38;5;124m    SELECT airline,[39m
[1;32m     10[0m [38;5;124m           total_flights_count,[39m
[0;32m   (...)[0m
[1;32m     14[0m [38;5;124m    FROM flight_data[39m
[1;32m     15[0m [38;5;124m"""[39m)
[0;32m---> 16[0m fact_table[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39msave([38;5;124m"[39m[38;5;124m/mnt/goldzoneflightstats/fact_flight_data[39m[38;5;124m"[39m)
[1;32m     18[0m [38;5;66;03m# Create dimension table (c

In [0]:
# Check the schema of city_wise_stats_df
city_wise_stats_df.printSchema()




In [0]:
# Read delta tables into DataFrames
flight_data_df = spark.read.format("delta").load("/mnt/silverzoneflightdata/Flight Data")
city_flights_df = spark.read.format("delta").load("/mnt/silverzoneflightdata/city_flights")
city_wise_stats_df = spark.read.format("delta").load("/mnt/silverzoneflightdata/Citywise_stats")

# Create fact table (flight_data)
flight_data_df.createOrReplaceTempView("flight_data")
fact_table = spark.sql("""
    SELECT airline,
           total_flights_count,
           avg_flights_per_day_count,
           market_share_percentage,
           top_airlines_total_flights
    FROM flight_data
""")
fact_table.write.format("delta").save("/mnt/goldzoneflightstats/fact_flight_data")

# Create dimension table (city_flights)
city_flights_df.createOrReplaceTempView("city_flights")
dimension_table_city_flights = spark.sql("""
    SELECT city,
           inbound_flights,
           outbound_flights,
           total_flights
    FROM city_flights
""")
dimension_table_city_flights.write.format("delta").save("/mnt/goldzoneflightstats/dim_city_flights")




[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2390626644991647>, line 16[0m
[1;32m      7[0m flight_data_df[38;5;241m.[39mcreateOrReplaceTempView([38;5;124m"[39m[38;5;124mflight_data[39m[38;5;124m"[39m)
[1;32m      8[0m fact_table [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124m"""[39m
[1;32m      9[0m [38;5;124m    SELECT airline,[39m
[1;32m     10[0m [38;5;124m           total_flights_count,[39m
[0;32m   (...)[0m
[1;32m     14[0m [38;5;124m    FROM flight_data[39m
[1;32m     15[0m [38;5;124m"""[39m)
[0;32m---> 16[0m fact_table[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39msave([38;5;124m"[39m[38;5;124m/mnt/goldzoneflightstats/fact_flight_data[39m[38;5;124m"[39m)
[1;32m     18[0m [38;5;66;03m# Create dimension table (c