In [128]:
#import findspark, pyspark
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from pyspark.sql.functions import col

#sc = pyspark.SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

file_attack = 'gtd_14to17_0718dist'
file_airport = 'airports-extended'
file_largest_airport = 'largest-global-airports-passenger-traffic'
file_passengers = 'avia_par_be'

# Fetch Data

In [129]:
#data_path = 'D:/DataMining/taba/'
data_path = '../data/Datasets/'

attack_data = sc.textFile(data_path + 'gtd-data/' + file_attack + '.csv')
airport_data = sc.textFile(data_path + file_airport + '.csv')
passenger_data_brussels = sc.textFile(data_path + file_passengers + '.csv')

# Prepare Attack Data

In [130]:
# Define columns for dataframe
new_columns = attack_data.first()
new_columns = new_columns.split(";")
attack_data = attack_data.filter(lambda l: l != new_columns)
df_terror_data = attack_data.map(lambda x: x.split(';')).toDF()
old_columns = df_terror_data.schema.names

In [131]:
# apply new column names
df_terror_data = reduce(lambda data, idx: data.withColumnRenamed(old_columns[idx], new_columns[idx]), range(len(old_columns)), df_terror_data)

# Query Attack Data

In [132]:
#query_result = df_terror_data.filter(col("country_txt") == "Belgium").select('iyear', 'imonth', 'iday','country_txt', 'summary')
query_result = df_terror_data.filter(df_terror_data.country_txt == "Belgium").select('iyear', 'imonth', 'iday','country_txt', 'summary')

In [133]:
query_result.show(3)

+-----+------+----+-----------+--------------------+
|iyear|imonth|iday|country_txt|             summary|
+-----+------+----+-----------+--------------------+
| 2014|     5|  24|    Belgium|05/24/2014: Assai...|
| 2014|     9|  16|    Belgium|09/16/2014: Assai...|
| 2016|     3|  22|    Belgium|03/22/2016: Two s...|
| 2016|     3|  22|    Belgium|03/22/2016: A sui...|
| 2016|     8|   6|    Belgium|08/06/2016: An as...|
| 2016|     8|  29|    Belgium|08/29/2016: Assai...|
| 2016|    10|   5|    Belgium|10/05/2016: An as...|
| 2016|    12|  23|    Belgium|12/23/2016: Secur...|
| 2017|     5|   1|    Belgium|"05/01/2017: An a...|
| 2017|     6|  20|    Belgium|"06/20/2017: A su...|
+-----+------+----+-----------+--------------------+
only showing top 10 rows



