## Spark exercise


# Setup

In [8]:
import pandas as pd 

In [3]:
# Init pyspark
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Init sparksql -- Only used to format the output nicely!
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

rows = sc.textFile("/air_transit_2007.csv")
data = rows.map(lambda line: line.split(","))
# data.cache()



# Sample Query

In [4]:
sample_results = data.filter(lambda r: (r[16]=='ROC'and r[1]=='3' and r[2]=='12')) \
    .map(lambda r: (r[5] , r[17])) \
    .collect()

#format nicely!
sqlContext.createDataFrame(sample_results, ['Sched. Departure', 'Destination']).show(n=10)

+----------------+-----------+
|Sched. Departure|Destination|
+----------------+-----------+
|            1455|        CLE|
|            1840|        CLE|
|             659|        CLE|
|            1750|        EWR|
|             630|        EWR|
|            1650|        EWR|
|            1255|        EWR|
|            1025|        EWR|
|            1034|        IAD|
|            1607|        IAD|
+----------------+-----------+
only showing top 10 rows



## Q1
I have removed all the rows that NA

In [235]:
# Response...
data_removed_na = data.filter(lambda row: 'NA' not in row)
total_records = data_removed_na.count()
print("Total number: ", total_records)

Total number:  7275289


# Q2
I have removed all the rows that NA

In [236]:
data_removed_na = data.filter(lambda row: 'NA' not in row)

header = data_removed_na.first()
data_no_header_no_na = data_removed_na.filter(lambda row: row != header)


# Assuming the month is the second column in your data
flights_per_month = data_no_header_no_na.map(lambda r: (int(r[1]), 1)) \
                        .reduceByKey(lambda a, b: a + b) \
                        .sortByKey(True).collect()
# flights_per_month

sqlContext.createDataFrame(flights_per_month, ['Month', 'Total Flights']).show(n=50)



+-----+-------------+
|Month|Total Flights|
+-----+-------------+
|    1|       604582|
|    2|       538878|
|    3|       621057|
|    4|       602317|
|    5|       623326|
|    6|       609838|
|    7|       632904|
|    8|       638883|
|    9|       592718|
|   10|       621665|
|   11|       597989|
|   12|       591131|
+-----+-------------+



# Q3
##### Find the plane with the highest number of flights. Each plane has a unique TailNum.


In [237]:
data_removed_na = data.filter(lambda row: 'NA' not in row)

header = data_removed_na.first()
data_no_header_no_na = data_removed_na.filter(lambda row: row != header)

tailNum_data = data_no_header_no_na.map(lambda r: (r[10], 1)) 

# Reduce by key (TailNum here) to get the counts
tailNum_counts = tailNum_data.reduceByKey(lambda a, b: a+b)
top_planes = tailNum_counts.takeOrdered(50, key = lambda x: -x[1])

sqlContext.createDataFrame(top_planes, ['Tail Number', 'Total Flights']).show(n=25)

+-----------+-------------+
|Tail Number|Total Flights|
+-----------+-------------+
|     N655BR|         4457|
|     N479HA|         4359|
|     N651BR|         4324|
|     N478HA|         4316|
|     N654BR|         4251|
|     N480HA|         4225|
|     N485HA|         4203|
|     N484HA|         4126|
|     N693BR|         4088|
|     N481HA|         4045|
|     N487HA|         4038|
|     N477HA|         3958|
|     N810AL|         3938|
|     N837AL|         3881|
|     N475HA|         3854|
|     N486HA|         3820|
|     N476HA|         3685|
|     N836AL|         3679|
|     N808AL|         3676|
|     N824AL|         3671|
|     N646BR|         3502|
|     N828AL|         3493|
|     N295SW|         3462|
|     N226SW|         3462|
|     N835AL|         3440|
+-----------+-------------+
only showing top 25 rows



# Q4 
Removed all NAs and used isdigit. 

In [11]:
# Remove rows containing 'NA'
data_removed_na = data.filter(lambda row: 'NA' not in row)

