# Project Title
### Data Engineering Capstone Project

#### Project Summary
- This project aims to collect and find data sources to use for analytics tables, app back-end, and source-of-truth databases.
- The data will be checked for quality issues and necessary cleaning steps will be noted. The next steps will involve creating a data model and pipelines, while ensuring that the data quality checks and integrity constraints are in place.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, substring, udf, sum
from pyspark.sql.types import *

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
spark=create_spark_session()

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, I aim to analyze and model four different datasets to prepare them for end use cases such as analytics tables, app back-end, and source-of-truth databases. I will identify and gather data from at least two sources, which include immigration data to the United States, airport codes, demographic data of U.S. cities, and temperature data. I will explore and assess the data to identify any data quality issues, document necessary cleaning steps, and define a data model. The data model will be pipelined with the data and ETL will be run to create the data pipelines and the data model. Additionally, I will ensure data quality checks, integrity constraints, and perform unit tests for the scripts to ensure that they are functioning correctly.

#### Describe and Gather Data 
The four datasets used in this project are:

***Immigration data to the United States (df_Immigration_sample)***

***Temperature data (df_temp)***

***Demographic data of U.S. cities (df_demographics)***

***Airport codes (df_airport_codes)***

The immigration data is a sample dataset that includes information on immigrants to the United States, including their country of origin, mode of transportation, and arrival date. The temperature data contains information on the temperatures of various cities around the world. The demographic data of U.S. cities includes information on the racial and ethnic makeup of cities in the United States. The airport codes dataset includes information on airport codes and their corresponding cities and countries. All datasets were obtained from publicly available sources.

In [3]:
# Read in the data here :
df_Immigration_sample = pd.read_csv('./immigration_data_sample.csv')
df_temp = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
df_demographics = pd.read_csv("./us-cities-demographics.csv", delimiter=";")
df_airport_codes = pd.read_csv("./airport-codes_csv.csv")

#### Printing the head of the first dataset df_i94:

In [4]:
df_Immigration_sample.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [5]:
df_Immigration_sample.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float64
airline       967 non

#### Describing the first dataset df_i94:

In [6]:
df_Immigration_sample.describe()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,dtadfile,entdepu,biryear,insnum,admnum
count,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,951.0,1000.0,1000.0,1000.0,1000.0,0.0,1000.0,35.0,1000.0
mean,1542097.0,3040461.0,2016.0,4.0,302.928,298.262,20559.68,1.078,20575.037855,42.382,1.859,1.0,20160420.0,,1973.618,3826.857143,69372370000.0
std,915287.9,1799818.0,0.0,0.0,206.485285,202.12039,8.995027,0.485955,24.211234,17.903424,0.386353,0.0,49.51657,,17.903424,221.742583,23381340000.0
min,10925.0,13208.0,2016.0,4.0,103.0,103.0,20545.0,1.0,20547.0,1.0,1.0,1.0,20160400.0,,1923.0,3468.0,0.0
25%,721442.2,1412170.0,2016.0,4.0,135.0,131.0,20552.0,1.0,20561.0,30.75,2.0,1.0,20160410.0,,1961.0,3668.0,55993010000.0
50%,1494568.0,2941176.0,2016.0,4.0,213.0,213.0,20560.0,1.0,20570.0,42.0,2.0,1.0,20160420.0,,1974.0,3887.0,59314770000.0
75%,2360901.0,4694151.0,2016.0,4.0,438.0,438.0,20567.25,1.0,20580.0,55.0,2.0,1.0,20160420.0,,1985.25,3943.0,93436230000.0
max,3095749.0,6061994.0,2016.0,4.0,746.0,696.0,20574.0,9.0,20715.0,93.0,3.0,1.0,20160800.0,,2015.0,4686.0,95021510000.0


In [7]:
full_df_immigration = pd.read_sas('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat', 'sas7bdat', encoding="ISO-8859-1")

In [8]:
full_df_immigration.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [9]:
full_df_immigration.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3096313 entries, 0 to 3096312
Data columns (total 28 columns):
cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port     object
arrdate     float64
i94mode     float64
i94addr     object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile    object
visapost    object
occup       object
entdepa     object
entdepd     object
entdepu     object
matflag     object
biryear     float64
dtaddto     object
gender      object
insnum      object
airline     object
admnum      float64
fltno       object
visatype    object
dtypes: float64(13), object(15)
memory usage: 661.4+ MB


In [10]:
full_df_immigration.describe()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,biryear,admnum
count,3096313.0,3096313.0,3096313.0,3096313.0,3096313.0,3096313.0,3096074.0,2953856.0,3095511.0,3096313.0,3096313.0,3095511.0,3096313.0
mean,3078652.0,2016.0,4.0,304.9069,303.2838,20559.85,1.07369,20573.95,41.76761,1.845393,1.0,1974.232,70828850000.0
std,1763278.0,0.0,0.0,210.0269,208.5832,8.777339,0.5158963,29.35697,17.42026,0.398391,0.0,17.42026,22154420000.0
min,6.0,2016.0,4.0,101.0,101.0,20545.0,1.0,15176.0,-3.0,1.0,1.0,1902.0,0.0
25%,1577790.0,2016.0,4.0,135.0,131.0,20552.0,1.0,20561.0,30.0,2.0,1.0,1962.0,56035230000.0
50%,3103507.0,2016.0,4.0,213.0,213.0,20560.0,1.0,20570.0,41.0,2.0,1.0,1975.0,59360940000.0
75%,4654341.0,2016.0,4.0,512.0,504.0,20567.0,1.0,20579.0,54.0,2.0,1.0,1986.0,93509870000.0
max,6102785.0,2016.0,4.0,999.0,760.0,20574.0,9.0,45427.0,114.0,3.0,1.0,2019.0,99915570000.0


In [11]:
full_df_immigration.to_csv('./Cleaned Data/full_immigration.csv', index=False)

#### Printing the head of the second dataset df_temp:

In [12]:
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [13]:
df_temp.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB



#### Describing the second dataset df_temp:

