** Airline Market Analysis..**<br>



In [2]:
dbutils.fs.unmount("/mnt/flight_data")

In [3]:
import urllib

#CONNECTING WITH AWS IN ORDER TO ACCESS S3 FILES.
ACCESS_KEY = "XXXX"
SECRET_KEY = "XXXX"
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "airplane-data-bigdata"
MOUNT_NAME = "flight_data"

dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)
display(dbutils.fs.ls("/mnt/%s" % MOUNT_NAME))

In [4]:
#LIST OF FILES 
display(dbutils.fs.ls("/mnt/flight_data/airplane_data"))

In [5]:
flight_merged_data = spark.read.format("csv").option("header", "true").load("/mnt/flight_data/airplane_data/airOT20*.csv")

In [6]:
flight_merged_data.count()

In [7]:
flight_merged_data.select('YEAR').distinct().show()

The above count queries are taking more time to execute as the whole table is getting traversed,thus there is a need of optimization over these dataframes.

In [9]:
flight_merged_data.rdd.getNumPartitions()

Above shown are the default Number of Partitions done by the Spark and there is a need to optimize the partition size for the dataframes.

In [11]:
flight_merged_data.repartition(63).createOrReplaceTempView("flight_merged_view")

Caching the dataframe to memeory with uodated number of partitions for faster execution.

In [13]:
spark.catalog.cacheTable('flight_merged_view')

In [14]:
spark.table("flight_merged_view").count()

This cache creation is done once an action query is triggered on the particular table.

In [16]:
flight_merged_DF = spark.table("flight_merged_view")

As our final dataframe with optimized number of partitions is ready we have saved a parquet file of the data for future use so we can continue with the same file even if the cluster terminates.

In [18]:
flight_merged_DF.write.format('parquet').save('/tmp/flight_merged_parquet/')

In [19]:
display(dbutils.fs.ls("/tmp/flight_merged_parquet"))

Setting Up the Deafult setting of Shuffle Partitions..

In [21]:
spark.conf.get("spark.sql.shuffle.partitions")

In [22]:
spark.conf.set("spark.sql.shuffle.partitions",63)

As we created a merged optimized dataframe,we have saved a "parquet" file of the dataframe with the updated number of partitions.Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems.
<br>
Later even if the cluster terminates we can use the newly created parquet file with the updated number of partitions.

# Exploratory Data Analysis of the Data.

Count of Flights Against each destination state in the United States from Jan 2000 - Feb 2020.
- California and Texas have maximum number of incoming flights.

In [26]:
query1 = spark.sql("SELECT COUNT(*) AS FLIGHTS, DEST_STATE_ABR FROM flight_merged_view GROUP BY DEST_STATE_ABR ORDER BY FLIGHTS DESC LIMIT 20")
display(query1)

FLIGHTS,DEST_STATE_ABR
15425336,CA
14449778,TX
8989110,FL
8486472,IL
7884760,GA
5902895,NY
4689134,CO
4140333,NC
4111950,VA
4082632,AZ


Comparing the possible causes of delays in the Flights.

In [28]:
query3 = spark.sql("SELECT 'CARRIER_DELAY' AS FACTOR, (SELECT SUM(CARRIER_DELAY) FROM flight_merged_view) AS FLIGHT_DELAYS UNION SELECT 'WEATHER_DELAY' AS FACTOR, (SELECT SUM(WEATHER_DELAY) FROM flight_merged_view) AS FLIGHT_DELAYS UNION SELECT 'NAS_DELAY' AS FACTOR, (SELECT SUM(NAS_DELAY) FROM flight_merged_view) AS FLIGHT_DELAYS UNION SELECT 'SECURITY_DELAY' AS FACTOR, (SELECT SUM(SECURITY_DELAY) FROM flight_merged_view) AS FLIGHT_DELAYS UNION SELECT 'LATE_AIRCRAFT_DELAY' AS FACTOR, (SELECT SUM(LATE_AIRCRAFT_DELAY) FROM flight_merged_view) AS FLIGHT_DELAYS ORDER BY FLIGHT_DELAYS DESC")
display(query3)

FACTOR,FLIGHT_DELAYS
LATE_AIRCRAFT_DELAY,465640739.0
CARRIER_DELAY,358894745.0
NAS_DELAY,327300129.0
WEATHER_DELAY,62387678.0
SECURITY_DELAY,1942565.0


**Analysing the count of cancelled flights against the days of week.**<br>

- Friday has comparatively less number of cancelled flights compared to other days of week.

In [30]:
def intToDay(dayOfWeek):
  switcher = {
    1: 'SUN',
    2: 'MON',
    3: 'TUE',
    4: 'WED',
    5: 'THU',
    6: 'FRI',
    7: 'SAT'
  }
  return switcher.get(int(dayOfWeek), 'Invalid DAY_OF_WEEK')

