
# Project Title

### Data Engineering Capstone Project

#### Rama Iyer

#### Project Summary

This project is my capstone submission for the Udacity Data Engineering Nanodegree. It showcases what I've learned through the program by taking large sets of data and organizing it in such a way that allows others to gain valuable insight from it.

This project creates an analytical datawarehouse for consumption by US Government, which can be used to understand immigration patterns, and help in making policy decisions around immigration so that industries such as tourism, business and higher education sector are able to thrive successfully. 

The analytical warehouse is hosted on Amazon Redshift and the pipeline implementation was done using Apache Airflow.

The project follows the follow steps:
* Step 0: Imports
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [22]:
# Step 0 - Imports
# Do all imports and installs here
import configparser
import datetime as dt
import os
import dataMappings
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import dayofweek, monotonically_increasing_id, from_unixtime
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, TimestampType,DateType, FloatType
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
from pyspark.sql import types 
import pandas as pd

In [23]:
config = configparser.ConfigParser()
config.read('/home/workspace/dl.cfg')

['/home/workspace/dl.cfg']

In [24]:
os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [25]:
def create_spark_session():
    """Return a SparkSession object."""
    
    spark = SparkSession \
        .builder \
        .config("spark.jars.repositories", "https://repos.spark-packages.org/") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11") \
        .config("spark.hadoop.fs.s3a.multiobjectdelete.enable","false") \
        .enableHiveSupport() \
        .getOrCreate()
    spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
    return spark


In [26]:
spark = create_spark_session()

### Step 1: Scope the Project and Gather Data
#### Scope
In this project an analytical database will be made available for the USA government, so they can quickly gather insights from the data.

At a high level,  
The data is cleaned and loaded into S3 as parquet files.  
This is then loaded into Redshift using pipeline created using Airflow.

#### Describe and Gather Data 
Data Sets Used:  
The following data was used to build the datawarehouse:  

I94 Immigration Data:  
The immigration data comes from the [US National Tourism and Trade Office](https://www.trade.gov/national-travel-and-tourism-office). It includes information about people entering the United States, such as immigration year and month, arrival and departure dates, age and year of birth of immigrant, arrival city, port, current residence state, travel mode (air, sea), visa type etc. Specificially, the data from April 2016 is used to showcase this project, which is over 1 million records. The data is in parquet format.    

U.S. City Demographic Data:  
The demographic data comes from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). It includes demographic information about US cities, such as the median age, total population, and specific populations (male vs female, foreign-born, different races, etc.). The data is in csv format.  

Country Data  
This data was provided in I94_SAS_Labels_Descriptions.SAS in the provided project and contains a mapping of country names and their I94 codes that are found in the immigration data. I used Spark to create a parquet file with this information.  

Visa Codes  
This data was provided in I94_SAS_Labels_Descriptions.SAS in the provided project. This maps to the 
I94VISA column in the immigration table. I used Spark to create a parquet file with this information.  

State Code  
This data was provided in I94_SAS_Labels_Descriptions.SAS in the provided project. This maps to the 
I94ADDR column in the immigration table. I used Spark to create a parquet file with this information.  

Travel Mode  
This data was provided in I94_SAS_Labels_Descriptions.SAS in the provided project. This maps to the 
I94MODE column in the immigration table. I used Spark to create a parquet file with this information.  

Port Code    
This data was provided in I94_SAS_Labels_Descriptions.SAS in the provided project. This maps to the 
I94PORT column in the immigration table. I used Spark to create a parquet file with this information.  

Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).  

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.


#### Read in the data here
Read the cities demographic file 

In [6]:
df = spark.read.options(delimiter=";").csv("us-cities-demographics.csv",header=True)
#print schems to get idea of columns in the data frame
df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [7]:
#show data 
df.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [8]:
#Create Temporary view on dataframe and explore
df.createOrReplaceTempView("CityTable")
dSQL = spark.sql("Select count(*) AS TotalRecords, count(distinct City) AS DistinctCity from CityTable").show()

+------------+------------+
|TotalRecords|DistinctCity|
+------------+------------+
|        2891|         567|
+------------+------------+



In [9]:
dSQL = spark.sql('''
Select City, State, count(distinct Race) AS DistinctRace 
from CityTable Group by City, State
''').show(5)

+--------------------+--------+------------+
|                City|   State|DistinctRace|
+--------------------+--------+------------+
|          Cincinnati|    Ohio|           5|
|           Lynchburg|Virginia|           5|
|         Kansas City|  Kansas|           5|
|Louisville/Jeffer...|Kentucky|           5|
|              Dayton|    Ohio|           5|
+--------------------+--------+------------+
only showing top 5 rows



In [10]:
dSQL = spark.sql("select count(distinct Race) from CityTable").show()

+--------------------+
|count(DISTINCT Race)|
+--------------------+
|                   5|
+--------------------+



Exploration of the above demographics data tells us that there are a total of 2891 records with 567 distinct cities and only 5 distinct Race.

#### Clean the data - Demographics

City Demographics - Change the data type of columns to the appropriate type. Also, as there is only 5 distinct Race in the data set, pivot the data by Race for each city and state combination and write this new data set to parquet

In [14]:
#Rename columms and Change datatypes
dSQL = spark.sql("Select City, \
                         State, \
                         float(`Median Age`) MedianAge, \
                         int(`Male Population`) MalePopulation, \
                         int(`Female Population`) FemalePopulation , \
                         int(`Total Population`) TotalPopulation, \
                         int(`Number of Veterans`) NumberOfVeterans, \
                         int(`Foreign-Born`) ForeignBorn, \
                         float(`Average Household Size`) AverageHouseholdSize, \
                         `State Code` StateCode, \
                         Race, \
                         int(Count) Count \
                         from CityTable")

# Pivot the dataframe by Race to get count in a single row
d_pivot_df = dSQL.groupBy("City","StateCode").pivot("Race").sum("Count")

#join the two datasets to create a single combined final dataset and drop duplicates
f_df = dSQL.join(d_pivot_df,["City","StateCode"]).drop("Race","Count").dropDuplicates()

#rename columms 
f_df = f_df.withColumnRenamed("American Indian and Alaska Native","AmericanIndianandAlaskanNativePopulation") \
.withColumnRenamed("Asian","AsianPopulation") \
.withColumnRenamed("Black or African-American","BlackorAfricanAmericanPopulation") \
.withColumnRenamed("Hispanic or Latino","HispanicorLatinoPopulation") \
.withColumnRenamed("White", "WhitePopulation")
f_df.limit(5).toPandas()

Unnamed: 0,City,StateCode,State,MedianAge,MalePopulation,FemalePopulation,TotalPopulation,NumberOfVeterans,ForeignBorn,AverageHouseholdSize,AmericanIndianandAlaskanNativePopulation,AsianPopulation,BlackorAfricanAmericanPopulation,HispanicorLatinoPopulation,WhitePopulation
0,Trenton,NJ,New Jersey,33.299999,42581,41650,84231,2604,20428,3.04,98.0,1437,43471,30613,37683
1,Saint Cloud,MN,Minnesota,30.700001,34311,33942,68253,5012,5757,2.41,1169.0,2494,6401,2639,59794
2,Cambridge,MA,Massachusetts,31.5,55421,54981,110402,2495,27757,2.06,906.0,18441,11880,9433,80551
3,Spring Hill,FL,Florida,45.099998,48295,54902,103197,9737,7893,2.58,1254.0,2409,7689,15838,91742
4,Missouri City,TX,Texas,37.200001,34932,36846,71778,4274,18556,3.03,,17854,28070,10007,20590


Read the immigration data file 

In [6]:
#read immigration data
df = spark.read.parquet("sas_data")
df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [16]:
df.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [17]:
#create temporary view to explore 
df.createOrReplaceTempView("ImmigTable")
dSQL = spark.sql('''
SELECT count(*) AS TotalRecords, count(distinct cicid ) AS DistinctImmigrationCount 
FROM ImmigTable
''').show()

+------------+------------------------+
|TotalRecords|DistinctImmigrationCount|
+------------+------------------------+
|     3096313|                 3096313|
+------------+------------------------+



In [18]:
dSQL = spark.sql('''
SELECT count(*) AS NoArrivalDate FROM ImmigTable WHERE arrdate is NULL
''').show()

+-------------+
|NoArrivalDate|
+-------------+
|            0|
+-------------+



In [20]:
dSQL = spark.sql('''
SELECT count(*) AS IncorrectDeparture FROM ImmigTable WHERE depdate IS NULL
''').show()

+------------------+
|IncorrectDeparture|
+------------------+
|            142457|
+------------------+



In [24]:
dSQL = spark.sql('''
SELECT min(i94yr) min_i94yr, max(i94yr) as max_i94yr, min(i94mon) as min_i94mon, max(i94mon)  as max_i94mon,
min(arrdate) as min_arrdate, min(depdate) as min_depdate
FROM ImmigTable 
''').show()

+---------+---------+----------+----------+-----------+-----------+
|min_i94yr|max_i94yr|min_i94mon|max_i94mon|min_arrdate|min_depdate|
+---------+---------+----------+----------+-----------+-----------+
|   2016.0|   2016.0|       4.0|       4.0|    20545.0|    15176.0|
+---------+---------+----------+----------+-----------+-----------+



In [25]:
dSQL = spark.sql('''
SELECT count(*) AS IncorrectDeparture FROM ImmigTable WHERE depdate < arrdate
''').show()

+------------------+
|IncorrectDeparture|
+------------------+
|               375|
+------------------+



The immigration data has a total of 3096313 records and the same number of distinct immigration id. Hence each row of data is unique. There are no records without an arrival date. Dates are represented as numbers. There are 375 records where the departure date is earlier to the arrival date. These need to be cleaned. There are 142457 records that have no departure dates, these are valid though.

#### Clean the data - Immigration

The dates in the immigration data are in double format, these need to be converted to date data type. Also convert the data types to suitable types for the other columns as well. The below columns are not used for analysis and hence can be dropped from the final data frame.
DTADFILE,VISAPOST,OCCUP,ENTDEPA,ENTDEPD,ENTDEPU,DTADDTO

In [7]:
# to convert to date 
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(float(x))).isoformat() if x else None)
df = df.withColumn("arrival_date", get_date(df.arrdate))
df = df.withColumn("departure_date", get_date(df.depdate))

df = df.withColumn("arrival_date", to_date(df.arrival_date, 'yyyy-MM-dd'))
df = df.withColumn("departure_date", to_date(df.departure_date, 'yyyy-MM-dd'))