In [14]:
df_temp.describe()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty
count,8235082.0,8235082.0
mean,16.72743,1.028575
std,10.35344,1.129733
min,-42.704,0.034
25%,10.299,0.337
50%,18.831,0.591
75%,25.21,1.349
max,39.651,15.396


#### Printing the head of the third dataset df_demographics:

In [15]:
df_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [16]:
df_demographics.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


#### Describing the third dataset df_demographics:

In [17]:
df_demographics.describe()

Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891.0
mean,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,48963.77
std,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,144385.6
min,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,98.0
25%,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,3435.0
50%,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,13780.0
75%,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


#### Printing the head of the fourth dataset df_airport_codes:

In [18]:
df_airport_codes.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [19]:
df_airport_codes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


#### Describing the fourth dataset df_airport_codes:

In [20]:
df_airport_codes.describe()

Unnamed: 0,elevation_ft
count,48069.0
mean,1240.789677
std,1602.363459
min,-1266.0
25%,205.0
50%,718.0
75%,1497.0
max,22000.0


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### Cleaning The First Dataset Cleaning:

In [21]:
full_immigration_df = spark.read.format('csv').option('header', 'true').load('./Cleaned Data/full_immigration.csv')
full_immigration_df.createOrReplaceTempView("immigration_table")

In [22]:
full_immigration_df.printSchema()

