# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import pyspark
# import image module
from IPython.display import Image
import matplotlib.pyplot as plt
import ipywidgets as widgets
import IPython.display as display

In [2]:
# Those libraries are going to be used for the ETL pipeline construction
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format,dayofweek
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, LongType,TimestampType

In [3]:
from pyspark.sql.functions import monotonically_increasing_id

### Step 1: Scope the Project and Gather Data

#### 1.1- Scope 

In this section I am going to answer the next questions:
- Explain what you plan to do in the project in more detail.
- What data do you use? 
- What is your end solution look like?
- What tools did you use?
- Etc.

The first step in this project consists on importing the four datasets, exploring each dataset via giving a general overview about each one such as the features (columns) number, the observations (rows) number and via identifying data quality issues, like missing values, duplicate data, etc. As a second step, I am going to define my the conceptual data model and I am going to give reasons behind my choice. Then, I am going to create the data pipeline with the conceptual data model I have chosen using an ETL process based on some sql queries. 

I decided to use the four datasets given by udacity to complete this project. Besides, the main dataset will include data on immigration to the United States, and supplementary datasets will include data on airport codes, U.S. city demographics, and temperature data.

All the project steps are going to be accomplished using the Jupyter notebook.

Every step in this project is well documented, starting by defining the main goals, then identifying which queries will be used and explaining how would Spark or Airflow be incorporated. The most important point is to demonstrate the reasong behind each choice. 

#### 1.2- Describe and Gather Data 

In this section, I am going to answer the next questions:
- Describe the used data sets.
- Where did it come from?
- What type of information is included? 

As I mentioned before, to complete this project, I decided to use the four datasets given by udacity. The four datasets are the next:

- Immigration to US Data :This data comes from the US National Tourism and Trade Office. 
- Airport codes: This is a simple dataframe of airport codes and corresponding cities
- U.S. city demographics : This is a simple table of airport codes and corresponding cities.
- Temperature data: This dataset came from Kaggle.

##### a.I94 Immigration Data

This data comes from the US National Tourism and Trade Office. It gives statistics about international visitors to the United States coming from different regions in all the world. This data is used by the US National Tourism and Trade Office to extract insights about Tourism movements to the country so they can improve services, detect anomlaies and increase output etc.

In [4]:
df_i94_migration=pd.read_csv('immigration_data_sample.csv')
del df_i94_migration['Unnamed: 0']
df_i94_migration.head(3)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT


In [5]:
df_i94_migration.admnum

0      5.658267e+10
1      9.436200e+10
2      5.578047e+10
3      9.478970e+10
4      4.232257e+10
5      7.368526e+08
6      7.863122e+08
7      5.547449e+10
8      5.941342e+10
9      5.544979e+10
10     5.574381e+10
11     5.933662e+10
12     5.617586e+10
13     9.461277e+10
14     5.583339e+10
15     4.416258e+10
16     9.285800e+10
17     9.338490e+10
18     9.443560e+10
19     9.353685e+10
20     5.909986e+10
21     9.325274e+10
22     5.630712e+10
23     9.365477e+10
24     5.621703e+10
25     5.553053e+10
26     5.648670e+10
27     9.469486e+10
28     5.599644e+10
29     9.361788e+10
           ...     
970    5.579231e+10
971    5.929832e+10
972    9.291274e+10
973    5.924649e+10
974    5.652541e+10
975    5.583605e+10
976    5.933928e+10
977    5.927716e+10
978    5.645324e+10
979    5.569490e+10
980    9.500985e+10
981    5.630799e+10
982    9.337128e+10
983    9.301863e+10
984    5.629125e+10
985    9.485471e+10
986    5.947464e+10
987    5.550622e+10
988    9.303277e+10


##### b.Airport Code data:

This dataset contains the list of all airport codes. Some of the columns contain attributes identifying airport locations, other codes (IATA, local if exist) that are relevant to identification of an airport.

The airport codes could be one from the next two:

- IATA airport code: a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems.
- ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code. 

In [6]:
df_airport_code=pd.read_csv('airport-codes_csv.csv')
df_airport_code .head(3)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"


##### c.U.S. city demographics data

This dataset comes from OpenSoft. It contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey.

In [7]:
df_us_city_demo=pd.read_csv('us-cities-demographics.csv', sep=';')
df_us_city_demo.head(3)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759


##### d. World Temperatures data

This dataset is provided by Kaggle. It contains temperature data about monthly average temperature value for many countries worldwide.

In [8]:
df_World_Temperature=pd.read_csv('/data2/GlobalLandTemperaturesByCity.csv')
df_World_Temperature.head(3)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E


In [9]:
list_countries=[c for c in df_World_Temperature.Country.unique()]
#print('list of countries listed in this dataframe is the next', list_countries)

I noticed that there is no temperature data for the United States in 2016

## Read data with Pyspark

In [10]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [11]:
#write to parquet
#df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

In [12]:
df_spark.head(1)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')]

### Step 2: Explore and Assess the Data
#### 2.1- Explore the Data 
- I have used pandas for Exploratory data analysis : EDA : rows number, column number, general overview etc
- I decided to replace missing value with median if the column is numeric otherwise I am going to remove that raw
- I deleted duplicate values

##### a.I94 Immigration Data

The dataset contains 28 features such as: year, month, port,  arrival date, arrival mode, address, visatype, occupation, airline, gender, departure date, country etc.

I am going to select only the relevant features or columns

###### Getting general idea about the data