In [8]:
df.createOrReplaceTempView("ImmigData")
dSQL = spark.sql('''
                    select 
                        int(cicid) immigration_id,
                        int(i94yr) immigration_year,
                        int(i94mon) immigration_month,
                        int(i94cit) citizenship_country_code,
                        int(i94res) residency_country_code,
                        i94port port_code,
                        arrival_date,
                        int(i94mode) travel_mode,
                        i94addr current_state_code,
                        departure_date,
                        int(i94bir) age,
                        int(i94visa) visa_code,
                        matflag match_flag,
                        int(biryear) birth_year,
                        gender,
                        airline airline_code,
                        bigint(admnum) admission_num,
                        visatype visa_type,
                        int(i94yr) immigration_year_part,
                        int(i94mon) immigration_month_part
                    from ImmigData     
                    WHERE departure_date is null or year(departure_date) >= 2016
               ''')
dSQL.select(year("departure_date").alias("dep_year")).groupBy("dep_year").count().toPandas()

Unnamed: 0,dep_year,count
0,2069.0,1
1,,142457
2,2020.0,1
3,2016.0,2953826
4,2084.0,1


In [9]:
dSQL.limit(5).toPandas()

Unnamed: 0,immigration_id,immigration_year,immigration_month,citizenship_country_code,residency_country_code,port_code,arrival_date,travel_mode,current_state_code,departure_date,age,visa_code,match_flag,birth_year,gender,airline_code,admission_num,visa_type,immigration_year_part,immigration_month_part
0,5748517,2016,4,245,438,LOS,2016-04-30,1,CA,2016-05-08,40,1,M,1976,F,QF,94953870030,B1,2016,4
1,5748518,2016,4,245,438,LOS,2016-04-30,1,NV,2016-05-17,32,1,M,1984,F,VA,94955622830,B1,2016,4
2,5748519,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-08,29,1,M,1987,M,DL,94956406530,B1,2016,4
3,5748520,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-14,29,1,M,1987,F,DL,94956451430,B1,2016,4
4,5748521,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-14,28,1,M,1988,M,DL,94956388130,B1,2016,4


Map Visa Codes, Port Codes, Country Codes and Travel mode from I94_SAS_Labels_Descriptions.SAS

In [41]:
# visa codes dictionary
visa_codes = {
   1 : 'Business',
   2 : 'Pleasure',
   3 : 'Student'
}

df_visa_codes = spark.createDataFrame(list(map(list, visa_codes.items())),
                                        ["visa_code","visa_type"])
df_visa_codes.toPandas()

Unnamed: 0,visa_code,visa_type
0,1,Business
1,2,Pleasure
2,3,Student


Country Code Mapping

In [42]:
cit_and_res_codes = {  
   582 :  'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)',
   236 :  'AFGHANISTAN',
   101 :  'ALBANIA',
   316 :  'ALGERIA',
   102 :  'ANDORRA',
   324 :  'ANGOLA',
   529 :  'ANGUILLA',
   518 :  'ANTIGUA-BARBUDA',
   687 :  'ARGENTINA ',
   151 :  'ARMENIA',
   532 :  'ARUBA',
   438 :  'AUSTRALIA',
   103 :  'AUSTRIA',
   152 :  'AZERBAIJAN',
   512 :  'BAHAMAS',
   298 :  'BAHRAIN',
   274 :  'BANGLADESH',
   513 :  'BARBADOS',
   104 :  'BELGIUM',
   581 :  'BELIZE',
   386 :  'BENIN',
   509 :  'BERMUDA',
   153 :  'BELARUS',
   242 :  'BHUTAN',
   688 :  'BOLIVIA',
   717 :  'BONAIRE, ST EUSTATIUS, SABA' ,
   164 :  'BOSNIA-HERZEGOVINA',
   336 :  'BOTSWANA',
   689 :  'BRAZIL',
   525 :  'BRITISH VIRGIN ISLANDS',
   217 :  'BRUNEI',
   105 :  'BULGARIA',
   393 :  'BURKINA FASO',
   243 :  'BURMA',
   375 :  'BURUNDI',
   310 :  'CAMEROON',
   326 :  'CAPE VERDE',
   526 :  'CAYMAN ISLANDS',
   383 :  'CENTRAL AFRICAN REPUBLIC',
   384 :  'CHAD',
   690 :  'CHILE',
   245 :  'CHINA, PRC',
   721 :  'CURACAO' ,
   270 :  'CHRISTMAS ISLAND',
   271 :  'COCOS ISLANDS',
   691 :  'COLOMBIA',
   317 :  'COMOROS',
   385 :  'CONGO',
   467 :  'COOK ISLANDS',
   575 :  'COSTA RICA',
   165 :  'CROATIA',
   584 :  'CUBA',
   218 :  'CYPRUS',
   140 :  'CZECH REPUBLIC',
   723 :  'FAROE ISLANDS (PART OF DENMARK)'  ,
   108 :  'DENMARK',
   322 :  'DJIBOUTI',
   519 :  'DOMINICA',
   585 :  'DOMINICAN REPUBLIC',
   240 :  'EAST TIMOR',
   692 :  'ECUADOR',
   368 :  'EGYPT',
   576 :  'EL SALVADOR',
   399 :  'EQUATORIAL GUINEA',
   372 :  'ERITREA',
   109 :  'ESTONIA',
   369 :  'ETHIOPIA',
   604 :  'FALKLAND ISLANDS',
   413 :  'FIJI',
   110 :  'FINLAND',
   111 :  'FRANCE',
   601 :  'FRENCH GUIANA',
   411 :  'FRENCH POLYNESIA',
   387 :  'GABON',
   338 :  'GAMBIA',
   758 :  'GAZA STRIP' ,
   154 :  'GEORGIA',
   112 :  'GERMANY',
   339 :  'GHANA',
   143 :  'GIBRALTAR',
   113 :  'GREECE',
   520 :  'GRENADA',
   507 :  'GUADELOUPE',
   577 :  'GUATEMALA',
   382 :  'GUINEA',
   327 :  'GUINEA-BISSAU',
   603 :  'GUYANA',
   586 :  'HAITI',
   726 :  'HEARD AND MCDONALD IS.',
   149 :  'HOLY SEE/VATICAN',
   528 :  'HONDURAS',
   206 :  'HONG KONG',
   114 :  'HUNGARY',
   115 :  'ICELAND',
   213 :  'INDIA',
   759 :  'INDIAN OCEAN AREAS (FRENCH)' ,
   729 :  'INDIAN OCEAN TERRITORY' ,
   204 :  'INDONESIA',
   249 :  'IRAN',
   250 :  'IRAQ',
   116 :  'IRELAND',
   251 :  'ISRAEL',
   117 :  'ITALY',
   388 :  'IVORY COAST',
   514 :  'JAMAICA',
   209 :  'JAPAN',
   253 :  'JORDAN',
   201 :  'KAMPUCHEA',
   155 :  'KAZAKHSTAN',
   340 :  'KENYA',
   414 :  'KIRIBATI',
   732 :  'KOSOVO' ,
   272 :  'KUWAIT',
   156 :  'KYRGYZSTAN',
   203 :  'LAOS',
   118 :  'LATVIA',
   255 :  'LEBANON',
   335 :  'LESOTHO',
   370 :  'LIBERIA',
   381 :  'LIBYA',
   119 :  'LIECHTENSTEIN',
   120 :  'LITHUANIA',
   121 :  'LUXEMBOURG',
   214 :  'MACAU',
   167 :  'MACEDONIA',
   320 :  'MADAGASCAR',
   345 :  'MALAWI',
   273 :  'MALAYSIA',
   220 :  'MALDIVES',
   392 :  'MALI',
   145 :  'MALTA',
   472 :  'MARSHALL ISLANDS',
   511 :  'MARTINIQUE',
   389 :  'MAURITANIA',
   342 :  'MAURITIUS',
   760 :  'MAYOTTE (AFRICA - FRENCH)' ,
   473 :  'MICRONESIA, FED. STATES OF',
   157 :  'MOLDOVA',
   122 :  'MONACO',
   299 :  'MONGOLIA',
   735 :  'MONTENEGRO' ,
   521 :  'MONTSERRAT',
   332 :  'MOROCCO',
   329 :  'MOZAMBIQUE',
   371 :  'NAMIBIA',
   440 :  'NAURU',
   257 :  'NEPAL',
   123 :  'NETHERLANDS',
   508 :  'NETHERLANDS ANTILLES',
   409 :  'NEW CALEDONIA',
   464 :  'NEW ZEALAND',
   579 :  'NICARAGUA',
   390 :  'NIGER',
   343 :  'NIGERIA',
   470 :  'NIUE',
   275 :  'NORTH KOREA',
   124 :  'NORWAY',
   256 :  'OMAN',
   258 :  'PAKISTAN',
   474 :  'PALAU',
   743 :  'PALESTINE' ,
   504 :  'PANAMA',
   441 :  'PAPUA NEW GUINEA',
   693 :  'PARAGUAY',
   694 :  'PERU',
   260 :  'PHILIPPINES',
   416 :  'PITCAIRN ISLANDS',
   107 :  'POLAND',
   126 :  'PORTUGAL',
   297 :  'QATAR',
   748 :  'REPUBLIC OF SOUTH SUDAN',
   321 :  'REUNION',
   127 :  'ROMANIA',
   158 :  'RUSSIA',
   376 :  'RWANDA',
   128 :  'SAN MARINO',
   330 :  'SAO TOME AND PRINCIPE',
   261 :  'SAUDI ARABIA',
   391 :  'SENEGAL',
   142 :  'SERBIA AND MONTENEGRO',
   745 :  'SERBIA' ,
   347 :  'SEYCHELLES',
   348 :  'SIERRA LEONE',
   207 :  'SINGAPORE',
   141 :  'SLOVAKIA',
   166 :  'SLOVENIA',
   412 :  'SOLOMON ISLANDS',
   397 :  'SOMALIA',
   373 :  'SOUTH AFRICA',
   276 :  'SOUTH KOREA',
   129 :  'SPAIN',
   244 :  'SRI LANKA',
   346 :  'ST. HELENA',
   522 :  'ST. KITTS-NEVIS',
   523 :  'ST. LUCIA',
   502 :  'ST. PIERRE AND MIQUELON',
   524 :  'ST. VINCENT-GRENADINES',
   716 :  'SAINT BARTHELEMY' ,
   736 :  'SAINT MARTIN' ,
   749 :  'SAINT MAARTEN' ,
   350 :  'SUDAN',
   602 :  'SURINAME',
   351 :  'SWAZILAND',
   130 :  'SWEDEN',
   131 :  'SWITZERLAND',
   262 :  'SYRIA',
   268 :  'TAIWAN',
   159 :  'TAJIKISTAN',
   353 :  'TANZANIA',
   263 :  'THAILAND',
   304 :  'TOGO',
   417 :  'TONGA',
   516 :  'TRINIDAD AND TOBAGO',
   323 :  'TUNISIA',
   264 :  'TURKEY',
   161 :  'TURKMENISTAN',
   527 :  'TURKS AND CAICOS ISLANDS',
   420 :  'TUVALU',
   352 :  'UGANDA',
   162 :  'UKRAINE',
   296 :  'UNITED ARAB EMIRATES',
   135 :  'UNITED KINGDOM',
   695 :  'URUGUAY',
   163 :  'UZBEKISTAN',
   410 :  'VANUATU',
   696 :  'VENEZUELA',
   266 :  'VIETNAM',
   469 :  'WALLIS AND FUTUNA ISLANDS',
   757 :  'WEST INDIES (FRENCH)' ,
   333 :  'WESTERN SAHARA',
   465 :  'WESTERN SAMOA',
   216 :  'YEMEN',
   139 :  'YUGOSLAVIA',
   301 :  'ZAIRE',
   344 :  'ZAMBIA',
   315 :  'ZIMBABWE',
   403 :  'INVALID: AMERICAN SAMOA',
   712 :  'INVALID: ANTARCTICA' ,
   700 :  'INVALID: BORN ON BOARD SHIP',
   719 :  'INVALID: BOUVET ISLAND (ANTARCTICA/NORWAY TERR.)',
   574 :  'INVALID: CANADA',
   720 :  'INVALID: CANTON AND ENDERBURY ISLS' ,
   106 :  'INVALID: CZECHOSLOVAKIA',
   739 :  'INVALID: DRONNING MAUD LAND (ANTARCTICA-NORWAY)' ,
   394 :  'INVALID: FRENCH SOUTHERN AND ANTARCTIC',
   501 :  'INVALID: GREENLAND',
   404 :  'INVALID: GUAM',
   730 :  'INVALID: INTERNATIONAL WATERS' ,
   731 :  'INVALID: JOHNSON ISLAND' ,
   471 :  'INVALID: MARIANA ISLANDS, NORTHERN',
   737 :  'INVALID: MIDWAY ISLANDS' ,
   753 :  'INVALID: MINOR OUTLYING ISLANDS - USA',
   740 :  'INVALID: NEUTRAL ZONE (S. ARABIA/IRAQ)' ,
   710 :  'INVALID: NON-QUOTA IMMIGRANT',
   505 :  'INVALID: PUERTO RICO',
    0  :  'INVALID: STATELESS',
   705 :  'INVALID: STATELESS',
   583 :  'INVALID: UNITED STATES',
   407 :  'INVALID: UNITED STATES',
   999 :  'INVALID: UNKNOWN',
   239 :  'INVALID: UNKNOWN COUNTRY',
   134 :  'INVALID: USSR',
   506 :  'INVALID: U.S. VIRGIN ISLANDS',
   755 :  'INVALID: WAKE ISLAND'  ,
   311 :  'Collapsed Tanzania (should not show)',
   741 :  'Collapsed Curacao (should not show)',
    54 :  'No Country Code (54)',
   100 :  'No Country Code (100)',
   187 :  'No Country Code (187)',
   190 :  'No Country Code (190)',
   200 :  'No Country Code (200)',
   219 :  'No Country Code (219)',
   238 :  'No Country Code (238)',
   277 :  'No Country Code (277)',
   293 :  'No Country Code (293)',
   300 :  'No Country Code (300)',
   319 :  'No Country Code (319)',
   365 :  'No Country Code (365)',
   395 :  'No Country Code (395)',
   400 :  'No Country Code (400)',
   485 :  'No Country Code (485)',
   503 :  'No Country Code (503)',
   589 :  'No Country Code (589)',
   592 :  'No Country Code (592)',
   791 :  'No Country Code (791)',
   849 :  'No Country Code (849)',
   914 :  'No Country Code (914)',
   944 :  'No Country Code (944)',
   996 :  'No Country Code (996)'
}

