## Snowflake configurations

In [0]:


options = {
  "sfUrl": "sk94588.east-us-2.azure.snowflakecomputing.com",
  "sfUser": "XXXX",
  "sfPassword": "XXXX",
  "sfDatabase": "user_deepa" ,
  "sfSchema": "airlines_dbx",
  "sfWarehouse": "interview_wh"
}


# if using the secrets (AKV) then commands will looks like below 
#if multiple DB's then parameterize the database name

snowflake_user = dbutils.secrets.get(scope = "azuredevkeyvault", key = "snowflake_user")
snowflake_password = dbutils.secrets.get(scope = "azuredevkeyvault", key = "snowflake_password")


warehouse="interview_wh"
dbname="user_deepa"
schema="airlines_dbx"

options = {
  "sfUrl": "sk94588.east-us-2.azure.snowflakecomputing.com",
  "sfUser": snowflake_user,
  "sfPassword": snowflake_password,
  "sfDatabase": dbname,
  "sfSchema": schema,
  "sfWarehouse": warehouse
}

## Azure Storage configurations

In [0]:
#secretkey=dbutils.secrets.get(scope = "azuredevkeyvault", key = "devstoragesecret")

configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": "99e27f80-c56f-4bd1-8ca1-8d9e2679c1c3", # comes from service principles (AAD)
       "fs.azure.account.oauth2.client.secret": Secret,
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/23939cd9-c188-457e-afa9-ce7fe2209e73/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

dbutils.fs.ls("/mnt/interview")   # provide the access using IAM in storage account in Azure

dbutils.fs.unmount("/mnt/interview")

dbutils.fs.mount(
source = "abfss://airlines@interview1.dfs.core.windows.net/",
mount_point = "/mnt/interview",
extra_configs = configs)

## Data Load

In [0]:
# Read airport data from Azure using mount and write to Snowflake aiports table.

df_airport = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/interview/rawdata/airports.csv")


df_airport.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "airports")\
  .mode("overwrite")\
  .save()

In [0]:
# Read airlines data from Azure using mount and write to Snowflake airlines table.

df_airlines = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/interview/rawdata/airlines.csv")

df_airlines.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "airlines")\
  .mode("overwrite")\
  .save()

In [0]:
# Read flights data from Azure using mount and write to Snowflake flights table.

df_flights = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/interview/rawdata/flights")

df_flights.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "flights")\
  .mode("overwrite")\
  .save()

## Northwoods Data Reports

In [0]:
#Total number of flights by airline and airport on a monthly basis 


df_report1 = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("query",  "select monthname(to_date(year||'-'||month||'-'||day)) as month , c.airline , b.airport, count(distinct flight_number) \
from flights a inner join airlines c on a.airline =c.IATA_CODE \
inner join airports b on a.origin_airport = b.IATA_CODE \
group by monthname(to_date(year||'-'||month||'-'||day)) ,c.airline , b.airport") \
  .load()


df_report1.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "airlines_agg.report1")\
  .mode("overwrite")\
  .save()

In [0]:
#On time percentage of each airline for the year 2015

df_report2 = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("query",  "select a.airline , trunc((a.delay_flight_cnt/b.total_flight_cnt) *100 , 2)as percentage \
from \
(select c.airline,count(distinct flight_number) as delay_flight_cnt from flights a inner join airlines c on a.airline =c.IATA_CODE \
where arrival_delay <=0 and a.year = 2015 group by c.airline) a inner join \
(select c.airline, count(distinct flight_number) as total_flight_cnt from flights a inner join airlines c on a.airline =c.IATA_CODE \
 where a.year = 2015 group by c.airline \
) b \
on a.airline =b.airline \
          ") \
  .load()


df_report2.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "airlines_agg.report2")\
  .mode("overwrite")\
  .save()


In [0]:
#Airlines with the largest number of delays

df_report3 = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("query",  "select c.airline,count(*) as cnt  from flights a inner join airlines c on a.airline =c.IATA_CODE \
where a.arrival_delay > 1  group by c.airline \
order by 2 desc ") \
  .load()


df_report3.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "airlines_agg.report3")\
  .mode("overwrite")\
  .save()


In [0]:
#Cancellation reasons by airport

df_report4 = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("query",  "select b.airport , a.cancellation_reason , count(*) from flights a inner join airlines c on a.airline =c.IATA_CODE \
inner join airports b on a.origin_airport = b.IATA_CODE \
where cancellation_reason <> 'NULL' \
group by b.airport , a.cancellation_reason") \
  .load()

df_report4.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "airlines_agg.report4")\
  .mode("overwrite")\
  .save()

In [0]:
#Delay reasons by airport

df_report5 = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("query",  "select origin_airport ,delay, count(*) as count from  \
(select origin_airport, airdelay ,delay from (select * from  flights where \
air_system_delay is not null  or security_delay is not null or airline_delay is not null \
or late_aircraft_delay is not null or weather_delay is not null  ) \
unpivot(airdelay for delay in (air_system_delay, security_delay,airline_delay,late_aircraft_delay,weather_delay )) \
where airdelay <> 0) a \
group by origin_airport, delay") \
  .load()

df_report5.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "airlines_agg.report5")\
  .mode("overwrite")\
  .save()

In [0]:
#Airline with the most unique routes

df_report6 = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("query",  "select airline, count(*) from \
(select  distinct c.airline , a.origin_airport || '-' || a.destination_airport as unique_route \
from flights a inner join airlines c on a.airline =c.IATA_CODE \
inner join airports b on a.origin_airport = b.IATA_CODE  ) \
group by airline \
order by 2 desc") \
  .load()

df_report6.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "airlines_agg.report6")\
  .mode("overwrite")\
  .save()