# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, row_number, regexp_replace, upper, trim, split, desc
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, hour, weekofyear, date_format, monotonically_increasing_id
from pyspark.sql.types import TimestampType as TS, LongType as Long, StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date

In [2]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

## Immigration Data

In [2]:
# Read in the data here
path = 'immigration_data_sample.csv'
df = pd.read_csv(path, encoding="ISO-8859-1")

In [3]:
df.shape

(1000, 29)

In [4]:
df.describe()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,dtadfile,entdepu,biryear,insnum,admnum
count,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,951.0,1000.0,1000.0,1000.0,1000.0,0.0,1000.0,35.0,1000.0
mean,1542097.0,3040461.0,2016.0,4.0,302.928,298.262,20559.68,1.078,20575.037855,42.382,1.859,1.0,20160420.0,,1973.618,3826.857143,69372370000.0
std,915287.9,1799818.0,0.0,0.0,206.485285,202.12039,8.995027,0.485955,24.211234,17.903424,0.386353,0.0,49.51657,,17.903424,221.742583,23381340000.0
min,10925.0,13208.0,2016.0,4.0,103.0,103.0,20545.0,1.0,20547.0,1.0,1.0,1.0,20160400.0,,1923.0,3468.0,0.0
25%,721442.2,1412170.0,2016.0,4.0,135.0,131.0,20552.0,1.0,20561.0,30.75,2.0,1.0,20160410.0,,1961.0,3668.0,55993010000.0
50%,1494568.0,2941176.0,2016.0,4.0,213.0,213.0,20560.0,1.0,20570.0,42.0,2.0,1.0,20160420.0,,1974.0,3887.0,59314770000.0
75%,2360901.0,4694151.0,2016.0,4.0,438.0,438.0,20567.25,1.0,20580.0,55.0,2.0,1.0,20160420.0,,1985.25,3943.0,93436230000.0
max,3095749.0,6061994.0,2016.0,4.0,746.0,696.0,20574.0,9.0,20715.0,93.0,3.0,1.0,20160800.0,,2015.0,4686.0,95021510000.0


In [5]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float64
airline       967 non

In [6]:
df[df.columns[df.isna().any()]].isna().sum()

i94addr       59
depdate       49
visapost     618
occup        996
entdepd       46
entdepu     1000
matflag       46
gender       141
insnum       965
airline       33
fltno          8
dtype: int64

In [7]:
df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


## Airport Codes

In [15]:
path_codes = 'airport-codes_csv.csv'
df_codes = pd.read_csv(path_codes, encoding="ISO-8859-1")

In [16]:
df_codes.shape

(55075, 12)

In [17]:
df_codes.describe()

Unnamed: 0,elevation_ft
count,48069.0
mean,1240.789677
std,1602.363459
min,-1266.0
25%,205.0
50%,718.0
75%,1497.0
max,22000.0


In [18]:
df_codes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [19]:
df_codes.local_code.unique()

array(['00A', '00AA', '00AK', ..., 'FAWT', 'ZEN', 'ZNC'], dtype=object)

## US Cities Demographics

In [3]:
path_city = 'us-cities-demographics.csv'
df_city = pd.read_csv(path_city, delimiter = ';', encoding="ISO-8859-1")

In [4]:
df_city.shape

(2891, 12)

In [5]:
df_city.describe()

Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891.0
mean,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,48963.77
std,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,144385.6
min,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,98.0
25%,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,3435.0
50%,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,13780.0
75%,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


In [6]:
df_city.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [7]:
df_city.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [18]:
df_city.State.unique()

array(['Maryland', 'Massachusetts', 'Alabama', 'California', 'New Jersey',
       'Illinois', 'Arizona', 'Missouri', 'North Carolina', 'Pennsylvania',
       'Kansas', 'Florida', 'Texas', 'Virginia', 'Nevada', 'Colorado',
       'Michigan', 'Connecticut', 'Minnesota', 'Utah', 'Arkansas',
       'Tennessee', 'Oklahoma', 'Washington', 'New York', 'Georgia',
       'Nebraska', 'Kentucky', 'South Carolina', 'Louisiana', 'New Mexico',
       'Iowa', 'Rhode Island', 'Puerto Rico', 'District of Columbia',
       'Wisconsin', 'Oregon', 'New Hampshire', 'North Dakota', 'Delaware',
       'Ohio', 'Idaho', 'Indiana', 'Alaska', 'Mississippi', 'Hawaii',
       'South Dakota', 'Maine', 'Montana'], dtype=object)

