# Project Title

## Data Engineering Capstone Project

Project Summary

### The project follows the follow steps:

* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# import pandas as pd
import re
# import psycopg2  # For postgres
from collections import defaultdict
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf  # For pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [3]:
#Installs
#sc.install_pypi_package("pandas")
#sc.install_pypi_package("boto3")
#sc.install_pypi_package("psycopg2")
# sc.install_pypi_package("numpy==1.17.3")
#sc.install_pypi_package("smart_open")

In [None]:
sc.list_packages() # For AWS EMR

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version  
-------------------------- ---------
beautifulsoup4             4.9.3    
boto                       2.49.0   
click                      7.1.2    
jmespath                   0.10.0   
joblib                     1.0.1    
lxml                       4.6.2    
mysqlclient                1.4.2    
nltk                       3.5      
nose                       1.3.4    
numpy                      1.16.5   
pip                        9.0.1    
py-dateutil                2.2      
python37-sagemaker-pyspark 1.4.1    
pytz                       2021.1   
PyYAML                     5.4.1    
regex                      2021.3.17
setuptools                 28.8.0   
six                        1.13.0   
tqdm                       4.59.0   
wheel                      0.29.0   
windmill                   1.6

### Step 1: Scope the Project and Gather Data (Explore)

#### Scope 

The aim of this project is to enrich the US I94 immigration data with data such as demographics and temperature to have wider understanding of the immigration patterns. We will be creating dimension tables and one fact table. The data fromI94 is aggregated by destination city and then joined with temperature data, resulting in the one fact table we want. Ultimately, we can query on the databse to check if temperature affects the destination city for immigration.

#### Describe and Gather Data 

The I94 data which of the format SAS7BDAT(binary database storage format), is taken from [here](https://travel.trade.gov/research/reports/i94/historical/2016.html). 

World Temperature Data: This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

In [4]:
awsAccessKeyId = 'xxxxxx'
awsSecretAccessKey = 'xxxxxxx'

In [5]:
spark._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
spark._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)
spark._jsc.hadoopConfiguration().set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "us-east-1.amazonaws.com")

In [6]:
# True indicates, it can be null
demographics_schema = StructType([
    StructField('City', StringType(), True),
    StructField('State', StringType(), True),
    StructField('Median Age', DoubleType(), True),
    StructField('Male Population', IntegerType(), True),
    StructField('Female Population', IntegerType(), True),
    StructField('Total Population', IntegerType(), True),
    StructField('Number of Veterans', IntegerType(), True),
    StructField('Foreign-born', IntegerType(), True),
    StructField('Average Household Size', DoubleType(), True),
    StructField('State Code', StringType(), True),
    StructField('Race', StringType(), True),
    StructField('Count', IntegerType(), True)
])
#s3://capstonend/us-cities-demographics.csv
df_demographics = spark.read.csv("Capstone/us-cities-demographics.csv",
                                 sep=";",
                                 schema=demographics_schema,
                                 header=True)

In [7]:
# True indicates, it can be null
temperature_schema = StructType([
    StructField('dt', DateType(), True),
    StructField('AverageTemperature', DoubleType(), True),
    StructField('AverageTemperatureUncertainty', DoubleType(), True),
    StructField('City', StringType(), True),
    StructField('Country', StringType(), True),
    StructField('Latitude', StringType(), True),
    StructField('Longitude', StringType(), True)
])
#Temperature
#s3://capstonend/GlobalLandTemperaturesByCity.csv
temperature = 'Capstone/GlobalLandTemperaturesByCity.csv'
df_temperature = spark.read.csv(temperature,
                                schema=temperature_schema,
                                header=True)

In [8]:
# True indicates, it can be null
airport_schema = StructType([
    StructField('ident', StringType(), True),
    StructField('type', StringType(), True),
    StructField('name', StringType(), True),
    StructField('elevation_ft', IntegerType(), True),
    StructField('continent', StringType(), True),
    StructField('iso_country', StringType(), True),
    StructField('iso_region', StringType(), True),
    StructField('municipality', StringType(), True),
    StructField('gps_code', StringType(), True),
    StructField('iata_code', StringType(), True),
    StructField('local_code', StringType(), True),
    StructField('coordinates', StringType(), True)
])
#Airport Codes
#s3://capstonend/airport-codes_csv.csv
df_airport_codes = spark.read.csv("Capstone/airport-codes_csv.csv",
                                  schema=airport_schema,
                                  header=True)