df_city_res_codes = spark.createDataFrame(list(map(list, cit_and_res_codes.items())),
                                        ["country_code","country"])
df_city_res_codes.limit(5).toPandas()

Unnamed: 0,country_code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


Travel mode Mapping

In [43]:
mode_codes = {
    1 : 'Air',
    2 : 'Sea',
    3 : 'Land',
    9 : 'Not reported'
}
df_travel_mode = spark.createDataFrame(list(map(list, mode_codes.items())),
                                        ["travel_mode_code","travel_code"])
df_travel_mode = df_travel_mode.withColumn("travel_mode_code",df_travel_mode["travel_mode_code"].cast(IntegerType()))
df_travel_mode.toPandas()    

Unnamed: 0,travel_mode_code,travel_code
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


State Code Mapping

In [44]:
state_codes = {
    'AL' : 'ALABAMA',
    'AK' : 'ALASKA',
    'AZ' : 'ARIZONA',
    'AR' : 'ARKANSAS',
    'CA' : 'CALIFORNIA',
    'CO' : 'COLORADO',
    'CT' : 'CONNECTICUT',
    'DE' : 'DELAWARE',
    'DC' :'DIST. OF COLUMBIA',
    'FL' :'FLORIDA',
    'GA' :'GEORGIA',
    'GU' :'GUAM',
    'HI' :'HAWAII',
    'ID' :'IDAHO',
    'IL' :'ILLINOIS',
    'IN' :'INDIANA',
    'IA' :'IOWA',
    'KS' :'KANSAS',
    'KY' :'KENTUCKY',
    'LA' :'LOUISIANA',
    'ME' :'MAINE',
    'MD' :'MARYLAND',
    'MA' :'MASSACHUSETTS',
    'MI' :'MICHIGAN',
    'MN' :'MINNESOTA',
    'MS' :'MISSISSIPPI',
    'MO' :'MISSOURI',
    'MT' :'MONTANA',
    'NC' :'N. CAROLINA',
    'ND' :'N. DAKOTA',
    'NE' :'NEBRASKA',
    'NV' :'NEVADA',
    'NH' :'NEW HAMPSHIRE',
    'NJ' :'NEW JERSEY',
    'NM' :'NEW MEXICO',
    'NY' :'NEW YORK',
    'OH' :'OHIO',
    'OK' :'OKLAHOMA',
    'OR' :'OREGON',
    'PA' :'PENNSYLVANIA',
    'PR' :'PUERTO RICO',
    'RI' :'RHODE ISLAND',
    'SC' :'S. CAROLINA',
    'SD' :'S. DAKOTA',
    'TN' :'TENNESSEE',
    'TX' :'TEXAS',
    'UT' :'UTAH',
    'VT' :'VERMONT',
    'VI' :'VIRGIN ISLANDS',
    'VA' :'VIRGINIA',
    'WV' :'W. VIRGINIA',
    'WA' :'WASHINGTON',
    'WI' :'WISCONSON',
    'WY' :'WYOMING' ,
    '99' :'All Other Codes' 
}
df_state_codes = spark.createDataFrame(list(map(list, state_codes.items())),
                                        ["state_code","state"])
df_state_codes.limit(5).toPandas()

Unnamed: 0,state_code,state
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


Port Code Mapping

