# Project Title: Immigration Stats by City 
### Data Engineering Capstone Project

#### Project Summary

* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [2]:
# Read in city data here

# https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
# This dataset contains information about the demographics of all US cities and census-designated 
#places with a population greater or equal to 65,000.

city_file = '/home/workspace/us-cities-demographics.csv'

In [3]:
# Distributed processing 
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [4]:
import pyspark.sql.types as typ
# Schema of city demographic data

schema = [
    typ.StructField("City", typ.StringType(), True),
    typ.StructField("State", typ.StringType(), True),
    typ.StructField("Median Age", typ.FloatType(), True),
    typ.StructField("Male Population", typ.FloatType(), True),
    typ.StructField("Female Population", typ.FloatType(), True),
    typ.StructField("Total Population", typ.FloatType(), True),
    typ.StructField("Number of Veterans", typ.FloatType(), True),
    typ.StructField("Foreign-born", typ.StringType(), True),
    typ.StructField("Average Household Size", typ.FloatType(), True),
    typ.StructField("State Code", typ.StringType(), True),
    typ.StructField("Race", typ.StringType(), True),
    typ.StructField("Count", typ.FloatType(), True)]

customSchema = typ.StructType(schema)

In [5]:
df_city = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(customSchema).load(city_file)

In [6]:
df_city.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: float (nullable = true)
 |-- Male Population: float (nullable = true)
 |-- Female Population: float (nullable = true)
 |-- Total Population: float (nullable = true)
 |-- Number of Veterans: float (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: float (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: float (nullable = true)



In [7]:
df_city.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-------+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|  Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-------+
|   Silver Spring|     Maryland|      33.8|        40601.0|          41862.0|         82463.0|            1562.0|       30908|                   2.6|        MD|  Hispanic or Latino|25924.0|
|          Quincy|Massachusetts|      41.0|        44129.0|          49500.0|         93629.0|            4147.0|       32935|                  2.39|        MA|               White|58723.0|
|          Hoover|      Alabama|      38.5|       

In [8]:
df_city.select('City','State','Median Age','Male Population','Female Population','Total Population','Number of Veterans').show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+
|   Silver Spring|     Maryland|      33.8|        40601.0|          41862.0|         82463.0|            1562.0|
|          Quincy|Massachusetts|      41.0|        44129.0|          49500.0|         93629.0|            4147.0|
|          Hoover|      Alabama|      38.5|        38040.0|          46799.0|         84839.0|            4819.0|
|Rancho Cucamonga|   California|      34.5|        88127.0|          87105.0|        175232.0|            5821.0|
|          Newark|   New Jersey|      34.6|       138040.0|         143873.0|        281913.0|            5829.0|
+----------------+-------------+----------+---------------+-----------------+-----------

In [9]:
df_city.select('Foreign-born','Average Household Size','State Code','Race', 'Count').show(5)

+------------+----------------------+----------+--------------------+-------+
|Foreign-born|Average Household Size|State Code|                Race|  Count|
+------------+----------------------+----------+--------------------+-------+
|       30908|                   2.6|        MD|  Hispanic or Latino|25924.0|
|       32935|                  2.39|        MA|               White|58723.0|
|        8229|                  2.58|        AL|               Asian| 4759.0|
|       33878|                  3.18|        CA|Black or African-...|24437.0|
|       86253|                  2.73|        NJ|               White|76402.0|
+------------+----------------------+----------+--------------------+-------+
only showing top 5 rows



In [10]:
# Read in immigration data 
#I94 Immigration Data: This data comes from the US National Tourism and Trade Office.
#https://travel.trade.gov/research/reports/i94/historical/2016.html

immigration_file = '/home/workspace/immigration_data_sample.csv'

df2 = pd.read_csv(immigration_file)
df2.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [11]:
# Load second immigration dataset 

df_immigration = spark.read.format("csv").option("header", "true").option("delimiter", ",").load(immigration_file)

In [12]:
df_immigration.show(5)

+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|    _c0|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|2027561|4084316.0|2016.0|   4.0| 209.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|  61.0|    2.0|  1.0|20160422|    null| null|      G|      O|   null|      M| 1955.0|07202016|     F|  null|     JL|56582674633.0|00782|      WT|
|2171295|4422636.0|2016.0|   4.0| 582.0| 582.0|    MCA|20567.0|    1

In [13]:
df_immigration.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = tru

In [14]:
# assumption: each record represents a person. sum of count reflects actual immigration numbers

df_immigration.select("i94mode", "visatype", 'gender', 'i94addr', 'count', 'i94port', 'biryear').show(5)

+-------+--------+------+-------+-----+-------+-------+
|i94mode|visatype|gender|i94addr|count|i94port|biryear|
+-------+--------+------+-------+-----+-------+-------+
|    1.0|      WT|     F|     HI|  1.0|    HHW| 1955.0|
|    1.0|      B2|     M|     TX|  1.0|    MCA| 1990.0|
|    1.0|      WT|     M|     FL|  1.0|    OGG| 1940.0|
|    1.0|      B2|     M|     CA|  1.0|    LOS| 1991.0|
|    3.0|      WT|     F|     NY|  1.0|    CHM| 1997.0|
+-------+--------+------+-------+-----+-------+-------+
only showing top 5 rows



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [16]:
# Performing cleaning tasks here

# count of null values 
df_city.rdd.map(lambda row: (row['City'], sum([i == None for i in row ]))).take(5)

[('Silver Spring', 0),
 ('Quincy', 0),
 ('Hoover', 0),
 ('Rancho Cucamonga', 0),
 ('Newark', 0)]

In [17]:
state = df_city.select('State Code')
state.take(5)

[Row(State Code='MD'),
 Row(State Code='MA'),
 Row(State Code='AL'),
 Row(State Code='CA'),
 Row(State Code='NJ')]

In [18]:
# City Data 

# rename columms of distributed dataFrame // curent naming convention is not supported

df_better = df_city.withColumn("Abbr", df_city['State Code'])

df_better1 = df_better.withColumn("total_pop", df_city['Total Population'])

df_better2 = df_better1.withColumn("male_pop", df_city['Male Population'])

df_better3 = df_better2.withColumn("female_pop", df_city['Female Population'])

df_better4 = df_better3.withColumn("foreign_pop", df_city['Foreign-born'])


In [19]:
df_better4.take(5)

[Row(City='Silver Spring', State='Maryland', Median Age=33.79999923706055, Male Population=40601.0, Female Population=41862.0, Total Population=82463.0, Number of Veterans=1562.0, Foreign-born='30908', Average Household Size=2.5999999046325684, State Code='MD', Race='Hispanic or Latino', Count=25924.0, Abbr='MD', total_pop=82463.0, male_pop=40601.0, female_pop=41862.0, foreign_pop='30908'),
 Row(City='Quincy', State='Massachusetts', Median Age=41.0, Male Population=44129.0, Female Population=49500.0, Total Population=93629.0, Number of Veterans=4147.0, Foreign-born='32935', Average Household Size=2.390000104904175, State Code='MA', Race='White', Count=58723.0, Abbr='MA', total_pop=93629.0, male_pop=44129.0, female_pop=49500.0, foreign_pop='32935'),
 Row(City='Hoover', State='Alabama', Median Age=38.5, Male Population=38040.0, Female Population=46799.0, Total Population=84839.0, Number of Veterans=4819.0, Foreign-born='8229', Average Household Size=2.5799999237060547, State Code='AL', R

In [20]:
#desciptive statistics of city data

df_city.describe(['median age', 'Average Household Size', 'Male Population','Female Population','Total Population']).show()

+-------+-----------------+----------------------+------------------+------------------+------------------+
|summary|       median age|Average Household Size|   Male Population| Female Population|  Total Population|
+-------+-----------------+----------------------+------------------+------------------+------------------+
|  count|             2891|                  2875|              2888|              2888|              2891|
|   mean|35.49488059248078|     2.742542615890503| 97328.42624653739|101769.63088642659|198966.77931511588|
| stddev|4.401616735414509|   0.43329108651402737|216299.93692873296|231564.57257148277| 447555.9296335903|
|    min|             22.9|                   2.0|           29281.0|           27348.0|           63215.0|
|    max|             70.5|                  4.98|         4081698.0|         4468707.0|         8550405.0|
+-------+-----------------+----------------------+------------------+------------------+------------------+



In [21]:
#Immigration data
# nulls visible in the data set
# drop rows that do not contain i94addr value 'state'

df_immigration_clean = df_immigration.dropna(how = 'any', subset = 'i94addr')

df_immigration_clean.show(5)

+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|    _c0|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|2027561|4084316.0|2016.0|   4.0| 209.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|  61.0|    2.0|  1.0|20160422|    null| null|      G|      O|   null|      M| 1955.0|07202016|     F|  null|     JL|56582674633.0|00782|      WT|
|2171295|4422636.0|2016.0|   4.0| 582.0| 582.0|    MCA|20567.0|    1

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

In [22]:
# The conceptual data model here leverages the city demographic dataset and the immigration dateset.
#    Due to the size of the dataset the goals is to efficiently clean the data, to creates views of a
#    subset of the relevant data and then to join the resulting views in order to generate a report 
#    that provides relevant information that could not have been achieved with either dataset alone, or while 
#    analyzing the full data sets

In [23]:
### 3.2 Mapping Out Data Pipelines

# Subviews will be created and joined on a common feature

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [24]:
# Transform datasets

# create temporary view of the city data 

df_better4.createOrReplaceTempView('cityView')

In [25]:
df_sub_city = spark.sql("""SELECT State, abbr, sum(male_pop) as sum_male_pop, sum(female_pop) as sum_female_pop, 
          sum(Foreign_pop) as sum_foreign_pop
          FROM cityView 
          group by State, abbr order by State""")

In [26]:
df_sub_city.createOrReplaceTempView("df_sub_city_view")

In [27]:
# final staging of city table

# abbr field will be used as the join featurr
# As a result the respective population metrics have been aggreagted by year

spark.sql("SELECT * FROM df_sub_city_view").show()

+--------------------+----+------------+--------------+---------------+
|               State|abbr|sum_male_pop|sum_female_pop|sum_foreign_pop|
+--------------------+----+------------+--------------+---------------+
|             Alabama|  AL|   2448200.0|     2715106.0|       252541.0|
|              Alaska|  AK|    764725.0|      728750.0|       166290.0|
|             Arizona|  AZ| 1.1137275E7|   1.1360435E7|      3411565.0|
|            Arkansas|  AR|   1400724.0|     1482165.0|       307753.0|
|          California|  CA| 6.1055672E7|   6.2388681E7|    3.7059662E7|
|            Colorado|  CO|   7273095.0|     7405250.0|      1688155.0|
|         Connecticut|  CT|   2123435.0|     2231661.0|      1114250.0|
|            Delaware|  DE|    163400.0|      196385.0|        16680.0|
|District of Columbia|  DC|   1598525.0|     1762615.0|       475585.0|
|             Florida|  FL| 1.5461937E7|   1.6626425E7|      7845566.0|
|             Georgia|  GA|   4101605.0|     4453555.0|       73

In [28]:
df_immigration_clean.createOrReplaceTempView('immigrationView')

# Key information for select immigration columns provided here:  

"""I94BIR - Age of Respondent in Years */
AIRLINE - Airline used to arrive in U.S. */
VISATYPE Class of admission legally admitting the non-immigrant to temporarily stay in U.S. */
I94MODE - There are missing values as well as not reported (9) */
/* I94VISA - Visa codes collapsed into three categories:
   1 = Business
   2 = Pleasure
   3 = Student   
   /* I94MODE - There are missing values as well as not reported (9) */
value i94model
	1 = 'Air'
	2 = 'Sea'
	3 = 'Land'
	9 = 'Not reported'"""

df_sub_immigration = spark.sql("""Select gender, i94addr as state, i94bir arrival_age, i94mode as arrival_mode, count 
                                FROM immigrationView""")

In [29]:
#final staging immigration table 
df_sub_immigration.createOrReplaceTempView("df_sub_im")

In [30]:
df_sub_im1 = spark.sql("""Select gender, state, avg(arrival_age) as avg_arrival_age, arrival_mode, count(count) as im_count 
FROM df_sub_im group by state, gender, arrival_mode""")

df_sub_im1.createOrReplaceTempView("df_sub_im_sub")

In [31]:
# Final staging of immigration table 

spark.sql("""SELECT * FROM df_sub_im_sub 
          Where gender is not Null 
          order by avg_arrival_age DESC""").show()

+------+-----+------------------+------------+--------+
|gender|state|   avg_arrival_age|arrival_mode|im_count|
+------+-----+------------------+------------+--------+
|     F|   NH|              74.0|         1.0|       1|
|     F|   MO|              71.5|         1.0|       2|
|     F|   MN|              71.0|         1.0|       1|
|     M|   OR|              66.0|         1.0|       1|
|     M|   HI|              65.0|         2.0|       1|
|     F|   CO|              62.5|         1.0|       2|
|     F|   CT|              62.0|         1.0|       4|
|     F|   VA|              61.5|         1.0|       6|
|     F|   NE|              61.0|         3.0|       1|
|     F|   OK|              59.0|         1.0|       1|
|     F|   MD|             58.25|         1.0|       4|
|     F|   OR|              58.0|         3.0|       1|
|     M|   WA|57.166666666666664|         1.0|       6|
|     F|   CA|              57.0|         3.0|       1|
|     M|   DC|              56.0|         3.0|  

In [32]:
# final staging tables summed and joined by state 

df_join = spark.sql("""SELECT gender, avg_arrival_age, arrival_mode, df_sub_city_view.state, sum_male_pop, sum_female_pop, sum_foreign_pop, im_count 
             FROM df_sub_im_sub
             JOIN df_sub_city_view
             on df_sub_im_sub.state = df_sub_city_view.abbr
          """)

df_join.createOrReplaceTempView("final_table")

df_join.show()

+------+------------------+------------+--------------------+------------+--------------+---------------+--------+
|gender|   avg_arrival_age|arrival_mode|               state|sum_male_pop|sum_female_pop|sum_foreign_pop|im_count|
+------+------------------+------------+--------------------+------------+--------------+---------------+--------+
|     F|              36.0|         1.0|             Alabama|   2448200.0|     2715106.0|       252541.0|       2|
|     M|              53.0|         1.0|             Alabama|   2448200.0|     2715106.0|       252541.0|       1|
|  null|              53.0|         1.0|             Alabama|   2448200.0|     2715106.0|       252541.0|       2|
|     M|             41.25|         1.0|             Arizona| 1.1137275E7|   1.1360435E7|      3411565.0|       4|
|  null|              43.0|         1.0|             Arizona| 1.1137275E7|   1.1360435E7|      3411565.0|       1|
|     M|              29.0|         1.0|            Arkansas|   1400724.0|     1

In [51]:
#final Consolidated report of ongoing immigration compared to existing foreign populations by state

spark.sql("""SELECT gender, state, arrival_mode, sum_foreign_pop, sum(im_count) as count_
          FROM final_table 
          WHERE (gender is not null and sum_foreign_pop is not null)
          GROUP BY gender, state, arrival_mode, sum_foreign_pop 
          Order by sum_foreign_pop Desc, count_ Desc
           """).show()

+------+-------------+------------+---------------+------+
|gender|        state|arrival_mode|sum_foreign_pop|count_|
+------+-------------+------------+---------------+------+
|     M|   California|         1.0|    3.7059662E7|    83|
|     F|   California|         1.0|    3.7059662E7|    57|
|     M|   California|         3.0|    3.7059662E7|     3|
|     F|   California|         3.0|    3.7059662E7|     1|
|     F|     New York|         1.0|    1.7186873E7|    84|
|     M|     New York|         1.0|    1.7186873E7|    58|
|     M|     New York|         3.0|    1.7186873E7|     1|
|     F|     New York|         9.0|    1.7186873E7|     1|
|     F|     New York|         3.0|    1.7186873E7|     1|
|     M|        Texas|         1.0|    1.4498054E7|    21|
|     F|        Texas|         1.0|    1.4498054E7|     9|
|     M|        Texas|         3.0|    1.4498054E7|     3|
|     M|      Florida|         1.0|      7845566.0|    84|
|     F|      Florida|         1.0|      7845566.0|    7

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [33]:
# immigration report by state, gender, mode of transporation with populations summed by state
# Check to see if there are is any data in the report
check = spark.sql(" SELECT count(state) FROM final_table").toPandas()

# First table loaded, determines if there are any new records loaded
check2 = df_city.count()


In [34]:
if check2 > 1:
    print('Initial table has been loaded...')
    if check['count(state)'][0] > 0:
        print('There are records in final report...checks passed') 
else: 
    print("There has been an error")

Initial table has been loaded...
There are records in final report...checks passed


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [35]:
#Data Dictionary 

# Primary Fields:

# Gender 
# State 
# Arrival Mode - 1 = 'Air'; 2 = 'Sea'; 3 = 'Land'; 9 = 'Not reported'
# Sum_foreign_pop - The total foreign popululation for a given city 
# Count_- Immigration counts by 'Group By'