In [9]:
#s3://capstonend/sas_data/
immigration = 'Capstone/sas_data/'
df_immigration = spark.read.parquet(immigration)

In [10]:
df_demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [11]:
df_demographics.show(2)

+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|Hispanic or Latino|25924|
|       Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|             White|58723|
+-------------+-------------+----------+---------------+-----------------+-----------

In [12]:
df_temperature.printSchema()

root
 |-- dt: date (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [13]:
df_temperature.show(2)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 2 rows



In [14]:
df_airport_codes.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [15]:
df_airport_codes.show(2)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
only showing top 2 rows



In [16]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [17]:
df_immigration.show(2)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

## Cleaning (Transform)

Temperature Data - We will drop all data points where AverageTemperature is NaN, duplicate locations, and add the i94port of the location in each entry.

In [18]:
def droping_nan_dup():
    '''
    Will drop nan rows, and duplicates of demographics and temperature
    '''
    global df_demographics
    global df_temperature
    global df_airport_codes
    df_demographics = df_demographics.dropna()
    df_temperature = df_temperature.filter(df_temperature.AverageTemperature != 'NaN')
    df_temperature = df_temperature.dropDuplicates(['City', 'Country'])
    df_airport_codes = df_airport_codes.dropna()

In [19]:
droping_nan_dup()

*I94 immigration data* - We wil drop all data points with the destination city code i94port is not a valid value (XXX, 99, NaN, etc). This is described in I94_SAS_Labels_Description.SAS

In [20]:
# Creating dict of i94port codes
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('Capstone/i94port_valid.txt') as f:
     for data in f:
         match = re_obj.search(data)
         i94port_valid[match[1]]=[match[2]]

In [21]:
# Clean I94 immigration data
def clean_i94_data(file):
    '''    
    Input: Path to I94 immigration data file
    Output: Spark dataframe of I94 immigration data with valid i94port
    '''
    global df_immigration
    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(
        df_immigration.i94port.isin(list(i94port_valid.keys())))
    return df_immigration

In [22]:
df_immigration = clean_i94_data(df_immigration)

In [23]:
df_immigration.show()

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [24]:
@udf()
def get_i94port(city):
    '''
    Input: City name
    Output: Corresponding i94port
    
    '''
    for key in i94port_valid:
        if city and city.lower() in i94port_valid[key][0].lower():
            return key

In [25]:
# Add iport94 code based on city name, helps in joins
df_temperature = df_temperature.withColumn("i94port",get_i94port(df_temperature.City))

In [26]:
# Remove data points with no iport94 code
df_temperature = df_temperature.filter(df_temperature.i94port != 'null')
df_temperature.show()

+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|     City|             Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|1852-07-01|             15.488|                        1.395|    Perth|           Australia|  31.35S|  114.97E|    PER|
|1828-01-01|             -1.977|                        2.551|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|1743-11-01|              2.767|                        1.905| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|1849-01-01|  7.399999999999999|                        2.699|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|1821-11-01|              2.322|                        2.375|  Spokane|       United States|  47.42N|  117.24W|    SPO|
|1843-01-01| 18.874000000000002|

In [27]:
def filter_temperature():
    '''
    Filters only temperature from United States, and add i94port
    '''
    global df_temperature
    print('Before Removal =',df_temperature.count())
    df_temperature = df_temperature.filter(
    df_temperature.Country == 'United States')
    print('After Removal =',df_temperature.count())

In [28]:
filter_temperature()

Before Removal = 207
After Removal = 112


In [29]:
df_temperature.show()

+----------+------------------+-----------------------------+----------------+-------------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|            City|      Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+----------------+-------------+--------+---------+-------+
|1828-01-01|            -1.977|                        2.551|         Seattle|United States|  47.42N|  121.97W|    SEA|
|1849-01-01| 7.399999999999999|                        2.699|         Ontario|United States|  34.56N|  116.76W|    ONT|
|1821-11-01|             2.322|                        2.375|         Spokane|United States|  47.42N|  117.24W|    SPO|
|1835-01-01|             9.833|                        2.182|         Nogales|United States|  31.35N|  111.20W|    NOG|
|1743-11-01| 8.129999999999999|                        2.245|         Atlanta|United States|  34.56N|   83.68W|    ATL|
|1743-11-01|             3.264|         

## Define the Data Model (Load)

Fact Table - This will contain information from the I94 immigration data joined with the city temperature data on i94port
Columns:
i94yr = year, i94mon = numeric month, i94cit = 3 digit code of origin city, i94port = 3 character code of destination city, arrdate = arrival date, i94mode = 1 digit travel code, depdate = departure date, i94visa = reason for immigration, AverageTemperature = average temperature of destination city

Dimension Tables - df_immigration, df_temperature

df_immigration:-
i94yr = year, i94mon = numeric month, i94cit = 3 digit code of origin city, i94port = 3 character code of destination city, arrdate = arrival date, i94mode = 1 digit travel code, depdate = departure date, i94visa = reason for immigration

df_temperature:-
i94port = code of destination city (mapped from cleaned up immigration data), AverageTemperature = average temperature, City = city name, Country = country name, Latitude= latitude, Longitude = longitude

In [30]:
def load_immigration():
    '''
    Extracts and saves the immigration table
    '''
    global df_immigration
    # Immigration, extract and save
    immigration_table = df_immigration.select([
        "i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate",
        "i94visa"
    ])
    immigration_table.write.mode("append").partitionBy("i94port").parquet(
        "immigration.parquet")

In [31]:
load_immigration()

In [32]:
def load_temperature():
    '''
    Extracts and saves the temperature table
    '''
    global df_temperature
    # Temperature
    temp_table = df_temperature.select([
        "AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"
    ])
    temp_table.write.mode("append").partitionBy("i94port").parquet(
        "temperature.parquet")

In [33]:
load_temperature()

In [34]:
def load_demographics():
    '''
    Extracts and saves the demographics table
    '''
    global df_demographics
    df_demographics = df_demographics.withColumnRenamed(
        'State Code', 'State_Code').withColumnRenamed(
            'Male Population', 'Male_Population').withColumnRenamed(
                'Female Population', 'Female_Population').withColumnRenamed(
                    'Total Population', 'Total_Population').withColumnRenamed(
                        'Number of Veterans',
                        'Number_of_Veterans').withColumnRenamed(
                            'Average Household Size',
                            'Average_Household_Size').withColumnRenamed('Median Age', 'Median_Age')
    df_demographics.write.mode("append").parquet("demographics.parquet")

In [35]:
load_demographics()

In [36]:
def load_fact_table():
    '''
    Performs joins and extracts required columns for fact_table
    '''
    global df_immigration
    global df_temperature
    # For fact table, create views to use sql for easy join
    df_immigration.createOrReplaceTempView("immigration_view")
    df_temperature.createOrReplaceTempView("temp_view")

    # Create the fact table by joining the immigration and temperature views
    global fact_table
    fact_table = spark.sql('''
    SELECT immigration_view.i94yr as year,
           immigration_view.i94mon as month,
           immigration_view.i94cit as city,
           immigration_view.i94port as i94port,
           immigration_view.arrdate as arrival_date,
           immigration_view.depdate as departure_date,
           immigration_view.i94visa as reason,
           temp_view.AverageTemperature as temperature,
           temp_view.Latitude as latitude,
           temp_view.Longitude as longitude
    FROM immigration_view
    JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
    ''')
    fact_table.write.mode("append").partitionBy("i94port").parquet(
        "fact_table.parquet")

In [37]:
load_fact_table()

## Quality Checks

Check the number of rows

In [38]:
def quality_checks_rows(table,name_):
  '''
  Input: table, and name of the table
  Output: Print statement to check if the table passed the quality checks 
  '''
  c = table.count()
  if c > 0:
    print(f'No. of rows in {name_} is {c}')
  else:
    print(f'Data quality check failed for {name_}')

In [39]:
quality_checks_rows(df_immigration,'df_immigration')

No. of rows in df_immigration is 3088544


In [40]:
quality_checks_rows(df_temperature,'df_temperature')

No. of rows in df_temperature is 112


In [41]:
quality_checks_rows(df_demographics,'df_demographics')

No. of rows in df_demographics is 2875


In [42]:
quality_checks_rows(fact_table,'fact_table')

No. of rows in fact_table is 2568134


In [79]:
df_demographics = df_demographics.withColumnRenamed('count','counts')# Renaming column for quality checks

In [83]:
df_immigration = df_immigration.withColumnRenamed('count','counts')# Renaming column for quality checks

In [75]:
import pyspark.sql.functions as f
def quality_checks_dups(df,name_):
    c = df.groupBy(df.columns)\
    .count()\
    .where(f.col('count') > 1)\
    .select(f.sum('count')).collect()[0][0]
    if c and c > 1:
        print(f'Data quality check failed!! No. of rows in {name_} are duplicate')
    else:
        print(f'Data quality check passed for {name_}')

In [76]:
quality_checks_dups(df_temperature,'temperature')

Data quality check passed for temperature


In [80]:
quality_checks_dups(df_demographics,'demographics')

Data quality check passed for demographics


In [84]:
quality_checks_dups(df_immigration,'immigration')

Data quality check passed for immigration


## Validating Loaded Data

In [43]:
temperature = './temperature.parquet'
df_temperature = spark.read.parquet(temperature)

In [44]:
df_temperature.show()

+------------------+----------------+-------------+--------+---------+-------+
|AverageTemperature|            City|      Country|Latitude|Longitude|i94port|
+------------------+----------------+-------------+--------+---------+-------+
|7.0200000000000005|Colorado Springs|United States|  39.38N|  104.05W|    COS|
|             3.653|  Salt Lake City|United States|  40.99N|  112.90W|    SLC|
|            19.761| Fort Lauderdale|United States|  26.52N|   80.60W|    FTL|
|              9.89|  Corpus Christi|United States|  28.13N|   97.27W|    CRP|
| 8.091999999999999|   San Francisco|United States|  37.78N|  122.03W|    SFR|
|             22.32|   Oklahoma City|United States|  36.17N|   97.46W|    OKC|
| 8.818999999999999|     Los Angeles|United States|  34.56N|  118.70W|    LOS|
|             17.55|    Jacksonville|United States|  29.74N|   81.23W|    JAC|
|             -3.42|     Albuquerque|United States|  34.56N|  107.03W|    ABQ|
| 6.072000000000001|    Philadelphia|United States| 

In [45]:
df_demographics = df_demographics.withColumnRenamed('Foreign-born','Foreign_born') # Renaming Column

In [46]:
df_demographics.show()

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|            City|         State|Median_Age|Male_Population|Female_Population|Total_Population|Number_of_Veterans|Foreign_born|Average_Household_Size|State_Code|                Race| Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino| 25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White| 58723|
|          Hoover|       Alabama|      38.5|      

In [47]:
df_demographics.createOrReplaceTempView("demo_view")
df_temperature.createOrReplaceTempView("temp_view")

In [48]:
spark.sql('''
    SELECT d.Foreign_born as previous_immigrants,t.City,t.AverageTemperature, t.i94port
    FROM demo_view d JOIN temp_view t 
    ON t.City = d.City
    ORDER BY previous_immigrants DESC
    ''').show()

+-------------------+-----------+------------------+-------+
|previous_immigrants|       City|AverageTemperature|i94port|
+-------------------+-----------+------------------+-------+
|            3212500|   New York|             3.264|    NYC|
|            3212500|   New York|             3.264|    NYC|
|            3212500|   New York|             3.264|    NYC|
|            3212500|   New York|             3.264|    NYC|
|            3212500|   New York|             3.264|    NYC|
|            1485425|Los Angeles| 8.818999999999999|    LOS|
|            1485425|Los Angeles| 8.818999999999999|    LOS|
|            1485425|Los Angeles| 8.818999999999999|    LOS|
|            1485425|Los Angeles| 8.818999999999999|    LOS|
|            1485425|Los Angeles| 8.818999999999999|    LOS|
|             696210|    Houston| 8.046000000000001|    HOU|
|             696210|    Houston| 8.046000000000001|    HOU|
|             696210|    Houston| 8.046000000000001|    HOU|
|             696210|   

As you can see, instead of temperature, there are other factors that come into play for immigrants

In this project, we used Spark since it can easily handle multiple file formats (parquet, csv, text) that contain large amounts of data. Spark SQL was used form fact table.

Since, files update monthly, we can create airflow pipeline to run monthly if needed.