In [45]:
ports_codes = {
   'ALC'    :    'ALCAN, AK             ',
   'ANC'    :    'ANCHORAGE, AK         ',
   'BAR'    :    'BAKER AAF - BAKER ISLAND, AK',
   'DAC'    :    'DALTONS CACHE, AK     ',
   'PIZ'    :    'DEW STATION PT LAY DEW, AK',
   'DTH'    :    'DUTCH HARBOR, AK      ',
   'EGL'    :    'EAGLE, AK             ',
   'FRB'    :    'FAIRBANKS, AK         ',
   'HOM'    :    'HOMER, AK             '           ,
   'HYD'    :    'HYDER, AK             ',
   'JUN'    :    'JUNEAU, AK            ',
   '5KE'    :    'KETCHIKAN, AK',
   'KET'    :    'KETCHIKAN, AK         ',
   'MOS'    :    'MOSES POINT INTERMEDIATE, AK',
   'NIK'    :    'NIKISKI, AK           ',
   'NOM'    :    'NOM, AK               ',
   'PKC'    :    'POKER CREEK, AK       ',
   'ORI'    :    'PORT LIONS SPB, AK',
   'SKA'    :    'SKAGWAY, AK           ',
   'SNP'    :    'ST. PAUL ISLAND, AK',
   'TKI'    :    'TOKEEN, AK',
   'WRA'    :    'WRANGELL, AK          ',
   'HSV'    :    'MADISON COUNTY - HUNTSVILLE, AL',
   'MOB'    :    'MOBILE, AL            ',
   'LIA'    :    'LITTLE ROCK, AR (BPS)',
   'ROG'    :    'ROGERS ARPT, AR',
   'DOU'    :    'DOUGLAS, AZ           ',
   'LUK'    :    'LUKEVILLE, AZ         ',
   'MAP'    :    'MARIPOSA AZ           ',
   'NAC'    :    'NACO, AZ              ',
   'NOG'    :    'NOGALES, AZ           ',
   'PHO'    :    'PHOENIX, AZ           ',
   'POR'    :    'PORTAL, AZ',
   'SLU'    :    'SAN LUIS, AZ          ',
   'SAS'    :    'SASABE, AZ            ',
   'TUC'    :    'TUCSON, AZ            ',
   'YUI'    :    'YUMA, AZ              ' ,
   'AND'    :    'ANDRADE, CA           ',
   'BUR'    :    'BURBANK, CA',
   'CAL'    :    'CALEXICO, CA          ',
   'CAO'    :    'CAMPO, CA             ' ,
   'FRE'    :    'FRESNO, CA            ',
   'ICP'    :    'IMPERIAL COUNTY, CA   ',
   'LNB'    :    'LONG BEACH, CA         ',
   'LOS'    :    'LOS ANGELES, CA       ',
   'BFL'    :    'MEADOWS FIELD - BAKERSFIELD, CA',
   'OAK'    :    'OAKLAND, CA ' ,
   'ONT'    :    'ONTARIO, CA',
   'OTM'    :    'OTAY MESA, CA          ',
   'BLT'    :    'PACIFIC, HWY. STATION, CA ',
   'PSP'    :    'PALM SPRINGS, CA',
   'SAC'    :    'SACRAMENTO, CA        ',
   'SLS'    :    'SALINAS, CA (BPS)',
   'SDP'    :    'SAN DIEGO, CA',
   'SFR'    :    'SAN FRANCISCO, CA     ',
   'SNJ'    :    'SAN JOSE, CA          ',
   'SLO'    :    'SAN LUIS OBISPO, CA   ',
   'SLI'    :    'SAN LUIS OBISPO, CA (BPS)',
   'SPC'    :    'SAN PEDRO, CA         ',
   'SYS'    :    'SAN YSIDRO, CA        ',
   'SAA'    :    'SANTA ANA, CA         ',
   'STO'    :    'STOCKTON, CA (BPS)',
   'TEC'    :    'TECATE, CA            ',
   'TRV'    :    'TRAVIS-AFB, CA        ',
   'APA'    :    'ARAPAHOE COUNTY, CO',
   'ASE'    :    'ASPEN, CO #ARPT',
   'COS'    :    'COLORADO SPRINGS, CO',
   'DEN'    :    'DENVER, CO            ',
   'DRO'    :    'LA PLATA - DURANGO, CO',
   'BDL'    :    'BRADLEY INTERNATIONAL, CT',
   'BGC'    :    'BRIDGEPORT, CT        ',
   'GRT'    :    'GROTON, CT            ',
   'HAR'    :    'HARTFORD, CT          ',
   'NWH'    :    'NEW HAVEN, CT         ',
   'NWL'    :    'NEW LONDON, CT        ',
   'TST'    :    'NEWINGTON DATA CENTER TEST, CT',
   'WAS'    :    'WASHINGTON DC         ',
   'DOV'    :    'DOVER AFB, DE',
   'DVD'    :    'DOVER-AFB, DE         ',
   'WLL'    :    'WILMINGTON, DE        ',
   'BOC'    :    'BOCAGRANDE, FL        ',
   'SRQ'    :    'BRADENTON - SARASOTA, FL',
   'CAN'    :    'CAPE CANAVERAL, FL    ',
   'DAB'    :    'DAYTONA BEACH INTERNATIONAL, FL',
   'FRN'    :    'FERNANDINA, FL        ',
   'FTL'    :    'FORT LAUDERDALE, FL   ',
   'FMY'    :    'FORT MYERS, FL        ',
   'FPF'    :    'FORT PIERCE, FL       ',
   'HUR'    :    'HURLBURT FIELD, FL',
   'GNV'    :    'J R ALISON MUNI - GAINESVILLE, FL',
   'JAC'    :    'JACKSONVILLE, FL      ',
   'KEY'    :    'KEY WEST, FL          ',
   'LEE'    :    'LEESBURG MUNICIPAL AIRPORT, FL',
   'MLB'    :    'MELBOURNE, FL',
   'MIA'    :    'MIAMI, FL             ',
   'APF'    :    'NAPLES, FL #ARPT',
   'OPF'    :    'OPA LOCKA, FL',
   'ORL'    :    'ORLANDO, FL           ',
   'PAN'    :    'PANAMA CITY, FL       ',
   'PEN'    :    'PENSACOLA, FL         ',
   'PCF'    :    'PORT CANAVERAL, FL    ',
   'PEV'    :    'PORT EVERGLADES, FL   ',
   'PSJ'    :    'PORT ST JOE, FL       ',
   'SFB'    :    'SANFORD, FL           ',
   'SGJ'    :    'ST AUGUSTINE ARPT, FL',
   'SAU'    :    'ST AUGUSTINE, FL      ',
   'FPR'    :    'ST LUCIE COUNTY, FL',
   'SPE'    :    'ST PETERSBURG, FL     ',
   'TAM'    :    'TAMPA, FL             ',
   'WPB'    :    'WEST PALM BEACH, FL   ',
   'ATL'    :    'ATLANTA, GA           ',
   'BRU'    :    'BRUNSWICK, GA         ',
   'AGS'    :    'BUSH FIELD - AUGUSTA, GA',
   'SAV'    :    'SAVANNAH, GA          ',
   'AGA'    :    'AGANA, GU             ',
   'HHW'    :    'HONOLULU, HI          ',
   'OGG'    :    'KAHULUI - MAUI, HI',
   'KOA'    :    'KEAHOLE-KONA, HI      ',
   'LIH'    :    'LIHUE, HI             ',
   'CID'    :    'CEDAR RAPIDS/IOWA CITY, IA',
   'DSM'    :    'DES MOINES, IA',
   'BOI'    :    'AIR TERM. (GOWEN FLD) BOISE, ID',
   'EPI'    :    'EASTPORT, ID          ',
   'IDA'    :    'FANNING FIELD - IDAHO FALLS, ID',
   'PTL'    :    'PORTHILL, ID          ',
   'SPI'    :    'CAPITAL - SPRINGFIELD, IL',
   'CHI'    :    'CHICAGO, IL           ',
   'DPA'    :    'DUPAGE COUNTY, IL',
   'PIA'    :    'GREATER PEORIA, IL',
   'RFD'    :    'GREATER ROCKFORD, IL',
   'UGN'    :    'MEMORIAL - WAUKEGAN, IL',
   'GAR'    :    'GARY, IN              ',
   'HMM'    :    'HAMMOND, IN           ',
   'INP'    :    'INDIANAPOLIS, IN      ',
   'MRL'    :    'MERRILLVILLE, IN      ',
   'SBN'    :    'SOUTH BEND, IN',
   'ICT'    :    'MID-CONTINENT - WITCHITA, KS',
   'LEX'    :    'BLUE GRASS - LEXINGTON, KY',
   'LOU'    :    'LOUISVILLE, KY        ',
   'BTN'    :    'BATON ROUGE, LA       ',
   'LKC'    :    'LAKE CHARLES, LA      ',
   'LAK'    :    'LAKE CHARLES, LA (BPS)',
   'MLU'    :    'MONROE, LA',
   'MGC'    :    'MORGAN CITY, LA       ',
   'NOL'    :    'NEW ORLEANS, LA       ',
   'BOS'    :    'BOSTON, MA            ',
   'GLO'    :    'GLOUCESTER, MA        ',
   'BED'    :    'HANSCOM FIELD - BEDFORD, MA',
   'LYN'    :    'LYNDEN, WA            ',
   'ADW'    :    'ANDREWS AFB, MD',
   'BAL'    :    'BALTIMORE, MD         ',
   'MKG'    :    'MUSKEGON, MD',
   'PAX'    :    'PATUXENT RIVER, MD    ',
   'BGM'    :    'BANGOR, ME            ',
   'BOO'    :    'BOOTHBAY HARBOR, ME   ',
   'BWM'    :    'BRIDGEWATER, ME       ',
   'BCK'    :    'BUCKPORT, ME          ',
   'CLS'    :    'CALAIS, ME   ',
   'CRB'    :    'CARIBOU, ME           ',
   'COB'    :    'COBURN GORE, ME       ',
   'EST'    :    'EASTCOURT, ME         ',
   'EPT'    :    'EASTPORT MUNICIPAL, ME',
   'EPM'    :    'EASTPORT, ME          ',
   'FOR'    :    'FOREST CITY, ME       ',
   'FTF'    :    'FORT FAIRFIELD, ME    ',
   'FTK'    :    'FORT KENT, ME         ',
   'HML'    :    'HAMIIN, ME            ',
   'HTM'    :    'HOULTON, ME           ',
   'JKM'    :    'JACKMAN, ME           ',
   'KAL'    :    'KALISPEL, MT          ',
   'LIM'    :    'LIMESTONE, ME         ',
   'LUB'    :    'LUBEC, ME             ',
   'MAD'    :    'MADAWASKA, ME         ',
   'POM'    :    'PORTLAND, ME          ',
   'RGM'    :    'RANGELEY, ME (BPS)',
   'SBR'    :    'SOUTH BREWER, ME      ',
   'SRL'    :    'ST AURELIE, ME        ',
   'SPA'    :    'ST PAMPILE, ME        ',
   'VNB'    :    'VAN BUREN, ME         ',
   'VCB'    :    'VANCEBORO, ME         ',
   'AGN'    :    'ALGONAC, MI           ',
   'ALP'    :    'ALPENA, MI            ',
   'BCY'    :    'BAY CITY, MI          ',
   'DET'    :    'DETROIT, MI           ',
   'GRP'    :    'GRAND RAPIDS, MI',
   'GRO'    :    'GROSSE ISLE, MI       ',
   'ISL'    :    'ISLE ROYALE, MI       ',
   'MRC'    :    'MARINE CITY, MI       ',
   'MRY'    :    'MARYSVILLE, MI        ',
   'PTK'    :    'OAKLAND COUNTY - PONTIAC, MI',
   'PHU'    :    'PORT HURON, MI        ',
   'RBT'    :    'ROBERTS LANDING, MI   ',
   'SAG'    :    'SAGINAW, MI           ',
   'SSM'    :    'SAULT STE. MARIE, MI  ',
   'SCL'    :    'ST CLAIR, MI          ',
   'YIP'    :    'WILLOW RUN - YPSILANTI, MI',
   'BAU'    :    'BAUDETTE, MN          ',
   'CAR'    :    'CARIBOU MUNICIPAL AIRPORT, MN',
   'GTF'    :    'Collapsed into INT, MN',
   'INL'    :    'Collapsed into INT, MN',
   'CRA'    :    'CRANE LAKE, MN        ',
   'MIC'    :    'CRYSTAL MUNICIPAL AIRPORT, MN',
   'DUL'    :    'DULUTH, MN            ',
   'ELY'    :    'ELY, MN               ',
   'GPM'    :    'GRAND PORTAGE, MN     ',
   'SVC'    :    'GRANT COUNTY - SILVER CITY, MN',
   'INT'    :    'INT''L FALLS, MN      ',
   'LAN'    :    'LANCASTER, MN         ',
   'MSP'    :    'MINN./ST PAUL, MN     ',
   'LIN'    :    'NORTHERN SVC CENTER, MN   ',
   'NOY'    :    'NOYES, MN             ',
   'PIN'    :    'PINE CREEK, MN        ',
   '48Y'    :    'PINECREEK BORDER ARPT, MN',
   'RAN'    :    'RAINER, MN            ',
   'RST'    :    'ROCHESTER, MN',
   'ROS'    :    'ROSEAU, MN            ',
   'SPM'    :    'ST PAUL, MN           ',
   'WSB'    :    'WARROAD INTL, SPB, MN',
   'WAR'    :    'WARROAD, MN           ',
   'KAN'    :    'KANSAS CITY, MO       ',
   'SGF'    :    'SPRINGFIELD-BRANSON, MO',
   'STL'    :    'ST LOUIS, MO          ',
   'WHI'    :    'WHITETAIL, MT         ',
   'WHM'    :    'WILD HORSE, MT        ',
   'GPT'    :    'BILOXI REGIONAL, MS',
   'GTR'    :    'GOLDEN TRIANGLE LOWNDES CNTY, MS',
   'GUL'    :    'GULFPORT, MS          ',
   'PAS'    :    'PASCAGOULA, MS        ',
   'JAN'    :    'THOMPSON FIELD - JACKSON, MS',
   'BIL'    :    'BILLINGS, MT          ',
   'BTM'    :    'BUTTE, MT             ',
   'CHF'    :    'CHIEF MT, MT          ',
   'CTB'    :    'CUT BANK MUNICIPAL, MT',
   'CUT'    :    'CUT BANK, MT          ',
   'DLB'    :    'DEL BONITA, MT        ',
   'EUR'    :    'EUREKA, MT (BPS)',
   'BZN'    :    'GALLATIN FIELD - BOZEMAN, MT',
   'FCA'    :    'GLACIER NATIONAL PARK, MT',
   'GGW'    :    'GLASGOW, MT           ',
   'GRE'    :    'GREAT FALLS, MT       ',
   'HVR'    :    'HAVRE, MT             ',
   'HEL'    :    'HELENA, MT            ',
   'LWT'    :    'LEWISTON, MT          ',
   'MGM'    :    'MORGAN, MT            ',
   'OPH'    :    'OPHEIM, MT            ',
   'PIE'    :    'PIEGAN, MT            ',
   'RAY'    :    'RAYMOND, MT           ',
   'ROO'    :    'ROOSVILLE, MT         ',
   'SCO'    :    'SCOBEY, MT            ',
   'SWE'    :    'SWEETGTASS, MT        ',
   'TRL'    :    'TRIAL CREEK, MT       ',
   'TUR'    :    'TURNER, MT            ',
   'WCM'    :    'WILLOW CREEK, MT      ',
   'CLT'    :    'CHARLOTTE, NC         ',
   'FAY'    :    'FAYETTEVILLE, NC',
   'MRH'    :    'MOREHEAD CITY, NC     ',
   'FOP'    :    'MORRIS FIELDS AAF, NC',
   'GSO'    :    'PIEDMONT TRIAD INTL AIRPORT, NC',
   'RDU'    :    'RALEIGH/DURHAM, NC    ',
   'SSC'    :    'SHAW AFB - SUMTER, NC',
   'WIL'    :    'WILMINGTON, NC        ',
   'AMB'    :    'AMBROSE, ND           ',
   'ANT'    :    'ANTLER, ND            ',
   'CRY'    :    'CARBURY, ND           ',
   'DNS'    :    'DUNSEITH, ND          ',
   'FAR'    :    'FARGO, ND             ',
   'FRT'    :    'FORTUNA, ND           ',
   'GRF'    :    'GRAND FORKS, ND       ',
   'HNN'    :    'HANNAH, ND            ',
   'HNS'    :    'HANSBORO, ND          ',
   'MAI'    :    'MAIDA, ND             ',
   'MND'    :    'MINOT, ND             ',
   'NEC'    :    'NECHE, ND             ',
   'NOO'    :    'NOONAN, ND            ',
   'NRG'    :    'NORTHGATE, ND         ',
   'PEM'    :    'PEMBINA, ND           ',
   'SAR'    :    'SARLES, ND            ',
   'SHR'    :    'SHERWOOD, ND          ',
   'SJO'    :    'ST JOHN, ND           ',
   'WAL'    :    'WALHALLA, ND          ',
   'WHO'    :    'WESTHOPE, ND          ',
   'WND'    :    'WILLISTON, ND         ',
   'OMA'    :    'OMAHA, NE             ',
   'LEB'    :    'LEBANON, NH           ',
   'MHT'    :    'MANCHESTER, NH',
   'PNH'    :    'PITTSBURG, NH         ',
   'PSM'    :    'PORTSMOUTH, NH        ',
   'BYO'    :    'BAYONNE, NJ           ',
   'CNJ'    :    'CAMDEN, NJ            ',
   'HOB'    :    'HOBOKEN, NJ           ',
   'JER'    :    'JERSEY CITY, NJ       ',
   'WRI'    :    'MC GUIRE AFB - WRIGHTSOWN, NJ',
   'MMU'    :    'MORRISTOWN, NJ',
   'NEW'    :    'NEWARK/TETERBORO, NJ  ',
   'PER'    :    'PERTH AMBOY, NJ       ',
   'ACY'    :    'POMONA FIELD - ATLANTIC CITY, NJ',
   'ALA'    :    'ALAMAGORDO, NM (BPS)',
   'ABQ'    :    'ALBUQUERQUE, NM       ',
   'ANP'    :    'ANTELOPE WELLS, NM    ',
   'CRL'    :    'CARLSBAD, NM          ',
   'COL'    :    'COLUMBUS, NM          ',
   'CDD'    :    'CRANE LAKE - ST. LOUIS CNTY, NM',
   'DNM'    :    'DEMING, NM (BPS)',
   'LAS'    :    'LAS CRUCES, NM        ',
   'LOB'    :    'LORDSBURG, NM (BPS)',
   'RUI'    :    'RUIDOSO, NM',
   'STR'    :    'SANTA TERESA, NM      ',
   'RNO'    :    'CANNON INTL - RENO/TAHOE, NV',
   'FLX'    :    'FALLON MUNICIPAL AIRPORT, NV',
   'LVG'    :    'LAS VEGAS, NV         ',
   'REN'    :    'RENO, NV              ',
   'ALB'    :    'ALBANY, NY            ',
   'AXB'    :    'ALEXANDRIA BAY, NY    ',
   'BUF'    :    'BUFFALO, NY           ',
   'CNH'    :    'CANNON CORNERS, NY',
   'CAP'    :    'CAPE VINCENT, NY      ',
   'CHM'    :    'CHAMPLAIN, NY         ',
   'CHT'    :    'CHATEAUGAY, NY        ',
   'CLA'    :    'CLAYTON, NY           ',
   'FTC'    :    'FORT COVINGTON, NY    ',
   'LAG'    :    'LA GUARDIA, NY        ',
   'LEW'    :    'LEWISTON, NY          ',
   'MAS'    :    'MASSENA, NY           ',
   'MAG'    :    'MCGUIRE AFB, NY       ',
   'MOO'    :    'MOORES, NY            ',
   'MRR'    :    'MORRISTOWN, NY        ',
   'NYC'    :    'NEW YORK, NY          ',
   'NIA'    :    'NIAGARA FALLS, NY     ',
   'OGD'    :    'OGDENSBURG, NY        ',
   'OSW'    :    'OSWEGO, NY            ',
   'ELM'    :    'REGIONAL ARPT - HORSEHEAD, NY',
   'ROC'    :    'ROCHESTER, NY         ',
   'ROU'    :    'ROUSES POINT, NY      ',
   'SWF'    :    'STEWART - ORANGE CNTY, NY',
   'SYR'    :    'SYRACUSE, NY          ',
   'THO'    :    'THOUSAND ISLAND BRIDGE, NY',
   'TRO'    :    'TROUT RIVER, NY       ',
   'WAT'    :    'WATERTOWN, NY         ',
   'HPN'    :    'WESTCHESTER - WHITE PLAINS, NY',
   'WRB'    :    'WHIRLPOOL BRIDGE, NY',
   'YOU'    :    'YOUNGSTOWN, NY        ',
   'AKR'    :    'AKRON, OH             ',
   'ATB'    :    'ASHTABULA, OH         ',
   'CIN'    :    'CINCINNATI, OH        ',
   'CLE'    :    'CLEVELAND, OH         ',
   'CLM'    :    'COLUMBUS, OH          ',
   'LOR'    :    'LORAIN, OH            ',
   'MBO'    :    'MARBLE HEADS, OH      ',
   'SDY'    :    'SANDUSKY, OH          ',
   'TOL'    :    'TOLEDO, OH            ',
   'OKC'    :    'OKLAHOMA CITY, OK     ',
   'TUL'    :    'TULSA, OK',
   'AST'    :    'ASTORIA, OR           ',
   'COO'    :    'COOS BAY, OR          ',
   'HIO'    :    'HILLSBORO, OR',
   'MED'    :    'MEDFORD, OR           ',
   'NPT'    :    'NEWPORT, OR           ',
   'POO'    :    'PORTLAND, OR          ',
   'PUT'    :    'PUT-IN-BAY, OH        ',
   'RDM'    :    'ROBERTS FIELDS - REDMOND, OR',
   'ERI'    :    'ERIE, PA              ',
   'MDT'    :    'HARRISBURG, PA',
   'HSB'    :    'HARRISONBURG, PA      ',
   'PHI'    :    'PHILADELPHIA, PA      ',
   'PIT'    :    'PITTSBURG, PA         ',
   'AGU'    :    'AGUADILLA, PR         ',
   'BQN'    :    'BORINQUEN - AGUADILLO, PR',
   'JCP'    :    'CULEBRA - BENJAMIN RIVERA, PR',
   'ENS'    :    'ENSENADA, PR          ',
   'FAJ'    :    'FAJARDO, PR           ',
   'HUM'    :    'HUMACAO, PR           ',
   'JOB'    :    'JOBOS, PR             ',
   'MAY'    :    'MAYAGUEZ, PR          ',
   'PON'    :    'PONCE, PR             ',
   'PSE'    :    'PONCE-MERCEDITA, PR',
   'SAJ'    :    'SAN JUAN, PR          ',
   'VQS'    :    'VIEQUES-ARPT, PR',
   'PRO'    :    'PROVIDENCE, RI        ',
   'PVD'    :    'THEODORE FRANCIS - WARWICK, RI',
   'CHL'    :    'CHARLESTON, SC        ',
   'CAE'    :    'COLUMBIA, SC #ARPT',
   'GEO'    :    'GEORGETOWN, SC        ',
   'GSP'    :    'GREENVILLE, SC',
   'GRR'    :    'GREER, SC',
   'MYR'    :    'MYRTLE BEACH, SC',
   'SPF'    :    'BLACK HILLS, SPEARFISH, SD',
   'HON'    :    'HOWES REGIONAL ARPT - HURON, SD',
   'SAI'    :    'SAIPAN, SPN           ',
   'TYS'    :    'MC GHEE TYSON - ALCOA, TN',
   'MEM'    :    'MEMPHIS, TN           ',
   'NSV'    :    'NASHVILLE, TN         ',
   'TRI'    :    'TRI CITY ARPT, TN',
   'ADS'    :    'ADDISON AIRPORT- ADDISON, TX',
   'ADT'    :    'AMISTAD DAM, TX       ',
   'ANZ'    :    'ANZALDUAS, TX',
   'AUS'    :    'AUSTIN, TX            ',
   'BEA'    :    'BEAUMONT, TX          ',
   'BBP'    :    'BIG BEND PARK, TX (BPS)',
   'SCC'    :    'BP SPEC COORD. CTR, TX',
   'BTC'    :    'BP TACTICAL UNIT, TX  ' ,
   'BOA'    :    'BRIDGE OF AMERICAS, TX',
   'BRO'    :    'BROWNSVILLE, TX       ',
   'CRP'    :    'CORPUS CHRISTI, TX    ',
   'DAL'    :    'DALLAS, TX            ',
   'DLR'    :    'DEL RIO, TX           ',
   'DNA'    :    'DONNA, TX',
   'EGP'    :    'EAGLE PASS, TX        ',
   'ELP'    :    'EL PASO, TX           ',
   'FAB'    :    'FABENS, TX            ',
   'FAL'    :    'FALCON HEIGHTS, TX    ',
   'FTH'    :    'FORT HANCOCK, TX      ',
   'AFW'    :    'FORT WORTH ALLIANCE, TX',
   'FPT'    :    'FREEPORT, TX          ',
   'GAL'    :    'GALVESTON, TX         ',
   'HLG'    :    'HARLINGEN, TX         ',
   'HID'    :    'HIDALGO, TX           ',
   'HOU'    :    'HOUSTON, TX           ',
   'SGR'    :    'HULL FIELD, SUGAR LAND ARPT, TX',
   'LLB'    :    'JUAREZ-LINCOLN BRIDGE, TX',
   'LCB'    :    'LAREDO COLUMBIA BRIDGE, TX',
   'LRN'    :    'LAREDO NORTH, TX      ',
   'LAR'    :    'LAREDO, TX            ',
   'LSE'    :    'LOS EBANOS, TX        ',
   'IND'    :    'LOS INDIOS, TX',
   'LOI'    :    'LOS INDIOS, TX        ',
   'MRS'    :    'MARFA, TX (BPS)',
   'MCA'    :    'MCALLEN, TX           ',
   'MAF'    :    'ODESSA REGIONAL, TX',
   'PDN'    :    'PASO DEL NORTE,TX     ',
   'PBB'    :    'PEACE BRIDGE, NY      ',
   'PHR'    :    'PHARR, TX             ',
   'PAR'    :    'PORT ARTHUR, TX       ',
   'ISB'    :    'PORT ISABEL, TX       ',
   'POE'    :    'PORT OF EL PASO, TX   ',
   'PRE'    :    'PRESIDIO, TX          ',
   'PGR'    :    'PROGRESO, TX          ',
   'RIO'    :    'RIO GRANDE CITY, TX   ',
   'ROM'    :    'ROMA, TX              ',
   'SNA'    :    'SAN ANTONIO, TX       ',
   'SNN'    :    'SANDERSON, TX         ',
   'VIB'    :    'VETERAN INTL BRIDGE, TX',
   'YSL'    :    'YSLETA, TX            ',
   'CHA'    :    'CHARLOTTE AMALIE, VI  ',
   'CHR'    :    'CHRISTIANSTED, VI     ',
   'CRU'    :    'CRUZ BAY, ST JOHN, VI ',
   'FRK'    :    'FREDERIKSTED, VI      ',
   'STT'    :    'ST THOMAS, VI         ',
   'LGU'    :    'CACHE AIRPORT - LOGAN, UT',
   'SLC'    :    'SALT LAKE CITY, UT    ',
   'CHO'    :    'ALBEMARLE CHARLOTTESVILLE, VA',
   'DAA'    :    'DAVISON AAF - FAIRFAX CNTY, VA',
   'HOP'    :    'HOPEWELL, VA          ',
   'HEF'    :    'MANASSAS, VA #ARPT',
   'NWN'    :    'NEWPORT, VA           ',
   'NOR'    :    'NORFOLK, VA           ',
   'RCM'    :    'RICHMOND, VA          ',
   'ABS'    :    'ALBURG SPRINGS, VT    ',
   'ABG'    :    'ALBURG, VT            ',
   'BEB'    :    'BEEBE PLAIN, VT       ',
   'BEE'    :    'BEECHER FALLS, VT     ',
   'BRG'    :    'BURLINGTON, VT        ',
   'CNA'    :    'CANAAN, VT            ',
   'DER'    :    'DERBY LINE, VT (I-91) ',
   'DLV'    :    'DERBY LINE, VT (RT. 5)',
   'ERC'    :    'EAST RICHFORD, VT     ',
   'HIG'    :    'HIGHGATE SPRINGS, VT  ',
   'MOR'    :    'MORSES LINE, VT       ',
   'NPV'    :    'NEWPORT, VT           ',
   'NRT'    :    'NORTH TROY, VT        ',
   'NRN'    :    'NORTON, VT            ',
   'PIV'    :    'PINNACLE ROAD, VT     ',
   'RIF'    :    'RICHFORT, VT          ',
   'STA'    :    'ST ALBANS, VT         ',
   'SWB'    :    'SWANTON, VT (BP - SECTOR HQ)',
   'WBE'    :    'WEST BERKSHIRE, VT    ',
   'ABE'    :    'ABERDEEN, WA          ',
   'ANA'    :    'ANACORTES, WA         ',
   'BEL'    :    'BELLINGHAM, WA        ',
   'BLI'    :    'BELLINGHAM, WASHINGTON #INTL',
   'BLA'    :    'BLAINE, WA            ',
   'BWA'    :    'BOUNDARY, WA          ',
   'CUR'    :    'CURLEW, WA (BPS)',
   'DVL'    :    'DANVILLE, WA          ',
   'EVE'    :    'EVERETT, WA           ',
   'FER'    :    'FERRY, WA             ',
   'FRI'    :    'FRIDAY HARBOR, WA     ',
   'FWA'    :    'FRONTIER, WA          ',
   'KLM'    :    'KALAMA, WA            ',
   'LAU'    :    'LAURIER, WA           ',
   'LON'    :    'LONGVIEW, WA          ',
   'MET'    :    'METALINE FALLS, WA    ',
   'MWH'    :    'MOSES LAKE GRANT COUNTY ARPT, WA',
   'NEA'    :    'NEAH BAY, WA          ',
   'NIG'    :    'NIGHTHAWK, WA         ',
   'OLY'    :    'OLYMPIA, WA           ',
   'ORO'    :    'OROVILLE, WA          ',
   'PWB'    :    'PASCO, WA             ',
   'PIR'    :    'POINT ROBERTS, WA     ',
   'PNG'    :    'PORT ANGELES, WA      ',
   'PTO'    :    'PORT TOWNSEND, WA     ',
   'SEA'    :    'SEATTLE, WA           ',
   'SPO'    :    'SPOKANE, WA           ',
   'SUM'    :    'SUMAS, WA             ',
   'TAC'    :    'TACOMA, WA            ',
   'PSC'    :    'TRI-CITIES - PASCO, WA',
   'VAN'    :    'VANCOUVER, WA         ',
   'AGM'    :    'ALGOMA, WI            ',
   'BAY'    :    'BAYFIELD, WI          ',
   'GRB'    :    'GREEN BAY, WI         ',
   'MNW'    :    'MANITOWOC, WI         ',
   'MIL'    :    'MILWAUKEE, WI         ',
   'MSN'    :    'TRUAX FIELD - DANE COUNTY, WI',
   'CHS'    :    'CHARLESTON, WV        ',
   'CLK'    :    'CLARKSBURG, WV        ',
   'BLF'    :    'MERCER COUNTY, WV',
   'CSP'    :    'CASPER, WY            ',
   'XXX'    :    'NOT REPORTED/UNKNOWN  ' ,
   '888'    :    'UNIDENTIFED AIR / SEAPORT',
   'UNK'    :    'UNKNOWN POE           ',
   'CLG'    :    'CALGARY, CANADA       ',
   'EDA'    :    'EDMONTON, CANADA      ',
   'YHC'    :    'HAKAI PASS, CANADA',
   'HAL'    :    'Halifax, NS, Canada   ',
   'MON'    :    'MONTREAL, CANADA      ',
   'OTT'    :    'OTTAWA, CANADA        ',
   'YXE'    :    'SASKATOON, CANADA',
   'TOR'    :    'TORONTO, CANADA       ',
   'VCV'    :    'VANCOUVER, CANADA     ',
   'VIC'    :    'VICTORIA, CANADA      ',
   'WIN'    :    'WINNIPEG, CANADA      ',
   'AMS'    :    'AMSTERDAM-SCHIPHOL, NETHERLANDS',
   'ARB'    :    'ARUBA, NETH ANTILLES  ',
   'BAN'    :    'BANKOK, THAILAND      ',
   'BEI'    :    'BEICA #ARPT, ETHIOPIA',
   'PEK'    :    'BEIJING CAPITAL INTL, PRC',
   'BDA'    :    'KINDLEY FIELD, BERMUDA',
   'BOG'    :    'BOGOTA, EL DORADO #ARPT, COLOMBIA',
   'EZE'    :    'BUENOS AIRES, MINISTRO PIST, ARGENTINA',
   'CUN'    :    'CANCUN, MEXICO',
   'CRQ'    :    'CARAVELAS, BA #ARPT, BRAZIL',
   'MVD'    :    'CARRASCO, URUGUAY',
   'DUB'    :    'DUBLIN, IRELAND       ',
   'FOU'    :    'FOUGAMOU #ARPT, GABON',
   'FBA'    :    'FREEPORT, BAHAMAS      ',
   'MTY'    :    'GEN M. ESCOBEDO, Monterrey, MX',
   'HMO'    :    'GEN PESQUEIRA GARCIA, MX',
   'GCM'    :    'GRAND CAYMAN, CAYMAN ISLAND',
   'GDL'    :    'GUADALAJARA, MIGUEL HIDAL, MX',
   'HAM'    :    'HAMILTON, BERMUDA     ',
   'ICN'    :    'INCHON, SEOUL KOREA',
   'IWA'    :    'INVALID - IWAKUNI, JAPAN',
   'CND'    :    'KOGALNICEANU, ROMANIA',
   'LAH'    :    'LABUHA ARPT, INDONESIA',
   'DUR'    :    'LOUIS BOTHA, SOUTH AFRICA',
   'MAL'    :    'MANGOLE ARPT, INDONESIA',
   'MDE'    :    'MEDELLIN, COLOMBIA',
   'MEX'    :    'JUAREZ INTL, MEXICO CITY, MX',
   'LHR'    :    'MIDDLESEX, ENGLAND',
   'NBO'    :    'NAIROBI, KENYA        ',
   'NAS'    :    'NASSAU, BAHAMAS       ',
   'NCA'    :    'NORTH CAICOS, TURK & CAIMAN',
   'PTY'    :    'OMAR TORRIJOS, PANAMA',
   'SPV'    :    'PAPUA, NEW GUINEA',
   'UIO'    :    'QUITO (MARISCAL SUCR), ECUADOR',
   'RIT'    :    'ROME, ITALY           ',
   'SNO'    :    'SAKON NAKHON #ARPT, THAILAND',
   'SLP'    :    'SAN LUIS POTOSI #ARPT, MEXICO',
   'SAN'    :    'SAN SALVADOR, EL SALVADOR',
   'SRO'    :    'SANTANA RAMOS #ARPT, COLOMBIA',
   'GRU'    :    'GUARULHOS INTL, SAO PAULO, BRAZIL',
   'SHA'    :    'SHANNON, IRELAND      ',
   'HIL'    :    'SHILLAVO, ETHIOPIA',
   'TOK'    :    'TOROKINA #ARPT, PAPUA, NEW GUINEA',
   'VER'    :    'VERACRUZ, MEXICO',
   'LGW'    :    'WEST SUSSEX, ENGLAND  ',
   'ZZZ'    :    'MEXICO Land (Banco de Mexico) ',
   'CHN'    :    'No PORT Code (CHN)',
   'CNC'    :    'CANNON CORNERS, NY',
   'MAA'    :    'Abu Dhabi',
   'AG0'    :    'MAGNOLIA, AR',
   'BHM'    :    'BAR HARBOR, ME',
   'BHX'    :    'BIRMINGHAM, AL',
   'CAK'    :    'AKRON, OH',
   'FOK'    :    'SUFFOLK COUNTY, NY',
   'LND'    :    'LANDER, WY',
   'MAR'    :    'MARFA, TX',
   'MLI'    :    'MOLINE, IL',
   'RIV'    :    'RIVERSIDE, CA',
   'RME'    :    'ROME, NY',
   'VNY'    :    'VAN NUYS, CA',
   'YUM'    :    'YUMA, AZ',
   'FRG'    :    'Collapsed (FOK) 06/15',
   'HRL'    :    'Collapsed (HLG) 06/15',
   'ISP'    :    'Collapsed (FOK) 06/15',
   'JSJ'    :    'Collapsed (SAJ) 06/15',
   'BUS'    :    'Collapsed (BUF) 06/15',
   'IAG'    :    'Collapsed (NIA) 06/15',
   'PHN'    :    'Collapsed (PHU) 06/15',
   'STN'    :    'Collapsed (STR) 06/15',
   'VMB'    :    'Collapsed (VNB) 06/15',
   'T01'    :    'Collapsed (SEA) 06/15',
   'PHF'    :    'No PORT Code (PHF)',
   'DRV'    :    'No PORT Code (DRV)',
   'FTB'    :    'No PORT Code (FTB)',
   'GAC'    :    'No PORT Code (GAC)',
   'GMT'    :    'No PORT Code (GMT)',
   'JFA'    :    'No PORT Code (JFA)',
   'JMZ'    :    'No PORT Code (JMZ)',
   'NC8'    :    'No PORT Code (NC8)',
   'NYL'    :    'No PORT Code (NYL)',
   'OAI'    :    'No PORT Code (OAI)',
   'PCW'    :    'No PORT Code (PCW)',
   'WA5'    :    'No PORT Code (WAS)',
   'WTR'    :    'No PORT Code (WTR)',
   'X96'    :    'No PORT Code (X96)',
   'XNA'    :    'No PORT Code (XNA)',
   'YGF'    :    'No PORT Code (YGF)',
   '5T6'    :    'No PORT Code (5T6)',
   '060'    :    'No PORT Code (60)',
   'SP0'    :    'No PORT Code (SP0)',
   'W55'    :    'No PORT Code (W55)',
   'X44'    :    'No PORT Code (X44)',
   'AUH'    :    'No PORT Code (AUH)',
   'RYY'    :    'No PORT Code (RYY)',
   'SUS'    :    'No PORT Code (SUS)',
   '74S'    :    'No PORT Code (74S)',
   'ATW'    :    'No PORT Code (ATW)',
   'CPX'    :    'No PORT Code (CPX)',
   'MTH'    :    'No PORT Code (MTH)',
   'PFN'    :    'No PORT Code (PFN)',
   'SCH'    :    'No PORT Code (SCH)',
   'ASI'    :    'No PORT Code (ASI)',
   'BKF'    :    'No PORT Code (BKF)',
   'DAY'    :    'No PORT Code (DAY)',
   'Y62'    :    'No PORT Code (Y62)',
   'AG'        :    'No PORT Code (AG)',
   'BCM'    :    'No PORT Code (BCM)',
   'DEC'    :    'No PORT Code (DEC)',
   'PLB'    :    'No PORT Code (PLB)',
   'CXO'    :    'No PORT Code (CXO)',
   'JBQ'    :    'No PORT Code (JBQ)',
   'JIG'    :    'No PORT Code (JIG)',
   'OGS'    :    'No PORT Code (OGS)',
   'TIW'    :    'No PORT Code (TIW)',
   'OTS'    :    'No PORT Code (OTS)',
   'AMT'    :    'No PORT Code (AMT)',
   'EGE'    :    'No PORT Code (EGE)',
   'GPI'    :    'No PORT Code (GPI)',
   'NGL'    :    'No PORT Code (NGL)',
   'OLM'    :    'No PORT Code (OLM)',
   '.GA'    :    'No PORT Code (.GA)',
   'CLX'    :    'No PORT Code (CLX)',
   'CP '    :    'No PORT Code (CP)',
   'FSC'    :    'No PORT Code (FSC)',
   'NK'     :    'No PORT Code (NK)',
   'ADU'    :    'No PORT Code (ADU)',
   'AKT'    :    'No PORT Code (AKT)',
   'LIT'    :    'No PORT Code (LIT)',
   'A2A'    :    'No PORT Code (A2A)',
   'OSN'    :    'No PORT Code (OSN)'
}
df_ports = spark.createDataFrame(list(map(list, ports_codes.items())),
                                         ["port_code", "port_name"])