In [13]:
df_i94_migration.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 28 columns):
cicid       1000 non-null float64
i94yr       1000 non-null float64
i94mon      1000 non-null float64
i94cit      1000 non-null float64
i94res      1000 non-null float64
i94port     1000 non-null object
arrdate     1000 non-null float64
i94mode     1000 non-null float64
i94addr     941 non-null object
depdate     951 non-null float64
i94bir      1000 non-null float64
i94visa     1000 non-null float64
count       1000 non-null float64
dtadfile    1000 non-null int64
visapost    382 non-null object
occup       4 non-null object
entdepa     1000 non-null object
entdepd     954 non-null object
entdepu     0 non-null float64
matflag     954 non-null object
biryear     1000 non-null float64
dtaddto     1000 non-null object
gender      859 non-null object
insnum      35 non-null float64
airline     967 non-null object
admnum      1000 non-null float64
fltno       992 non-null object
visat

In [14]:
df_i94_migration.describe().transpose()

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
cicid,1000.0,3040461.0,1799818.0,13208.0,1412170.0,2941176.0,4694151.0,6061994.0
i94yr,1000.0,2016.0,0.0,2016.0,2016.0,2016.0,2016.0,2016.0
i94mon,1000.0,4.0,0.0,4.0,4.0,4.0,4.0,4.0
i94cit,1000.0,302.928,206.4853,103.0,135.0,213.0,438.0,746.0
i94res,1000.0,298.262,202.1204,103.0,131.0,213.0,438.0,696.0
arrdate,1000.0,20559.68,8.995027,20545.0,20552.0,20560.0,20567.25,20574.0
i94mode,1000.0,1.078,0.4859549,1.0,1.0,1.0,1.0,9.0
depdate,951.0,20575.04,24.21123,20547.0,20561.0,20570.0,20580.0,20715.0
i94bir,1000.0,42.382,17.90342,1.0,30.75,42.0,55.0,93.0
i94visa,1000.0,1.859,0.3863525,1.0,2.0,2.0,2.0,3.0


###### Dataset features

In [15]:
df_i94_migration.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

###### Dimensions

In [16]:
df_i94_migration.shape

(1000, 28)

###### Data types

In [17]:
df_i94_migration.dtypes

cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port      object
arrdate     float64
i94mode     float64
i94addr      object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile      int64
visapost     object
occup        object
entdepa      object
entdepd      object
entdepu     float64
matflag      object
biryear     float64
dtaddto      object
gender       object
insnum      float64
airline      object
admnum      float64
fltno        object
visatype     object
dtype: object

##### b.Airport Code data: 

The dataset has 12 features such as : identification code, name of airport, continent, iso country, municipality, GPS code, local code, iso region etc.

The same here, only relevant features are going to be considered.

###### Getting general idea about the data

In [18]:
df_airport_code.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [19]:
df_airport_code.describe().transpose()

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
elevation_ft,48069.0,1240.789677,1602.363459,-1266.0,205.0,718.0,1497.0,22000.0


###### Dataset features

In [20]:
df_airport_code.columns

Index(['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country',
       'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code',
       'coordinates'],
      dtype='object')

###### Dimensions

In [21]:
df_airport_code.shape

(55075, 12)

###### Data types

In [22]:
df_airport_code.dtypes

ident            object
type             object
name             object
elevation_ft    float64
continent        object
iso_country      object
iso_region       object
municipality     object
gps_code         object
iata_code        object
local_code       object
coordinates      object
dtype: object

###### NaN 

In [23]:
df_airport_code.isnull().sum().sort_values(ascending=False)

iata_code       45886
continent       27719
local_code      26389
gps_code        14045
elevation_ft     7006
municipality     5676
iso_country       247
coordinates         0
iso_region          0
name                0
type                0
ident               0
dtype: int64

##### c.U.S. city demographics data

This dataset contains 12 features or columns such as : City, State, Median Age, Total Population, Male Population, Female Population, Race etc.

###### Getting general idea about the data

In [24]:
df_us_city_demo.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [25]:
df_us_city_demo.describe().transpose()

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
Median Age,2891.0,35.494881,4.401617,22.9,32.8,35.3,38.0,70.5
Male Population,2888.0,97328.426247,216299.936929,29281.0,39289.0,52341.0,86641.75,4081698.0
Female Population,2888.0,101769.630886,231564.572571,27348.0,41227.0,53809.0,89604.0,4468707.0
Total Population,2891.0,198966.779315,447555.929634,63215.0,80429.0,106782.0,175232.0,8550405.0
Number of Veterans,2878.0,9367.832523,13211.219924,416.0,3739.0,5397.0,9368.0,156961.0
Foreign-born,2878.0,40653.59868,155749.103665,861.0,9224.0,18822.0,33971.75,3212500.0
Average Household Size,2875.0,2.742543,0.433291,2.0,2.43,2.65,2.95,4.98
Count,2891.0,48963.774473,144385.588565,98.0,3435.0,13780.0,54447.0,3835726.0


###### Dataset features

In [26]:
df_us_city_demo.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')

###### Data Dimensions 

In [27]:
df_us_city_demo.shape

(2891, 12)

###### Data types

In [28]:
df_us_city_demo.dtypes

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

##### NaN 

In [29]:
df_us_city_demo.isnull().sum().sort_values(ascending=False)

Average Household Size    16
Foreign-born              13
Number of Veterans        13
Female Population          3
Male Population            3
Count                      0
Race                       0
State Code                 0
Total Population           0
Median Age                 0
State                      0
City                       0
dtype: int64

##### d. World Temperatures data

###### Getting general idea about the data