spark.udf.register("intToDay", intToDay)
query4 = spark.sql("SELECT intToDay(DAY_OF_WEEK) AS WEEKDAY, DAY_OF_WEEK, SUM(CANCELLED) AS FLIGHTS_CANCELLED FROM flight_merged_view GROUP BY DAY_OF_WEEK ORDER BY FLIGHTS_CANCELLED DESC")
display(query4)

WEEKDAY,DAY_OF_WEEK,FLIGHTS_CANCELLED
MON,2,378592.0
TUE,3,376648.0
WED,4,363834.0
SUN,1,351376.0
THU,5,339440.0
SAT,7,288615.0
FRI,6,236335.0


**Total Number of FLights against each Month (Jan 2000 - Feb 2020)**<br>

- June and July having maximum number of flight travels this should be becuase of the summer breaks.

In [32]:
def intToMonth(month):
  switcher = {
    1: 'Jan',
    2: 'Feb',
    3: 'Mar',
    4: 'Apr',
    5: 'May',
    6: 'Jun',
    7: 'Jul',
    8: 'Aug',
    9: 'Sep',
    10: 'Oct',
    11: 'Nov',
    12: 'Dec'
  }
  return switcher.get(int(month), 'Invalid MONTH')

spark.udf.register("intToMonth", intToMonth)
query5 = spark.sql("SELECT COUNT(*) AS FLIGHTS, intToMonth(MONTH) AS MONTH_NAME, MONTH FROM flight_merged_view GROUP BY MONTH ORDER BY FLIGHTS DESC")
display(query5)

FLIGHTS,MONTH_NAME,MONTH
11937664,Jun,6
11376394,Jul,7
11281939,Sep,9
11135746,Nov,11
11119794,Jan,1
11053120,Mar,3
10952679,May,5
10617396,Apr,4
10311516,Aug,8
9784417,Oct,10


##Impact of Global Recession On the Airline Industry..!!

<br>
When housing prices fell and homeowners began to walk away from their mortgages, the value of mortgage-backed securities held by investment banks declined in 2007–2008, causing several to collapse on the economy of the United States in the years of 2008 and ahead....

source:wikipedia

**State wise frequency of the flights journey.**
- Consistent fall in the Number of flights Journeys in the big states of US.<br>
1.California <br>
2.Texas<br>
3.Illinois<br>
3.Florida<br>
4.New York<br>

In [35]:
#STATEWISE DISTRIBUTION FOR THE IMPACTED YEARS..!
display(spark.sql('select DEST_STATE_ABR,count(DEST_STATE_ABR) as Frequency_Flights,year from flight_merged_view where CAST(YEAR AS INT) IN ("2007","2008","2009") group by YEAR,DEST_STATE_ABR ORDER BY count(DEST_STATE_ABR)'))

**Month wise analysis of the Global recession period.**<br>
- November 2008 and Jan 2009 observed a major fall in the number of flights.

In [37]:
display(spark.sql('select distinct CAST(MONTH AS INT),COUNT(CAST(MONTH AS INT)) as NUMBER_OF_FLIGHTS ,CAST(YEAR AS INT) from flight_merged_view WHERE YEAR IN("2008","2009") GROUP BY CAST(MONTH AS INT),CAST(YEAR AS INT) ORDER BY CAST(YEAR AS INT),CAST(MONTH AS INT)'))

**Identifying the trends in the Frequency of Flights for the Airline Carriers from 2007-2009**.

In [39]:
display(spark.sql('select UNIQUE_CARRIER,COUNT(UNIQUE_CARRIER) as FREQUENCY,YEAR from flight_merged_view WHERE CAST(YEAR AS INT) IN ("2007","2008","2009") group by UNIQUE_CARRIER,YEAR ORDER BY YEAR,COUNT(UNIQUE_CARRIER) DESC'))

# Fraud Data Analysis over Airline Booking Transactions.

In [41]:
Fraud_df = spark.read.format("csv").option("header", "true").load("/mnt/flight_data/airplane_data/Fraud_Cases.csv")

In [42]:
display(Fraud_df)

In [43]:
Fraud_df.count()

In [44]:
Fraud_df.registerTempTable('fraud_data')

**Comparison of fraudelent risk in male (i.e., 1) and female (i.e., 2) **

In [46]:
#--Comparison of fraudelent risk in male (i.e., 1) and female (i.e., 2) 
display(spark.sql('select gender, count(*) as count from fraud_data f where fraudRisk=1 group by gender'))

**Comparison of cardholders which are fraudelent cases**

In [48]:
#Comparison of cardholders which are fraudelent cases
display(spark.sql('select count(cardholder) as card, cardholder from fraud_data where fraudRisk=0 group by cardholder'))

**Finding states which are more prone to fraudelent activities**

In [50]:
#Finding states which are more prone to fraudelent activities
display(spark.sql('select state, count(state) as state_count from fraud_data where fraudRisk in (1) group by state order by count(state)'))

***********END*********