# Remove the header
header = data_removed_na.first()
data_no_header_no_na = data_removed_na.filter(lambda row: row != header)

# Filter out rows with non-numeric values in the flight time and map the results
flightTime_data = data_no_header_no_na.filter(
    lambda r: all(i.replace('.', '', 1).isdigit() for i in r[13].split())  # Check if the value is numeric
).map(
    lambda r: (r[10], int(r[13]))  # Map the results to tuples
)

# Sum flight times per tail number
flightTime_totals = flightTime_data.reduceByKey(lambda a, b: a+b)

# Get the top 100 planes with the most total flight time
top_flightTimes = flightTime_totals.takeOrdered(100, key=lambda x: -x[1])

# Display the results
sqlContext.createDataFrame(top_flightTimes, ['Tail Number', 'Flight Time']).show(n=25)

+-----------+-----------+
|Tail Number|Flight Time|
+-----------+-----------+
|     N556AS|     532213|
|     N557UA|     259376|
|     N597UA|     254760|
|     N636JB|     254357|
|     N637JB|     253562|
|     N590NW|     253079|
|     N607JB|     252862|
|     N590UA|     252847|
|     N505UA|     252382|
|     N554UA|     252378|
|     N558AS|     251992|
|     N212UA|     251816|
|     N598UA|     250612|
|     N624JB|     250327|
|     N646JB|     249865|
|     N625JB|     249089|
|     N543UA|     248784|
|     N599JB|     248747|
|     N565AS|     248648|
|     N640JB|     248571|
|     N618JB|     248465|
|     N666UA|     248352|
|     N595UA|     248135|
|     N639JB|     247980|
|     N649JB|     247807|
+-----------+-----------+
only showing top 25 rows



# Q5
Removed all NAs

In [239]:
from operator import add

data_removed_na = data.filter(lambda row: 'NA' not in row)

header = data_removed_na.first()
data_no_header_no_na = data_removed_na.filter(lambda row: row != header)


# Assuming 'Month' is the 2nd column, 'Origin' is the 17th column, and 'Dest' is the 18th column
departures = data_no_header_no_na.map(lambda r: ((r[1], r[16]), 1))
arrivals = data_no_header_no_na.map(lambda r: ((r[1], r[17]), 1))

# Merge the two datasets and reduce
all_flights = departures.union(arrivals)


total_flights = all_flights.reduceByKey(add)


# Reshape ((Month, Airport), Count) to (Month, (Airport, Count)) and get the busiest airport for each month
busiest_airports = total_flights.map(lambda x: (x[0][0],(x[0][1], x[1]))).reduceByKey(lambda a, b: a if a[1] > b[1] else b)



results = busiest_airports.map(lambda x: (int(x[0]), x[1][0], x[1][1])).sortBy(lambda x: x[0]).collect()

sqlContext.createDataFrame(results, ['Month', 'Airport', 'Flights']).show(n=13)


+-----+-------+-------+
|Month|Airport|Flights|
+-----+-------+-------+
|    1|    ATL|  62801|
|    2|    ATL|  57583|
|    3|    ATL|  66647|
|    4|    ATL|  64838|
|    5|    ATL|  67819|
|    6|    ATL|  69936|
|    7|    ATL|  72121|
|    8|    ATL|  72453|
|    9|    ATL|  67598|
|   10|    ATL|  73016|
|   11|    ATL|  68793|
|   12|    ATL|  67502|
+-----+-------+-------+



# Q6
Find the airline with highest average delay of each type in March 2007. Note: do not write 
separate code for each error type. You should compute a single RDD where each row contains
the delay type, the airline that is worst regarding that delay type, and its average delay of tha 
type in minutes. 

In [240]:
data_removed_na = data.filter(lambda row: 'NA' not in row)

header = data_removed_na.first()
data_no_header_no_na = data_removed_na.filter(lambda row: row != header)