In [30]:
df_World_Temperature.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [31]:
df_World_Temperature.describe().transpose()

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
AverageTemperature,8235082.0,16.727433,10.353442,-42.704,10.299,18.831,25.21,39.651
AverageTemperatureUncertainty,8235082.0,1.028575,1.129733,0.034,0.337,0.591,1.349,15.396


###### Dataset features

In [32]:
df_World_Temperature.columns

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')

###### Data Dimensions

In [33]:
df_World_Temperature.shape

(8599212, 7)

###### Data types

In [34]:
df_World_Temperature.dtypes

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object

###### NaN

In [35]:
df_World_Temperature.isnull().sum().sort_values(ascending=False)

AverageTemperatureUncertainty    364130
AverageTemperature               364130
Longitude                             0
Latitude                              0
Country                               0
City                                  0
dt                                    0
dtype: int64

#### Cleaning Steps
The performed cleaning steps consists of the next tasks:
- Detecing and removing missing values
- Detecing and Removing duplicates
- Renaming some columns to new ones

#### a.Cleaning I94 Immigration Data
###### - Removing Missing values

In [36]:
df_i94_migration.isnull().sum().sort_values(ascending=False)

entdepu     1000
occup        996
insnum       965
visapost     618
gender       141
i94addr       59
depdate       49
matflag       46
entdepd       46
airline       33
fltno          8
i94cit         0
i94mon         0
i94res         0
i94yr          0
i94port        0
arrdate        0
i94mode        0
visatype       0
dtadfile       0
i94bir         0
i94visa        0
count          0
entdepa        0
biryear        0
dtaddto        0
admnum         0
cicid          0
dtype: int64

In [37]:
df_i94_migration=df_i94_migration.dropna(subset=['airline', 'gender','visapost'])

In [38]:
df_i94_migration.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
3,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
13,4916639.0,2016.0,4.0,260.0,260.0,LOS,20570.0,1.0,CA,20581.0,...,,M,1954.0,10252016,F,,EK,94612770000.0,00215,B2
16,1056530.0,2016.0,4.0,512.0,512.0,NAS,20550.0,1.0,GA,20552.0,...,,M,1995.0,10052016,F,,DL,92858000000.0,00554,B2
18,4668286.0,2016.0,4.0,746.0,158.0,SEA,20568.0,1.0,NV,20571.0,...,,M,1970.0,10232016,M,,DL,94435600000.0,00143,B2


##### Removing duplicates

In [39]:
check_duplicate=[c for c in df_i94_migration.duplicated().unique()]
check_duplicate# there is no duplicates

[False]

##### changing the columns names

In [40]:
df_i94_migration.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [41]:
df_i94_migration.rename(columns={'i94yr': 'immig_year', 'i94mon': 'immig_month', 'i94cit':'city_code',
                                'i94res':'country_code', 'i94port': 'departure_port', 'arrdate': 'arrival_date',
                                'i94mode':'immig_mode','i94addr':'arrival_port', 'depdate':'departure_date',
                                'i94bir':'age','i94visa':'visa_type', 'occup':'occupation',
                                 'entdepa':'arrival_flag', 'entdepd':'departure_flag', 'matflag':'match_flag',
                                'entdepu':'update_flag','biryear':'birth_year','dtaddto':'date_to_stay','insnum':'INSnum',
                                'admnum':'admission_number','fltno':'flight_num','visatype':'admission_class'}, inplace=True)

In [42]:
df_i94_migration.columns

Index(['cicid', 'immig_year', 'immig_month', 'city_code', 'country_code',
       'departure_port', 'arrival_date', 'immig_mode', 'arrival_port',
       'departure_date', 'age', 'visa_type', 'count', 'dtadfile', 'visapost',
       'occupation', 'arrival_flag', 'departure_flag', 'update_flag',
       'match_flag', 'birth_year', 'date_to_stay', 'gender', 'INSnum',
       'airline', 'admission_number', 'flight_num', 'admission_class'],
      dtype='object')

###### Converting columns type

In [43]:
df_i94_migration['city_code']=[int(c) for c in df_i94_migration['city_code']]
df_i94_migration['country_code']=[int(c) for c in df_i94_migration['country_code']]
df_i94_migration['age']=[int(c) for c in df_i94_migration['age']]
df_i94_migration['birth_year']=[int(c) for c in df_i94_migration['birth_year']]
df_i94_migration['admission_number']=[int(c) for c in df_i94_migration['admission_number']]
df_i94_migration['arrival_date']=[int(c) for c in df_i94_migration['arrival_date']]
df_i94_migration['departure_date']=[int(c) if (pd.isna(c)==False) else c for c in df_i94_migration['departure_date']]
df_i94_migration['immig_mode']=[int(c) for c in df_i94_migration['immig_mode']]
df_i94_migration['visa_type']=[int(c) for c in df_i94_migration['visa_type']]

#### b.Cleaning airport data
##### Removing missing values

In [44]:
df_airport_code.isnull().sum().sort_values(ascending=False)

iata_code       45886
continent       27719
local_code      26389
gps_code        14045
elevation_ft     7006
municipality     5676
iso_country       247
coordinates         0
iso_region          0
name                0
type                0
ident               0
dtype: int64

In [45]:
df_airport_code=df_airport_code.dropna()

##### Removing duplicates

In [46]:
df_airport_code.duplicated()
duplic_lists=[c for c in df_airport_code.duplicated().unique()]
duplic_lists

[False]