In [13]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [15]:
#write to parquet
#df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

In [16]:
df_spark

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string]

In [27]:
df['i94addr'].unique()

array(['HI', 'TX', 'FL', 'CA', 'NY', 'GA', 'IL', nan, 'MA', 'NV', 'PA',
       'GU', 'NC', 'NJ', 'VT', 'WA', 'NE', 'VA', 'MP', 'IN', 'MO', 'MI',
       'OR', 'MN', 'UN', 'ID', 'AZ', 'KY', 'SC', 'MS', 'MD', 'TN', 'OH',
       'CT', 'KS', 'DC', 'IA', 'LA', 'VQ', 'PR', 'CO', 'AL', 'SW', 'NM',
       'UT', 'OK', 'NH', 'TE', 'ME', 'AR', 'RI', 'WI'], dtype=object)

In [42]:
import numpy as np

In [46]:
df.i94addr.unique()

array(['HI', 'TX', 'FL', 'CA', 'NY', 'GA', 'IL', nan, 'MA', 'NV', 'PA',
       'GU', 'NC', 'NJ', 'VT', 'WA', 'NE', 'VA', 'MP', 'IN', 'MO', 'MI',
       'OR', 'MN', 'UN', 'ID', 'AZ', 'KY', 'SC', 'MS', 'MD', 'TN', 'OH',
       'CT', 'KS', 'DC', 'IA', 'LA', 'VQ', 'PR', 'CO', 'AL', 'SW', 'NM',
       'UT', 'OK', 'NH', 'TE', 'ME', 'AR', 'RI', 'WI'], dtype=object)

In [51]:
df_city['State Code'].unique()

array(['MD', 'MA', 'AL', 'CA', 'NJ', 'IL', 'AZ', 'MO', 'NC', 'PA', 'KS',
       'FL', 'TX', 'VA', 'NV', 'CO', 'MI', 'CT', 'MN', 'UT', 'AR', 'TN',
       'OK', 'WA', 'NY', 'GA', 'NE', 'KY', 'SC', 'LA', 'NM', 'IA', 'RI',
       'PR', 'DC', 'WI', 'OR', 'NH', 'ND', 'DE', 'OH', 'ID', 'IN', 'AK',
       'MS', 'HI', 'SD', 'ME', 'MT'], dtype=object)

In [2]:
path_sas = 'country_code.csv'

In [3]:
df_country = pd.read_csv(path_sas, delimiter = '|')

In [5]:
df_country.shape

(289, 1)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### GET THE DIMENSIONS

In [55]:
def cleaning_data(x):
    temp = x
    temp = temp.str.strip()
    temp = temp.str.upper()
    temp = temp.str.replace("'", "")
    temp = temp.str.replace(",|-|\.", "")
    temp = temp.str.replace(".*UNKNOWN.*|.*NO PORT.*|INVALID.*|COLLAPSED.*|NO COUNTRY.*", "TO BE DETERMINATED", regex = True)
    return temp

### Country Code

In [232]:
path = 'sources/country_code.csv'
df_country = pd.read_csv(path, delimiter = '=', engine='python')

In [233]:
df_country.columns = df_country.columns.str.strip()
df_country.columns = df_country.columns.str.replace("'", "")

In [234]:
df_country = df_country.apply(lambda x:  cleaning_data(x) if x.dtypes == 'object' else x)

In [235]:
df_country.shape

(289, 2)

In [236]:
df_country = df_country.dropna()
df_country = df_country.drop_duplicates()

In [237]:
df_country.shape

(289, 2)

In [238]:
df_country.head()