# Then, create a new RDD where each row contains 'DelayType', 'Delay', 'UniqueCarrier' and 'Month'
delay_types = ['CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay','ArrDelay', 'DepDelay']
delay_indices = [24, 25, 26, 27, 28, 14, 15]

delay_data = data_no_header_no_na.flatMap(lambda r: [((delay_type, r[8]), float(r[delay_index])) for delay_type, delay_index in zip(delay_types, delay_indices) if r[delay_index] != 'NA'])

# Compute averages for each 'DelayType' and 'UniqueCarrier'
delay_averages = delay_data.mapValues(lambda v: (v, 1)).reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])).mapValues(lambda v: v[0]/v[1])

# Find the airline with the highest average delay for each type
worst_airlines = delay_averages.map(lambda x: (x[0][0], (x[0][1], x[1]))).reduceByKey(lambda a, b: a if a[1] > b[1] else b)

# Print the results
results = worst_airlines.collect()
for result in results:
    print(f'Delay Type: {result[0]}, Worst Airline: {result[1][0]}, Average Delay: {round(result[1][1], 3)} minutes')

Delay Type: WeatherDelay, Worst Airline: OH, Average Delay: 3.633 minutes
Delay Type: NASDelay, Worst Airline: B6, Average Delay: 7.662 minutes
Delay Type: LateAircraftDelay, Worst Airline: B6, Average Delay: 7.861 minutes
Delay Type: ArrDelay, Worst Airline: EV, Average Delay: 17.196 minutes
Delay Type: CarrierDelay, Worst Airline: EV, Average Delay: 10.239 minutes
Delay Type: SecurityDelay, Worst Airline: AS, Average Delay: 0.086 minutes
Delay Type: DepDelay, Worst Airline: EV, Average Delay: 20.272 minutes


In [241]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Define the DataFrame schema as a list of column names and types
schema = ["Delay Type", "Airline", "Average Delay"]
flat_results = [(result[0], result[1][0], round(result[1][1], 2)) for result in results]

# Convert the results into DataFrame
df = spark.createDataFrame(flat_results, schema)

# Show the DataFrame
df.show()

+-----------------+-------+-------------+
|       Delay Type|Airline|Average Delay|
+-----------------+-------+-------------+
|     WeatherDelay|     OH|         3.63|
|         NASDelay|     B6|         7.66|
|LateAircraftDelay|     B6|         7.86|
|         ArrDelay|     EV|         17.2|
|     CarrierDelay|     EV|        10.24|
|    SecurityDelay|     AS|         0.09|
|         DepDelay|     EV|        20.27|
+-----------------+-------+-------------+



# Q7
Compute median, mean, and mode of columns 12-16, 19-21 and 25-29 for the flights in the third 
week of 2007. Exclude the non-numeric values.


In [274]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col
from statistics import mean, median
from scipy import stats


data_removed_na = data.filter(lambda row: 'NA' not in row)

header = data_removed_na.first()
data_no_header_no_na = data_removed_na.filter(lambda row: row != header)


# Assume that SparkContext is available as sc
spark = SparkSession(sc)

# Filter data for the third week of 2007
data_3rd_week_2007 = data_no_header_no_na.filter(lambda row: row[0] == '2007' and int(row[1]) == 1 and int(row[2]) >= 15 and int(row[2]) <= 21)

# Define function to compute median, mean and mode
def compute_statistics(column_index):
    column_data = data_3rd_week_2007.map(lambda row: int(row[column_index])).filter(lambda x: x is not None).collect()
    return median(column_data), mean(column_data), int(stats.mode(column_data)[0][0])

# Define columns to compute statistics
columns = [11, 12, 13, 14, 15, 18, 19, 20, 24, 25, 26, 27, 28]

## BETTER DESIGN USING PYTHON
column_names = [
    'ActualElapsedTime',
    'CRSElapsedTime',
    'AirTime',
    'ArrDelay',
    'DepDelay',
    'Distance',
    'TaxiIn',
    'TaxiOut',
    'CarrierDelay',
    'WeatherDelay',
    'NASDelay',
    'SecurityDelay',
    'LateAircraftDelay'
]

