In [2]:
import findspark # looking for spark in my system
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,isnan, when, count



In [3]:
spark = SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/13 10:44:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/13 10:44:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
# Reading CSV files
df_alist = spark.read.option("header", True).option("inferSchema", True).csv("airportList.csv")
df_alocation = spark.read.option("header", True).option("inferSchema", True).csv("airportsLocation.csv")

                                                                                

In [5]:
# Schema of DataFrames 
print("Airports List Schema, Data count: {}".format(df_alist.count()))
df_alist.printSchema()
print("Airport Locations Schema, Data count: {}".format(df_alocation.count()))
df_alocation.printSchema()

Airports List Schema, Data count: 8107
root
 |-- airport_id: integer (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- city_airport_location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- airport_code: string (nullable = true)

Airport Locations Schema, Data count: 8107
root
 |-- airport_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- altitude: integer (nullable = true)
 |-- timezone: double (nullable = true)
 |-- dst: string (nullable = true)
 |-- olson_timezone: string (nullable = true)



In [6]:
# Checking null values
df_alist.select([count(when(col(c).contains('None') | col(c).contains('NULL') | \
                            (col(c) == '' ) | col(c).isNull() | isnan(c), c)).alias(c)
                    for c in df_alist.columns]).show()

# There are NULL or NaN values in airport_name, city_airport_location and airport_code columns 

+----------+------------+---------------------+-------+------------+
|airport_id|airport_name|city_airport_location|country|airport_code|
+----------+------------+---------------------+-------+------------+
|         0|           1|                    1|      0|        2228|
+----------+------------+---------------------+-------+------------+



In [7]:
# Number of Null values
df_alist.filter(df_alist.airport_code.isNull()).count()

2227

In [8]:
# When filtering with dropna we eliminate 2227 NULL values, there are still NaN Values
df_alist_dropna_filtered = df_alist.dropna(subset="airport_code")
df_alist_dropna_filtered.count()

5880

In [9]:
# When filtering with filter we eliminate 2227 NULL values, there are still NaN Values
df_alist_filtered = df_alist.filter(df_alist.airport_code.isNotNull())
df_alist_filtered.count()

5880

In [10]:
# How many NaN Values are still in df_alist_filtered?  Answer: 3 
df_alist.select([count(when(isnan(c), c)).alias(c) for c in df_alist.columns]).show()

+----------+------------+---------------------+-------+------------+
|airport_id|airport_name|city_airport_location|country|airport_code|
+----------+------------+---------------------+-------+------------+
|         0|           1|                    1|      0|           1|
+----------+------------+---------------------+-------+------------+



In [11]:
# Left join -to know airports with latitude >40
df_join = df_alist_filtered.join(df_alocation, ["airport_code"], "left")

print("Join DataFrame Schema, Data count: {}".format(df_join.count()))
df_join.printSchema()

Join DataFrame Schema, Data count: 5884
root
 |-- airport_code: string (nullable = true)
 |-- airport_id: integer (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- city_airport_location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- altitude: integer (nullable = true)
 |-- timezone: double (nullable = true)
 |-- dst: string (nullable = true)
 |-- olson_timezone: string (nullable = true)



In [12]:
# Confirmed airports with issues latitude >40 
confirmed_airports = df_join.filter(df_join.latitude >= 40)

print("Aeropuertos confirmados con trafico de armas: {} \n".format(confirmed_airports.count()))
confirmed_airports.select("airport_code", "airport_name", "country", "city_airport_location", "latitude").show()

Aeropuertos confirmados con trafico de armas: 2252 

+------------+--------------------+---------+---------------------+---------+
|airport_code|        airport_name|  country|city_airport_location| latitude|
+------------+--------------------+---------+---------------------+---------+
|         UAK|          Narsarsuaq|Greenland|         Narssarssuaq|61.160517|
|         GOH|                Nuuk|Greenland|             Godthaab|64.190922|
|         SFJ|   Sondre Stromfjord|Greenland|          Sondrestrom|67.016969|
|         THU|      Thule Air Base|Greenland|                Thule|76.531203|
|         AEY|            Akureyri|  Iceland|             Akureyri|65.659994|
|         EGS|         Egilsstadir|  Iceland|          Egilsstadir|65.283333|
|         HFN|        Hornafjordur|  Iceland|                 Hofn|64.295556|
|         HZK|             Husavik|  Iceland|              Husavik|65.952328|
|         IFJ|          Isafjordur|  Iceland|           Isafjordur|66.058056|
|         K

In [13]:
targered_countries = df_alist_filtered.filter(((df_alist_filtered.airport_id%2) > 0)
                                                | (df_alist_filtered.country == "United States") 
                                                | (df_alist_filtered.country == "Mexico")
                                                | (df_alist_filtered.country == "Brazil") 
                                                | (df_alist_filtered.country == "Canada") 
                                                | (df_alist_filtered.country == "Japan"))

In [14]:
print("Aeropuertos sospechosos: {} \nid-impar y lista de países objetivo".format(targered_countries.count()))
targered_countries.show()

Aeropuertos sospechosos: 3987 
id-impar y lista de países objetivo
+----------+--------------------+---------------------+----------------+------------+
|airport_id|        airport_name|city_airport_location|         country|airport_code|
+----------+--------------------+---------------------+----------------+------------+
|         1|              Goroka|               Goroka|Papua New Guinea|         GKA|
|         3|         Mount Hagen|          Mount Hagen|Papua New Guinea|         HGU|
|         5|Port Moresby Jack...|         Port Moresby|Papua New Guinea|         POM|
|         7|          Narsarsuaq|         Narssarssuaq|       Greenland|         UAK|
|         9|   Sondre Stromfjord|          Sondrestrom|       Greenland|         SFJ|
|        11|            Akureyri|             Akureyri|         Iceland|         AEY|
|        13|        Hornafjordur|                 Hofn|         Iceland|         HFN|
|        15|          Isafjordur|           Isafjordur|         Iceland| 