df_ports.limit(5).toPandas()

Unnamed: 0,port_code,port_name
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


Read - Airports codes csv file

In [24]:
a_df = spark.read.options(delimiter=",").csv("airport-codes_csv.csv",header=True)
a_df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [47]:
a_df.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [49]:
a_df.createOrReplaceTempView("AirportTable")
aSQL = spark.sql("select count(*) AS TotalRecords from AirportTable")
aSQL.show()

+------------+
|TotalRecords|
+------------+
|       55075|
+------------+



In [50]:
aSQL = spark.sql("select count(*) as NotNullIataCodes from AirportTable WHERE iata_code IS NOT NULL")
aSQL.show()

+----------------+
|NotNullIataCodes|
+----------------+
|            9189|
+----------------+



Get count of records by type

In [51]:
aSQL = spark.sql("select type, count(*) as TypeCount from AirportTable WHERE iata_code IS NOT NULL GROUP BY type ")
aSQL.show()

+--------------+---------+
|          type|TypeCount|
+--------------+---------+
| large_airport|      602|
| seaplane_base|      143|
|      heliport|       68|
|        closed|      279|
|medium_airport|     3859|
| small_airport|     4238|
+--------------+---------+



Check for duplicates

In [52]:
aSQL = spark.sql("select iata_code,type,count(*) from AirportTable \
                 WHERE type <>  'closed' and iata_code <> '0' \
                 group by iata_code,type having count(*) > 1 ")