root
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: string (nullable = 

In [23]:
full_immigration_df.show(10)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1897628485.0| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| n

In [24]:
# check the total number of records of full_immigration_df
full_immigration_df.count()

3096313

In [25]:
# check for cicid if it unique or not and if unique I will make it a primary key:
from pyspark.sql.functions import countDistinct

full_immigration_df.select(countDistinct("cicid")).show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+



In [26]:
### Adding a new column 'arrival_date' to the immigration dataset
full_immigration_df = spark.sql("""
    SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date 
    FROM immigration_table
""")


# Adding a new column 'departure_date' to the immigration dataset
full_immigration_df = spark.sql("""
    SELECT *, 
        CASE 
            WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
            WHEN depdate IS NULL THEN NULL
            ELSE 'N/A' 
        END AS departure_date 
    FROM (
        SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date 
        FROM immigration_table
    )
""")

### Displaying the first five rows of the updated full immigration dataset
full_immigration_df.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+--------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|arrival_date|departure_date|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+--------------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1897628485.0| null|      B2|  2016-04-29|          null|
|  7

In [27]:
# Filtering the immigration dataset to only include rows where 'departure_date_2' is greater than or equal to 'arrival_date'
full_immigration_df = full_immigration_df.filter(full_immigration_df.departure_date >= full_immigration_df.arrival_date)

# Displaying the first 5 rows of the filtered immigration dataset
full_immigration_df.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+--------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|arrival_date|departure_date|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+--------------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M|  null|     OS|  666643185.0|   93|      B2|  2016-04-01|    2016-08-25|
| 16

In [28]:
print(f"Number of rows: {full_immigration_df.count()}")

Number of rows: 2953481


In [29]:
# Counting the number of rows for each unique value in the 'i94mode' column
mode_counts = spark.sql("""
    SELECT i94mode, COUNT(*) AS count
    FROM immigration_table
    GROUP BY i94mode
""")

# Displaying the result
mode_counts.show()

+-------+-------+
|i94mode|  count|
+-------+-------+
|    1.0|2994505|
|   null|    239|
|    9.0|   8560|
|    2.0|  26349|
|    3.0|  66660|
+-------+-------+



In [30]:
# Defining a dictionary to map i94mode values to arrival modes
arrival_modes = {1.0: 'Air', 2.0: 'Sea', 3.0: 'Land'}

# Adding a new column 'arrival_mode' to the immigration dataset using a UDF
def map_arrival_mode(i94mode):
    return arrival_modes.get(i94mode, 'Not_reported')

map_arrival_mode_udf = udf(map_arrival_mode, StringType())

# Adding the 'arrival_mode' column to the 'full_immigration_df' DataFrame
full_immigration_df = full_immigration_df.withColumn('arrival_mode', map_arrival_mode_udf('i94mode'))

# Displaying the first row of the updated immigration dataset
full_immigration_df.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+--------------+------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|arrival_date|departure_date|arrival_mode|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+--------------+------------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M|  null|     OS|  666643185.0|   93|   

In [31]:
print(f"Number of rows: {full_immigration_df.count()}")

Number of rows: 2953481


In [32]:
# Counting the number of rows for each unique value in the 'i94visa' column
visa_counts = spark.sql("""
    SELECT i94visa, COUNT(*) AS count
    FROM immigration_table
    GROUP BY i94visa
""")

# Displaying the result
visa_counts.show()

+-------+-------+
|i94visa|  count|
+-------+-------+
|    1.0| 522079|
|    2.0|2530868|
|    3.0|  43366|
+-------+-------+



In [33]:
# Defining a dictionary to map i94visa values to visa types
visa_types = {1.0: 'Business', 2.0: 'Pleasure', 3.0: 'Student'}

def map_visa_type(i94visa):
    return visa_types.get(i94visa, 'Not_reported')

map_visa_type_udf = udf(map_visa_type, StringType())

# Adding the 'visa_type' column to the 'immigration_table' DataFrame
full_immigration_df = full_immigration_df\
    .withColumn('visa_type', map_visa_type_udf('i94visa'))

# Displaying the first row of the updated immigration dataset
full_immigration_df.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+--------------+------------+------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|arrival_date|departure_date|arrival_mode|   visa_type|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+--------------+------------+------------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M

In [34]:
print(f"Number of rows: {full_immigration_df.count()}")

Number of rows: 2953481


In [35]:
# Using the DataFrame API to group the immigration data by gender and count the number of records
gender_counts = spark.sql("""
    SELECT gender, COUNT(*) AS count
    FROM immigration_table
    GROUP BY gender
""")
# Displaying the gender counts
gender_counts.show()

+------+-------+
|gender|  count|
+------+-------+
|     F|1302743|
|  null| 414269|
|     M|1377224|
|     U|    467|
|     X|   1610|
+------+-------+



In [36]:
def clean_full_immigration_df_gender(full_immigration_df):
    # Filtering the immigration data by gender using the DataFrame API
    full_immigration_df = full_immigration_df \
        .filter(full_immigration_df.gender.isin(['M', 'F']))

    # Creating a temporary view for the filtered data
    full_immigration_df.createOrReplaceTempView('immigration_table')

    # return the cleaned data:
    return full_immigration_df
    
full_immigration_df = clean_full_immigration_df_gender(full_immigration_df)

# Displaying the first row of the filtered data
full_immigration_df.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+--------------+------------+------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|arrival_date|departure_date|arrival_mode|   visa_type|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+--------------+------------+------------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M

In [37]:
print(f"Number of rows: {full_immigration_df.count()}")

Number of rows: 2544951


In [38]:
def clean_full_immigration_df(full_immigration_df):    
    # Selecting and renaming columns from the immigration data using the DataFrame API
    cols = ['cicid', 'year', 'month', 'immigrant_birth_country', 'immigrant_residence_country', 'adm_port',
            'arrival_date', 'departure_date', 'arrival_mode', 'arrival_state', 'age', 'visa_type', 'count',
            'birth_year', 'gender', 'airline', 'admission_num', 'flight_num']
    full_immigration_df = full_immigration_df.select(
        col('cicid'), col('i94yr').alias('year'), col('i94mon').alias('month'),
        col('i94cit').alias('immigrant_birth_country'), col('i94res').alias('immigrant_residence_country'),
        col('i94port').alias('adm_port'), col('arrival_date'), col('departure_date'), col('arrival_mode'),
        col('i94addr').alias('arrival_state'), col('i94bir').alias('age'), col('visa_type'), col('count'),
        col('biryear').alias('birth_year'), col('gender'), col('airline'), col('admnum').alias('admission_num'),
        col('fltno').alias('flight_num')
    ).toDF(*cols)
    # return the cleaned data:
    return full_immigration_df
    
full_immigration_df = clean_full_immigration_df(full_immigration_df)
# Displaying the selected columns from the immigration data
full_immigration_df.show(10)

+-----+------+-----+-----------------------+---------------------------+--------+------------+--------------+------------+-------------+----+------------+-----+----------+------+-------+-------------+----------+
|cicid|  year|month|immigrant_birth_country|immigrant_residence_country|adm_port|arrival_date|departure_date|arrival_mode|arrival_state| age|   visa_type|count|birth_year|gender|airline|admission_num|flight_num|
+-----+------+-----+-----------------------+---------------------------+--------+------------+--------------+------------+-------------+----+------------+-----+----------+------+-------+-------------+----------+
| 15.0|2016.0|  4.0|                  101.0|                      101.0|     WAS|  2016-04-01|    2016-08-25|Not_reported|           MI|55.0|Not_reported|  1.0|    1961.0|     M|     OS|  666643185.0|        93|
| 27.0|2016.0|  4.0|                  101.0|                      101.0|     BOS|  2016-04-01|    2016-04-05|Not_reported|           MA|58.0|Not_reporte

In [39]:
print(f"Number of rows: {full_immigration_df.count()}")

Number of rows: 2544951


In [40]:
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [41]:
df_temp.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [42]:
df_temp.isnull().sum()

dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64

In [43]:
df_temp.duplicated().sum()

0

In [44]:
def clean_df_temp(df_temp):
    # Remove rows with missing temperature values
    df_temp = df_temp.dropna()

    # filter the  for the United States to make it useful with our immigration data
    df_temp = df_temp[df_temp['Country']=='United States']

    # Convert dt to datetime
    df_temp.dt = pd.to_datetime(df_temp.dt)
    # return the cleaned data:
    return df_temp

df_temp = clean_df_temp(df_temp)
df_temp.head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W
47560,1820-06-01,25.682,2.008,Abilene,United States,32.95N,100.53W
47561,1820-07-01,26.268,1.802,Abilene,United States,32.95N,100.53W
47562,1820-08-01,25.048,1.895,Abilene,United States,32.95N,100.53W
47563,1820-09-01,22.435,2.216,Abilene,United States,32.95N,100.53W
47564,1820-10-01,15.83,2.169,Abilene,United States,32.95N,100.53W


In [45]:
df_temp.to_csv('./Cleaned Data/df_temp.csv', index=False)

### Cleaning the Third dataset:

In [46]:
def clean_df_demographics(df_demographics):
    # Remove rows with missing values
    df_demographics = df_demographics.dropna().reset_index(drop=True)

    # Convert numeric columns to appropriate data types
    numeric_columns = ["Median Age", "Male Population", "Female Population", "Total Population", 
                       "Number of Veterans", "Foreign-born", "Average Household Size", "Count"]
    df_demographics[numeric_columns] = df_demographics[numeric_columns].apply(pd.to_numeric)

    # Convert categorical columns to appropriate data types
    categorical_columns = ["City", "State", "State Code", "Race"]
    df_demographics[categorical_columns] = df_demographics[categorical_columns].astype("category")
    # return the cleaned data:
    return df_demographics

df_demographics = clean_df_demographics(df_demographics)

# Display the cleaned DataFrame
df_demographics.head(10)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229.0,62432.0,118661,6634.0,7517.0,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712.0,41971.0,80683,4815.0,8355.0,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629.0,56860.0,108489,3800.0,37038.0,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762.0,43270.0,85032,5783.0,3269.0,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751.0,58077.0,109828,5204.0,16315.0,2.65,NC,Asian,11060


In [47]:
df_demographics.to_csv('./Cleaned Data/df_demographics.csv', index=False)

In [48]:
df_airport_codes = pd.read_csv("./airport-codes_csv.csv")

In [49]:
df_airport_codes.shape

(55075, 12)

In [50]:
df_airport_codes.isnull().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

In [51]:
def clean_df_airport_codes(df_airport_codes):
    # Remove rows with missing values in the 'iata_code' column
    df_airport_codes = df_airport_codes.dropna(subset=['iata_code']).reset_index(drop=True)

    # Convert numeric columns to appropriate data types
    numeric_columns = ["elevation_ft"]
    df_airport_codes[numeric_columns] = df_airport_codes[numeric_columns].apply(pd.to_numeric)

    # Convert categorical columns to appropriate data types
    categorical_columns = ["ident", "type", "name", "continent", "iso_country", "iso_region", "municipality", "gps_code", "iata_code", "local_code"]
    df_airport_codes[categorical_columns] = df_airport_codes[categorical_columns].astype("category")

    # Split the 'coordinates' column into separate 'latitude' and 'longitude' columns
    df_airport_codes[["longitude", "latitude"]] = df_airport_codes["coordinates"].str.split(",", expand=True).apply(pd.to_numeric)

    # filter the  for the United States to make it useful with our immigration data
    df_airport_codes = df_airport_codes[df_airport_codes['iso_country'] == 'US']

    # Drop the original 'coordinates' column
    df_airport_codes = df_airport_codes.drop(columns=["coordinates"])
    # return the cleaned data:
    return df_airport_codes
    
df_airport_codes = clean_df_airport_codes(df_airport_codes)

# Display the cleaned DataFrame
print(df_airport_codes.shape)
df_airport_codes.head(10)

(2019, 13)


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,longitude,latitude
1,07FA,small_airport,Ocean Reef Club Airport,8.0,,US,US-FL,Key Largo,07FA,OCA,07FA,-80.274803,25.325399
2,0AK,small_airport,Pilot Station Airport,305.0,,US,US-AK,Pilot Station,,PQS,0AK,-162.899994,61.934601
3,0CO2,small_airport,Crested Butte Airpark,8980.0,,US,US-CO,Crested Butte,0CO2,CSE,0CO2,-106.928341,38.851918
4,0TE7,small_airport,LBJ Ranch Airport,1515.0,,US,US-TX,Johnson City,0TE7,JCY,0TE7,-98.622498,30.251801
5,13MA,small_airport,Metropolitan Airport,418.0,,US,US-MA,Palmer,13MA,PMX,13MA,-72.311401,42.223301
6,13Z,seaplane_base,Loring Seaplane Base,0.0,,US,US-AK,Loring,13Z,WLR,13Z,-131.636993,55.601299
7,16A,small_airport,Nunapitchuk Airport,12.0,,US,US-AK,Nunapitchuk,PPIT,NUP,16A,-162.440454,60.905591
8,16K,seaplane_base,Port Alice Seaplane Base,0.0,,US,US-AK,Port Alice,16K,PTC,16K,-133.597,55.803
9,19AK,small_airport,Icy Bay Airport,50.0,,US,US-AK,Icy Bay,19AK,ICY,19AK,-141.662003,59.969002
10,19P,seaplane_base,Port Protection Seaplane Base,0.0,,US,US-AK,Port Protection,19P,PPV,19P,-133.610001,56.3288


In [52]:
df_airport_codes.shape

(2019, 13)

In [53]:
df_airport_codes.isnull().sum()

ident              0
type               0
name               0
elevation_ft      34
continent       2019
iso_country        0
iso_region         0
municipality       6
gps_code          81
iata_code          0
local_code        50
longitude          0
latitude           0
dtype: int64

In [54]:
df_airport_codes = df_airport_codes.drop(columns=["continent"])
df_airport_codes.dropna(inplace=True)

In [55]:
df_airport_codes.head(10)

Unnamed: 0,ident,type,name,elevation_ft,iso_country,iso_region,municipality,gps_code,iata_code,local_code,longitude,latitude
1,07FA,small_airport,Ocean Reef Club Airport,8.0,US,US-FL,Key Largo,07FA,OCA,07FA,-80.274803,25.325399
3,0CO2,small_airport,Crested Butte Airpark,8980.0,US,US-CO,Crested Butte,0CO2,CSE,0CO2,-106.928341,38.851918
4,0TE7,small_airport,LBJ Ranch Airport,1515.0,US,US-TX,Johnson City,0TE7,JCY,0TE7,-98.622498,30.251801
5,13MA,small_airport,Metropolitan Airport,418.0,US,US-MA,Palmer,13MA,PMX,13MA,-72.311401,42.223301
6,13Z,seaplane_base,Loring Seaplane Base,0.0,US,US-AK,Loring,13Z,WLR,13Z,-131.636993,55.601299
7,16A,small_airport,Nunapitchuk Airport,12.0,US,US-AK,Nunapitchuk,PPIT,NUP,16A,-162.440454,60.905591
8,16K,seaplane_base,Port Alice Seaplane Base,0.0,US,US-AK,Port Alice,16K,PTC,16K,-133.597,55.803
9,19AK,small_airport,Icy Bay Airport,50.0,US,US-AK,Icy Bay,19AK,ICY,19AK,-141.662003,59.969002
10,19P,seaplane_base,Port Protection Seaplane Base,0.0,US,US-AK,Port Protection,19P,PPV,19P,-133.610001,56.3288
11,1KC,small_airport,Kalakaket Creek AS Airport,1598.0,US,US-AK,Kalakaket Creek,1KC,KKK,1KC,-156.820393,64.416626


In [56]:
df_airport_codes.to_csv('./Cleaned Data/df_airport_codes.csv', index=False)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

Entities:
- Immigration fact table with columns:
    - cicid (double)
    - year (double)
    - month (double)
    - immigrant_birth_country (double)
    - immigrant_residence_country (double)
    - adm_port (string)
    - arrival_date (double)
    - departure_date (double)
    - arrival_mode (double)
    - arrival_state (string)
    - age (double)
    - visa_type (double)
    - count (double)
    - birth_year (double)
    - gender (string)
    - airline (string)
    - admission_num (double)
    - flight_num (string)
    - visatype (string)
- Temperature dimension table with columns:
    - dt (date)
    - AverageTemperature (double)
    - City (string)
    - Country (string)
- Demographic dimension table with columns:
    - City (string)
    - State (string)
    - median_age (double)
    - male_population (integer)
    - female_population (integer)
    - total_population (integer)
    - num_veterans (integer)
    - num_foreign_born (integer)
    - avg_household_size (double)
    - state_code (string)
    - Race (string)
    - Count (integer)
- Airport dimension table with columns:
    - ident (string)
    - type (string)
    - name (string)
    - elevation_ft (double)
    - iso_country (string)
    - iso_region (string)
    - municipality (string)
    - gps_code (string)
    - iata_code (string)
    - local_code (string)
    - longitude (double)
    - latitude (double)

Relationships:
- The Immigration fact table is related to the Temperature dimension table and the Demographic dimension table through the City and State columns respectively.
- The Immigration fact table is related to the Airport dimension table through the adm_port column representing the airport code.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

***1. Run the create_tables.py script to generate the necessary tables in the database.***

***2. Combine the data from the city and airports tables using a join operation.***

***3. Populate the tables with the relevant data.***

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [57]:
# Create Spark df of all the tables
file_paths = [
    './Cleaned Data/full_immigration.csv',
    './Cleaned Data/df_temp.csv',
    './Cleaned Data/df_demographics.csv',
    './Cleaned Data/df_airport_codes.csv',

]

dataframes = []

for file_path in file_paths:
    df = spark.read.format('csv').option('header', 'true').load(file_path)
    dataframes.append(df)

spark_i94, spark_temp, spark_airport, spark_demographics = dataframes

In [58]:
table_names = ["i94", "temperature", "airport", "demographics"]
for dataframe, name in zip(dataframes, table_names):
    dataframe.createOrReplaceTempView(name)
    dataframe.printSchema()

root
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: string (nullable = 

In [59]:
# Create a SparkSession
spark = SparkSession.builder.appName("immigration_data_pipeline").getOrCreate()

# Load the immigration data
immigration_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .load('./Cleaned Data/full_immigration.csv')

# Select relevant columns and rename them from immigration_df:
immigration_fact = immigration_df.select(
    col("cicid"),
    col("i94yr").alias("year"),
    col("i94mon").alias("month"),
    col("i94cit").alias("immigrant_birth_country"),
    col("i94res").alias("immigrant_residence_country"),
    col("i94port").alias("adm_port"),
    col("arrdate").alias("arrival_date"),
    col("depdate").alias("departure_date"),
    col("i94mode").alias("arrival_mode"),
    col("i94addr").alias("arrival_state"),
    col("i94bir").alias("age"),
    col("i94visa").alias("visa_type"),
    col("count"),
    col("biryear").alias("birth_year"),
    col("gender"),
    col("airline"),
    col("admnum").alias("admission_num"),
    col("fltno").alias("flight_num"),
    col("visatype")
)

# Load the temperature data
temperature_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .load('./Cleaned Data/df_temp.csv')

# Select relevant columns and filter for data in the United States from temperature_df:
temperature_dim = temperature_df.select(
    col("dt"),
    col("AverageTemperature"),
    col("City"),
    col("Country")
).filter(col("Country") == "United States")

temperature_dim = temperature_dim.dropDuplicates(["dt"])

# Load the demographic data
demographic_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .load('./Cleaned Data/df_demographics.csv')

# Select relevant columns and rename them form demographic_df:
demographic_dim = demographic_df.select(
    col("City"),
    col("State"),
    col("Median Age").alias("median_age"),
    col("Male Population").alias("male_population"),
    col("Female Population").alias("female_population"),
    col("Total Population").alias("total_population"),
    col("Number of Veterans").alias("num_veterans"),
    col("Foreign-born").alias("num_foreign_born"),
    col("Average Household Size").alias("avg_household_size"),
    col("State Code").alias("state_code"),
    col("Race"),
    col("Count")
)

demographic_dim = demographic_dim.dropDuplicates(["City", "State"])
demographic_dim = demographic_dim.withColumnRenamed("City", "arrival_city")
# Load the airport data
airport_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .load('./Cleaned Data/df_airport_codes.csv')

# Select relevant columns and rename them from airport_df:
airport_schema = StructType([
    StructField("ident", StringType(), True),
    StructField("type", StringType(), True),
    StructField("name", StringType(), True),
    StructField("elevation_ft", DoubleType(), True),
    StructField("iso_country", StringType(), True),
    StructField("iso_region", StringType(), True),
    StructField("municipality", StringType(), True),
    StructField("gps_code", StringType(), True),
    StructField("iata_code", StringType(), True),
    StructField("local_code", StringType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("latitude", DoubleType(), True)
])

airport_dim = airport_df.select(
    col("ident"),
    col("type"),
    col("name"),
    col("iso_country"),
    col("iso_region"),
    col("municipality"),
    col("gps_code"),
    col("iata_code"),
    col("local_code"),
    col("longitude"),
    col("latitude"),
    col("elevation_ft").cast(DoubleType()).alias("elevation_ft")
).filter(col("elevation_ft").isNotNull() & col("elevation_ft").cast("string").rlike("^\\d+\\.?\\d*$"))


In [60]:
immigration_fact.show(10)

+-----+------+-----+-----------------------+---------------------------+--------+------------+--------------+------------+-------------+----+---------+-----+----------+------+-------+--------------+----------+--------+
|cicid|  year|month|immigrant_birth_country|immigrant_residence_country|adm_port|arrival_date|departure_date|arrival_mode|arrival_state| age|visa_type|count|birth_year|gender|airline| admission_num|flight_num|visatype|
+-----+------+-----+-----------------------+---------------------------+--------+------------+--------------+------------+-------------+----+---------+-----+----------+------+-------+--------------+----------+--------+
|  6.0|2016.0|  4.0|                  692.0|                      692.0|     XXX|     20573.0|          null|        null|         null|37.0|      2.0|  1.0|    1979.0|  null|   null| 1.897628485E9|      null|      B2|
|  7.0|2016.0|  4.0|                  254.0|                      276.0|     ATL|     20551.0|          null|         1.0|  

In [61]:
temperature_dim.show(10)

+-------------------+------------------+-------+-------------+
|                 dt|AverageTemperature|   City|      Country|
+-------------------+------------------+-------+-------------+
|1750-03-01 00:00:00|              3.01|  Akron|United States|
|1753-03-01 00:00:00|4.2189999999999985|  Akron|United States|
|1798-04-01 00:00:00|             9.274|  Akron|United States|
|1828-12-01 00:00:00|             7.648|Abilene|United States|
|1854-05-01 00:00:00|            20.789|Abilene|United States|
|1870-01-01 00:00:00| 5.976000000000001|Abilene|United States|
|1879-10-01 00:00:00|            18.047|Abilene|United States|
|1893-11-01 00:00:00|              9.03|Abilene|United States|
|1896-04-01 00:00:00|            18.524|Abilene|United States|
|1902-09-01 00:00:00|21.930999999999997|Abilene|United States|
+-------------------+------------------+-------+-------------+
only showing top 10 rows



In [62]:
demographic_dim.show()

+--------------------+--------------+----------+---------------+-----------------+----------------+------------+----------------+------------------+----------+--------------------+------+
|        arrival_city|         State|median_age|male_population|female_population|total_population|num_veterans|num_foreign_born|avg_household_size|state_code|                Race| Count|
+--------------------+--------------+----------+---------------+-----------------+----------------+------------+----------------+------------------+----------+--------------------+------+
|          Cincinnati|          Ohio|      32.7|       143654.0|         154883.0|          298537|     13699.0|         16896.0|              2.08|        OH|               White|162245|
|         Kansas City|        Kansas|      33.4|        74606.0|          76655.0|          151261|      8139.0|         25507.0|              2.71|        KS|Black or African-...| 40177|
|           Lynchburg|      Virginia|      28.7|        3861

In [63]:
airport_dim.show(10)

+-----+-------------+--------------------+-----------+----------+---------------+--------+---------+----------+-------------------+------------------+------------+
|ident|         type|                name|iso_country|iso_region|   municipality|gps_code|iata_code|local_code|          longitude|          latitude|elevation_ft|
+-----+-------------+--------------------+-----------+----------+---------------+--------+---------+----------+-------------------+------------------+------------+
| 07FA|small_airport|Ocean Reef Club A...|         US|     US-FL|      Key Largo|    07FA|      OCA|      07FA|   -80.274803161621|25.325399398804002|         8.0|
| 0CO2|small_airport|Crested Butte Air...|         US|     US-CO|  Crested Butte|    0CO2|      CSE|      0CO2|-106.92834099999999|         38.851918|      8980.0|
| 0TE7|small_airport|   LBJ Ranch Airport|         US|     US-TX|   Johnson City|    0TE7|      JCY|      0TE7|     -98.6224975586|30.251800537100006|      1515.0|
| 13MA|small_air

In [68]:
"""
creating a query to group count immigrants by their cities and include the "num_foreign_born" field in this city. 
The results could be ordered by the immigrant counts to see if cities with the most immigrants are indeed cities 
with the largest foreign-born counts.
"""
# Join immigration_fact and demographic_dim on arrival_city
joined_df = immigration_fact.join(demographic_dim, immigration_fact.arrival_state == demographic_dim.state_code, "inner")

# Group by arrival_city and sum the immigration_fact.count and demographic_dim.num_foreign_born columns
result_df = joined_df.groupBy("arrival_city") \
    .agg(sum(immigration_fact["count"]).alias("num_immigrants"), sum("num_foreign_born").alias("num_foreign_born")) \
    .orderBy("num_immigrants", ascending=False)

# Show the top 10 cities by immigrant count and their corresponding num_foreign_born count
result_df.show(10)

+--------------+--------------+----------------+
|  arrival_city|num_immigrants|num_foreign_born|
+--------------+--------------+----------------+
|  Jacksonville|      645076.0|  5.333592615E10|
| Pompano Beach|      621701.0| 2.0350138833E10|
|   Tallahassee|      621701.0|  1.039484072E10|
|   Spring Hill|      621701.0|   4.907085993E9|
|Pembroke Pines|      621701.0|  3.867601921E10|
|         Largo|      621701.0|   6.521021789E9|
|         Davie|      621701.0| 1.6494970932E10|
|     Poinciana|      621701.0|   8.920787649E9|
|     Hollywood|      621701.0| 3.4291783758E10|
| Coral Springs|      621701.0| 2.3967816952E10|
+--------------+--------------+----------------+
only showing top 10 rows



In [69]:
result_df.show(100)

+----------------+--------------+----------------+
|    arrival_city|num_immigrants|num_foreign_born|
+----------------+--------------+----------------+
|    Jacksonville|      645076.0|  5.333592615E10|
|      Clearwater|      621701.0| 1.0995403886E10|
|         Deltona|      621701.0|   4.412833698E9|
|        Lakeland|      621701.0|   7.206757992E9|
|         Hialeah|      621701.0|1.05781181748E11|
|      Boca Raton|      621701.0| 1.3128460017E10|
|      Plantation|      621701.0| 1.7346701302E10|
|      Palm Coast|      621701.0|   5.647531884E9|
|Saint Petersburg|      621701.0| 1.7760132467E10|
| West Palm Beach|      621701.0| 1.9070678175E10|
|      Lauderhill|      621701.0| 1.5835346171E10|
|           Davie|      621701.0| 1.6494970932E10|
|           Largo|      621701.0|   6.521021789E9|
|   Coral Springs|      621701.0| 2.3967816952E10|
|         Kendall|      621701.0|  1.938463718E10|
|     Tallahassee|      621701.0|  1.039484072E10|
|           Tampa|      621701.

In [71]:
def write_to_parquet(df, partition_cols, path):
    df.write.mode('overwrite').partitionBy(*partition_cols).parquet(path)
    
# Define the path where the Parquet files will be saved
output_path = "./Transformed Data/"

# Write the immigration_fact data frame to Parquet
write_to_parquet(immigration_fact, ["year", "month"], output_path + "immigration_fact")

# Write the temperature_dim data frame to Parquet
write_to_parquet(temperature_dim, ["City"], output_path + "temperature_dim")

# Write the demographic_dim data frame to Parquet
write_to_parquet(demographic_dim, ["arrival_city", "State"], output_path + "demographic_dim")

# Write the airport_dim data frame to Parquet
write_to_parquet(airport_dim, ["iso_country"], output_path + "airport_dim")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

### immigration_fact Data Quality Check:

In [72]:
def immigration_fact_data_quality_check(immigration_fact):   
    # Check for missing values in the cicid column
    if immigration_fact.filter(col("cicid").isNull()).count() > 0:
        raise ValueError("immigration_fact data frame has missing values in the cicid column")

    # Check for duplicates in the cicid column
    if immigration_fact.count() != immigration_fact.select("cicid").distinct().count():
        raise ValueError("immigration_fact data frame has duplicates in the cicid column")

    # Check that the year and month columns contain valid values
    if immigration_fact.filter(col("year") < 2016).count() > 0:
        raise ValueError("immigration_fact data frame has invalid values in the year column")
    if immigration_fact.filter(col("year") > 2020).count() > 0:
        raise ValueError("immigration_fact data frame has invalid values in the year column")
    if immigration_fact.filter(col("month") < 1).count() > 0:
        raise ValueError("immigration_fact data frame has invalid values in the month column")
    if immigration_fact.filter(col("month") > 12).count() > 0:
        raise ValueError("immigration_fact data frame has invalid values in the month column")
    
immigration_fact_quality_check = immigration_fact_data_quality_check(immigration_fact) 
immigration_fact_quality_check

### temperature_dim Data Quality Check:

In [73]:
def temperature_dim_data_quality_check(temperature_dim):    
    # Check for missing values in the dt column
    if temperature_dim.filter(col("dt").isNull()).count() > 0:
        raise ValueError("temperature_dim data frame has missing values in the dt column")

    # Check for duplicates in the dt column
    if temperature_dim.count() != temperature_dim.select("dt").distinct().count():
        raise ValueError("temperature_dim data frame has duplicates in the dt column")

    # Check that the AverageTemperature column contains valid values
    if temperature_dim.filter(col("AverageTemperature") < -100).count() > 0:
        raise ValueError("temperature_dim data frame has invalid values in the AverageTemperature column")
    if temperature_dim.filter(col("AverageTemperature") > 100).count() > 0:
        raise ValueError("temperature_dim data frame has invalid values in the AverageTemperature column")
           
temperature_dim_quality_check = temperature_dim_data_quality_check(temperature_dim) 
temperature_dim_quality_check

### demographic_dim Data Quality Check:

In [74]:
def demographic_dim_data_quality_check(demographic_dim):
    # Check for missing values in the City and State columns
    if demographic_dim.filter(col("arrival_city").isNull() | col("State").isNull()).count() > 0:
        raise ValueError("demographic_dim data frame has missing values in the City or State column")

    # Check for duplicates in the combination of City and State columns
    if demographic_dim.count() != demographic_dim.select("arrival_city", "State").distinct().count():
        raise ValueError("demographic_dim data frame has duplicates in the combination of City and State columns")

    # Check that the median_age, male_population, female_population, total_population,
    # num_veterans, num_foreign_born, and avg_household_size columns contain valid values
    if demographic_dim.filter(col("median_age") < 0).count() > 0:
        raise ValueError("demographic_dim data frame has invalid values in the median_age column")
    if demographic_dim.filter(col("male_population") < 0).count() > 0:
        raise ValueError("demographic_dim data frame has invalid values in the male_population column")
    if demographic_dim.filter(col("female_population") < 0).count() > 0:
        raise ValueError("demographic_dim data frame has invalid values in the female_population column")
    if demographic_dim.filter(col("total_population") < 0).count() > 0:
        raise ValueError("demographic_dim data frame has invalid values in the total_population column")
    if demographic_dim.filter(col("num_veterans") < 0).count() > 0:
        raise ValueError("demographic_dim data frame has invalid values in the num_veterans column")
    if demographic_dim.filter(col("num_foreign_born") < 0).count() > 0:
        raise ValueError("demographic_dim data frame has invalid values in the num_foreign_born column")
    if demographic_dim.filter(col("avg_household_size") < 0).count() > 0:
        raise ValueError("demographic_dim data frame has invalid values in the avg_household_size column")       
        
demographic_dim_quality_check = demographic_dim_data_quality_check(demographic_dim) 
demographic_dim_quality_check

### airport_dim  Data Quality Check:

In [75]:
def airport_dim_data_quality_check(airport_dim):
    # Check for missing values in the ident column
    if airport_dim.filter(col("ident").isNull()).count() > 0:
        raise ValueError("airport_dim data frame has missing values in the ident column")

    # Check for duplicates in the ident column
    if airport_dim.count() != airport_dim.select("ident").distinct().count():
        raise ValueError("airport_dim data frame has duplicates in the ident column")

    # Check that the elevation_ft, longitude, and latitude columns contain valid values
    if airport_dim.filter(col("elevation_ft").isNull()).count() > 0:
        raise ValueError("airport_dim data frame has missing values in the elevation_ft column")
    if airport_dim.filter(col("longitude").isNull()).count() > 0:
        raise ValueError("airport_dim data frame has missing values in the longitude column")
    if airport_dim.filter(col("latitude").isNull()).count() > 0:
        raise ValueError("airport_dim data frame has missing values in the latitude column")
    if airport_dim.filter(col("elevation_ft") < 0).count() > 0:
        raise ValueError("airport_dim data frame has invalid values in the elevation_ft column")

airport_dim_quality_check = airport_dim_data_quality_check(airport_dim) 
airport_dim_quality_check

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [76]:
tables_dict = {
    "immigration": {
        "immigration_fact": {
            "cicid": "ID number for each record",
            "year": "4 digit year of arrival",
            "month": "Numeric month of arrival",
            "immigrant_birth_country": "3 digit code for immigrant's country of birth",
            "immigrant_residence_country": "3 digit code for immigrant's country of residence",
            "adm_port": "Port of admission into the US",
            "arrival_date": "Arrival date in the US in SAS format",
            "departure_date": "Departure date from the US in SAS format",
            "arrival_mode": "Mode of arrival into the US",
            "arrival_state": "US state of arrival",
            "age": "Age of immigrant",
            "visa_type": "Visa category for immigrant as double type",
            "count": "Number of immigrants with same characteristics",
            "birth_year": "Year of birth for immigrant",
            "gender": "Gender of immigrant",
            "airline": "Airline used to arrive in the US",
            "admission_num": "Admission number for immigrant",
            "flight_num": "Flight number of airline used to arrive in the US",
            "visatype": "Visa category for immigrant as string type"
        }
    },
    "temperature": {
        "temperature_dim": {
            "dt": "Date in format (YYYY-MM-DD) of temperature observation",
            "AverageTemperature": "Average temperature for the city on the given date",
            "City": "Name of the city where the observation was taken",
            "Country": "Country where the city is located"
        }
    },
    "demographics": {
        "demographic_dim": {
            "arrival_city": "Name of the city",
            "State": "US state where the city is located",
            "median_age": "Median age of the population in the city",
            "male_population": "Number of males in the population of the city",
            "female_population": "Number of females in the population of the city",
            "total_population": "Total number of people in the city",
            "num_veterans": "Number of veterans in the population of the city",
            "num_foreign_born": "Number of foreign-born individuals in the population of the city",
            "avg_household_size": "Average household size in the city",
            "state_code": "2 letter code for the US state where the city is located",
            "Race": "Ethnicity of the population",
            "Count": "Number of individuals in the population with the specified ethnicity"
        }
    },
    "airports": {
        "airport_dim": {
            "ident": "Unique identifier for the airport",
            "type": "Type of airport (small, medium, large)",
            "name": "Name of airport",
            "continent": "Continent where airport is located",
            "iso_country": "ISO code for the country where the airport is located",
            "iso_region": "ISO code for the region where the airport is located",
            "municipality": "Municipality where the airport is located",
            "gps_code": "GPS code for the airport",
            "iata_code": "IATA code for the airport",
            "local_code": "Local code for the airport",
             "Latitude and longitude of the airport"
            "longitude":"longitude of the airport",
            "latitude": "Latitude of the airport",
            "elevation_ft": "Airport altitude "
        }
    }
}

In [77]:
# Create an empty list to store the rows of the table
rows = []

# Loop through each table and its nested dictionaries
for table_name, table_dict in tables_dict.items():
    for nested_table_name, nested_table_dict in table_dict.items():
        # Loop through each column and its description in the nested dictionary
        for column_name, column_description in nested_table_dict.items():
            # Add a row to the list for each column and its description
            rows.append([table_name, nested_table_name + '.' + column_name, column_description])

# Create a DataFrame from the list of rows and add column names
database_tables = pd.DataFrame(rows, columns=['Table', 'Column', 'Description'])

# Display the DataFrame
database_tables

Unnamed: 0,Table,Column,Description
0,immigration,immigration_fact.cicid,ID number for each record
1,immigration,immigration_fact.year,4 digit year of arrival
2,immigration,immigration_fact.month,Numeric month of arrival
3,immigration,immigration_fact.immigrant_birth_country,3 digit code for immigrant's country of birth
4,immigration,immigration_fact.immigrant_residence_country,3 digit code for immigrant's country of residence
5,immigration,immigration_fact.adm_port,Port of admission into the US
6,immigration,immigration_fact.arrival_date,Arrival date in the US in SAS format
7,immigration,immigration_fact.departure_date,Departure date from the US in SAS format
8,immigration,immigration_fact.arrival_mode,Mode of arrival into the US
9,immigration,immigration_fact.arrival_state,US state of arrival


In [78]:
# Stop the SparkSession
spark.stop()

#### Step 5: Complete Project Write Up
 * Clearly state the rationale for the choice of tools and technologies for the project: 
* To handle the massive amount of immigration data, which consists of around 3 million rows, we opted to leverage the power of Apache Spark. For other relatively smaller datasets, we utilized Pandas to manage them.



 * Propose how often the data should be updated and why: 
* Given that the primary dataset (I94 immigration dataset) is updated on a monthly basis, we need to ensure that our solution is updated monthly as well.




 * Write a description of how you would approach the problem differently under the following scenarios:
* The data was increased by 100x: While we will continue to use Spark for handling the big data, we plan to scale up the number of nodes in our cluster. Additionally, we will leverage AWS to store our data in S3 and Redshift.
 
 
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day: 
* Our ETL pipeline will be managed in a DAG using Apache Airflow, which will guarantee that the pipeline runs on time and performs data quality checks.
 
 
 
 * The database needed to be accessed by 100+ people: 
* To ensure that everyone can easily access our analytics database, we will migrate it to Amazon Redshift.
 
 
 
 * What schema are you using in this project? Does it have any advantage compared to other schemas?: 
* The schema used in this project is a star schema, which comprises of a central fact table (Immigration fact table) connected to several dimension tables (Temperature, Demographic, and Airport dimension tables) through foreign keys.
* The advantage of using a star schema is that it simplifies queries and allows for faster performance, as data is denormalized and stored in a way that minimizes the number of joins required to retrieve data. This schema is also optimized for OLAP (Online Analytical Processing) queries, which involve complex queries that analyze large sets of data.


 * Why do you connect the entities the way you did in this project? Perhaps it is optimized for a particular query?:
* The entities are connected in this way to enable efficient and effective querying of the data, as the foreign keys allow for easy join operations between the fact table and dimension tables. This schema is optimized for queries that involve aggregating data across multiple dimensions, such as analyzing immigration patterns by temperature, demographic, or airport information.