Unnamed: 0,Code,Country
0,582,MEXICO AIR SEA AND NOT REPORTED (I94 NO LAND A...
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [239]:
to_ins = spark.createDataFrame(df_country)

In [240]:
to_ins.write.mode('overwrite').parquet('dimensions/dim_countries')

### Port Code

In [154]:
path = 'stages/port_code.csv'
df_port = pd.read_csv(path, delimiter = '=', engine = 'python')

In [155]:
df_port.shape

(660, 2)

In [156]:
df_port.columns = df_port.columns.str.strip()
df_port.columns = df_port.columns.str.replace("'", "")

In [157]:
df_port.CITY = df_port.CITY.str.replace(',', '|')

In [158]:
df_port = df_port.apply(lambda x: cleaning_data(x) if x.dtypes == 'object' else x)

In [159]:
#chunk = df_port[~df_port.PORT.isin(df_codes.local_code.unique())].copy()
df_port.shape

(660, 2)

In [160]:
chunk = df_port.copy()

In [161]:
temp = chunk.CITY.str.split('|', expand = True).copy()

In [162]:
temp = temp.apply(lambda x: x.str.strip())

In [163]:
temp.columns = ['CITY', 'STATE', 'OTHER']

In [164]:
temp.fillna("TO BE DETERMINATED", inplace = True)

In [165]:
temp2 = temp[(temp.STATE.str.len() > 2) & \
     (temp.STATE.str.contains("\(", regex = True) == False) & \
     (temp.STATE != 'TO BE DETERMINATED')].copy()

In [166]:
temp3 = temp2[(temp2.OTHER != 'TO BE DETERMINATED') & (temp2.OTHER.str.len() > 2)].copy()

In [167]:
temp2['CITY'] = temp2['CITY'] + ' ' + temp2['STATE']

In [168]:
temp2['STATE'] = temp2['OTHER']

In [169]:
temp3['CITY'] = temp3['CITY'] + ' ' + temp3['STATE'] + ' ' + temp3['OTHER']

In [170]:
temp3['STATE'] = 'TO BE DETERMINATED'

In [171]:
temp2.loc[temp3.index, :] = temp3

In [172]:
temp2['OTHER'] = 'TO BE DETERMINATED'

In [173]:
temp.loc[temp2.index, :] = temp2

In [174]:
chunk.loc[temp.index, 'CITY'] = temp['CITY']

In [175]:
chunk['STATE'] = temp['STATE']

In [176]:
chunk.isna().sum()

PORT     0
CITY     0
STATE    0
dtype: int64

In [177]:
chunk.columns = ['local_code', 'municipality', 'state']

In [178]:
chunk.head()

Unnamed: 0,local_code,municipality,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


In [203]:
dim_airports = spark.read.parquet("dimensions/dim_airports")

In [180]:
dim_airports.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- state: string (nullable = true)
 |-- local_code: string (nullable = true)



In [181]:
chunk['ident'] = 'TO BE DETERMINATED'
chunk['type'] = 'TO BE DETERMINATED'
chunk['name'] = 'TO BE DETERMINATED'
chunk['elevation_ft'] = -1
chunk['iso_country'] = 'TO BE DETERMINATED'
chunk['gps_code'] = 'TO BE DETERMINATED'
chunk['iata_code'] = 'TO BE DETERMINATED'
chunk['latitude'] = -1.0
chunk['longitude'] = -1.0

In [182]:
chunk = chunk[dim_airports.columns]

In [202]:
to_ins = spark.createDataFrame(chunk)

In [184]:
to_ins.columns

['ident',
 'type',
 'name',
 'elevation_ft',
 'iso_country',
 'municipality',
 'gps_code',
 'iata_code',
 'latitude',
 'longitude',
 'state',
 'local_code']

In [204]:
to_ins = to_ins.join(dim_airports, to_ins.local_code == dim_airports.local_code, "left_outer") \
        .where(dim_airports.local_code.isNull()) \
        .select(to_ins.ident, to_ins.type, to_ins.name, to_ins.elevation_ft, to_ins.iso_country, \
               to_ins.municipality, to_ins.gps_code, to_ins.iata_code, to_ins.latitude, \
               to_ins.longitude, to_ins.state, to_ins.local_code)

In [205]:
to_ins.count()

355

In [208]:
to_ins.write.mode('append').partitionBy(['state', 'local_code']).parquet('dimensions/dim_airports')

In [187]:
dim_airports.count()

21591

### Mode Stage

In [211]:
path = 'sources/mode_code.csv'
df_code = pd.read_csv(path, delimiter = '=', engine = 'python')

In [212]:
df_code.columns = df_code.columns.str.strip()

In [213]:
df_code.columns = df_code.columns.str.replace("'", "")
df_code.columns = df_code.columns.str.lower()

In [214]:
df_code = df_code.apply(lambda x:  cleaning_data(x) if x.dtypes == 'object' else x)

In [215]:
df_code

Unnamed: 0,code,mode
0,1,AIR
1,2,SEA
2,3,LAND
3,9,NOT REPORTED


In [216]:
to_ins = spark.createDataFrame(df_code)

In [217]:
to_ins.write.mode('overwrite').parquet('dimensions/dim_modes')

### City Code

In [107]:
path = 'sources/city_code.csv'
df_city = pd.read_csv(path, delimiter = '=', engine = 'python')

In [108]:
df_city.columns = df_city.columns.str.strip()
df_city.columns = df_city.columns.str.lower()
df_city.columns = df_city.columns.str.replace("'", "")

In [109]:
df_city = df_city.apply(lambda x: cleaning_data(x) if x.dtypes == 'object' else x)

In [110]:
df_city.head()

Unnamed: 0,code,city
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [111]:
df_city.columns = ['state_code', 'city']

In [112]:
dim_city = spark.read.parquet("dimensions/dim_cities")

In [113]:
dim_city.limit(2).toPandas()

Unnamed: 0,city,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,count,state
0,AUGUSTARICHMOND COUNTY CONSOLIDATED GOVERNMENT,33.7,94662,101917,196579,19085,7915,2.67,GA,AMERICAN INDIAN AND ALASKA NATIVE,1667,GEORGIA
1,LOUISVILLE/JEFFERSON COUNTY METRO GOVERNMENT,37.5,298451,316938,615389,39364,37875,2.45,KY,AMERICAN INDIAN AND ALASKA NATIVE,4585,KENTUCKY


In [114]:
df_city['state'] = 'TO BE DETERMINATED'
df_city['median_age'] = -1.0
df_city['male_population'] = -1
df_city['female_population'] = -1
df_city['total_population'] = -1
df_city['number_of_veterans'] = -1
df_city['foreign_born'] = -1
df_city['average_household_size'] = -1.0
df_city['race'] = 'TO BE DETERMINATED'
df_city['count'] = -1

In [115]:
df_city = df_city[dim_city.columns]

In [116]:
dim_city.printSchema()

root
 |-- city: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- race: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- state: string (nullable = true)



In [117]:
to_ins = spark.createDataFrame(df_city)

In [118]:
to_ins = to_ins.join(dim_city, to_ins.state_code == dim_city.state_code, "left_outer") \
        .where(dim_city.state_code.isNull()) \
        .select(to_ins.city, to_ins.state, to_ins.median_age, to_ins.male_population, to_ins.female_population, \
               to_ins.total_population, to_ins.number_of_veterans, to_ins.foreign_born, to_ins.average_household_size, \
               to_ins.state_code, to_ins.race, to_ins['count'])

In [119]:
to_ins =  to_ins.filter(to_ins.state_code.isNotNull()) \
                           .dropDuplicates(subset=['state_code', 'race', 'city'])

In [122]:
to_ins.write.mode('append').partitionBy('state').parquet('dimensions/dim_cities')

### Visa Stage

In [219]:
path = 'sources/visa_code.csv'
df_visa = pd.read_csv(path, delimiter = '=', engine = 'python')

In [220]:
df_visa.columns = df_visa.columns.str.strip()
df_visa.columns = df_visa.columns.str.replace("'", "")

In [221]:
df_visa = df_visa.apply(lambda x:  cleaning_data(x) if x.dtypes == 'object' else x)

In [222]:
df_visa

Unnamed: 0,Code,Visa
0,1,BUSINESS
1,2,PLEASURE
2,3,STUDENT


In [223]:
to_ins = spark.createDataFrame(df_visa)

In [224]:
to_ins.write.mode('overwrite').parquet('dimensions/dim_visa')

### Get stages to dimensions

In [62]:
path = 'airport-codes_csv.csv'

In [63]:
sp = pd.read_csv(path, engine = "python", encoding="ISO-8859-1")

In [64]:
df = sp[sp['iso_country'] == 'US'].copy()

In [65]:
df = df[~df['local_code'].isna()]

In [66]:
temp = df['coordinates'].str.split(',', expand = True).copy()

In [67]:
temp.columns = ['latitude', 'longitude']

In [68]:
df.drop(labels = 'coordinates', axis = 1, inplace = True)

In [69]:
df = df.join(temp)

In [79]:
df.columns

Index(['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country',
       'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code',
       'latitude', 'longitude'],
      dtype='object')

In [81]:
df['city_code'] = df['iso_region'].str.split('-', expand = True)[1].str.strip()

In [82]:
df.drop(labels = 'iso_region', axis = 1, inplace = True)

In [84]:
df = df.apply(lambda x:  cleaning_data(x) if x.dtypes == 'object' else x)

In [86]:
df.isna().sum()

ident               0
type                0
name                0
elevation_ft      151
continent       21235
iso_country         0
municipality       21
gps_code          314
iata_code       19267
local_code          0
latitude            0
longitude           0
city_code           0
dtype: int64

In [88]:
df.drop(labels = 'continent', axis = 1, inplace = True)

In [95]:
df.fillna('TO BE DETERMINATED', inplace = True)

In [97]:
df.isna().sum()

ident           0
type            0
name            0
elevation_ft    0
iso_country     0
municipality    0
gps_code        0
iata_code       0
local_code      0
latitude        0
longitude       0
city_code       0
dtype: int64

In [98]:
df.head()

Unnamed: 0,ident,type,name,elevation_ft,iso_country,municipality,gps_code,iata_code,local_code,latitude,longitude,city_code
0,00A,HELIPORT,TOTAL RF HELIPORT,11,US,BENSALEM,00A,TO BE DETERMINATED,00A,7493360137939453,4007080078125,PA
1,00AA,SMALL_AIRPORT,AERO B RANCH AIRPORT,3435,US,LEOTI,00AA,TO BE DETERMINATED,00AA,101473911,38704022,KS
2,00AK,SMALL_AIRPORT,LOWELL FIELD,450,US,ANCHOR POINT,00AK,TO BE DETERMINATED,00AK,151695999146,5994919968,AK
3,00AL,SMALL_AIRPORT,EPPS AIRPARK,820,US,HARVEST,00AL,TO BE DETERMINATED,00AL,8677030181884766,3486479949951172,AL
5,00AS,SMALL_AIRPORT,FULTON AIRPORT,1100,US,ALEX,00AS,TO BE DETERMINATED,00AS,978180194,349428028,OK


### Cities Demographics

In [75]:
path = 'us-cities-demographics.csv'

In [76]:
citySchema = R([
    Fld("city", Str()),
    Fld("state", Str()),
    Fld("median_age", Dbl()),
    Fld("male_population", Int()),
    Fld("female_population", Int()),
    Fld("total_population", Int()),
    Fld("number_of_veterans", Int()),
    Fld("foreign_born", Int()),
    Fld("average_household_size", Dbl()),
    Fld("state_code", Str()),
    Fld("race", Str()),
    Fld("count", Int())
])

In [77]:
df = spark.read.option("delimiter", ";").csv(path, header = True, schema = citySchema).distinct()

In [78]:
df.count()

2891

In [79]:
df.limit(5).toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,count
0,San Diego,California,34.5,693826,701081,1394907,92489,373842,2.73,CA,White,949388
1,Highlands Ranch,Colorado,39.6,49186,53281,102467,4840,8827,2.72,CO,Asian,5650
2,Lakeland,Florida,38.1,47840,56570,104410,7390,11592,2.56,FL,Asian,3348
3,Rockford,Illinois,36.3,71076,78270,149346,8894,18323,2.52,IL,Black or African-American,33122
4,Davenport,Iowa,35.2,50123,52454,102577,7090,4065,2.39,IA,White,88145


In [101]:
df_clean = df

In [102]:
for field in df_clean.schema:
    if field.dataType == Str():
        df_clean = df_clean.withColumn(field.name, upper(col(field.name)))
        df_clean = df_clean.withColumn(field.name, trim(col(field.name)))
        df_clean = df_clean.withColumn(field.name, regexp_replace(col(field.name), "'", ""))
        df_clean = df_clean.withColumn(field.name, regexp_replace(col(field.name), ",|-|\.", ""))
        df_clean = df_clean.withColumn(field.name, regexp_replace(col(field.name), ".*UNKNOWN.*|.*NO PORT.*|INVALID.*|COLLAPSED.*|NO COUNTRY.*", "TO BE DETERMINATED"))
        df_clean = df_clean.fillna("TO BE DETERMINATED", subset=[field.name])

In [103]:
df_clean.limit(5).toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,count
0,SAN DIEGO,CALIFORNIA,34.5,693826,701081,1394907,92489,373842,2.73,CA,WHITE,949388
1,HIGHLANDS RANCH,COLORADO,39.6,49186,53281,102467,4840,8827,2.72,CO,ASIAN,5650
2,LAKELAND,FLORIDA,38.1,47840,56570,104410,7390,11592,2.56,FL,ASIAN,3348
3,ROCKFORD,ILLINOIS,36.3,71076,78270,149346,8894,18323,2.52,IL,BLACK OR AFRICANAMERICAN,33122
4,DAVENPORT,IOWA,35.2,50123,52454,102577,7090,4065,2.39,IA,WHITE,88145


In [104]:
df_clean =  df_clean.filter(df_clean.state_code.isNotNull()) \
                           .dropDuplicates(subset=['state_code', 'race', 'city'])

In [105]:
df_clean.count()

2891

In [106]:
df_clean.write.mode('overwrite').partitionBy('state').parquet('dimensions/dim_cities')

## Airports Dim

In [189]:
path = 'stages/airport-codes_csv.csv'

In [190]:
airportSchema = R([
    Fld("ident", Str()),
    Fld("type", Str()),
    Fld("name", Str()),
    Fld("elevation_ft", Int()),
    Fld("continent", Str()),
    Fld("iso_country", Str()),
    Fld("iso_region", Str()),
    Fld("municipality", Str()),
    Fld("gps_code", Str()),
    Fld("iata_code", Str()),
    Fld("local_code", Str()),
    Fld("coordinates", Str())
])

In [191]:
df = spark.read.option("delimiter", ",").csv(path, header = True, schema = airportSchema).distinct()

In [192]:
df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,01CL,small_airport,Swansboro Country Airport,2594,,US,US-CA,Placerville,01CL,,01CL,"-120.73400115966797, 38.79990005493164"
1,03VA,closed,Whipoorwill Springs Airport,250,,US,US-VA,Nokesville,03VA,,03VA,"-77.57969665527344, 38.66460037231445"
2,05VA,small_airport,Providence Airport,445,,US,US-VA,Farmville,05VA,,05VA,"-78.4186019897461, 37.22420120239258"
3,0AK8,closed,Pollux Heliport,515,,US,US-AK,North Pole,,,,"-147.497501, 64.895835"
4,0AR2,small_airport,Mission Field-Marotti Memorial Airport,217,,US,US-AR,Crawfordsville,0AR2,,0AR2,"-90.35009765625, 35.26679992675781"


In [193]:
df = df.withColumn('state', split(col('iso_region'),'-')[1]) \
       .withColumn('latitude', split(col('coordinates'), ',')[0].cast(Dbl())) \
       .withColumn('longitude', split(col('coordinates'), ',')[1].cast(Dbl()))

In [194]:
df = df.filter("UPPER(iso_country) == 'US'") \
       .drop('iso_region') \
       .drop('coordinates') \
       .drop('continent')

In [195]:
df.count()

22757

In [196]:
df = df.filter("local_code is not null")

In [197]:
df.count()

21236

In [198]:
df_clean = df

In [199]:
for field in df_clean.schema:
    if field.dataType == Str():
        df_clean = df_clean.withColumn(field.name, upper(col(field.name)))
        df_clean = df_clean.withColumn(field.name, trim(col(field.name)))
        df_clean = df_clean.withColumn(field.name, regexp_replace(col(field.name), "'", ""))
        df_clean = df_clean.withColumn(field.name, regexp_replace(col(field.name), ",|-|\.", ""))
        df_clean = df_clean.withColumn(field.name, regexp_replace(col(field.name), ".*UNKNOWN.*|.*NO PORT.*|INVALID.*|COLLAPSED.*|NO COUNTRY.*", "TO BE DETERMINATED"))
        df_clean = df_clean.fillna("TO BE DETERMINATED", subset=[field.name])

In [200]:
df_clean.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,iso_country,municipality,gps_code,iata_code,local_code,state,latitude,longitude
0,01CL,SMALL_AIRPORT,SWANSBORO COUNTRY AIRPORT,2594,US,PLACERVILLE,01CL,TO BE DETERMINATED,01CL,CA,-120.734001,38.7999
1,03VA,CLOSED,WHIPOORWILL SPRINGS AIRPORT,250,US,NOKESVILLE,03VA,TO BE DETERMINATED,03VA,VA,-77.579697,38.6646
2,05VA,SMALL_AIRPORT,PROVIDENCE AIRPORT,445,US,FARMVILLE,05VA,TO BE DETERMINATED,05VA,VA,-78.418602,37.224201
3,0AR2,SMALL_AIRPORT,MISSION FIELDMAROTTI MEMORIAL AIRPORT,217,US,CRAWFORDSVILLE,0AR2,TO BE DETERMINATED,0AR2,AR,-90.350098,35.2668
4,0IS9,HELIPORT,BERNARDIN HELIPORT,830,US,WEST BROOKLYN,0IS9,TO BE DETERMINATED,0IS9,IL,-89.202904,41.708401


In [201]:
df_clean.write.mode('overwrite').partitionBy(['state', 'local_code']).parquet('dimensions/dim_airports')

### Fact Immigration

In [11]:
dim_country = spark.read.parquet('dimensions/dim_countries')
dim_visa    = spark.read.parquet('dimensions/dim_visa')
dim_mode    = spark.read.parquet('dimensions/dim_modes')
dim_airport = spark.read.parquet('dimensions/dim_airports')
dim_city    = spark.read.parquet('dimensions/dim_cities')

In [15]:
dim_city = dim_city.dropDuplicates(subset=['state_code'])

In [16]:
#df = spark.read.option("delimiter", ",").csv(path, header = True).distinct()
df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [17]:
import datetime as dt

In [18]:
format_dt_sas = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat()if x else dt.datetime(1960,1,1).date().isoformat())

