# Data Engineering Capstone Project

In this jupyter notebook file, we will check a sample of the data sources and go through the entire ETL process before writing the main etl file in `etl.py`

## 1. Imports Libraries

In [97]:
# Do all imports and installs here
import pandas as pd

## 2. Data Quality checks and Data Cleaning
Here we check for inconsistencies on each data source. If any, we will fix them and save in a separate file

#### 2.1. Airport Code Table

In [105]:
# Read in the data here
airport = pd.read_csv('airport-codes_csv.csv')

In [106]:
# check first rows
airport.head(2)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"


In [107]:
# check data types
airport.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [108]:
# transform data types
airport.loc[airport['elevation_ft'].isna(), 'elevation_ft'] = 0 
airport['elevation_ft'] = airport['elevation_ft'].astype(int)

In [109]:
# check distribution elevation_ft
pd.DataFrame(airport['elevation_ft'].describe()).T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
elevation_ft,55075.0,1082.950867,1553.023286,-1266.0,70.0,572.0,1300.0,22000.0


In [110]:
# distribution - continent (NaN values can be cleaned according to column country)
airport['continent'].value_counts(dropna=False)

NaN    27719
EU      7840
SA      7709
AS      5350
AF      3362
OC      3067
AN        28
Name: continent, dtype: int64

In [111]:
# distribution - country (there are nan values - country can be found by geolocation)
airport['iso_country'].unique()