In [47]:
df_airport_code=df_airport_code.drop_duplicates()
df_airport_code.reset_index(drop=True, inplace=True)
df_airport_code.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,03N,small_airport,Utirik Airport,4.0,OC,MH,MH-UTI,Utirik Island,K03N,UTK,03N,"169.852005, 11.222"
1,ADC,small_airport,Andakombe Airport,3600.0,OC,PG,PG-EHG,Andekombe,AYAN,ADC,ADK,"145.744722222, -7.13722222222"
2,AEK,small_airport,Aseki Airport,4106.0,OC,PG,PG-MPL,Aseki,AYAX,AEK,ASE,"146.19386673, -7.35080485552"
3,AIE,small_airport,Aiome Airport,350.0,OC,PG,PG-MPM,Aiome,AYAO,AIE,AIO,"144.7307003, -5.145699978"
4,AIH,small_airport,Aiambak Airport,90.0,OC,PG,PG-WPD,Aiambak,AYAK,AIH,AMB,"141.2675, -7.342777777779999"


##### changing the columns names

In [48]:
df_airport_code.columns

Index(['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country',
       'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code',
       'coordinates'],
      dtype='object')

In [49]:
airport_col=['type', 'name', 'iso_country', 'iso_region', 'municipality', 'iata_code', 'local_code']
df_airport_code=df_airport_code[airport_col]
df_airport_code.rename(columns={'iso_country':'country','iso_region':'region','iata_code':'IATA_code'}, inplace=True)
df_airport_code.head(1)

Unnamed: 0,type,name,country,region,municipality,IATA_code,local_code
0,small_airport,Utirik Airport,MH,MH-UTI,Utirik Island,UTK,03N


#### d.Cleaning city demographics data

##### Removing missing values

In [50]:
df_us_city_demo.isnull().sum().sort_values(ascending=False)

Average Household Size    16
Foreign-born              13
Number of Veterans        13
Female Population          3
Male Population            3
Count                      0
Race                       0
State Code                 0
Total Population           0
Median Age                 0
State                      0
City                       0
dtype: int64

In [51]:
df_us_city_demo=df_us_city_demo.dropna()
df_us_city_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


##### Removing duplicates

In [52]:
df_us_city_demo.duplicated()
duplic_lists=[c for c in df_us_city_demo.duplicated().unique()]
duplic_lists

[False]

##### Changing the columns names

In [53]:
df_us_city_demo.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')

In [54]:
df_us_city_demo.head(2)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723


In [55]:
df_us_city_demo.rename(columns={'Male Population':'male_portion','Female Population':'female_portion','Total Population':'total_portion',
                               'Foreign-born':'foreign_born','Median Age':'median_age', 'Number of Veterans':'num_veterans',
                               'Average Household Size':'avg_houshold_size', 'City':'city','State':'state','Race':'race'}, inplace=True)

In [56]:
df_us_city_demo.columns

Index(['city', 'state', 'median_age', 'male_portion', 'female_portion',
       'total_portion', 'num_veterans', 'foreign_born', 'avg_houshold_size',
       'State Code', 'race', 'Count'],
      dtype='object')

In [57]:
df_us_city_demo['num_veterans']=[int(x) for x in df_us_city_demo.num_veterans]

#### d.Cleaning World Temperatures data
##### Removing missing values

In [58]:
df_World_Temperature.isnull().sum().sort_values(ascending=False)

AverageTemperatureUncertainty    364130
AverageTemperature               364130
Longitude                             0
Latitude                              0
Country                               0
City                                  0
dt                                    0
dtype: int64

In [59]:
df_World_Temperature=df_World_Temperature.dropna()
df_World_Temperature.reset_index(drop=True, inplace=True)
df_World_Temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
2,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
3,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
4,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E


##### Removing duplicates

In [60]:
df_World_Temperature.duplicated()
duplic_lists=[c for c in df_World_Temperature.duplicated().unique()]
duplic_lists

[False]

##### Changing the columns names

In [61]:
df_World_Temperature.columns

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')

In [62]:
columns_temp=['dt','AverageTemperature','AverageTemperatureUncertainty', 'City', 'Country']
df_World_Temperature=df_World_Temperature[columns_temp]
df_World_Temperature['year']=[x.split('-')[0] for x in df_World_Temperature.dt]
# I will extract the month as the immigration I94 dataset has also the month
df_World_Temperature['month']=[x.split('-')[1] for x in df_World_Temperature.dt]
df_World_Temperature.rename(columns={'dt':'datetime','AverageTemperature':'Avg_temp','AverageTemperatureUncertainty':'Avg_temp_uncert',
                                 'City':'city', 'Country':'country'}, inplace=True)
df_World_Temperature.head(2)

Unnamed: 0,datetime,Avg_temp,Avg_temp_uncert,city,country,year,month
0,1743-11-01,6.068,1.737,Århus,Denmark,1743,11
1,1744-04-01,5.788,3.624,Århus,Denmark,1744,4


In [63]:
df_World_Temperature.columns

Index(['datetime', 'Avg_temp', 'Avg_temp_uncert', 'city', 'country', 'year',
       'month'],
      dtype='object')

Selecting only the US Temperature data and the year 2016

In [64]:
df_World_Temperature=df_World_Temperature[(df_World_Temperature.country=='United States')]
#df_World_Temperature=df_World_Temperature[df_World_Temperature.year=='2016']
df_World_Temperature=df_World_Temperature.reset_index(drop=True)
df_World_Temperature.head(3)