In [19]:
df = df.withColumn("arrdate", format_dt_sas(df.arrdate)) \
       .withColumn("depdate", format_dt_sas(df.depdate))

In [20]:
for field in df.dtypes:
    if field[1] == 'string':
        df = df.withColumn(field[0], upper(col(field[0])))
        df = df.withColumn(field[0], trim(col(field[0])))
        df = df.withColumn(field[0], regexp_replace(col(field[0]), "'", ""))
        df = df.withColumn(field[0], regexp_replace(col(field[0]), ",|-|\.", ""))
        df = df.withColumn(field[0], regexp_replace(col(field[0]), ".*UNKNOWN.*|.*NO PORT.*|INVALID.*|COLLAPSED.*|NO COUNTRY.*", "TO BE DETERMINATED"))
        df = df.fillna("TO BE DETERMINATED", subset=[field[0]])

In [21]:
df = df.fillna(-1)

In [141]:
df.limit(2).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20160429,-1.0,TO BE DETERMINATED,19600101,...,U,TO BE DETERMINATED,1979.0,10282016,TO BE DETERMINATED,TO BE DETERMINATED,TO BE DETERMINATED,1897628000.0,TO BE DETERMINATED,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20160407,1.0,AL,19600101,...,Y,TO BE DETERMINATED,1991.0,D/S,M,TO BE DETERMINATED,TO BE DETERMINATED,3736796000.0,00296,F1