aSQL.show()

+---------+--------------+--------+
|iata_code|          type|count(1)|
+---------+--------------+--------+
|      LMC| small_airport|       2|
|      RZS| small_airport|       2|
|      MXR| small_airport|       2|
|      YTY|medium_airport|       2|
|      ZRH| large_airport|       2|
|      RMD| small_airport|       2|
|      REQ| small_airport|       2|
|      HLA|medium_airport|       2|
|      NWT| small_airport|       2|
|      KCZ|medium_airport|       2|
|      PCO| small_airport|       2|
|      DZI| small_airport|       2|
|      DDU| small_airport|       2|
|      LPE| small_airport|       2|
|      KMM| small_airport|       2|
|      ULG| small_airport|       2|
|      GVA| large_airport|       2|
|      JNB| large_airport|       2|
|      IST| large_airport|       2|
|      IZA|medium_airport|       2|
+---------+--------------+--------+
only showing top 20 rows



The airports csv file has a total of 55075 records. However, there are only 9189 records that have the iata code populated. This is the column that can be mapped to the immigration data. Also, there are 279 closed airports that can be filtered out. There are a number of airports that are duplicated as can be seen from the above pandas dataframe, these need to be removed as well. The latitude and longitude are given together a string column, these need to be separated.

#### Clean up - Airports csv 