Unnamed: 0,datetime,Avg_temp,Avg_temp_uncert,city,country,year,month
0,1820-01-01,2.101,3.217,Abilene,United States,1820,1
1,1820-02-01,6.926,2.853,Abilene,United States,1820,2
2,1820-03-01,10.767,2.395,Abilene,United States,1820,3


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Data modeling is the procedure of creating a data model for the data to be stored in database or datawarehouse. It is named Conceptual model because it is based on conceptual data objects representation along with associations between various data objects and rules.

I have chosen to work with star schema due to its simplicity regarding the data mart schema style. In addition, this approach is the most adopted to develop data warehouses and dimensional data marts. The star schema consists of one or more fact tables referencing a number of dimension tables. 

A fact table consists of the measurements, metrics or facts of a business process. It is located at the center of a star schema surrounded by dimension tables and is more used and effective for handling simpler queries.

A dimension table contains dimensions of a fact. It is companion table to the fact table and it contains descriptive attributes to be used as query constraining. It is de-normalized table that is connected to the fact table and located at the edges of the star schema. Commonly used dimensions are people, products, place and time.

In my star schema, I have chosen one fact table with seven dimension tables. 

Here is my star schema :

In [65]:
# get the image
Image(url="star_schema.png", width=800, height=800)

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the star data model
    
- Our input data which corresponds to the four datasets are stored in S3 bucket under the next paths:
    - my_bucket/immigration_data_sample.csv
    - my_bucket/us-cities-demographics.csv
    - my_bucket/airport-codes_csv.csv
    - my_bucket/GlobalLandTemperaturesByCity.csv' 
- Read all the csv data from S3 and convert them into one fact table named 'Trip' and seven dimension tables named: 'Population Details', 'Demographic summary', 'Time', 'Airline Details', 'Airport', 'Immigrant Details', 'Temperature'  
    - Immigration_data is converted into the next tables: 'Immigrant Details'( dim table), 'Trip' (fact), 'Airline Details' (dim), 'Time' (dim)
    - Temperature_data is converted 'Temperature' dimension table
    - Aiport_data is converted into 'Airport' dimension table
    - Dempographic_data is converted into 'Demographic summary' and 'Population Details' dimension tables
