# US Immigration and Temperature
### Data Engineering Capstone Project

#### Project Summary
This capstone project is to give a chance to combine what I've learned throughout this programme. The project is to create the ETL pipeline to create a database for useful insights and analysis. For example, is there any immigration age or nationality distribtuion? or do immigrates prefer warmer places?

#### The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd, re
import os
import glob

In [2]:
!pip install pyspark --upgrade
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, count, col, year, month, avg, isnull, round
from pyspark.sql.types import StringType, IntegerType

Collecting pyspark
Installing collected packages: pyspark
  Found existing installation: pyspark 2.4.3
    Can't uninstall 'pyspark'. No files were found to uninstall.
Successfully installed pyspark-3.1.2


In [3]:
# Create Spark session
spark = SparkSession.builder.getOrCreate()
df_spark =spark.read.load('./sas_data')
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
3 different sources will be used to create fact and dimension tables. 

#### Describe and Gather Data 
- I94 Immigration Data: comes from the U.S. National Tourism and Trade Office and contains various statistics on international visitor arrival in USA and comes from the US National Tourism and Trade Office. ([link](https://www.trade.gov/national-travel-and-tourism-office)).
- World Temperature Data: This dataset came from Kaggle and includes the information about the average weather temperatures. ([link](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)).
- U.S. City Demographic Data: This data comes from OpenSoft and includes the information about the demographics of US cities such as average age, gender distribution. ([link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)).

In [None]:
# Read the I94 immigration data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
imm_df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [5]:
imm_df.head(1)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [6]:
# Read the temperature data here
temp_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(temp_fname)

In [7]:
temp_df.head(1)

Row(dt='1743-11-01', AverageTemperature='6.068', AverageTemperatureUncertainty='1.7369999999999999', City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E')

In [8]:
# Read the demographic data here
demog_fname = 'us-cities-demographics.csv'
demog_df = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(demog_fname)

In [9]:
demog_df.head(1)

Row(City='Silver Spring', State='Maryland', Median Age='33.8', Male Population='40601', Female Population='41862', Total Population='82463', Number of Veterans='1562', Foreign-born='30908', Average Household Size='2.6', State Code='MD', Race='Hispanic or Latino', Count='25924')

In [10]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data

#### Cleaning Steps
Document steps necessary to clean the data

#### 2.1 i94 Immigration data exploration, assessment and cleaning

In [11]:
imm_df.head(10)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,...,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,...,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


In [12]:
imm_df.tail(10)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
3096303,4471817.0,2016.0,4.0,745.0,745.0,PHU,20567.0,3.0,IL,20569.0,...,,M,1958.0,10222016,F,,,94296290000.0,LAND,B2
3096304,4471819.0,2016.0,4.0,745.0,745.0,PHU,20567.0,3.0,IL,20569.0,...,,M,1969.0,10222016,F,,,94296430000.0,LAND,B2
3096305,5011591.0,2016.0,4.0,745.0,745.0,SKA,20570.0,3.0,US,20573.0,...,,M,1987.0,10182016,F,,,93975540000.0,00490,B1
3096306,4249464.0,2016.0,4.0,745.0,745.0,SUM,20566.0,3.0,CA,20571.0,...,,M,1978.0,10212016,M,,,94269090000.0,LAND,B1
3096307,5416391.0,2016.0,4.0,745.0,745.0,SUM,20572.0,3.0,MN,20577.0,...,,M,1971.0,10262016,M,,,94714800000.0,LAND,B1
3096308,625229.0,2016.0,4.0,745.0,745.0,SYS,20547.0,3.0,CA,,...,,,1980.0,5082016,,,,78934560000.0,00066,B2
3096309,1972204.0,2016.0,4.0,745.0,745.0,SYS,20554.0,3.0,CA,20555.0,...,,M,1980.0,9102016,F,,,90300540000.0,00066,B2
3096310,4249448.0,2016.0,4.0,745.0,745.0,TEC,20566.0,3.0,VA,20588.0,...,,M,1993.0,9202016,F,,,91416720000.0,00651,B2
3096311,5658953.0,2016.0,4.0,748.0,748.0,NEW,20573.0,3.0,MN,,...,,,1959.0,10282016,M,,,94887100000.0,LAND,B2
3096312,3106671.0,2016.0,4.0,123.0,749.0,NOG,20561.0,3.0,AZ,20567.0,...,,M,1958.0,7102016,M,,,56056870000.0,00866,WB


In [13]:
imm_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3096313 entries, 0 to 3096312
Data columns (total 28 columns):
cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port     object
arrdate     float64
i94mode     float64
i94addr     object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile    object
visapost    object
occup       object
entdepa     object
entdepd     object
entdepu     object
matflag     object
biryear     float64
dtaddto     object
gender      object
insnum      object
airline     object
admnum      float64
fltno       object
visatype    object
dtypes: float64(13), object(15)
memory usage: 661.4+ MB


In [14]:
# clean data and keep relevant columns

imm_df2 = df_spark[['cicid','i94yr','i94mon','i94port','i94mode','arrdate','depdate','i94bir','biryear','gender', 'i94visa']]

In [15]:
imm_df2 = imm_df.limit(6000).toPandas()

In [None]:
imm_df2.gender.value_counts()

In [16]:
# Convert dates from Sas to Pyspark

@udf(StringType())
def convert_dataframe(file):
    if file:
        return (datetime(1960,1,1).date()+timedelta(file).isoformat())
    return None

In [17]:
# Remove non values
imm_df3 = imm_df.dropna(how = 'any', subset = ['i94port','gender'])
imm_df3.count()

2682044

In [18]:
# Create cleaned immigration table
imm_df4 = imm_df3.select(col("cicid").alias("id"),
                                             col("i94yr").alias("year"),
                                             col("i94mon").alias("month"),
                                             col("i94port").alias("city_code"),
                                             col("i94mode").alias("travel_code"),
                                             col("arrdate").alias("arrival_date"),
                                             col("depdate").alias("departure_date"),
                                             col("i94bir").alias("age"),
                                             col("biryear").alias("birth_year"),
                                             col("gender").alias("gender"),
                                             col("i94visa").alias("travel_reason"))

In [19]:
df_immigration_clean = imm_df4.limit(6000).toPandas()
df_immigration_clean.head()

Unnamed: 0,cicid,i94yr,i94mon,i94port,i94mode,arrdate,depdate,i94bir,biryear,gender,i94visa
0,5748517.0,2016.0,4.0,LOS,1.0,20574.0,20582.0,40.0,1976.0,F,1.0
1,5748518.0,2016.0,4.0,LOS,1.0,20574.0,20591.0,32.0,1984.0,F,1.0
2,5748519.0,2016.0,4.0,LOS,1.0,20574.0,20582.0,29.0,1987.0,M,1.0
3,5748520.0,2016.0,4.0,LOS,1.0,20574.0,20588.0,29.0,1987.0,F,1.0
4,5748521.0,2016.0,4.0,LOS,1.0,20574.0,20588.0,28.0,1988.0,M,1.0


### 2.2 Temperature data exploration, assessment and cleaning

In [31]:
temp_df.count()

8599212

In [36]:
temp_df.head(10)

[Row(dt='1743-11-01', AverageTemperature='6.068', AverageTemperatureUncertainty='1.7369999999999999', City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1743-12-01', AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1744-01-01', AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1744-02-01', AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1744-03-01', AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1744-04-01', AverageTemperature='5.7879999999999985', AverageTemperatureUncertainty='3.6239999999999997', City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1744

In [33]:
# Clean country variable
temp_df1 = temp_df.filter(temp_df["Country"] == 'United States')
temp_df1.head()

Row(dt='1820-01-01', AverageTemperature='2.1010000000000004', AverageTemperatureUncertainty='3.217', City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W')

In [38]:
# Separate year and month
temp_df2 = temp_df1.withColumn("year", year(temp_df['dt'])) \
                                   .withColumn("month", month(temp_df["dt"]))

temp_df2.head(10)

[Row(dt='1820-01-01', AverageTemperature='2.1010000000000004', AverageTemperatureUncertainty='3.217', City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', year=1820, month=1),
 Row(dt='1820-02-01', AverageTemperature='6.926', AverageTemperatureUncertainty='2.853', City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', year=1820, month=2),
 Row(dt='1820-03-01', AverageTemperature='10.767', AverageTemperatureUncertainty='2.395', City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', year=1820, month=3),
 Row(dt='1820-04-01', AverageTemperature='17.988999999999994', AverageTemperatureUncertainty='2.202', City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', year=1820, month=4),
 Row(dt='1820-05-01', AverageTemperature='21.809', AverageTemperatureUncertainty='2.036', City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', year=1820, month=5),
 Row(dt='1820-06-

In [41]:
# clean data and keep relevant columns
temp_df3 = temp_df2[['City','Country','Latitude','Longitude','AverageTemperature','year','month']]
temp_df3.head(5)

[Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature='2.1010000000000004', year=1820, month=1),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature='6.926', year=1820, month=2),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature='10.767', year=1820, month=3),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature='17.988999999999994', year=1820, month=4),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature='21.809', year=1820, month=5)]

In [42]:
temp_df3

DataFrame[City: string, Country: string, Latitude: string, Longitude: string, AverageTemperature: string, year: int, month: int]

In [43]:
temp_df4 = temp_df3.withColumn('AverageTemperature', col('AverageTemperature').cast('float'))
temp_df4.head(5)

[Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature=2.1010000705718994, year=1820, month=1),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature=6.926000118255615, year=1820, month=2),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature=10.767000198364258, year=1820, month=3),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature=17.98900032043457, year=1820, month=4),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature=21.80900001525879, year=1820, month=5)]

In [44]:
# identify the most recent year of the temperature data because this is most relevant
max_yr = temp_df4.agg({"year":"max"}).collect()[0]
print(max_yr)

Row(max(year)=2013)


In [45]:
# only keep 2013 data
temp_df5 = temp_df4.filter(temp_df4["year"] == 2013)
temp_df5.head(10)

[Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature=6.320000171661377, year=2013, month=1),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature=8.116000175476074, year=2013, month=2),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature=12.503000259399414, year=2013, month=3),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature=15.753000259399414, year=2013, month=4),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature=22.545000076293945, year=2013, month=5),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature=27.83099937438965, year=2013, month=6),
 Row(City='Abilene', Country='United States', Latitude='32.95N', Longitude='100.53W', AverageTemperature=27.42099952697754, yea

In [46]:
@udf(StringType())
def city_abb(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

In [49]:
# Create City code
temp_df6 = temp_df5.withColumn("city_code", city_abb(temp_df5["City"]))
temp_df7 = temp_df6.dropna(how = 'any',subset=['city_code'])

In [52]:
temp_df7

DataFrame[City: string, Country: string, Latitude: string, Longitude: string, AverageTemperature: float, year: int, month: int, city_code: string]

In [53]:
# clean temp table
temp_df8 = temp_df7.select(col("City").alias("city"),
                           col("city_code").alias("city_code"),
                           col("Country").alias("country"),
                           col("year").alias("year"),
                           col("month").alias("month"),
                           col("Latitude").alias("latitude"),
                           col("Longitude").alias("longitude")
                          ).drop_duplicates()

In [54]:
df_temp_clean = temp_df8.limit(6000).toPandas()
df_temp_clean.head()

Py4JJavaError: An error occurred while calling o560.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 1 times, most recent failure: Lost task 0.0 in stage 38.0 (TID 50, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-46-4a7cf7a849f6>", line 3, in city_abb
NameError: name 'valid_ports' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3257)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3254)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
	at sun.reflect.GeneratedMethodAccessor53.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-46-4a7cf7a849f6>", line 3, in city_abb
NameError: name 'valid_ports' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)