In [25]:
def split_column(value, split_character="-",extract_index=0,output_type="str"):
    """
    Function is used to split the value (column supplied) and return a particular split (based on index supplied)
    and outputs it to a string, float or int
    """
    
    splits = [eval(output_type)(val) for val in value.split(split_character)]
    return splits[extract_index]

In [26]:
#user defined functions
split_to_string = udf(split_column, StringType())
split_to_float = udf(split_column,FloatType())

In [27]:
a_df = a_df.withColumn("latitude", split_to_float(a_df.coordinates,lit(","),lit(0),lit("float"))) \
.withColumn("longitude",split_to_float(a_df.coordinates,lit(","),lit(1),lit("float"))) \
.withColumn("region",split_to_string(a_df.iso_region,lit("-"),lit(1),lit("str")))

In [28]:
a_df = a_df.dropDuplicates(subset=["iata_code","type"])

In [29]:
a_df.createOrReplaceTempView("AirportTable")
aSQL = spark.sql("select distinct  \
                         ident AS airport_id, \
                         type AS airport_type, \
                         name AS airport_name, \
                         elevation_ft, \
                         continent, \
                         iso_country, \
                         region, \
                         municipality, \
                         gps_code, \
                         iata_code, \
                         local_code, \
                         latitude, \
                         longitude \
                 from AirportTable \
                 WHERE iata_code is not null and iata_code <> '0'  \
                 and type <> 'closed'  \
                ")
aSQL.limit(5).toPandas()

Unnamed: 0,airport_id,airport_type,airport_name,elevation_ft,continent,iso_country,region,municipality,gps_code,iata_code,local_code,latitude,longitude
0,YNUM,small_airport,Numbulwar Airport,31.0,OC,AU,NT,,YNUM,NUB,,135.716995,-14.2717
1,ZHLY,small_airport,Luoyang Airport,840.0,AS,CN,41,Luoyang,ZHLY,LYA,,112.388,34.7411
2,MH-LML,small_airport,Lae Island Airport,,OC,MH,LAE,Lae Island,,LML,,166.264999,8.92111
3,GCTS,large_airport,Tenerife South Airport,209.0,EU,ES,CN,Tenerife Island,GCTS,TFS,,-16.5725,28.0445
4,SYKS,small_airport,Karasabai Airport,731.0,SA,GY,UT,Karasabai,SYKS,KRG,,-59.533298,4.03333


In [37]:
df_g = aSQL.groupby('iata_code','airport_type').count()
df_g = df_g.select("count").filter("count > 1").show()

+-----+
|count|
+-----+
+-----+



#### Read - Global Temperatures 