# Compute statistics and store them in a list of Rows
results = []
for column, column_name in zip(columns, column_names):
    median_val, mean_val, mode_val = compute_statistics(column)
    results.append(Row(column_name=column_name,mean=round(mean_val,2), median=round(median_val,0), mode=mode_val))

# Convert list of Rows to DataFrame
df = spark.createDataFrame(results)

# Show DataFrame
df.show()


  return median(column_data), mean(column_data), int(stats.mode(column_data)[0][0])


+-----------------+------+------+----+
|      column_name|  mean|median|mode|
+-----------------+------+------+----+
|ActualElapsedTime|128.31| 110.0|  80|
|   CRSElapsedTime|127.31| 109.0|  75|
|          AirTime|103.11|  84.0|  59|
|         ArrDelay| 15.97|   2.0|  -5|
|         DepDelay| 14.97|   0.0|   0|
|         Distance| 714.0| 557.0| 337|
|           TaxiIn|  7.07|   6.0|   4|
|          TaxiOut| 18.13|  14.0|  10|
|     CarrierDelay|  4.23|   0.0|   0|
|     WeatherDelay|  1.73|   0.0|   0|
|         NASDelay|  5.69|   0.0|   0|
|    SecurityDelay|  0.01|   0.0|   0|
|LateAircraftDelay|  7.37|   0.0|   0|
+-----------------+------+------+----+



# Q8 
#### Assumption

I have calculated the possible number of combinations with all the time constraints and the same carrier. Added the constraint of the same day e.g. depart 23:50 will arrive the same day at 5am.
This question was slightly confusing and the exact definition of flights was not indicated. Therefore, I have used combinations of possible flights using the same airline. 


This specific instance is using the cartesian command, which will parse all possible combinations of flights that will give us a definitive answer. However, it might take some time to run.

In [5]:
rows = sc.textFile("/air_transit_2007.csv")
data = rows.map(lambda line: line.split(","))

header = data.first() #extract header
no_header_data = data.filter(lambda row : row != header)

In [9]:
# Index for readability
month_index = 1
year_index = 0
day_index = 2
origin_index = 16
dest_index = 17
crsdeptime_index = 5
crsarrtime_index = 7 
carrier_index = 8

data_q8 = no_header_data.filter(lambda line: line[month_index] == '2' and line[year_index] == '2007')

# Filter for the flights from PHL to LAX after 05:59
outbound_flights = data_q8.filter(lambda line: line[origin_index] == 'PHL' and line[dest_index] == 'LAX' and int(line[crsdeptime_index]) > 559)

# Filter for the flights from LAX to PHL before 23:00
inbound_flights = data_q8.filter(lambda line: line[origin_index] == 'LAX' and line[dest_index] == 'PHL' and int(line[crsarrtime_index]) < 2300)

# Ensure there is at least 3 hours and 1 minute (181 minutes) layover time and it is the same day
flights_combination = outbound_flights.cartesian(inbound_flights).filter(
    lambda x: (int(x[0][crsarrtime_index]) - int(x[1][crsdeptime_index]) <= 181) and
              (x[1][day_index] == x[0][day_index])
)

#Grab the proof - next cells 
first_10_rows = flights_combination.take(10)
df = pd.DataFrame(first_10_rows)

# Filter for the same carrier combinations
flights_combination_same_carrier = flights_combination.filter(lambda x: x[0][carrier_index] == x[1][carrier_index])

# Count the combinations for each carrier - reduce
carrier_combinations = flights_combination_same_carrier.map(lambda x: (x[0][8], 1)).reduceByKey(lambda a, b: a + b)

# Collect the results
results = carrier_combinations.collect()
#Visual
sqlContext.createDataFrame(results, ['Carrier', 'Combinations']).show(n=13)

+-------+------------+
|Carrier|Combinations|
+-------+------------+
|     US|         156|
|     WN|          28|
|     UA|          97|
+-------+------------+



#### The highest possible combinations only using the same carrier in a month is US with 156 possible ones. 


