# Immigration Data Lake
### Data Engineering Capstone Project

#### Project Summary
This project aims to gather data of immigrants to the US and analyze it through many aspects, It aims to answer questions like:
- What is the most popular state for the student immigrants ? 
- Does the temperature of a state in a certain month affect the number of immigrants?
- How much does the number of immigrants to a state affect the foreigners born in that state ?
And many more analytical questions.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
# import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import count,when,col
from pyspark.sql.functions import to_date
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import monotonically_increasing_id

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1668996888511_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [2]:
# Read in the data here
input_data = "s3a://immigration-raw-data/"
immigration_df = spark.read.parquet(input_data + 'immigration/')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
labels_data = spark.read.text(input_data + 'I94_SAS_Labels_Descriptions.SAS')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
demograhics_df = spark.read.option("header",True).option("delimiter",";").option("inferSchema" , "true").csv(input_data +"us-cities-demographics.csv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
temperature_df =  spark.read.option("header",True).option("inferSchema" , "true").csv(input_data + 'GlobalLandTemperaturesByCity.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
airport_df = spark.read.option("header",True).csv(input_data + "airport-codes_csv.csv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

##### Immigration data cleaning

In [7]:
# Performing cleaning tasks here
immigration_df.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(cicid=5680949.0, i94yr=2016.0, i94cit=117.0, i94res=117.0, i94port='NYC', arrdate=20659.0, i94mode=1.0, i94addr='NY', depdate=None, i94bir=30.0, i94visa=3.0, count=1.0, dtadfile='20160724', visapost='NPL', occup=None, entdepa='G', entdepd=None, entdepu=None, matflag=None, biryear=1986.0, dtaddto='D/S', gender='F', insnum=None, airline='IG', admnum=2947450085.0, fltno='3940', visatype='F1', i94mon=7.0)]

In [8]:
immigration_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = t

In [9]:
immigration_df.limit(5).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+------+
|    cicid| i94yr|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|i94mon|
+---------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+------+
|2358368.0|2016.0| 135.0| 135.0|    NEW|20739.0|    1.0|     FL|20761.0|  17.0|    2.0|  1.0|20161012|    null| null|      G|      O|   null|      M| 1999.0|01092017|     F|  null|     UA|6.6156102033E10|00922|      WT|  10.0|
|2358369.0|2016.0| 254.0| 276.0|    SAI|20739.0|    1.0|     MP|20743.0|   5.0|    2.0|  1.0

In [10]:
immigration_df.describe("i94yr").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+
|summary|               i94yr|
+-------+--------------------+
|  count|            40790529|
|   mean|              2016.0|
| stddev|4.57374931523714E-14|
|    min|              2016.0|
|    max|              2016.0|
+-------+--------------------+

In [11]:
@udf(IntegerType())
def convertToInteger(x):
    if x!=None:
        return int(x)    
    else:
        return x
    
for col_name, col_type in immigration_df.dtypes:
    if col_type == 'double':
        immigration_df = immigration_df.withColumn(col_name, convertToInteger(col_name))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
immigration_df.limit(5).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+----------+-----+--------+------+
|  cicid|i94yr|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|    admnum|fltno|visatype|i94mon|
+-------+-----+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+----------+-----+--------+------+
|2452107| 2016|   135|   135|    NYC|  20708|      1|     NY|  20713|    57|      1|    1|20160911|    null| null|      G|      O|   null|      M|   1959|12092016|     M|  null|     VS|1474026993|   45|      WB|     9|
|2452108| 2016|   135|   135|    ATL|  20708|      1|     GA|  20784|    42|      2|    1|20160911|    null| null|      O|  

In [13]:
immigration_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: integer (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: integer (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: integer (nullable = true)
 |-- fltno: string (

In [14]:
s = '\n'.join(d['value'] for d in labels_data.collect())
lines = s.splitlines()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
import itertools

def decodingImmegrationData():
    # file_name = "I94_SAS_Labels_Descriptions.SAS"
    country_codes = {}
    port_codes ={}
    mode_codes ={}
    address_codes ={}
    visa_codes = {}
    # with open(file_name) as f:
    for line in lines[9:298]:#s, 9, 298):  # start=9, stop=298
        code, country = line.partition("=")[::2]
        country = country.replace(';','')
        country = country.replace('\'','')
        country = country.strip()
        country_codes[int(code)] = country

    for line in lines[ 302:962]:
        code, portcountry = line.partition("=")[::2]
        code = code.replace('\'','')
        code = code.strip()
        portcountry = portcountry.replace('\'','')
        portcountry = portcountry.strip()
        if ',' in portcountry:
            port,country = portcountry.rsplit(',', 1)
            port_codes[code] = [port.strip(),country.strip()]
        else:
            port_codes[code] = [None,None]

    for line in lines[972:976]:
        code, mode = line.partition("=")[::2]
        mode = mode.replace(';','')
        mode = mode.replace('\'','')
        mode = mode.strip()
        code = int(code)
        mode_codes[code] = mode

    for line in lines[ 981:1036 ]:
        code, address = line.partition("=")[::2]
        code = code.replace('\'','')
        code = code.strip()
        address = address.replace(';','')
        address = address.replace('\'','')
        address = address.strip()
        address_codes [code] = address


    for line in lines[1046:1049]:
        code , visa_type = line.partition("=")[::2]
        visa_type = visa_type.strip()
        code = int(code)
        visa_codes[code] = visa_type
            
    return country_codes,port_codes,mode_codes,address_codes,visa_codes
country_codes,port_codes,mode_codes,address_codes,visa_codes = decodingImmegrationData()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
@udf(StringType())
def extractCountry(x):
    if x != None and x in country_codes.keys():
        return country_codes[x]
    else:
        return None

@udf(StringType())
def extractCountryPort(x):
    if x != None and x in port_codes.keys():
        return port_codes[x][0]
    else:
        return None
    
@udf(StringType())
def extractStatePort(x):
    if x != None and x in port_codes.keys():
        return port_codes[x][1]
    else:
        return None

@udf(StringType())
def extractTravelMode(x):
    if x != None and x in mode_codes.keys():
        return mode_codes[x]
    else:
        return x

@udf(StringType())
def extractAddress(x):
    if x != None  and x in address_codes.keys():
        return address_codes[x]
    else:
        return x

@udf(StringType())
def extractVisa(x):
    if x != None and x in visa_codes.keys():
        return visa_codes[x]
    else:
        return x



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
immigration_df = immigration_df.withColumn("i94cit", extractCountry("i94cit"))
immigration_df = immigration_df.withColumn("i94res", extractCountry("i94res"))
immigration_df = immigration_df.withColumn("airportcountry", extractCountryPort("i94port"))
immigration_df = immigration_df.withColumn("airportstate", extractStatePort("i94port"))
immigration_df = immigration_df.withColumn("i94mode", extractTravelMode("i94mode"))
immigration_df = immigration_df.withColumn("i94addr", extractAddress("i94addr"))
immigration_df = immigration_df.withColumn("i94visa", extractVisa("i94visa"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
immigration_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

40790529

In [19]:
immigration_df = immigration_df.withColumn('dtadfile', to_date(col('dtadfile'), format='yyyyMMdd'))
immigration_df = immigration_df.withColumn('dtadddto', to_date(col('dtaddto'), format='MMddyyyy'))
immigration_df.take(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(cicid=5680949, i94yr=2016, i94cit='ITALY', i94res='ITALY', i94port='NYC', arrdate=20659, i94mode='Air', i94addr='NEW YORK', depdate=None, i94bir=30, i94visa='Student', count=1, dtadfile=datetime.date(2016, 7, 24), visapost='NPL', occup=None, entdepa='G', entdepd=None, entdepu=None, matflag=None, biryear=1986, dtaddto='D/S', gender='F', insnum=None, airline='IG', admnum=-1347517211, fltno='3940', visatype='F1', i94mon=7, airportcountry='NEW YORK', airportstate='NY', dtadddto=None), Row(cicid=5680950, i94yr=2016, i94cit='CHINA, PRC', i94res='CHINA, PRC', i94port='DET', arrdate=20659, i94mode='Air', i94addr='ILLINOIS', depdate=20679, i94bir=46, i94visa='Pleasure', count=1, dtadfile=datetime.date(2016, 8, 13), visapost=None, occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1970, dtaddto='01232017', gender='M', insnum='78652', airline='DL', admnum=-1347516211, fltno='188', visatype='B2', i94mon=7, airportcountry='DETROIT', airportstate='MI', dtadddto=datetime.da

In [20]:
import datetime
@udf(TimestampType())
def convertToTimestamp(x):
    try:
        return datetime.timedelta(days=int(x)) + datetime.datetime(1960,1,1)
    except:
        return datetime.datetime(1960,1,1)
    

immigration_df = immigration_df.withColumn('arrdate', convertToTimestamp('arrdate'))
immigration_df = immigration_df.withColumn('depdate', convertToTimestamp('depdate'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
immigration_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+--------------+--------------+-------+-------------------+-------+----------+-------------------+------+--------+-----+----------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-----------+-----+--------+------+--------------+------------+----------+
|  cicid|i94yr|        i94cit|        i94res|i94port|            arrdate|i94mode|   i94addr|            depdate|i94bir| i94visa|count|  dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|     admnum|fltno|visatype|i94mon|airportcountry|airportstate|  dtadddto|
+-------+-----+--------------+--------------+-------+-------------------+-------+----------+-------------------+------+--------+-----+----------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-----------+-----+--------+------+--------------+------------+----------+
|5680949| 2016|         ITALY|         ITALY|    NYC|2016-07-24 00:00:00|    Air|  NE

In [22]:
@udf(StringType())
def lowerString(x):
    if x != None:
        return x.lower()
    else:
        return x
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
immigration_df = immigration_df.withColumn("airportcountry", lowerString("airportcountry"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
immigration_df.limit(5).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+--------------+--------------+-------+-------------------+-------+----------+-------------------+------+--------+-----+----------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-----------+-----+--------+------+--------------+------------+----------+
|  cicid|i94yr|        i94cit|        i94res|i94port|            arrdate|i94mode|   i94addr|            depdate|i94bir| i94visa|count|  dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|     admnum|fltno|visatype|i94mon|airportcountry|airportstate|  dtadddto|
+-------+-----+--------------+--------------+-------+-------------------+-------+----------+-------------------+------+--------+-----+----------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-----------+-----+--------+------+--------------+------------+----------+
|5680949| 2016|         ITALY|         ITALY|    NYC|2016-07-24 00:00:00|    Air|  NE

In [25]:
immigration_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

40790529

In [26]:
# immigration_df.select([count(when(col(colname).isNull(),1)).alias('n'+colname) for colname in immigration_df.columns]).head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
immigration_df = immigration_df.dropna(how = "any", subset = ["airportcountry","airportstate"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
immigration_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

39548799

In [29]:
immigration_df.createOrReplaceTempView("immigration_staging")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### US Demographics data cleaning

In [30]:
demograhics_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)

In [31]:
demograhics_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2891

In [32]:
demograhics_df.select(countDistinct("City","State","Race")).show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------+
|count(DISTINCT City, State, Race)|
+---------------------------------+
|                             2891|
+---------------------------------+

In [33]:
demograhics_df.filter(demograhics_df.City == "Los Angeles").filter(demograhics_df.State == "California").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-------+
|       City|     State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|  Count|
+-----------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-------+
|Los Angeles|California|      35.0|        1958998|          2012898|         3971896|             85417|     1485425|                  2.86|        CA|               White|2177650|
|Los Angeles|California|      35.0|        1958998|          2012898|         3971896|             85417|     1485425|                  2.86|        CA|               Asian| 512999|
|Los Angeles|California|      35.0|        1958998|          2012898|         3971896|    

In [34]:

demograhics_df.select([count(when(col(colname).isNull(),1)).alias('nulls_'+colname) for colname in demograhics_df.columns]).head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(nulls_City=0, nulls_State=0, nulls_Median Age=0, nulls_Male Population=3, nulls_Female Population=3, nulls_Total Population=0, nulls_Number of Veterans=13, nulls_Foreign-born=13, nulls_Average Household Size=16, nulls_State Code=0, nulls_Race=0, nulls_Count=0)

In [35]:
# no need to drop na 
demograhics_df = demograhics_df.drop_duplicates(['City','State','Race'])
demograhics_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2891

In [36]:
demograhics_df = demograhics_df.withColumn("City", lowerString("City"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
for c in demograhics_df.columns:
    demograhics_df = demograhics_df.withColumnRenamed(c,c.replace(' ', '_'))
demograhics_df = demograhics_df.withColumnRenamed("Foreign-born","Foreign_born")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
demograhics_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|         City|     State|Median_Age|Male_Population|Female_Population|Total_Population|Number_of_Veterans|Foreign_born|Average_Household_Size|State_Code|                Race| Count|
+-------------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|      alafaya|   Florida|      33.5|          39504|            45760|           85264|              4176|       15842|                  2.94|        FL|               White| 63666|
| baldwin park|California|      35.8|          38747|            38309|           77056|               780|       34322|                  4.13|        CA|Black or African-...|  1560|
|      houston|     Texas|      32.6|        1149686|          1148942|         22986

In [39]:
demograhics_df.createOrReplaceTempView("demographics")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Temperature data cleaning

In [40]:
temperature_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8599212

In [41]:
temperature_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

In [42]:
temperature_df = temperature_df.filter(temperature_df.Country == "United States")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
temperature_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

687289

In [44]:
temperature_df.filter(temperature_df.Country == "United States").filter(temperature_df.City == "Los Angeles").head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(dt=datetime.datetime(1849, 1, 1, 0, 0), AverageTemperature=8.818999999999999, AverageTemperatureUncertainty=2.5580000000000003, City='Los Angeles', Country='United States', Latitude='34.56N', Longitude='118.70W')

In [45]:
temperature_df.select([count(when(col(colname).isNull(),1)).alias('nulls_'+colname) for colname in temperature_df.columns]).head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(nulls_dt=0, nulls_AverageTemperature=25765, nulls_AverageTemperatureUncertainty=25765, nulls_City=0, nulls_Country=0, nulls_Latitude=0, nulls_Longitude=0)

In [46]:
temperature_df = temperature_df.na.drop(how="any")
temperature_df = temperature_df.drop_duplicates(['dt','City'])
temperature_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

639649

In [47]:
temperature_df = temperature_df.withColumn("City", lowerString("City"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
temperature_df.limit(5).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+------------------+-----------------------------+------------+-------------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|        City|      Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+------------+-------------+--------+---------+
|1745-04-01 00:00:00|             8.209|                        1.227| jersey city|United States|  40.99N|   74.56W|
|1745-04-01 00:00:00| 7.167000000000001|                        1.069|   new haven|United States|  40.99N|   72.43W|
|1750-07-01 00:00:00|             22.69|                         1.28|     buffalo|United States|  42.59N|   78.55W|
|1750-07-01 00:00:00|            24.161|           1.8330000000000002|     madison|United States|  42.59N|   89.45W|
|1750-07-01 00:00:00|            27.141|                        1.328|newport news|United States|  37.78N|   77.29W|
+-------------------+------------------+------------------------

In [49]:
temperature_df.createOrReplaceTempView("temperature_staging")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Airport data cleaning

In [50]:
airport_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

In [51]:
airport_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

55075

In [52]:
from pyspark.sql.functions import countDistinct
airport_df.select(countDistinct("municipality")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------------+
|count(DISTINCT municipality)|
+----------------------------+
|                       27133|
+----------------------------+

In [53]:
airport_df.select(countDistinct("iso_region","municipality")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------------------------+
|count(DISTINCT iso_region, municipality)|
+----------------------------------------+
|                                   31543|
+----------------------------------------+

In [54]:
x = airport_df.filter(airport_df.iso_region == "US-CA")
# x.filter(x.municipality =="Los Angeles").count()
x.filter(x.municipality =="Los Angeles").head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(ident='01CN', type='heliport', name="Los Angeles County Sheriff's Department Heliport", elevation_ft='300', continent='NA', iso_country='US', iso_region='US-CA', municipality='Los Angeles', gps_code='01CN', iata_code=None, local_code='01CN', coordinates='-118.15399932861328, 34.03779983520508')

In [55]:
@udf(StringType())
def extractState(x):
    state = x.split('-')
    return state[1]

airport_df = airport_df.withColumn("iso_region", extractState("iso_region"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
airport_df = airport_df.withColumn("municipality", lowerString("municipality"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [57]:
airport_df.limit(5).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|        PA|    bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|        KS|       leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|        AK|anchor point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [58]:
airport_df.select([count(when(col(colname).isNull(),1)).alias('nulls_'+colname) for colname in airport_df.columns]).head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(nulls_ident=0, nulls_type=0, nulls_name=0, nulls_elevation_ft=7006, nulls_continent=0, nulls_iso_country=0, nulls_iso_region=0, nulls_municipality=5676, nulls_gps_code=14045, nulls_iata_code=45886, nulls_local_code=26389, nulls_coordinates=0)

In [59]:
airport_df= airport_df.na.drop(subset=["municipality"])
airport_df= airport_df.drop_duplicates(["iso_region","municipality"])
airport_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

31462

In [60]:
airport_df = airport_df.filter(airport_df.iso_country == "US")
airport_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

11940

In [61]:
airport_df.createOrReplaceTempView("airport_staging")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [62]:
city_table = spark.sql("""
                            SELECT  DISTINCT d.City AS city,
                                    d.State AS state,
                                    d.State_Code AS state_code,
                                    d.Median_Age AS median_age,
                                    d.Male_Population AS male_population,
                                    d.Female_Population AS female_population, 
                                    d.Total_Population AS total_population,
                                    d.Number_of_Veterans AS number_of_veterans,
                                    d.Foreign_born AS foreign_born,
                                    d.Average_Household_Size AS average_household_size
                            FROM demographics d
                            
                            
""")

city_table = city_table.withColumn("city_id", monotonically_increasing_id())
city_table.createOrReplaceTempView('city')
city_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+----------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+-----------+
|        city|     state|state_code|median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|    city_id|
+------------+----------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+-----------+
|  sacramento|California|        CA|      33.7|         237724|           252991|          490715|             19698|      112579|                  2.73|          0|
|     killeen|     Texas|        TX|      29.2|          69442|            71367|          140809|             24281|       15769|                  2.72|          1|
|    avondale|   Arizona|        AZ|      29.1|          38712|            41971|           80683|              4815|        8355|                  3.18| 8589934592|
|   

In [63]:
city_races_table = spark.sql("""
                            SELECT  monotonically_increasing_id() AS city_races_id,
                                    c.city_id AS city_id,
                                    d.Race AS race,
                                    d.Count AS race_count
                            FROM demographics d
                            JOIN city c
                            ON d.City = c.city
                            AND d.State = c.state
                            AND d.State_Code = c.state_code
                            
                            
""")
city_races_table.createOrReplaceTempView('city_races')

city_races_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+-------------+--------------------+----------+
|city_races_id|      city_id|                race|race_count|
+-------------+-------------+--------------------+----------+
|            0|1709396983808|               White|     63666|
|            1| 412316860416|Black or African-...|      1560|
|            2|  25769803777|               Asian|    173854|
|            3|1279900254208|               White|     91201|
|            4| 206158430208|               Asian|     17854|
+-------------+-------------+--------------------+----------+
only showing top 5 rows

In [64]:
airport_table = spark.sql("""
                            SELECT  DISTINCT a.ident AS airport_id,
                                    a.name AS name,
                                    a.type AS type,
                                    c.city_id AS city_id
                            FROM airport_staging a
                            JOIN city c
                            ON a.municipality = c.city
                            AND a.iso_region = c.state_code
                            
""")
airport_table.createOrReplaceTempView('airport')

airport_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+--------+-------------+
|airport_id|                name|    type|      city_id|
+----------+--------------------+--------+-------------+
|      7LL1|Vista Medical Cen...|heliport|1116691496962|
|      04TE|Veterans Affairs ...|heliport|  25769803776|
|      31OR|   Heli-Jet Heliport|heliport| 249108103169|
|       0T7|   Kittyhawk Airport|  closed|1417339207680|
|      PN20|   Car Tech Heliport|heliport|1623497637892|
+----------+--------------------+--------+-------------+
only showing top 5 rows

In [65]:
immigration_table = spark.sql("""
                            SELECT  DISTINCT i.cicid AS immigrant_id,
                                    i.i94yr AS immigration_year,
                                    i.i94mon AS immigration_month,
                                    i.i94cit AS origin_city,
                                    c.city_id AS arrival_city_id,
                                    i.arrdate AS arrival_date,
                                    i.i94mode AS travel_mode,
                                    i.i94addr AS address,
                                    i.depdate AS departial_date,
                                    i.i94bir AS age,
                                    i.i94visa AS visa_class,
                                    i.occup AS occupation,
                                    i.biryear AS birth_year,
                                    i.dtaddto AS addmission_date,
                                    i.gender AS gender,
                                    i.airline AS airline,
                                    i.fltno AS flight_number,
                                    i.visatype AS visa_type 
                            FROM immigration_staging i
                            JOIN city c
                            ON i.airportcountry = c.city
                            AND i.airportstate = c.state_code
                            
""")
immigration_table = immigration_table.withColumn("immigration_id", monotonically_increasing_id())

immigration_table.createOrReplaceTempView('immigration')

immigration_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+----------------+-----------------+--------------------+---------------+-------------------+-----------+--------+-------------------+---+----------+----------+----------+---------------+------+-------+-------------+---------+--------------+
|immigrant_id|immigration_year|immigration_month|         origin_city|arrival_city_id|       arrival_date|travel_mode| address|     departial_date|age|visa_class|occupation|birth_year|addmission_date|gender|airline|flight_number|visa_type|immigration_id|
+------------+----------------+-----------------+--------------------+---------------+-------------------+-----------+--------+-------------------+---+----------+----------+----------+---------------+------+-------+-------------+---------+--------------+
|     5656337|            2016|               12|      UNITED KINGDOM|  1357209665540|2016-12-25 00:00:00|        Air|  NEVADA|2017-01-01 00:00:00| 52|  Pleasure|      null|      1964|       03242017|     F|     VS|           43|      

In [67]:
temperature_table = spark.sql("""
                            SELECT  DISTINCT t.dt AS temperature_date ,
                                    c.city_id AS city_id ,
                                    t.AverageTemperature AS average_temperature,
                                    t.AverageTemperatureUncertainty AS average_temperature_uncertainty 
                            
                            FROM temperature_staging t
                            JOIN city c
                            ON t.City = c.city                            
""")
temperature_table = temperature_table.withColumn("temperature_id", monotonically_increasing_id())

temperature_table.createOrReplaceTempView('temperature')

temperature_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-------------+-------------------+-------------------------------+--------------+
|   temperature_date|      city_id|average_temperature|average_temperature_uncertainty|temperature_id|
+-------------------+-------------+-------------------+-------------------------------+--------------+
|1910-06-01 00:00:00| 627065225216| 26.316999999999997|                          0.355|             0|
|1955-02-01 00:00:00| 704374636548|              8.728|                          0.243|             1|
|1898-07-01 00:00:00|1125281431555|             19.823|                          0.479|             2|
|2010-04-01 00:00:00| 790273982464|             16.701|                           0.25|             3|
|1854-01-01 00:00:00|1099511627778| 1.5419999999999998|                          2.147|             4|
+-------------------+-------------+-------------------+-------------------------------+--------------+
only showing top 5 rows

In [66]:
# write to s3 
output_data = 's3a://immigration-analytics/'


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [68]:
city_table.write.mode("overwrite").parquet(output_data + "city.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
city_races_table.write.mode("overwrite").parquet(output_data + "city_races.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [70]:
airport_table.write.mode("overwrite").parquet(output_data + "airport.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [71]:
immigration_table.write.mode("overwrite").parquet(output_data + "immigration.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [72]:
temperature_table.write.mode("overwrite").parquet(output_data + "temperature.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

#### Ensure that the created tables are not empty

In [73]:
city_table.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

596

In [74]:
city_races_table.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2891

In [75]:
airport_table.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

478

In [76]:
immigration_table.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

31716637

In [77]:
temperature_table.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

675386

#### Ensure that tables have unique keys

In [78]:
def DataQuality(actual_result,expected_result):
    print(f"Expected Result is {expected_result}.")
    if actual_result != expected_result :
        print(f"Data quality test failed with result {actual_result}.") 
        raise ValueError("Data quality check failed.")
    else:
        print(f"Data quality test passed with result {actual_result}.") 
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [79]:
unique_city = city_table.select(countDistinct("city_id"))
actual_result = int(unique_city.collect()[0][0])
expected_result = int(city_table.count())
DataQuality(actual_result,expected_result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Expected Result is 596.
Data quality test passed with result 596.

In [80]:
unique_airport = airport_table.select(countDistinct("airport_id"))
actual_result = int(unique_airport.collect()[0][0])
expected_result = int(airport_table.count())
DataQuality(actual_result,expected_result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Expected Result is 478.
Data quality test passed with result 478.

In [81]:
unique_city_races = city_races_table.select(countDistinct("city_races_id"))
actual_result = int(unique_city_races.collect()[0][0])
expected_result = int(city_races_table.count())
DataQuality(actual_result,expected_result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Expected Result is 2891.
Data quality test passed with result 2891.

In [82]:
unique_temperature = temperature_table.select(countDistinct("temperature_id"))
actual_result = int(unique_temperature.collect()[0][0])
expected_result = int(temperature_table.count())
DataQuality(actual_result,expected_result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Expected Result is 675386.
Data quality test passed with result 675386.

In [83]:
unique_immigration = immigration_table.select(countDistinct("immigration_id"))
actual_result = int(unique_immigration.collect()[0][0])
expected_result = int(immigration_table.count())
DataQuality(actual_result,expected_result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Expected Result is 31716637.
Data quality test passed with result 31716637.

#### check data types

In [85]:
city_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- city_id: long (nullable = false)

In [86]:
city_races_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- city_races_id: long (nullable = false)
 |-- city_id: long (nullable = false)
 |-- race: string (nullable = true)
 |-- race_count: integer (nullable = true)

In [87]:
airport_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- airport_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- city_id: long (nullable = false)

In [88]:
temperature_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- temperature_date: timestamp (nullable = true)
 |-- city_id: long (nullable = false)
 |-- average_temperature: double (nullable = true)
 |-- average_temperature_uncertainty: double (nullable = true)
 |-- temperature_id: long (nullable = false)

In [89]:
immigration_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- immigrant_id: integer (nullable = true)
 |-- immigration_year: integer (nullable = true)
 |-- immigration_month: integer (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- arrival_city_id: long (nullable = false)
 |-- arrival_date: timestamp (nullable = true)
 |-- travel_mode: string (nullable = true)
 |-- address: string (nullable = true)
 |-- departial_date: timestamp (nullable = true)
 |-- age: integer (nullable = true)
 |-- visa_class: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- addmission_date: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- flight_number: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- immigration_id: long (nullable = false)

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.