In [22]:
dim_country.createOrReplaceTempView("country")
dim_visa.createOrReplaceTempView("visa")
dim_mode.createOrReplaceTempView("mode")
dim_airport.createOrReplaceTempView("airport")
dim_city.createOrReplaceTempView("city")
df.createOrReplaceTempView("fact")

In [None]:
fact = spark.sql("""
SELECT 
    f.i94yr as yr,
    f.i94mon as mnth,
    c.Code as cty_cntry,
    f.i94res as resdnc_cntry,
    a.local_code as prt,
    f.arrdate as arrvl_dt,
    m.code as arrvl_md,
    ci.state_code as us_stt,
    f.depdate as dprtr_dt,
    f.i94bir as rpndnt_age,
    v.Code as vs_typ_id,
    f.count as cnt,
    f.dtadfile as dt_fl,
    f.visapost as vs_issd_stt,
    f.occup as occptn,
    f.entdepa as arrvl_flg,
    f.entdepd as dprtr_flg,
    f.entdepu as updt_flg,
    f.matflag as arrvl_dprtr_flg,
    f.biryear as brth_yr,
    f.dtaddto as allwd_dt,
    f.gender as gndr,
    f.insnum as ins_nmbr,
    f.airline as arln,
    f.admnum as admssn_nmbr,
    f.fltno as flght_nmbr,
    f.visatype as vs_typ
FROM fact f inner join country c on f.i94cit = c.Code
     inner join airport a on f.i94port = a.local_code
     inner join mode m on f.i94mode = m.code
     inner join city ci on f.i94addr = ci.state_code
     inner join visa v on f.i94visa = v.Code
""")