- Load the created tables back to S3 bucket where our datalake is located under the next path:
    - my_bucket/output/

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [66]:
# Write code here
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get("aws",'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get("aws",'AWS_SECRET_ACCESS_KEY')

output_data = config.get("aws",'s3_output_data')

def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark


In [67]:
spark = create_spark_session()

Here we must build the next eight tables: immigrant_details, airline_details, trip, airport, time, temperature, population_details, demographic_summary.
#### immigrant_details table:

In [68]:
# define the immigrant_details schema
immigrant_details_schema = StructType([
        StructField("city_code", IntegerType()),
        StructField("country_code", IntegerType()),
        StructField("age", IntegerType()),
        StructField("gender", StringType()),
        StructField("occupation", StringType()),
        StructField("birth_year", IntegerType()),
        StructField("date_to_stay", StringType()),
        StructField("INSnum", StringType()),
        StructField("admission_number", LongType()),
        StructField("admission_class", StringType()),
    ])

In [69]:
df_immigrant_details=df_i94_migration[['city_code','country_code','age','gender','occupation','birth_year','date_to_stay','INSnum','admission_number','admission_class']]
df_immigrant_details.head(1)

Unnamed: 0,city_code,country_code,age,gender,occupation,birth_year,date_to_stay,INSnum,admission_number,admission_class
1,582,582,26,M,,1990,10222016,,94361995930,B2


In [70]:
immigrant_details_table = spark.createDataFrame(df_immigrant_details,schema=immigrant_details_schema)

In [71]:
immigrant_details_table=immigrant_details_table.withColumn("immigrant_id", monotonically_increasing_id())

In [72]:
immigrant_details_table.show(5)

+---------+------------+---+------+----------+----------+------------+------+----------------+---------------+------------+
|city_code|country_code|age|gender|occupation|birth_year|date_to_stay|INSnum|admission_number|admission_class|immigrant_id|
+---------+------------+---+------+----------+----------+------------+------+----------------+---------------+------------+
|      582|         582| 26|     M|       NaN|      1990|    10222016|   NaN|     94361995930|             B2|           0|
|      297|         297| 25|     M|       NaN|      1991|    10272016|   NaN|     94789696030|             B2|           1|
|      260|         260| 62|     F|       NaN|      1954|    10252016|   NaN|     94612771030|             B2|           2|
|      512|         512| 21|     F|       NaN|      1995|    10052016|   NaN|     92858001630|             B2|           3|
|      746|         158| 46|     M|       NaN|      1970|    10232016|   NaN|     94435600530|             B2|           4|
+-------

In [74]:
# write immigrant_details table to parquet files partitioned by country_code and birth_year
immigrant_details_table.write.partitionBy("country_code","birth_year").parquet(output_data+"/immigrant_details_table.parquet")

#### airline table

In [75]:
# define the airline schema
airline_schema = StructType([
        StructField("airline", StringType()),
        StructField("age", StringType()),  
    ])

In [76]:
airline_df=df_i94_migration[['airline','flight_num']]
airline_df.head(1)

Unnamed: 0,airline,flight_num
1,*GA,XBLNG


In [77]:
airline_table = spark.createDataFrame(airline_df,schema=airline_schema)

In [78]:
airline_table=airline_table.withColumn("airline_id", monotonically_increasing_id())

In [79]:
airline_table.show(5)

+-------+-----+----------+
|airline|  age|airline_id|
+-------+-----+----------+
|    *GA|XBLNG|         0|
|     QR|00739|         1|
|     EK|00215|         2|
|     DL|00554|         3|
|     DL|00143|         4|
+-------+-----+----------+
only showing top 5 rows



In [81]:
# write airline table to parquet files 
airline_table.write.parquet(output_data+"/airline_table.parquet")

##### Trip table:

In [82]:
# define the trip schema
trip_schema = StructType([
        StructField("arrival_date", IntegerType()),
        StructField("departure_date", DoubleType()),
        StructField("arrival_port", StringType()),
        StructField("departure_port", StringType()),
        StructField("arrival_flag", StringType()),
        StructField("departure_flag", StringType()),
        StructField("immig_mode", IntegerType()),
        StructField("visa_type", IntegerType()),
        StructField("visapost", StringType()),
    ])

In [83]:
df_trip=df_i94_migration[['arrival_date','departure_date','arrival_port','departure_port','arrival_flag','departure_flag','immig_mode','visa_type','visapost']]
df_trip.head(1)

Unnamed: 0,arrival_date,departure_date,arrival_port,departure_port,arrival_flag,departure_flag,immig_mode,visa_type,visapost
1,20567,20568.0,TX,MCA,G,R,1,2,MTR


In [84]:
trip_table = spark.createDataFrame(df_trip,schema=trip_schema)

In [85]:
trip_table=trip_table.withColumn("trip_id", monotonically_increasing_id())

In [86]:
trip_table.show(5)

+------------+--------------+------------+--------------+------------+--------------+----------+---------+--------+-------+
|arrival_date|departure_date|arrival_port|departure_port|arrival_flag|departure_flag|immig_mode|visa_type|visapost|trip_id|
+------------+--------------+------------+--------------+------------+--------------+----------+---------+--------+-------+
|       20567|       20568.0|          TX|           MCA|           G|             R|         1|        2|     MTR|      0|
|       20572|       20581.0|          CA|           LOS|           G|             O|         1|        2|     DOH|      1|
|       20570|       20581.0|          CA|           LOS|           G|             O|         1|        2|     MNL|      2|
|       20550|       20552.0|          GA|           NAS|           G|             O|         1|        2|     NSS|      3|
|       20568|       20571.0|          NV|           SEA|           G|             O|         1|        2|     MOS|      4|
+-------

In [87]:
#write trip table to parquet
trip_table.write.parquet(output_data+"/trip_table.parquet")

#### airport table:

In [88]:
# define the airport schema
airport_schema = StructType([
        StructField("type", StringType()),
        StructField("name", StringType()),
        StructField("country", StringType()),
        StructField("region", StringType()),
        StructField("local_code", StringType()),
        StructField("IATA_code", StringType()),
    ])

In [89]:
airport_df=df_airport_code[['type','name','country','region','local_code','IATA_code']]
airport_df.head(1)

Unnamed: 0,type,name,country,region,local_code,IATA_code
0,small_airport,Utirik Airport,MH,MH-UTI,03N,UTK


In [90]:
airport_table = spark.createDataFrame(airport_df,schema=airport_schema)

In [91]:
airport_table=airport_table.withColumn("airport_id", monotonically_increasing_id())

In [92]:
airport_table.show(5)

+-------------+-----------------+-------+------+----------+---------+----------+
|         type|             name|country|region|local_code|IATA_code|airport_id|
+-------------+-----------------+-------+------+----------+---------+----------+
|small_airport|   Utirik Airport|     MH|MH-UTI|       03N|      UTK|         0|
|small_airport|Andakombe Airport|     PG|PG-EHG|       ADK|      ADC|         1|
|small_airport|    Aseki Airport|     PG|PG-MPL|       ASE|      AEK|         2|
|small_airport|    Aiome Airport|     PG|PG-MPM|       AIO|      AIE|         3|
|small_airport|  Aiambak Airport|     PG|PG-WPD|       AMB|      AIH|         4|
+-------------+-----------------+-------+------+----------+---------+----------+
only showing top 5 rows



In [93]:
#write airport table to parquet
airport_table.write.parquet(output_data+"/airport_table.parquet")

#### temperature table

In [94]:
# define the airport schema
temperature_schema = StructType([
        StructField("datetime", StringType()),
        StructField("Avg_temp", StringType()),
        StructField("Avg_temp_uncert", StringType()),
        StructField("city", StringType()),
        StructField("country", StringType()),
        StructField("year", StringType()),
        StructField("month", StringType()),
    ])

In [95]:
temperature_df=df_World_Temperature[['datetime','Avg_temp','Avg_temp_uncert','city','country','year','month']]
temperature_df.head(1)

Unnamed: 0,datetime,Avg_temp,Avg_temp_uncert,city,country,year,month
0,1820-01-01,2.101,3.217,Abilene,United States,1820,1


In [96]:
temperature_table = spark.createDataFrame(temperature_df,schema=temperature_schema)

In [97]:
temperature_table=temperature_table.withColumn("temp_id", monotonically_increasing_id())

In [98]:
temperature_table.show(5)

+----------+------------------+---------------+-------+-------------+----+-----+-------+
|  datetime|          Avg_temp|Avg_temp_uncert|   city|      country|year|month|temp_id|
+----------+------------------+---------------+-------+-------------+----+-----+-------+
|1820-01-01|2.1010000000000004|          3.217|Abilene|United States|1820|   01|      0|
|1820-02-01|             6.926|          2.853|Abilene|United States|1820|   02|      1|
|1820-03-01|10.767000000000001|          2.395|Abilene|United States|1820|   03|      2|
|1820-04-01|17.988999999999994|          2.202|Abilene|United States|1820|   04|      3|
|1820-05-01|            21.809|          2.036|Abilene|United States|1820|   05|      4|
+----------+------------------+---------------+-------+-------------+----+-----+-------+
only showing top 5 rows



In [100]:
# write temperature table to parquet files partitioned by year and month
temperature_table.write.partitionBy("year","month").parquet(output_data+"/temperature_table.parquet")

#### time table

In [101]:
# define the time schema
time_schema = StructType([
        StructField("datetime", StringType()),
        StructField("year", StringType()),
        StructField("month", StringType()),
    ])

In [102]:
time_df=df_World_Temperature[['datetime','year','month']]
time_df.head(1)

Unnamed: 0,datetime,year,month
0,1820-01-01,1820,1


In [103]:
time_table = spark.createDataFrame(time_df,schema=time_schema)

In [104]:
time_table=time_table.withColumn("time_id", monotonically_increasing_id())

In [105]:
time_table.show(5)

+----------+----+-----+-------+
|  datetime|year|month|time_id|
+----------+----+-----+-------+
|1820-01-01|1820|   01|      0|
|1820-02-01|1820|   02|      1|
|1820-03-01|1820|   03|      2|
|1820-04-01|1820|   04|      3|
|1820-05-01|1820|   05|      4|
+----------+----+-----+-------+
only showing top 5 rows



In [107]:
# write the time table into s3 partitioned by year and month
time_table.write.partitionBy("year","month").parquet(output_data+"/time_table.parquet")

#### population_details table: 

In [108]:
# define the population details schema
population_details_schema = StructType([
        StructField("female_portion", DoubleType()),
        StructField("male_portion", DoubleType()),
        StructField("total_portion", IntegerType()),
        StructField("city", StringType()),
        StructField("state", StringType()),
        StructField("race", StringType()),
    ])

In [109]:
population_details_df=df_us_city_demo[['female_portion','male_portion','total_portion','city','state','race']]
population_details_df.head(1)

Unnamed: 0,female_portion,male_portion,total_portion,city,state,race
0,41862.0,40601.0,82463,Silver Spring,Maryland,Hispanic or Latino


In [110]:
population_details_table = spark.createDataFrame(population_details_df,schema=population_details_schema)

In [111]:
population_details_table=population_details_table.withColumn("population_id", monotonically_increasing_id())

In [112]:
population_details_table.show(5)

+--------------+------------+-------------+----------------+-------------+--------------------+-------------+
|female_portion|male_portion|total_portion|            city|        state|                race|population_id|
+--------------+------------+-------------+----------------+-------------+--------------------+-------------+
|       41862.0|     40601.0|        82463|   Silver Spring|     Maryland|  Hispanic or Latino|            0|
|       49500.0|     44129.0|        93629|          Quincy|Massachusetts|               White|            1|
|       46799.0|     38040.0|        84839|          Hoover|      Alabama|               Asian|            2|
|       87105.0|     88127.0|       175232|Rancho Cucamonga|   California|Black or African-...|            3|
|      143873.0|    138040.0|       281913|          Newark|   New Jersey|               White|            4|
+--------------+------------+-------------+----------------+-------------+--------------------+-------------+
only showi

In [113]:
#write the population details table partitioned by state and city
population_details_table.write.partitionBy("state","city").parquet(output_data+"/population_details_table.parquet")

#### demographic_summary

In [114]:
# define the demographic summary schema
demographic_summary_schema = StructType([
        StructField("city", StringType()),
        StructField("state", StringType()),
        StructField("median_age", DoubleType()),
        StructField("num_veterans", IntegerType()),
        StructField("foreign_born", StringType()),
        StructField("avg_houshold_size", DoubleType()),
    ])

In [115]:
demographic_summary_df=df_us_city_demo[['city','state','median_age','num_veterans','foreign_born','avg_houshold_size']]
demographic_summary_df.head(1)

Unnamed: 0,city,state,median_age,num_veterans,foreign_born,avg_houshold_size
0,Silver Spring,Maryland,33.8,1562,30908.0,2.6


In [116]:
demographic_summary_table = spark.createDataFrame(demographic_summary_df,schema=demographic_summary_schema)

In [117]:
demographic_summary_table=demographic_summary_table.withColumn("summary_id", monotonically_increasing_id())

In [118]:
demographic_summary_table.show(4)

+----------------+-------------+----------+------------+------------+-----------------+----------+
|            city|        state|median_age|num_veterans|foreign_born|avg_houshold_size|summary_id|
+----------------+-------------+----------+------------+------------+-----------------+----------+
|   Silver Spring|     Maryland|      33.8|        1562|     30908.0|              2.6|         0|
|          Quincy|Massachusetts|      41.0|        4147|     32935.0|             2.39|         1|
|          Hoover|      Alabama|      38.5|        4819|      8229.0|             2.58|         2|
|Rancho Cucamonga|   California|      34.5|        5821|     33878.0|             3.18|         3|
+----------------+-------------+----------+------------+------------+-----------------+----------+
only showing top 4 rows



In [120]:
# write demographic summary table partitioned by state and city
demographic_summary_table.write.partitionBy("state","city").parquet(output_data+"/demographic_summary_table.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

##### 4.2.1- Print all the schema to check the datatype and null vales

In [121]:
immigrant_details_table.printSchema()

root
 |-- city_code: integer (nullable = true)
 |-- country_code: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- date_to_stay: string (nullable = true)
 |-- INSnum: string (nullable = true)
 |-- admission_number: long (nullable = true)
 |-- admission_class: string (nullable = true)
 |-- immigrant_id: long (nullable = false)



In [122]:
trip_table.printSchema()

root
 |-- arrival_date: integer (nullable = true)
 |-- departure_date: double (nullable = true)
 |-- arrival_port: string (nullable = true)
 |-- departure_port: string (nullable = true)
 |-- arrival_flag: string (nullable = true)
 |-- departure_flag: string (nullable = true)
 |-- immig_mode: integer (nullable = true)
 |-- visa_type: integer (nullable = true)
 |-- visapost: string (nullable = true)
 |-- trip_id: long (nullable = false)



In [123]:
airline_table.printSchema()

root
 |-- airline: string (nullable = true)
 |-- age: string (nullable = true)
 |-- airline_id: long (nullable = false)



In [124]:
airport_table.printSchema()

root
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- IATA_code: string (nullable = true)
 |-- airport_id: long (nullable = false)



In [125]:
temperature_table.printSchema()

root
 |-- datetime: string (nullable = true)
 |-- Avg_temp: string (nullable = true)
 |-- Avg_temp_uncert: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- temp_id: long (nullable = false)



In [126]:
time_table.printSchema()

root
 |-- datetime: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- time_id: long (nullable = false)



In [127]:
population_details_table.printSchema()

root
 |-- female_portion: double (nullable = true)
 |-- male_portion: double (nullable = true)
 |-- total_portion: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- race: string (nullable = true)
 |-- population_id: long (nullable = false)



In [128]:
demographic_summary_table.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- num_veterans: integer (nullable = true)
 |-- foreign_born: string (nullable = true)
 |-- avg_houshold_size: double (nullable = true)
 |-- summary_id: long (nullable = false)



##### 4.2.2- Check if the tables are empty or not

In [136]:
print('immigrant_details table dimesions (rows,columns)',(immigrant_details_table.count(), len(immigrant_details_table.columns)))

immigrant_details table dimesions: (369, 11)


In [138]:
print('trip table dimesions (rows,columns):',(trip_table.count(), len(trip_table.columns)))

trip table dimesions : (rows,columns): (369, 10)


In [139]:
print('airline table dimesions (rows,columns):',(airline_table.count(), len(airline_table.columns)))

airline table dimesions: : (rows,columns) (369, 3)


In [140]:
print('airport table dimesions (rows,columns):',(airport_table.count(), len(airport_table.columns)))

airport table dimesions (rows,columns): (678, 7)


In [142]:
print('temperature table dimesions (rows,columns):',(temperature_table.count(), len(temperature_table.columns)))

temperature table dimesions (rows,columns): (661524, 8)


In [144]:
print('time table dimesions (rows,columns):',(time_table.count(), len(time_table.columns)))

time table dimesions (rows,columns): (661524, 4)


In [145]:
print('population_details dimesions (rows,columns):',(population_details_table.count(), len(population_details_table.columns)))

population_details dimesions (rows,columns): (2875, 7)


In [146]:
print('demographic_summary dimesions (rows,columns):',(demographic_summary_table.count(), len(demographic_summary_table.columns)))

demographic_summary dimesions (rows,columns): (2875, 7)


#### 4.3 Data dictionary 
Here is a brief description what means data dictionary:
- Describes a table’s columns based on common traits (i.e name, definition, data type) within another table.
- Data dictionaries allow readers to understand complex databases without having to investigate each column.
- It is like a summary of columns number and type in each table.
- It is most used if a data table is simply too large to view directly.

In [147]:
Image(url="my_dict.png", width=900, height=900)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### 5.1- Tools and Technologies
Clearly state the rationale for the choice of tools: 

- The data is stored in S3 bucket : I have chosen this solution because it offers durability, availability, performance, security, and virtually unlimited scalability at very low costs.
- Pandas for Exploratory Data Analysis (EDA) : It is a Profiling Library that allows you to perform fast and effective data exploration. It allows you to become familiar with the given data via exploration, through statistics, data visualisations, and data summaries.
- PySpark : It used to process large datasets, building machine learning pipelines, and creating ETLs for a data platform.

##### 5.2- Data update

- Data must updated regulary to ensure that the information stored in it or derived from it is current. The process of updating the data is called the refresh process
- In our case we update our data by adding new records to our tables and deleting duplicates
- Data could be updated as follow:
    - Temperature and Demographic details and statistics (Tables) are updated each 12 months as those reports are only generated annualy.
    - Immigration data (Tables) could be updated monthly or each six months.

##### 5.3- Perspective

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.
 
1- The data was increased by 100x 

- If the data has been increased by 100x we could choose to migrate our local solution to the cloud using RDS or Redshift.
- Knowing that our data is going to be used by BI apps (OLAP) or that the data is really very large so RDS is not optimal solution
- The optimal solution here is using Redshift which, supports data warehouse and data lake approaches, enabling it to access and analyze very large amounts of data. 


2- The data populates a dashboard that must be updated on a daily basis by 7am every day.

- We can develop a data pipeline using Apache Airflow, a data pipeline is a set of consecutive or parallel tasks such as : collection, cleaning, analysis, visualization etc.
- Then, we schedule tasks of our pipeline to be run daily at 6:30
- The output reports or files of this pipeline are insert into S3 bucket
- Our dashboard could be devloped using AWS Quicksight where the dataset to be visualized could be updated daily at 7:00 am

The database needed to be accessed by 100+ people.

- If those users are only going to access data withou performing online analysis or processing so in this case RDS is also a good solution
- Otherwise, Redshift is our best solution