## Installing PySpark

In [2]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


## PySpark Setup with db and basic operations on CSV

In [8]:
from pyspark.sql import SparkSession

In [9]:
import sqlite3

# Reconnect to the SQLite database
conn = sqlite3.connect('Airlines_data.sqlite') #It contains 8 tables

conn.close()

In [10]:
import os

# Initialize Spark session
from pyspark.sql import SparkSession

spark= (SparkSession.builder
    .master("local")
    .appName("AirlineDataAnalysis")
    .config(
        "spark.jars",
        "{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
    .config(
        "spark.driver.extraClassPath",
        "{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
    .getOrCreate())

#### operation on aircrafts_data 

In [64]:
# Load data into PySpark DataFrames

aircrafts_data = spark.read.csv('AIR_CSV/aircrafts_data.csv', header=True, inferSchema=True)

In [6]:
# Show schema and data
aircrafts_data.printSchema()

root
 |-- aircraft_code: string (nullable = true)
 |-- model: string (nullable = true)
 |-- range: string (nullable = true)



In [7]:
#display the contents of a DataFrame in a tabular format

aircrafts_data.show(5)

+-------------+--------------------+--------------------+
|aircraft_code|               model|               range|
+-------------+--------------------+--------------------+
|          773|"{""en"": ""Boein...| ""ru"": ""Боинг ...|
|          763|"{""en"": ""Boein...| ""ru"": ""Боинг ...|
|          SU9|"{""en"": ""Sukho...| ""ru"": ""Сухой ...|
|          320|"{""en"": ""Airbu...| ""ru"": ""Аэробу...|
|          321|"{""en"": ""Airbu...| ""ru"": ""Аэробу...|
+-------------+--------------------+--------------------+
only showing top 5 rows



In [8]:
# Select distinct values for key columns
aircrafts_data.select('aircraft_code').distinct().show()

+-------------+
|aircraft_code|
+-------------+
|          SU9|
|          320|
|          733|
|          773|
|          319|
|          CN1|
|          763|
|          321|
|          CR2|
+-------------+



#### operation on airports_data

In [23]:
# Load data into PySpark DataFrames

airports_data = spark.read.csv('AIR_CSV/airports_data.csv', header=True, inferSchema=True)

In [11]:
# Show schema and data
airports_data.printSchema()

root
 |-- airport_code: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- timezone: string (nullable = true)



In [12]:
#display the contents of a DataFrame in a tabular format

airports_data.show(5)

+------------+--------------------+--------------------+--------------------+--------------------+
|airport_code|        airport_name|                city|         coordinates|            timezone|
+------------+--------------------+--------------------+--------------------+--------------------+
|         YKS|"{""en"": ""Yakut...| ""ru"": ""Якутск...|"{""en"": ""Yakut...| ""ru"": ""Якутск...|
|         MJZ|"{""en"": ""Mirny...| ""ru"": ""Мирный...|"{""en"": ""Mirnyj""| ""ru"": ""Мирный...|
|         KHV|"{""en"": ""Khaba...| ""ru"": ""Хабаро...|"{""en"": ""Khaba...| ""ru"": ""Хабаро...|
|         PKC|"{""en"": ""Yeliz...| ""ru"": ""Елизов...|"{""en"": ""Petro...| ""ru"": ""Петроп...|
|         UUS|"{""en"": ""Yuzhn...| ""ru"": ""Хомуто...|"{""en"": ""Yuzhn...| ""ru"": ""Южно-С...|
+------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [13]:
# Select distinct values for key columns
airports_data.select('airport_code').distinct().show()

+------------+
|airport_code|
+------------+
|         SCW|
|         NBC|
|         ULV|
|         NYM|
|         KHV|
|         UUA|
|         KZN|
|         CEK|
|         KJA|
|         OSW|
|         ABA|
|         ASF|
|         UFA|
|         NYA|
|         TBW|
|         ESL|
|         RGK|
|         REN|
|         NFG|
|         VOG|
+------------+
only showing top 20 rows



#### operation on boarding_passes.csv

In [17]:
# Load data into PySpark DataFrames

boarding_passes = spark.read.csv('AIR_CSV/boarding_passes.csv', header=True, inferSchema=True)

In [18]:
# Show schema and data
boarding_passes.printSchema()

root
 |-- ticket_no: long (nullable = true)
 |-- flight_id: integer (nullable = true)
 |-- boarding_no: integer (nullable = true)
 |-- seat_no: string (nullable = true)



In [25]:
#display the contents of a DataFrame in a tabular format

boarding_passes.show()

+----------+---------+-----------+-------+
| ticket_no|flight_id|boarding_no|seat_no|
+----------+---------+-----------+-------+
|5435212351|    30625|          1|     2D|
|5435212386|    30625|          2|     3G|
|5435212381|    30625|          3|     4H|
|5432211370|    30625|          4|     5D|
|5435212357|    30625|          5|    11A|
|5435212360|    30625|          6|    11E|
|5435212393|    30625|          7|    11H|
|5435212374|    30625|          8|    12E|
|5435212365|    30625|          9|    13D|
|5435212378|    30625|         10|    14H|
|5435212362|    30625|         11|    15E|
|5435212334|    30625|         12|    15F|
|5435212370|    30625|         13|    15K|
|5435212329|    30625|         14|    15H|
|5435725513|    30625|         15|    16D|
|5435212328|    30625|         16|    16C|
|5435630915|    30625|         17|    16E|
|5435212388|    30625|         18|    17E|
|5432159775|    30625|         19|    17D|
|5435212382|    30625|         20|    17H|
+----------

In [20]:
# Select distinct values for key columns

boarding_passes.select('ticket_no').distinct().show()

+----------+
| ticket_no|
+----------+
|5435212382|
|5435212330|
|5433367230|
|5434406691|
|5433524850|
|5434586171|
|5433524848|
|5435824169|
|5435476635|
|5434022532|
|5432525710|
|5432525640|
|5434343604|
|5435423639|
|5435884414|
|5434149521|
|5434755718|
|5434600341|
|5435326497|
|5435681423|
+----------+
only showing top 20 rows



#### operation on bookings.csv

In [17]:
# Load data into PySpark DataFrames

bookings = spark.read.csv('AIR_CSV/bookings.csv', header=True, inferSchema=True)

In [23]:
# Show schema and data
bookings.printSchema()

root
 |-- book_ref: string (nullable = true)
 |-- book_date: timestamp (nullable = true)
 |-- total_amount: integer (nullable = true)



In [26]:
#display the contents of a DataFrame in a tabular format
boarding_passes.show()

+----------+---------+-----------+-------+
| ticket_no|flight_id|boarding_no|seat_no|
+----------+---------+-----------+-------+
|5435212351|    30625|          1|     2D|
|5435212386|    30625|          2|     3G|
|5435212381|    30625|          3|     4H|
|5432211370|    30625|          4|     5D|
|5435212357|    30625|          5|    11A|
|5435212360|    30625|          6|    11E|
|5435212393|    30625|          7|    11H|
|5435212374|    30625|          8|    12E|
|5435212365|    30625|          9|    13D|
|5435212378|    30625|         10|    14H|
|5435212362|    30625|         11|    15E|
|5435212334|    30625|         12|    15F|
|5435212370|    30625|         13|    15K|
|5435212329|    30625|         14|    15H|
|5435725513|    30625|         15|    16D|
|5435212328|    30625|         16|    16C|
|5435630915|    30625|         17|    16E|
|5435212388|    30625|         18|    17E|
|5432159775|    30625|         19|    17D|
|5435212382|    30625|         20|    17H|
+----------

In [22]:
# Select distinct values for key columns

bookings.select('book_ref').distinct().show()

+--------+
|book_ref|
+--------+
|  00054E|
|  004EDA|
|  00597A|
|  009C80|
|  00A164|
|  00DD00|
|  01342C|
|  01524D|
|  01537F|
|  016310|
|  016602|
|  01BE2B|
|  020766|
|  021518|
|  023288|
|  023814|
|  024548|
|  02BBB5|
|  02CD0E|
|  02E019|
+--------+
only showing top 20 rows



#### operation on flights.csv

In [11]:
# Load data into PySpark DataFrames

flights = spark.read.csv('AIR_CSV/flights.csv', header=True, inferSchema=True)

In [14]:
# Show schema and data
flights.printSchema()

root
 |-- flight_id: integer (nullable = true)
 |-- flight_no: string (nullable = true)
 |-- scheduled_departure: timestamp (nullable = true)
 |-- scheduled_arrival: timestamp (nullable = true)
 |-- departure_airport: string (nullable = true)
 |-- arrival_airport: string (nullable = true)
 |-- status: string (nullable = true)
 |-- aircraft_code: string (nullable = true)
 |-- actual_departure: string (nullable = true)
 |-- actual_arrival: string (nullable = true)



In [29]:
#display the contents of a DataFrame in a tabular format
flights.show()

+---------+---------+-------------------+-------------------+-----------------+---------------+---------+-------------+----------------+--------------+
|flight_id|flight_no|scheduled_departure|  scheduled_arrival|departure_airport|arrival_airport|   status|aircraft_code|actual_departure|actual_arrival|
+---------+---------+-------------------+-------------------+-----------------+---------------+---------+-------------+----------------+--------------+
|     1185|   PG0134|2017-09-10 12:20:00|2017-09-10 17:25:00|              DME|            BTK|Scheduled|          319|              \N|            \N|
|     3979|   PG0052|2017-08-25 17:20:00|2017-08-25 20:05:00|              VKO|            HMA|Scheduled|          CR2|              \N|            \N|
|     4739|   PG0561|2017-09-05 15:00:00|2017-09-05 16:45:00|              VKO|            AER|Scheduled|          763|              \N|            \N|
|     5502|   PG0529|2017-09-12 12:20:00|2017-09-12 13:50:00|              SVO|         

In [32]:
# Select distinct values for key columns

flights.select('flight_id').distinct().show()

+---------+
|flight_id|
+---------+
|      148|
|      463|
|      471|
|      496|
|      833|
|     1088|
|     1238|
|     1342|
|     1580|
|     1591|
|     1645|
|     1829|
|     1959|
|     2122|
|     2142|
|     2366|
|     2659|
|     2866|
|     3175|
|     3749|
+---------+
only showing top 20 rows



#### operation on seats.csv

In [33]:
# Load data into PySpark DataFrames

seats = spark.read.csv('AIR_CSV/seats.csv', header=True, inferSchema=True)

In [34]:
# Show schema and data
seats.printSchema()

root
 |-- aircraft_code: string (nullable = true)
 |-- seat_no: string (nullable = true)
 |-- fare_conditions: string (nullable = true)



In [35]:
#display the contents of a DataFrame in a tabular format
seats.show()

+-------------+-------+---------------+
|aircraft_code|seat_no|fare_conditions|
+-------------+-------+---------------+
|          319|     2A|       Business|
|          319|     2C|       Business|
|          319|     2D|       Business|
|          319|     2F|       Business|
|          319|     3A|       Business|
|          319|     3C|       Business|
|          319|     3D|       Business|
|          319|     3F|       Business|
|          319|     4A|       Business|
|          319|     4C|       Business|
|          319|     4D|       Business|
|          319|     4F|       Business|
|          319|     5A|       Business|
|          319|     5C|       Business|
|          319|     5D|       Business|
|          319|     5F|       Business|
|          319|     6A|        Economy|
|          319|     6B|        Economy|
|          319|     6C|        Economy|
|          319|     6D|        Economy|
+-------------+-------+---------------+
only showing top 20 rows



In [36]:
# Select distinct values for key columns

seats.select('seat_no').distinct().show()


+-------+
|seat_no|
+-------+
|    20E|
|    13C|
|    23G|
|    17A|
|     3K|
|    21K|
|    43C|
|    21E|
|    13G|
|    14D|
|    16C|
|    31F|
|    14G|
|    16H|
|    17H|
|    33C|
|     1A|
|    12K|
|    26B|
|    43D|
+-------+
only showing top 20 rows



#### operation on ticket_flights.csv

In [66]:
# Load data into PySpark DataFrames

ticket_flights = spark.read.csv('AIR_CSV/ticket_flights.csv', header=True, inferSchema=True)

In [40]:
# Show schema and data
ticket_flights.printSchema()

root
 |-- ticket_no: long (nullable = true)
 |-- flight_id: integer (nullable = true)
 |-- fare_conditions: string (nullable = true)
 |-- amount: integer (nullable = true)



In [42]:
#display the contents of a DataFrame in a tabular format
ticket_flights.show()

+----------+---------+---------------+------+
| ticket_no|flight_id|fare_conditions|amount|
+----------+---------+---------------+------+
|5432159776|    30625|       Business| 42100|
|5435212351|    30625|       Business| 42100|
|5435212386|    30625|       Business| 42100|
|5435212381|    30625|       Business| 42100|
|5432211370|    30625|       Business| 42100|
|5435212357|    30625|        Comfort| 23900|
|5435212360|    30625|        Comfort| 23900|
|5435212393|    30625|        Comfort| 23900|
|5435212374|    30625|        Comfort| 23900|
|5435212365|    30625|        Comfort| 23900|
|5435212378|    30625|        Comfort| 23900|
|5435212362|    30625|        Comfort| 23900|
|5435212334|    30625|        Comfort| 23900|
|5435212329|    30625|        Comfort| 23900|
|5435212370|    30625|        Comfort| 23900|
|5435212328|    30625|        Comfort| 23900|
|5435725513|    30625|        Comfort| 23900|
|5435630915|    30625|        Comfort| 23900|
|5435212388|    30625|        Econ

In [39]:
# Select distinct values for key columns

ticket_flights.select('ticket_no').distinct().show()

+----------+
| ticket_no|
+----------+
|5435212382|
|5435212330|
|5433367230|
|5434406691|
|5433524850|
|5434586171|
|5433524848|
|5435824169|
|5435476635|
|5434022532|
|5432525710|
|5432525640|
|5434343604|
|5435423639|
|5435884414|
|5434149521|
|5434755718|
|5434600341|
|5435326497|
|5435681423|
+----------+
only showing top 20 rows



#### operation on tickets.csv

In [43]:
# Load data into PySpark DataFrames

tickets = spark.read.csv('AIR_CSV/tickets.csv', header=True, inferSchema=True)

In [45]:
# Show schema and data
tickets.printSchema()

root
 |-- ticket_no: long (nullable = true)
 |-- book_ref: string (nullable = true)
 |-- passenger_id: string (nullable = true)



In [46]:
#display the contents of a DataFrame in a tabular format
tickets.show()

+----------+--------+------------+
| ticket_no|book_ref|passenger_id|
+----------+--------+------------+
|5432000987|  06B046| 8149 604011|
|5432000988|  06B046| 8499 420203|
|5432000989|  E170C3| 1011 752484|
|5432000990|  E170C3| 4849 400049|
|5432000991|  F313DD| 6615 976589|
|5432000992|  F313DD| 2021 652719|
|5432000993|  F313DD| 0817 363231|
|5432000994|  CCC5CB| 2883 989356|
|5432000995|  CCC5CB| 3097 995546|
|5432000996|  1FB1E4| 6866 920231|
|5432000997|  DE3EA6| 6030 369450|
|5432000998|  4B75D1| 8675 588663|
|5432000999|  9E60AA| 0764 728785|
|5432001000|  69DAD1| 8954 972101|
|5432001001|  69DAD1| 6772 748756|
|5432001002|  69DAD1| 7364 216524|
|5432001003|  08A2A5| 3635 182357|
|5432001004|  08A2A5| 8252 507584|
|5432001005|  C2CAB7| 1026 982766|
|5432001006|  C6DA66| 7107 950192|
+----------+--------+------------+
only showing top 20 rows



In [47]:
# Select distinct values for key columns

tickets.select('ticket_no').distinct().show()

+----------+
| ticket_no|
+----------+
|5432001017|
|5432003796|
|5432005211|
|5432006769|
|5432006840|
|5432019789|
|5432019892|
|5432020100|
|5432020176|
|5432020194|
|5432020454|
|5432020457|
|5432034555|
|5432035038|
|5432035620|
|5432042498|
|5432042933|
|5432042941|
|5432042958|
|5432044490|
+----------+
only showing top 20 rows



### Join relevant tables

In [48]:
joined_df = bookings.join(tickets, 'book_ref') \
    .join(ticket_flights, 'ticket_no') \
    .join(flights, 'flight_id')
joined_df.select('book_ref', 'book_date', 'total_amount', 'passenger_id', 'flight_id', 'flight_no', 'scheduled_departure').show()

+--------+-------------------+------------+------------+---------+---------+-------------------+
|book_ref|          book_date|total_amount|passenger_id|flight_id|flight_no|scheduled_departure|
+--------+-------------------+------------+------------+---------+---------+-------------------+
|  06B046|2017-07-05 22:49:00|       12400| 8149 604011|    28935|   PG0242|2017-07-16 14:35:00|
|  06B046|2017-07-05 22:49:00|       12400| 8499 420203|    28935|   PG0242|2017-07-16 14:35:00|
|  E170C3|2017-06-29 04:25:00|       24700| 1011 752484|    28939|   PG0242|2017-07-17 14:35:00|
|  E170C3|2017-06-29 04:25:00|       24700| 4849 400049|    28939|   PG0242|2017-07-17 14:35:00|
|  F313DD|2017-07-03 07:07:00|       30900| 6615 976589|    28913|   PG0242|2017-07-18 14:35:00|
|  F313DD|2017-07-03 07:07:00|       30900| 2021 652719|    28913|   PG0242|2017-07-18 14:35:00|
|  F313DD|2017-07-03 07:07:00|       30900| 0817 363231|    28913|   PG0242|2017-07-18 14:35:00|
|  CCC5CB|2017-07-07 05:33:00|

### Summary statistics

In [49]:
flights.count()

33121

In [50]:
bookings.agg({'total_amount': 'avg'}).show()

+-----------------+
|avg(total_amount)|
+-----------------+
|79025.60581152869|
+-----------------+



In [55]:
aircrafts_data.agg({'range': 'min'}).show()

+--------------------+
|          min(range)|
+--------------------+
| ""ru"": ""Аэробу...|
+--------------------+



### 15 PYSPARK SOLUTION

In [15]:
# 1. Retrieve the total number of flights in the dataset.

flights.count()

33121

In [18]:
#2. Find the average booking amount.

bookings.agg({'total_amount': 'avg'}).show()


+-----------------+
|avg(total_amount)|
+-----------------+
|79025.60581152869|
+-----------------+



In [21]:
#3. List distinct aircraft codes.

aircrafts_data.select('aircraft_code').distinct().show()

+-------------+
|aircraft_code|
+-------------+
|          SU9|
|          320|
|          733|
|          773|
|          319|
|          CN1|
|          763|
|          321|
|          CR2|
+-------------+



In [24]:
#4. Identify distinct airport codes.

airports_data.select('airport_code').distinct().show()

+------------+
|airport_code|
+------------+
|         SCW|
|         NBC|
|         ULV|
|         NYM|
|         KHV|
|         UUA|
|         KZN|
|         CEK|
|         KJA|
|         OSW|
|         ABA|
|         ASF|
|         UFA|
|         NYA|
|         TBW|
|         ESL|
|         RGK|
|         REN|
|         NFG|
|         VOG|
+------------+
only showing top 20 rows



In [30]:
# 5. Count the number of flights per status.

flights.groupBy('status').count().show()


+---------+-----+
|   status|count|
+---------+-----+
|  Arrived|16707|
|  On Time|  518|
| Departed|   58|
|Cancelled|  414|
|Scheduled|15383|
|  Delayed|   41|
+---------+-----+



In [31]:
# 6. Calculate the total booking amount per day.

bookings.groupBy(bookings.book_date.cast('date')).agg({'total_amount': 'sum'}).show()


+-----------------------+-----------------+
|CAST(book_date AS DATE)|sum(total_amount)|
+-----------------------+-----------------+
|             2017-08-11|        499785400|
|             2017-06-29|        145775400|
|             2017-07-31|        448733300|
|             2017-08-14|        526163900|
|             2017-08-10|        480165900|
|             2017-06-30|        200531600|
|             2017-07-06|        433437900|
|             2017-06-28|         87814000|
|             2017-08-08|        471848900|
|             2017-07-03|        380211300|
|             2017-07-30|        426864800|
|             2017-08-04|        450783300|
|             2017-07-10|        447481900|
|             2017-06-25|         14024000|
|             2017-07-15|        443461600|
|             2017-08-07|        448206600|
|             2017-07-26|        435854700|
|             2017-08-06|        453079200|
|             2017-07-11|        437480000|
|             2017-07-12|       

In [33]:
# 7. Retrieve flights with a specific aircraft code.|

flights.filter(flights.aircraft_code == 'CR2').show()

+---------+---------+-------------------+-------------------+-----------------+---------------+---------+-------------+--------------------+--------------------+
|flight_id|flight_no|scheduled_departure|  scheduled_arrival|departure_airport|arrival_airport|   status|aircraft_code|    actual_departure|      actual_arrival|
+---------+---------+-------------------+-------------------+-----------------+---------------+---------+-------------+--------------------+--------------------+
|     3979|   PG0052|2017-08-25 17:20:00|2017-08-25 20:05:00|              VKO|            HMA|Scheduled|          CR2|                  \N|                  \N|
|     7784|   PG0667|2017-09-10 17:30:00|2017-09-10 20:00:00|              SVO|            KRO|Scheduled|          CR2|                  \N|                  \N|
|     9478|   PG0360|2017-08-28 11:30:00|2017-08-28 14:05:00|              LED|            REN|Scheduled|          CR2|                  \N|                  \N|
|    12012|   PG0621|2017-08

In [34]:
# 8. List the top 5 busiest airports based on departures.

flights.groupBy('departure_airport').count().orderBy('count', ascending=False).limit(5).show()

+-----------------+-----+
|departure_airport|count|
+-----------------+-----+
|              DME| 3217|
|              SVO| 2981|
|              LED| 1900|
|              VKO| 1719|
|              OVB| 1055|
+-----------------+-----+



In [35]:
# 9. Find flights that departed on time.

flights.filter(flights.status == 'On Time').show()

+---------+---------+-------------------+-------------------+-----------------+---------------+-------+-------------+----------------+--------------+
|flight_id|flight_no|scheduled_departure|  scheduled_arrival|departure_airport|arrival_airport| status|aircraft_code|actual_departure|actual_arrival|
+---------+---------+-------------------+-------------------+-----------------+---------------+-------+-------------+----------------+--------------+
|        5|   PG0405|2017-08-16 12:05:00|2017-08-16 13:00:00|              DME|            LED|On Time|          321|              \N|            \N|
|        8|   PG0402|2017-08-16 14:55:00|2017-08-16 15:50:00|              DME|            LED|On Time|          321|              \N|            \N|
|       47|   PG0404|2017-08-15 21:35:00|2017-08-15 22:30:00|              DME|            LED|On Time|          321|              \N|            \N|
|      304|   PG0222|2017-08-16 13:35:00|2017-08-16 17:00:00|              DME|            OVB|On Ti

In [3]:
from pyspark.sql.functions import unix_timestamp, expr, avg

In [14]:
# 10. Calculate the average delay time per airline.
flights.withColumn('delay', (unix_timestamp('actual_arrival') - unix_timestamp('scheduled_arrival')) / 60).groupBy('airline').avg('delay').show()


In [51]:
# 11. Identify the day with the highest number of cancellations.

flights.filter(flights.status == 'Cancelled').groupBy(flights.scheduled_departure.cast('date')).count().orderBy('count', ascending=False).limit(1).show()


+---------------------------------+-----+
|CAST(scheduled_departure AS DATE)|count|
+---------------------------------+-----+
|                       2017-09-14|  287|
+---------------------------------+-----+



In [59]:
# 12. Calculate the percentage of on-time arrivals per airport.
flights.groupBy('arrival_airport').agg(expr("round(sum(case when status = 'On Time' then 1 else 0 end) / count(*) * 100, 2)").alias('on_time_percentage')).show()


+---------------+------------------+
|arrival_airport|on_time_percentage|
+---------------+------------------+
|            SCW|              1.58|
|            NBC|              1.33|
|            ULV|              1.53|
|            NYM|              1.43|
|            UUA|              1.09|
|            KHV|              1.74|
|            KZN|              1.91|
|            CEK|              1.46|
|            KJA|              1.56|
|            ABA|              1.28|
|            OSW|              1.43|
|            ASF|              1.35|
|            UFA|              1.82|
|            TBW|              1.64|
|            NYA|               0.0|
|            ESL|              2.03|
|            RGK|              3.41|
|            REN|              1.64|
|            VOG|              1.27|
|            NFG|              2.27|
+---------------+------------------+
only showing top 20 rows



In [60]:
# 13. Find the longest flight (based on scheduled time).

flights.withColumn('flight_duration', expr("(unix_timestamp(scheduled_arrival) - unix_timestamp(scheduled_departure)) / 60")).orderBy('flight_duration', ascending=False).limit(1).show()

+---------+---------+-------------------+-------------------+-----------------+---------------+---------+-------------+----------------+--------------+---------------+
|flight_id|flight_no|scheduled_departure|  scheduled_arrival|departure_airport|arrival_airport|   status|aircraft_code|actual_departure|actual_arrival|flight_duration|
+---------+---------+-------------------+-------------------+-----------------+---------------+---------+-------------+----------------+--------------+---------------+
|     1247|   PG0168|2017-09-13 22:35:00|2017-09-14 07:25:00|              DME|            UUS|Scheduled|          319|              \N|            \N|          530.0|
+---------+---------+-------------------+-------------------+-----------------+---------------+---------+-------------+----------------+--------------+---------------+



In [62]:
# 14. Determine the average flight range for each aircraft model.

aircrafts_data.groupBy('model').agg({'range': 'avg'}).show()

+--------------------+----------+
|               model|avg(range)|
+--------------------+----------+
|"{""en"": ""Airbu...|      NULL|
|"{""en"": ""Bomba...|      NULL|
|"{""en"": ""Boein...|      NULL|
|"{""en"": ""Airbu...|      NULL|
|"{""en"": ""Cessn...|      NULL|
|"{""en"": ""Sukho...|      NULL|
|"{""en"": ""Airbu...|      NULL|
|"{""en"": ""Boein...|      NULL|
|"{""en"": ""Boein...|      NULL|
+--------------------+----------+



In [67]:
# 15. Calculate the total revenue generated by each aircraft model.

aircrafts_data.join(flights, 'aircraft_code').join(ticket_flights, 'flight_id').groupBy('model').agg({'amount': 'sum'}).show()

+--------------------+-----------+
|               model|sum(amount)|
+--------------------+-----------+
|"{""en"": ""Bomba...| 1982760500|
|"{""en"": ""Boein...| 1426552100|
|"{""en"": ""Airbu...| 2706163100|
|"{""en"": ""Cessn...|   96373800|
|"{""en"": ""Sukho...| 5114484700|
|"{""en"": ""Airbu...| 1638164100|
|"{""en"": ""Boein...| 3431205500|
|"{""en"": ""Boein...| 4371277100|
+--------------------+-----------+



## END