In [134]:
query_result.select('summary').show(2, False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|summary                                                                                                                                                                                                                                                                      

# Prepare airport data

In [135]:
fields = []
fields.append(StructField('airport_id', IntegerType(), True)) 
fields.append(StructField('airport_name', StringType(), True))
fields.append(StructField('city', StringType(), True))
fields.append(StructField('country', StringType(), True))
fields.append(StructField('IATA', StringType(), True)) # kind of airport identifier
fields.append(StructField('ICAO', StringType(), True)) # different kind of airport identifier
fields.append(StructField('latitude', FloatType(), True))
fields.append(StructField('longitude', FloatType(), True))
fields.append(StructField('unknown1', IntegerType(), True))
fields.append(StructField('unknown2', StringType(), True))
fields.append(StructField('unknown3', StringType(), True))
fields.append(StructField('continent_city', StringType(), True))
fields.append(StructField('idk_column', StringType(), True))
fields.append(StructField('source', StringType(), True))
schema = StructType(fields)

clean_data = (airport_data
              .map(lambda line: line.split(';'))
              .map(lambda line: [int(line[0]),line[1],line[2],line[3],line[4],line[5],float(line[6]),float(line[7]),int(line[8]),line[9],line[10],line[11],line[12],line[13]])
             )
df_airport_data = sqlContext.createDataFrame(clean_data, schema)

# Query airport data

In [136]:
(df_airport_data
 .select(df_airport_data.airport_name, df_airport_data.city,df_airport_data.continent_city)
 .filter(df_airport_data.continent_city == 'Europe/Brussels')
 .show(df_airport_data.count(),False))

+--------------------------------------+-----------------+---------------+
|airport_name                          |city             |continent_city |
+--------------------------------------+-----------------+---------------+
|Antwerp International Airport (Deurne)|Antwerp          |Europe/Brussels|
|Beauvechain Air Base                  |Beauvechain      |Europe/Brussels|
|Kleine Brogel Air Base                |Kleine Brogel    |Europe/Brussels|
|Brussels Airport                      |Brussels         |Europe/Brussels|
|Jehonville Air Base                   |Bertrix          |Europe/Brussels|
|Brussels South Charleroi Airport      |Charleroi        |Europe/Brussels|
|Chièvres Air Base                     |Chievres         |Europe/Brussels|
|Koksijde Air Base                     |Koksijde         |Europe/Brussels|
|Florennes Air Base                    |Florennes        |Europe/Brussels|
|Wevelgem Airport                      |Kortrijk-vevelgem|Europe/Brussels|
|Liège Airport           

# Prepare passenger data

## Cleaning

In [137]:
international_passengers_file =  'international-passengers-'
international_passengers_2013 = sc.textFile(data_path + international_passengers_file + '2013' + '.csv')
international_passengers_2014 = sc.textFile(data_path + international_passengers_file + '2014' + '.csv')

total_passengers_file = 'total-passengers-'
total_passengers_2013 = sc.textFile(data_path + total_passengers_file + '2013' + '.csv')
total_passengers_2014 = sc.textFile(data_path + total_passengers_file + '2014' + '.csv')

all_passenger_datasets_except2015 = [international_passengers_2013,international_passengers_2014,total_passengers_2013, total_passengers_2014]

counter = sc.accumulator(0)
def clean_passenger_data(rdd):
    global counter
    
    header = rdd.first()
    rdd = rdd.filter(lambda l: l != header)
    rdd = rdd.map(lambda l: l.split(";"))

    new_columns = rdd.map(lambda l: l[3].split("/"))

    rdd = rdd.map(lambda l: l[0:3] + l[4:])

    rdd = rdd.zip(new_columns)

    rdd = rdd.map(lambda l: l[0] + l[1])
    
    all_passenger_datasets_except2015[counter.value] = rdd
    
    counter +=  1
    
    
#    print(rdd.take(2))
#    print()
#    print("+++++++++++++++++++++++++++++++++")
#    print()
    
for dataset in all_passenger_datasets_except2015:
    clean_passenger_data(dataset)
    
print(all_passenger_datasets_except2015[0].take(2))

[['1', 'London Heathrow Airport', 'Hillingdon, Greater London, United Kingdom', '66689466', '4%', 'LHR', 'EGLL'], ['2', 'Dubai International Airport', 'Garhoud, Dubai, United Arab Emirates', '65872250', '15%', 'DXB', 'OMDB']]


## creating dataframe

### make the dataframes for the other datasets

In [138]:
fields = []
fields.append(StructField('rank', StringType(), True))
fields.append(StructField('airport_name', StringType(), True))
fields.append(StructField('location', StringType(), True))
fields.append(StructField('total_passengers', StringType(), True))
fields.append(StructField('change', StringType(), True))
fields.append(StructField('IATA', StringType(), True))
fields.append(StructField('ICAO', StringType(), True))
schema_passengers_data = StructType(fields)


df_international_passengers_2013 = sqlContext.createDataFrame(all_passenger_datasets_except2015[0], schema_passengers_data)
df_international_passengers_2014 = sqlContext.createDataFrame(all_passenger_datasets_except2015[1], schema_passengers_data)
df_total_passengers_2013 = sqlContext.createDataFrame(all_passenger_datasets_except2015[2], schema_passengers_data)
df_total_passengers_2014 = sqlContext.createDataFrame(all_passenger_datasets_except2015[3], schema_passengers_data)

datasets = [df_international_passengers_2013,df_international_passengers_2014,df_total_passengers_2013,df_total_passengers_2014]
for df in datasets:
    df.show(2)
    print("+++++++++++++++++++++++++++++++++")
    print()

+----+--------------------+--------------------+----------------+------+----+----+
|rank|        airport_name|            location|total_passengers|change|IATA|ICAO|
+----+--------------------+--------------------+----------------+------+----+----+
|   1|London Heathrow A...|Hillingdon, Great...|        66689466|    4%| LHR|EGLL|
|   2|Dubai Internation...|Garhoud, Dubai, U...|        65872250|   15%| DXB|OMDB|
+----+--------------------+--------------------+----------------+------+----+----+
only showing top 2 rows

+++++++++++++++++++++++++++++++++

+----+--------------------+--------------------+----------------+------+----+----+
|rank|        airport_name|            location|total_passengers|change|IATA|ICAO|
+----+--------------------+--------------------+----------------+------+----+----+
|   1|Dubai Internation...|Garhoud, Dubai, U...|        69954392|  6.2%| DXB|OMDB|
|   2|London Heathrow A...|Hillingdon, Great...|        68091095|  1.9%| LHR|EGLL|
+----+--------------------+

### fix the data from 2015 missing ICAO column

In [139]:
fields = []
fields.append(StructField('rank', StringType(), True))
fields.append(StructField('airport_name', StringType(), True))
fields.append(StructField('location', StringType(), True))
fields.append(StructField('country', StringType(), True))
fields.append(StructField('total_passengers', StringType(), True))
fields.append(StructField('change', StringType(), True))
fields.append(StructField('IATA', StringType(), True))
schema_total_passengers_2015 = StructType(fields)

fields = []
fields.append(StructField('rank', StringType(), True))
fields.append(StructField('airport_name', StringType(), True))
fields.append(StructField('location', StringType(), True))
fields.append(StructField('total_passengers', StringType(), True))
fields.append(StructField('change', StringType(), True))
fields.append(StructField('IATA', StringType(), True))
schema_international_passengers_2015 = StructType(fields)

In [140]:
total_passengers_2015 = sc.textFile(data_path + total_passengers_file + '2015' + '.csv')
international_passengers_2015 = sc.textFile(data_path + international_passengers_file + '2015' + '.csv')


# remove header
header = total_passengers_2015.first()
total_passengers_2015 = total_passengers_2015.filter(lambda l: l != header)
# create dataframe
df_total_passengers_2015 = sqlContext.createDataFrame(total_passengers_2015.map(lambda l: l.split(";")),schema_total_passengers_2015)
# add missing column by join using previous data (df_airport_data)
df_total_passengers_2015 = df_total_passengers_2015.join(df_airport_data.select(df_airport_data.IATA, df_airport_data.ICAO),"IATA")

# repeat for second dataset from 2015
international_passengers_2015 = sc.textFile(data_path + international_passengers_file + '2015' + '.csv')
header = international_passengers_2015.first()
international_passengers_2015 = international_passengers_2015.filter(lambda l: l != header)
temp = international_passengers_2015.map(lambda l: l.split(";"))
df_international_passengers_2015 = sqlContext.createDataFrame(international_passengers_2015.map(lambda l: l.split(";")),schema_international_passengers_2015)
df_international_passengers_2015 = df_international_passengers_2015.join(df_airport_data.select(df_airport_data.IATA, df_airport_data.ICAO),"IATA")

# print results
print("total passenger data 2015")
df_total_passengers_2015.show(2)
print()
print("++++++++++++++++++++++++++++++")
print()
print("international passenger data 2015")
df_international_passengers_2015.show(2)

total passenger data 2015
+----+----+--------------------+---------+-------+----------------+------+----+
|IATA|rank|        airport_name| location|country|total_passengers|change|ICAO|
+----+----+--------------------+---------+-------+----------------+------+----+
| FRA|  12|Flughafen Frankfu...|Frankfurt|Germany|        61032022|   2.5|EDDF|
| IST|  11|Atatürk Internati...| Istanbul| Turkey|        61346229|   8.2|LTBA|
+----+----+--------------------+---------+-------+----------------+------+----+
only showing top 2 rows


++++++++++++++++++++++++++++++

international passenger data 2015
+----+----+--------------------+-----------------+----------------+------+----+
|IATA|rank|        airport_name|         location|total_passengers|change|ICAO|
+----+----+--------------------+-----------------+----------------+------+----+
| PMI|  36|Aeropuerto de Pal...|Palma De Mallorca|        18107070|   0.6|LEPA|
| HEL|  48|Helsinki-Vantaa A...|         Helsinki|        13826868|   2.9|EFHK|
+-

# Prepare passenger data Brussels Airport

In [141]:
passenger_data_brussels = passenger_data_brussels.map(lambda l: l.split(";"))

header = passenger_data_brussels.first()
passenger_data_brussels = passenger_data_brussels.filter(lambda l: l != header)
df_passenger_data_brussels = passenger_data_brussels.toDF(header)

df_passenger_data_brussels.show(2)

+----------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|destination ICAO|2004M03|2004M04|2005M03|2005M04|2006M03|2006M04|2007M03|2007M04|2008M03|2008M04|2009M03|2009M04|2010M03|2010M04|2011M03|2011M04|2012M03|2012M04|2013M03|2013M04|2014M03|2014M04|2015M03|2015M04|2016M03|2016M04|2017M03|2017M04|2018M03|2018M04|
+----------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|            LOWW|  29916|  31346|  27747|  29958|  32884|  31902|  35712|  40232|  43480|  42864|  37655|  37727|  32659|  26410|  35922|  34845|  35000|  34440|  33190|  35530|  35638|  41863|  34789|  39449|  24226|  236