### 2.3 Demographic data exploration, assessment and cleaning

In [None]:
demog_df.count()

In [None]:
demog_df.head()

In [None]:
demog_df1 = demog_df.withColumn("percentage_males", round((demog_df['Male Population'] / demog_df['Total Population']) * 100)) \
                    .withColumn("percentage_females", round((demog_df['Female Population'] / demog_df['Total Population']) * 100)) \
                    .withColumn("percentage_foreign_born", round((demog_df['Foreign-born'] / demog_df['Total Population']) * 100)) \
                    .withColumn("percentage_race", round((demog_df['Count'] / demog_df['Total Population'])) * 100)

In [None]:
demog_df1.head(5)

In [None]:
# Create cleaned demographic table 
demog_df2 = demog_df1.select(col("City").alias("city"),
                                  col("Race").alias("race"),
                                  col("percentage_males").alias("percentage_males"),
                                  col("percentage_females").alias("percentage_females"),
                                  col("percentage_foreign_born").alias("percentage_foreign_born"))

In [None]:
df_demog_clean = demog_df2.limit(6000).toPandas()
df_demog_clean.head()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Fact table will contain information from the I94 immigration data joined with the city temperature data on i94port and demographic data on:

i94_df
* id
* city_code


The first dimension table will contain events from the I94 immigration data. The columns below will be extracted from the immigration dataframe:

immigration_df
* id
* city_code
* year
* month
* arrival_date
* departure_date
* birth_year
* age
* gender
* travel_reason

The second dimension table will contain city temperature data. The columns below will be extracted from the temperature dataframe:

temperature_df
* city_code
* average_temperature
* city
* country
* latitude
* longitude

The third dimension table will contain demongraphic data. The columns below will be extracted from the demographic dataframe:

demographic_df
* city_code
* city
* race
* percentage_males
* percentage_females
* percentage_foreign_born


#### 3.2 Mapping Out Data Pipelines
The pipeline steps are described below:

* Clean I94 data as described in step 2 to create Spark dataframe df_immigration for each month
* Clean temperature data as described in step 2 to create Spark dataframe df_temp (already performed)
* Create immigration dimension table 
* Create temperature dimension table
* Create demographic dimension table
* Create fact table by joining immigration, temperature dimension table and demographic tables on i94port and write to parquet file partitioned by i94port

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# First dimension table
immigration_df = df_immigration3.select(col("cicid").alias("id"),
                                             col("i94port").alias("city_code"),
                                             col("i94yr").alias("year"),
                                             col("i94mon").alias("month"),
                                             col("arrdate").alias("arrival_date"),
                                             col("depdate").alias("departure_date"),
                                             col("i94bir").alias("age"),
                                             col("biryear").alias("birth_year"),
                                             col("gender").alias("gender"),
                                             col("i94visa").alias("travel_reason"))
immigration_df.head(5)

In [None]:
# Seconf dimension table
temperature_df = 

In [None]:
# Third dimension table
demographic_df = 

In [None]:
# Fact table
i94_df = 

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    
    Output: Print outcome of data quality check
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(immigration_df, "immigration table")
quality_check(temp_df, "temperature table")
quality_check(demog_df, "demographic table")

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

* Propose how often the data should be updated and why.

* Write a description of how you would approach the problem differently under the following scenarios:
     * The data was increased by 100x

     * The data populates a dashboard that must be updated on a daily basis by 7am every day

     * The database needed to be accessed by 100+ people
     