In [27]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = spark.read.options(delimiter=",").csv(fname,header=True)
temp_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [28]:
temp_df.createOrReplaceTempView("CityTempTable")
tSQL = spark.sql(''' SELECT count(*) FROM CityTempTable
''').show()

+--------+
|count(1)|
+--------+
| 8599212|
+--------+



In [30]:
tSQL = spark.sql('''
                     select date(dt) DateRec ,
                            date_format(date(dt),'MMM') Month,
                            float(AverageTemperature) AvgTemp,
                            City, 
                            Country
                     from CityTempTable
                   
                     
''')
#tSQL.printSchema()
result = tSQL.groupby("Country").agg({'DateRec': 'min'})
result.toPandas()

Unnamed: 0,Country,min(DateRec)
0,Chad,1856-01-01
1,Russia,1743-11-01
2,Paraguay,1832-01-01
3,Yemen,1864-01-01
4,Senegal,1849-01-01
5,Sweden,1743-11-01
6,Guyana,1824-01-01
7,Burma,1796-01-01
8,Philippines,1825-01-01
9,Eritrea,1864-01-01


In [31]:
#get minimum date recorded for cities in united States. 
tSQL = spark.sql('''
                     select date(dt) DateRec ,
                            date_format(date(dt),'MMM') Month,
                            float(AverageTemperature) AvgTemp,
                            City, 
                            Country
                     from CityTempTable
                     WHERE Country = 'United States'          
''')
#tSQL.printSchema()
result = tSQL.agg({'DateRec': 'min'})
result.toPandas()

Unnamed: 0,min(DateRec)
0,1743-11-01


In [32]:
# Filter by only dates greater than or equal to year 2000 and for cities in United States Only
# as we are not interested in temperatures of other cities
# City can then be joined to the demographics table !
temp_df.createOrReplaceTempView("CityTempTable")
tSQL = spark.sql('''
                     select date(dt) DateRec ,
                            date_format(date(dt),'MMM') Month,
                            float(AverageTemperature) AvgTemp,
                            City, 
                            Country
                     from CityTempTable where to_date(dt,'yyyy-MM-dd') >= '2000-01-01' 
                     and Country = 'United States' 
                  ''')

#now get the average temperature per month for each City across the years!
# Pivot the dataframe by Month to get average temp in a single row for each row 
t_pivot_df = tSQL.groupBy("City","Country").pivot("Month").avg("AvgTemp")

#join the two datasets to create a single combined final dataset and drop duplicates
f_df = tSQL.join(t_pivot_df,["City","Country"]).drop("DateRec","AvgTemp","Month").dropDuplicates()

# Create final view  with renamed columns which will be written to parquet
f_df.createOrReplaceTempView("FinalCityTempTable")
fSQL = spark.sql('''
                  select City,  
                         Country, 
                         Jan AvgTempJan,
                         Feb AvgTempFeb,
                         Mar AvgTempMar,
                         Apr AvgTempApr,
                         May AvgTempMay,
                         Jun AvgTempJun,
                         Jul AvgTempJul,
                         Aug AvgTempAug,
                         Sep AvgTempSep,
                         Oct AvgTempOct,
                         Nov AvgTempNov,
                         Dec AvgTempDec
                 from FinalCityTempTable
''')
fSQL.limit(5).toPandas()

Unnamed: 0,City,Country,AvgTempJan,AvgTempFeb,AvgTempMar,AvgTempApr,AvgTempMay,AvgTempJun,AvgTempJul,AvgTempAug,AvgTempSep,AvgTempOct,AvgTempNov,AvgTempDec
0,Allentown,United States,-2.2595,-0.886429,4.095786,10.1175,15.698071,20.634857,23.144786,22.391571,18.34,11.544,6.458846,0.357846
1,Pueblo,United States,-0.245714,1.001786,6.420214,10.885214,16.350857,22.149,24.956714,23.557,18.876214,11.512154,5.105077,-0.627692
2,Seattle,United States,1.986714,2.786143,4.269214,6.445857,9.989214,12.945143,16.251572,16.121785,13.448571,8.252462,4.083769,1.492308
3,Garden Grove,United States,14.233929,13.531,14.312571,14.741429,16.715214,17.955929,19.937214,20.690857,20.8135,18.533461,16.491154,13.814308
4,Huntington Beach,United States,14.233929,13.531,14.312571,14.741429,16.715214,17.955929,19.937214,20.690857,20.8135,18.533461,16.491154,13.814308


#### Read Immigration data and build dates table 

In [33]:
#read immigration data
df = spark.read.parquet("sas_data")

# convert date fields to date type
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(float(x))).isoformat() if x else None)

#convert sas dates to date
df = df.withColumn("arrival_date", get_date(df.arrdate))
df = df.withColumn("departure_date", get_date(df.depdate))

df = df.withColumn("arrival_date", to_date(df.arrival_date, 'yyyy-MM-dd'))
df = df.withColumn("departure_date", to_date(df.departure_date, 'yyyy-MM-dd'))

#create dates dataframe with unique dates from the file
df.createOrReplaceTempView("DatesData")
dtSQL = spark.sql('''
                    select distinct date
                    from 
                    (
                        select arrival_date date from DatesData WHERE arrival_date >= to_date('2016-01-01')
                        UNION
                        select departure_date date from DatesData WHERE departure_date is not null and departure_date >= to_date('2016-01-01')
                    ) as Drv         
                ''')

#add columns to enrich the dates dataframe. this is will be used to write to parquet
dtSQL = dtSQL.select("date", \
                     date_format("date", 'yyyyMMdd').alias("date_id"), \
                     year("date").alias("year"), \
                     month("date").alias("month"), \
                     dayofmonth("date").alias("day"), \
                     weekofyear("date").alias("week"), \
                     date_format("date",'MMMM').alias("month_name"), \
                     dayofweek("date").alias("day_of_week"),\
                     date_format("date",'E').alias("day_of_week_name")
                    )
dtSQL.limit(5).toPandas()

Unnamed: 0,date,date_id,year,month,day,week,month_name,day_of_week,day_of_week_name
0,2016-03-01,20160301,2016,3,1,9,March,3,Tue
1,2016-04-25,20160425,2016,4,25,17,April,2,Mon
2,2016-05-03,20160503,2016,5,3,18,May,3,Tue
3,2016-08-15,20160815,2016,8,15,33,August,2,Mon
4,2016-08-31,20160831,2016,8,31,35,August,4,Wed


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
I used a star schema for the data model. My intention is to keep the queries as simple as possible by not making the tables too normalized and limiting the required number of joins. An ER diagram of this data model can be found [here] ()  

At the center of the data model is a fact table focusing on immigration (fact_immigration), the majority of which can be found in the immigration data. The dimensions provide additonal information to support the immigration data (dim_date, dim_country, dim_state, dim_city, dim_visa, dim_travel_mode, dim_ports). 

#### 3.2 Mapping Out Data Pipelines
1. etl.py - This python script was run to transform the various data files (immigration, airports, ports, demographics) into various parquet files that are stored in S3 bucket.  
2. The data model is on AWS Redshift. I chose Redshift due to its efficiency in handling large amounts of data. 
The files in S3 are first staged and then loaded into the various dimensions and facts.  
3. I chose Apache Airflow to complete this data pipeline because of its ease of use, has ready-to-use operators that can be used to integrate Airflow with S3 and Redshift. It's graphical UI can be used to monitor and manage work flows. It is easy to check the status of tasks and to schedule the tasks.


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.  

Prerequisite: Redshift cluster created in Oregon region as S3 files are stored in Oregon region. Modify the accessibility settings of the cluster so that it is accessible from the internet. 

Once this is done, the following steps are executed.

1. To create the required staging, dimension and fact tables in Redshift, execute the capstone_tablecreation_dag in the Airflow Web UI. This drops and creates all the tables required by the data model

2. Next, execute the capstone_dag in the Airflow UI. This takes care of populating the staging tables from S3, transforms the data and loads to dimension tables and fact table. The data quality checks are also taken care of here.  

The data pipeline in Airflow consists of the below operators:
StageToRedshiftOperator  
This operator takes a  parquet file and copies it directly into a Redshift table. All the staging tables below are loaded in parallel.

staging_visa  
staging_travel_mode  
staging_country  
staging_ports  
staging_climate  
staging_city  
staging_state  
staging_date  
staging_immigration    

LoadDimensionOperator
This operator queries the staging tables and populates the dimension tables. It provides an optional parameter called columns that allows the user to specify the fields in which to enter data rather than assuming every field is being populated. There is a helper file that contains all the queries referenced in this operator. The following tables are populated using this.  

dim_visa  
dim_travel_mode  
dim_country  
dim_ports  
dim_state  
dim_city  
dim_date  

LoadFactOperator  
This operator is used to populate the fact table fact_immigration. There is a helper file that contains all the queries referenced in this operator.   

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

DataQualityOperator  
This takes in as a parameter a series of queries to run on the newly populated tables to ensure the data pipeline runs as expected. 
This operator accepts a parameter  is an expected value and any valid comparison (=, <>, >, <, >=, <=). This gives the user the flexibility when creating data quality checks. 

Checking that a table has at least a certain number of records  
Ensuring that dimension keys are not null   

#### 4.3 Data dictionary 
The data dictionary for the final data model can be found [here](documents/DataDictionary.md)

#### Step 5: Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
Used spark to transform the data into parquet files that are stored in S3. Spark is a powerful analytical engine for big data and hence used this.   
S3 facilitates highly-scalable, secured and low-latency data storage from the cloud. With its simple web service interface, it is easy to store and retrieve data on Amazon S3 from anywhere on the web.  
Redshift is cloud based and hosted directly on Amazon Web Services and has a flexible architecture that can scale in seconds to meet changing storage needs. Costs can be kept relatively low and it is easy to use.   
Airflow is used to automatically organise, execute, and monitor data flow. Hene, used airflow for executing the data pipeline  

* Propose how often the data should be updated and why.
The I94 immigration data is updated on a monthly basis and hence it is feasible to say, data processing and ETL can be done on a monthly basis.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.    
 For the existing project, the Spark and Airflow processes are run in the Udacity workspace. If the data was increased by 100x, I would run these processes on a more powerful environment in AWS, such as Amazon Elastic MapReduce (EMR) for Spark and Amazon Managed Workflows for Apache Airflow (MWAA) for Airflow.  
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.  
 Schedule the dag to run every morning and set a SLA so that it completes in a reasonable amount of time.  
 * The database needed to be accessed by 100+ people.  
 Amazon Redshift data sharing lets us share live data in Amazon Redshift to securely and easily share data for read purposes with other Redshift clusters within and across AWS accounts and with AWS analytic services using the data lake. With data sharing, users can instantly query live data from any Redshift cluster as long as they have permissions to access without the complexity and delays associated with data copies and data movement. This feature can be used if the database has to be accessed by 100+ people.