In [224]:
# Display the proof table, where I have compared the actual numbers to verify the correctness. 0 is outbound, 1 is inbound
# df

# Q9

In this question I have used all the data, since none of the NA values appeared in our timetable.
I have added a different and more readible style for the table, as well as added the Status column, which uses simple if statements.on.

In [101]:
rows = sc.textFile("/air_transit_2007.csv")
data = rows.map(lambda line: line.split(","))

In [116]:
# Helper function to format time
def format_time(time_str):
    if time_str != '':
        formatted = time_str.zfill(4)
        return formatted[:2] + ':' + formatted[2:]
    else:
        return None

month_index = 1
day_index = 2
year_index = 0
origin_index = 16
deptime_index = 4
crsdeptime_index = 5
carrier_index = 8
dest_index = 17
        

header = data.first() #extract header
no_header_data = data.filter(lambda row : row != header)

# Filter data
la_flights = no_header_data.filter(lambda row: (row[origin_index] == 'LAX' and 
                                                row[year_index] == '2007' and 
                                                row[month_index] == '1' and 
                                                row[day_index] == '12' and 
                                                int(row[crsdeptime_index]) >= 1200 and 
                                                int(row[crsdeptime_index]) <= 1400))



# Sort by scheduled departure time
sorted_la_flights = la_flights.sortBy(lambda row: row[crsdeptime_index])

selected_la_flights = sorted_la_flights.map(lambda row: [format_time(row[crsdeptime_index]), 
                                                         'Departed' if int(row[deptime_index]) <= 1300 else ('Closed' if int(row[deptime_index]) <= 1303 else(
                                                         'LAST CALL' if int(row[deptime_index]) <= 1310 else ('Boarding' if int(row[deptime_index]) <= 1330 else 'Check-In'))), 
                                                         row[carrier_index], 
                                                         row[dest_index],
                                                         format_time(row[deptime_index])])

# sqlContext.createDataFrame(selected_la_flights, ['Scheduled DepTime', 'Status', 'Airline Code', 'Destination', 'Actual Time']).show(n=500)
## BETTER FORMATTING 
table = selected_la_flights.toDF(['Scheduled DepTime', 'Status', 'Airline Code', 'Destination', 'Actual Time'])
table = table.limit(100)  # limited at 100 as asked 

# Convert the DataFrame to a pandas DataFrame
pandas_df = table.toPandas()

# Define the headers
headers = ['Schedule', 'Status', 'Air', 'Dest', 'Actual Time']

# Apply ASCII formatting to the table
formatted_table = tabulate(pandas_df, headers, tablefmt='fancy_grid')

print(formatted_table)


╒════╤════════════╤═══════════╤═══════╤════════╤═══════════════╕
│    │ Schedule   │ Status    │ Air   │ Dest   │ Actual Time   │
╞════╪════════════╪═══════════╪═══════╪════════╪═══════════════╡
│  0 │ 12:00      │ Departed  │ WN    │ OAK    │ 12:56         │
├────┼────────────┼───────────┼───────┼────────┼───────────────┤
│  1 │ 12:00      │ Departed  │ AA    │ JFK    │ 11:59         │
├────┼────────────┼───────────┼───────┼────────┼───────────────┤
│  2 │ 12:00      │ Departed  │ CO    │ IAH    │ 12:13         │
├────┼────────────┼───────────┼───────┼────────┼───────────────┤
│  3 │ 12:05      │ Departed  │ AA    │ MIA    │ 12:06         │
├────┼────────────┼───────────┼───────┼────────┼───────────────┤
│  4 │ 12:11      │ Departed  │ NW    │ MSP    │ 12:54         │
├────┼────────────┼───────────┼───────┼────────┼───────────────┤
│  5 │ 12:14      │ Departed  │ OO    │ SAN    │ 12:17         │
├────┼────────────┼───────────┼───────┼────────┼───────────────┤
│  6 │ 12:15      │ Depar

## Thank you and have a good day! 