In [None]:
fact.limit(10).toPandas()

In [29]:
dim_visa.printSchema()

root
 |-- Code: long (nullable = true)
 |-- Visa: string (nullable = true)



In [150]:
fact.limit(100).toPandas()

Unnamed: 0,yr,mnth,cty_cntry,resdnc_cntry,prt,arrvl_dt,arrvl_md,us_stt,dprtr_dt,rpndnt_age,...,updt_flg,arrvl_dprtr_flg,brth_yr,allwd_dt,gndr,ins_nmbr,arln,admssn_nmbr,flght_nmbr,vs_typ
0,2016.0,4.0,101,101.0,WAS,20160401,1,MI,20160825,55.0,...,TO BE DETERMINATED,M,1961.0,09302016,M,TO BE DETERMINATED,OS,6.666432e+08,93,B2
1,2016.0,4.0,101,101.0,NYC,20160401,1,MA,20160423,28.0,...,TO BE DETERMINATED,M,1988.0,09302016,TO BE DETERMINATED,TO BE DETERMINATED,AA,9.246846e+10,00199,B2
2,2016.0,4.0,101,101.0,NYC,20160401,1,MA,20160423,4.0,...,TO BE DETERMINATED,M,2012.0,09302016,TO BE DETERMINATED,TO BE DETERMINATED,AA,9.246846e+10,00199,B2
3,2016.0,4.0,101,101.0,NYC,20160401,1,MI,20160411,57.0,...,TO BE DETERMINATED,M,1959.0,09302016,TO BE DETERMINATED,TO BE DETERMINATED,AZ,9.247104e+10,00602,B1
4,2016.0,4.0,101,101.0,NYC,20160401,1,NJ,20160414,63.0,...,TO BE DETERMINATED,M,1953.0,09302016,TO BE DETERMINATED,TO BE DETERMINATED,AZ,9.247140e+10,00602,B2
5,2016.0,4.0,101,101.0,NYC,20160401,1,NJ,20160414,57.0,...,TO BE DETERMINATED,M,1959.0,09302016,TO BE DETERMINATED,TO BE DETERMINATED,AZ,9.247161e+10,00602,B2
6,2016.0,4.0,101,101.0,NYC,20160401,1,NY,20160409,46.0,...,TO BE DETERMINATED,M,1970.0,09302016,TO BE DETERMINATED,TO BE DETERMINATED,AZ,9.247080e+10,00602,B2
7,2016.0,4.0,101,101.0,NYC,20160401,1,NY,20160418,48.0,...,TO BE DETERMINATED,M,1968.0,09302016,TO BE DETERMINATED,TO BE DETERMINATED,AZ,9.247849e+10,00608,B1
8,2016.0,4.0,101,101.0,NYC,20160401,1,NY,20160805,52.0,...,TO BE DETERMINATED,M,1964.0,09302016,TO BE DETERMINATED,TO BE DETERMINATED,TK,9.250139e+10,00001,B2
9,2016.0,4.0,101,101.0,TOR,20160401,1,MO,20160410,33.0,...,TO BE DETERMINATED,M,1983.0,09302016,TO BE DETERMINATED,TO BE DETERMINATED,MQ,9.249091e+10,03348,B2


In [151]:
fact.write.mode("overwrite").partitionBy('yr', 'mnth', 'us_stt').parquet('facts/fact_immigration')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.