array(['US', 'PR', 'MH', 'MP', 'GU', 'SO', 'AQ', 'GB', 'PG', 'AD', 'SD',
       'SA', 'AE', 'SS', 'ES', 'CN', 'AF', 'LK', 'SB', 'CO', 'AU', 'MG',
       'TD', 'AL', 'AM', 'MX', 'MZ', 'PW', 'NR', 'AO', 'AR', 'AS', 'AT',
       'ZZ', 'GA', 'AZ', 'BA', 'BB', 'BE', 'DE', 'BF', 'BG', 'GL', 'BH',
       'BI', 'IS', 'BJ', 'OM', 'XK', 'BM', 'KE', 'PH', 'BO', 'BR', 'BS',
       'CV', 'BW', 'FJ', 'BY', 'UA', 'LR', 'BZ', 'CA', 'CD', 'CF', 'CG',
       'MR', 'CH', 'CL', 'CM', 'MA', 'CR', 'CU', 'CY', 'CZ', 'SK', 'PA',
       'DZ', 'ID', 'GH', 'RU', 'CI', 'DK', 'NG', 'DO', 'NE', 'HR', 'TN',
       'TG', 'EC', 'EE', 'FI', 'EG', 'GG', 'JE', 'IM', 'FK', 'EH', 'NL',
       'IE', 'FO', 'LU', 'NO', 'PL', 'ER', 'MN', 'PT', 'SE', 'ET', 'LV',
       'LT', 'ZA', 'SZ', 'GQ', 'SH', 'MU', 'IO', 'ZM', 'FM', 'KM', 'YT',
       'RE', 'TF', 'ST', 'FR', 'SC', 'ZW', 'MW', 'LS', nan, 'ML', 'GM',
       'GE', 'GF', 'SL', 'GW', 'GN', 'SN', 'GR', 'GT', 'TZ', 'GY', 'SR',
       'DJ', 'HK', 'LY', 'HN', 'VN', 'KZ', 'RW', 'HT

In [112]:
# airport id is unique - no need to remove rows
aux1 = airport[airport['ident'].isnull() | ~airport[airport['ident'].notnull()].duplicated(subset='ident',keep='first')]
aux1.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    55075 non-null int64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: int64(1), object(11)
memory usage: 5.5+ MB


In [113]:
# distribution - local_code (fill local_code with ident column)
airport.loc[airport['local_code'].isna(), 'local_code'] = airport['ident']

In [137]:
# Many local codes with duplicates.
airport['local_code'].value_counts(ascending=False).head(5)

KR-0696    1
BLE        1
SWNS       1
UZ-0139    1
ORBD       1
Name: local_code, dtype: int64

In [115]:
# remove duplicate local_code entries
airport = airport.drop_duplicates(subset ="local_code", 
                     keep = "first") 

In [19]:
# size after removing duplicates
airport.shape

(52998, 12)

In [20]:
# # check if there's duplicates on local_code - no duplicates
airport['local_code'].value_counts(ascending=False).head(5)

In [116]:
airport.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 52998 entries, 0 to 55074
Data columns (total 12 columns):
ident           52998 non-null object
type            52998 non-null object
name            52998 non-null object
elevation_ft    52998 non-null int64
continent       26621 non-null object
iso_country     52751 non-null object
iso_region      52998 non-null object
municipality    47352 non-null object
gps_code        39840 non-null object
iata_code       8798 non-null object
local_code      52998 non-null object
coordinates     52998 non-null object
dtypes: int64(1), object(11)
memory usage: 5.3+ MB


In [117]:
airport.columns.tolist()

['ident',
 'type',
 'name',
 'elevation_ft',
 'continent',
 'iso_country',
 'iso_region',
 'municipality',
 'gps_code',
 'iata_code',
 'local_code',
 'coordinates']

In [118]:
# export cleaned data
airport.to_csv(r'/home/workspace/airport.csv', index = False)

#### 2.2. I94 Immigration Data (Fact table)

This is a sample of the immigration data. We will perform data quality checks with spark afterwards, and data cleaning during the etl process.

In [3]:
# read data sample
immigration_csv = pd.read_csv('immigration_data_sample.csv')
immigration_csv.sample(4)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
14,682005,1387607.0,2016.0,4.0,148.0,112.0,BOS,20552.0,1.0,MA,...,,M,1982.0,7062016,F,,AF,55833390000.0,338,WT
524,501686,1020076.0,2016.0,4.0,148.0,112.0,LVG,20550.0,1.0,NY,...,,M,1984.0,7042016,F,,LH,55713450000.0,410,WT
731,3004195,6034750.0,2016.0,4.0,252.0,209.0,AGA,20572.0,1.0,GU,...,,M,1978.0,6122016,M,3675.0,DL,57526140000.0,610,GMT
77,2185800,4469436.0,2016.0,4.0,694.0,694.0,MIA,20567.0,1.0,FL,...,,M,1953.0,10222016,F,,AV,94343710000.0,342,B2


In [16]:
# cicid is unique - no need to remove rows
aux1 = immigration_csv[immigration_csv['cicid'].isnull() | ~immigration_csv[immigration_csv['cicid'].notnull()].duplicated(subset='cicid',keep='first')]
aux1.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float64
airline       967 non

In [17]:
immigration_csv.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float64
airline       967 non

In [18]:
# check statistics - arrival date
immigration_csv['arrdate'].describe()

count     1000.000000
mean     20559.680000
std          8.995027
min      20545.000000
25%      20552.000000
50%      20560.000000
75%      20567.250000
max      20574.000000
Name: arrdate, dtype: float64

In [19]:
# check statistics - port (only 3-digit codes)
immigration_csv['i94port'].sample(10)

888    FTL
442    DET
317    SFR
487    BOS
470    NEW
37     NYC
234    ORL
410    NEW
938    LOS
766    PSP
Name: i94port, dtype: object

In [20]:
# check statistics - arrival date
immigration_csv['i94visa'].unique()

array([ 2.,  1.,  3.])

In [21]:
# check statistics - arrival date
immigration_csv['visatype'].unique()

array(['WT', 'B2', 'CP', 'B1', 'GMT', 'WB', 'F1', 'E2', 'F2', 'M1'], dtype=object)

#### 2.3. World Temperature Data

In [85]:
# load data
PATH = '../../data2/'
world = pd.read_csv(f'{PATH}GlobalLandTemperaturesByCity.csv')
world.sample(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
4824170,1968-08-01,13.859,0.212,Milan,Italy,45.81N,8.08E
7237268,1929-06-01,15.89,0.455,Sumy,Ukraine,50.63N,34.23E
6766839,1957-02-01,22.116,0.276,Santos,Brazil,23.31S,46.31W
5786639,2002-12-01,-0.718,0.341,Peoria,United States,40.99N,89.47W
5255682,1834-06-01,16.604,3.028,Nijmegen,Netherlands,52.24N,5.26E


In [87]:
world.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [88]:
# transform dt column to datetime
world['dt'] = pd.to_datetime(world['dt'])

In [89]:
# add dt columns year month day
world['year'] = world['dt'].dt.year
world['month'] = world['dt'].dt.month
world['day'] = world['dt'].dt.day

In [90]:
# check nan values in the most recent year. Since we have many nan values, let's try a year below
aux1 = world[world['year'] == 2013]
aux1.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 31590 entries, 3230 to 8599211
Data columns (total 10 columns):
dt                               31590 non-null datetime64[ns]
AverageTemperature               28520 non-null float64
AverageTemperatureUncertainty    28520 non-null float64
City                             31590 non-null object
Country                          31590 non-null object
Latitude                         31590 non-null object
Longitude                        31590 non-null object
year                             31590 non-null int64
month                            31590 non-null int64
day                              31590 non-null int64
dtypes: datetime64[ns](1), float64(2), int64(3), object(4)
memory usage: 2.7+ MB


In [91]:
# check nan values in 2012. No nan values found - we will use year 2012 for this project.
aux1 = world[world['year'] == 2012]
aux1.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 42120 entries, 3218 to 8599202
Data columns (total 10 columns):
dt                               42120 non-null datetime64[ns]
AverageTemperature               42120 non-null float64
AverageTemperatureUncertainty    42120 non-null float64
City                             42120 non-null object
Country                          42120 non-null object
Latitude                         42120 non-null object
Longitude                        42120 non-null object
year                             42120 non-null int64
month                            42120 non-null int64
day                              42120 non-null int64
dtypes: datetime64[ns](1), float64(2), int64(3), object(4)
memory usage: 3.5+ MB


In [92]:
# check duplicate cities - we will need to remove duplicates
aux1['City'].value_counts(ascending=False).head(5)

Rongcheng      36
Worcester      36
León           36
Springfield    36
Santiago       36
Name: City, dtype: int64

In [93]:
# remove duplicates
aux1 = world.drop_duplicates(subset ="City", 
                     keep = "first") 

In [94]:
# duplicates removed
aux1['City'].value_counts(ascending=False).head(5)

Osorno     1
Imphal     1
Aba        1
Sivas      1
Bikaner    1
Name: City, dtype: int64

In [95]:
aux1.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 3448 entries, 0 to 8595973
Data columns (total 10 columns):
dt                               3448 non-null datetime64[ns]
AverageTemperature               3448 non-null float64
AverageTemperatureUncertainty    3448 non-null float64
City                             3448 non-null object
Country                          3448 non-null object
Latitude                         3448 non-null object
Longitude                        3448 non-null object
year                             3448 non-null int64
month                            3448 non-null int64
day                              3448 non-null int64
dtypes: datetime64[ns](1), float64(2), int64(3), object(4)
memory usage: 296.3+ KB


In [96]:
# export data
aux1.to_csv(r'/home/workspace/temp.csv', index = False)

#### 2.4. US City Demographic Data

In [125]:
# read file
demo = pd.read_csv('us-cities-demographics.csv', sep = ';')
demo.sample(2)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
2859,Yuma,Arizona,33.4,48298.0,45847.0,94145,7182.0,19326.0,2.64,AZ,American Indian and Alaska Native,1228
2387,Lakewood,Colorado,37.7,76013.0,76576.0,152589,9988.0,14169.0,2.29,CO,American Indian and Alaska Native,2597


In [127]:
# There are many duplicates - we will remove them
demo[demo['City'] == 'Schenectady']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
733,Schenectady,New York,36.7,32398.0,32901.0,65299,3388.0,10961.0,2.82,NY,Asian,5140
969,Schenectady,New York,36.7,32398.0,32901.0,65299,3388.0,10961.0,2.82,NY,White,40278
1279,Schenectady,New York,36.7,32398.0,32901.0,65299,3388.0,10961.0,2.82,NY,American Indian and Alaska Native,1059
1689,Schenectady,New York,36.7,32398.0,32901.0,65299,3388.0,10961.0,2.82,NY,Black or African-American,15640
1874,Schenectady,New York,36.7,32398.0,32901.0,65299,3388.0,10961.0,2.82,NY,Hispanic or Latino,5957


In [128]:
# remove duplicates
demo = demo.drop_duplicates(subset = ['City'])
demo[demo['City'] == 'Schenectady']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
733,Schenectady,New York,36.7,32398.0,32901.0,65299,3388.0,10961.0,2.82,NY,Asian,5140


In [55]:
# size after removing duplicates
demo.drop_duplicates(subset = ['City']).shape

(567, 12)

In [129]:
# get column names
demo.columns.tolist()

['City',
 'State',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code',
 'Race',
 'Count']

In [130]:
# rename all columns to snakecase type
demo.columns = ['city','state', 'median_age', 'male_population', 'female_population',
 'total_population', 'number_of_veterans', 'foreign_born', 'average_household_size', 'state_code', 'race', 'count']

In [131]:
demo.head()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [132]:
demo.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 567 entries, 0 to 2726
Data columns (total 12 columns):
city                      567 non-null object
state                     567 non-null object
median_age                567 non-null float64
male_population           566 non-null float64
female_population         566 non-null float64
total_population          567 non-null int64
number_of_veterans        560 non-null float64
foreign_born              560 non-null float64
average_household_size    559 non-null float64
state_code                567 non-null object
race                      567 non-null object
count                     567 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 57.6+ KB


In [133]:
# change dtypes
demo.loc[demo['male_population'].isna(), 'male_population'] = 0
demo.loc[demo['female_population'].isna(), 'female_population'] = 0
demo.loc[demo['total_population'].isna(), 'total_population'] = 0
demo.loc[demo['foreign_born'].isna(), 'foreign_born'] = 0
demo.loc[demo['number_of_veterans'].isna(), 'number_of_veterans'] = 0

demo['male_population'] = demo['male_population'].astype(int)
demo['female_population'] = demo['female_population'].astype(int)
demo['total_population'] = demo['total_population'].astype(int)
demo['foreign_born'] = demo['foreign_born'].astype(int)
demo['number_of_veterans'] = demo['number_of_veterans'].astype(int)

In [134]:
# descriptive stats
demo.describe()

Unnamed: 0,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,count
count,567.0,567.0,567.0,567.0,567.0,567.0,559.0,567.0
mean,35.609524,95638.7,100054.9,195821.6,8996.287478,40305.97,2.754329,60908.78
std,4.535928,217851.0,233234.1,450944.2,12965.775494,157022.0,0.442436,210238.9
min,22.9,0.0,0.0,63215.0,0.0,0.0,2.0,203.0
25%,32.85,38722.0,40870.0,79334.5,3647.0,8780.5,2.44,3713.5
50%,35.3,50989.0,52704.0,104410.0,5189.0,18570.0,2.65,16041.0
75%,38.1,80866.0,85241.0,168574.5,8913.5,33326.0,2.965,62378.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


In [136]:
# export data
demo.to_csv(r'/home/workspace/demo.csv', index = False)

# 3. ETL simulation

Here we use Spark to perform data quality checks on the entire immigration dataset, and perform an ETL simulation with other datasets. Some data cleaning is also performed here so that we avoid any issues during ETL in Redshift. 

### 3.1 Immigration data (entire dataset)

In [67]:
# write to parquet - all immigration data from year 2016
# df_spark.write.parquet("sas_data")

# define sql functions and datetime conversion function
from datetime import datetime, timedelta
from pyspark.sql.functions import udf, col
from pyspark.sql import types as T
# convert date types (sas)
def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None

# read SAS data with spark
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [68]:
# read data
immigration = spark.read.parquet("sas_data")

In [69]:
# print schema
immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [70]:
# check immigration file for further s3 load
immigration.createOrReplaceTempView("immigration_first_test")
spark.sql('''
          SELECT COUNT(*)
          FROM immigration_first_test 
          '''
          ).show()

+--------+
|count(1)|
+--------+
| 3096313|
+--------+



In [71]:
# change date format
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())
immigration = immigration.withColumn("arrival_date", udf_datetime_from_sas("arrdate"))
immigration = immigration.withColumn("departure_date", udf_datetime_from_sas("depdate"))
immigration = immigration.select(col("cicid"), col("i94yr"), col("i94mon"), col("i94port"), \
                                col("i94mode"), col("i94addr"),\
                                col("i94bir"), col("i94visa"), col("biryear"),\
                                col("gender"), col("visatype"), col("arrival_date"),\
                                col("departure_date")).distinct()

In [7]:
# check immigration file for further s3 load
immigration.createOrReplaceTempView("immigration_cleaned")

In [11]:
# check count
spark.sql('''
          SELECT COUNT(*)
          FROM immigration_cleaned 
          '''
          ).show()

+--------+
|count(1)|
+--------+
| 3096313|
+--------+



In [8]:
# check data types
spark.sql('''
          SELECT arrival_date, departure_date
          FROM immigration_cleaned 
          LIMIT 1
          '''
          ).show()

+------------+--------------+
|arrival_date|departure_date|
+------------+--------------+
|  2016-04-01|    2016-04-10|
+------------+--------------+



In [9]:
# write immigration data to parquet file
immigration.write.parquet("/home/workspace/immigration")

### 3.2. Airport

In [90]:
# read file
airport = spark.read.format("csv").option("header","true").load("airport.csv")
# drop duplicates
airport = airport.dropDuplicates(['local_code'])
# Check size
airport.createOrReplaceTempView("airport_cleaned")
spark.sql('''
                    SELECT COUNT(*)
                    from airport_cleaned
''').show()

+--------+
|count(1)|
+--------+
|   52998|
+--------+



### 3.3. Temperature

In [85]:
# read file
temp = spark.read.format("csv").option("header","true").load("GlobalLandTemperaturesByCity.csv")
temp.createOrReplaceTempView("temp_cleaned")
# Select relevant columns and rows
temp = spark.sql('''
                    SELECT dt, 
                     EXTRACT(year FROM dt) as year,
                     EXTRACT(month FROM dt) as month,
                     AverageTemperature, 
                     LOWER(City) as city
                     FROM temp_cleaned 
                     WHERE EXTRACT(year FROM dt) = 2012
                   
''')
# drop duplicates
temp = temp.dropDuplicates(['City'])

In [86]:
# Check size
temp.createOrReplaceTempView("temp")
spark.sql('''
                    SELECT COUNT(*)
                    from temp
''').show()

+--------+
|count(1)|
+--------+
|    3448|
+--------+



### 3.4. Demographics

In [89]:
# read file
demo = spark.read.format("csv").option("header","true").load("demo.csv")
demo.createOrReplaceTempView("demo_cleaned")

# select relevant columns
demo = spark.sql('''
                    SELECT LOWER(city) as city,
                    median_age,
                    male_population as male_pop,
                    female_population as fem_pop,
                    total_population as total_pop,
                    foreign_born
                    FROM demo_cleaned
''')
# drop duplicates
demo = demo.dropDuplicates(['City'])
# clean demographics data and select columns of interest
demo.createOrReplaceTempView("demo")
# Check size
spark.sql('''
                    SELECT COUNT(*)
                    from demo
''').show()

+--------+
|count(1)|
+--------+
|     567|
+--------+



## 4. Joining tables
In this section, we perform data cleaning and joins to check how the final table would look like. If everything goes well, we are ready to ingest raw data in S3, perform ETL, and deliver the data to the DW in Redshift.

In [91]:
# create temporary view
airport.createOrReplaceTempView("airport1")
demo.createOrReplaceTempView("demo1")
temp.createOrReplaceTempView("temp1")
immigration.createOrReplaceTempView("immigration1")

In [None]:
# JOINS - This will be the query to produce the table to the analytics department.
spark.sql('''
        SELECT i.cicid, i.i94yr, i.i94mon, i.i94port, i.i94mode, i.i94addr, i.i94bir, i.i94visa,
        i.biryear, i.gender, i.visatype, 
        i.arrival_date, i.departure_date, 
        a.airport_type, a.airport_name, a.iso_region, a.local_code, 
        t.AverageTemperature, t.city, 
        d.median_age, d.male_pop, d.fem_pop, d.total_pop, d.foreign_born
        FROM immigration1 i
        LEFT JOIN airport1 a
        ON i.i94port = a.local_code
        LEFT JOIN temp1 t
        ON a.municipality = t.city
        AND i.i94mon = t.month
        LEFT JOIN demo1 d
        ON a.municipality = d.city
        LIMIT 5
''').toPandas()

In [96]:
# Check how many entries the final table has
spark.sql('''
        SELECT COUNT(*)
        FROM immigration1 i
        LEFT JOIN airport1 a
        ON i.i94port = a.local_code
        LEFT JOIN temp1 t
        ON a.municipality = t.city
        AND i.i94mon = t.month
        LEFT JOIN demo1 d
        ON a.municipality = d.city
''').show()

+--------+
|count(1)|
+--------+
| 3096313|
+--------+



Great! Seems like our joins have worked. We are ready to create our Redshift cluster; instructions are described in the `create_cluster.ipynb` at this repository.