The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [137]:
# import python libaries
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, date_add
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd

### Step 1: Scope the Project and Gather Data

#### Scope 
This project will focus on understanding the trend of immigrant to United States by region or cities. We will implement both data warehouse/data lake to enhance its process almost in real-time basis. We combined different files as below. 

#### Describe and Gather Data 
*I94 Immigration Data: This data comes from the US National Tourism and Trade Office. 

*World Temperature Data: This dataset came from Kaggle. You can read more about it here.

*U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it here.

*Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from here.

### Step 2: Explore and Assess the Data
#### We will narrow down each dataset checking their shape and columns.  Also we will clean / normalize data as much as possible    


## I94 Immigration Data

In [139]:
# Immigration data sample
df_immig_sample = pd.read_csv('immigration_data_sample.csv')


In [3]:
df_immig_sample.columns


Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [4]:
pd.set_option('display.max_columns', 50)
df_immig_sample.head(10)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,20606.0,51.0,2.0,1.0,20160408,,,T,N,,M,1965.0,10072016,M,,DL,736852600.0,910,B2
6,1072780,2197173.0,2016.0,4.0,245.0,245.0,SFR,20556.0,1.0,CA,20635.0,48.0,2.0,1.0,20160412,,,T,O,,M,1968.0,10112016,F,,CX,786312200.0,870,B2
7,112205,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,20554.0,33.0,2.0,1.0,20160402,,,G,O,,M,1983.0,6302016,F,,BA,55474490000.0,00117,WT
8,2577162,5227851.0,2016.0,4.0,131.0,131.0,CHI,20572.0,1.0,IL,20575.0,39.0,2.0,1.0,20160428,,,O,O,,M,1977.0,7262016,,,LX,59413420000.0,00008,WT
9,10930,13213.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,20553.0,35.0,2.0,1.0,20160401,,,O,O,,M,1981.0,6292016,,,AA,55449790000.0,00109,WT


In [6]:
# Country code data
df_countryCodes = pd.read_csv('countries.csv')


In [7]:
df_countryCodes.shape


(289, 2)

In [8]:
df_countryCodes.head()


Unnamed: 0,code,country
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [10]:
# Airport code data
i94portCodes = pd.read_csv('i94portCodes.csv')


In [11]:
i94portCodes.shape


(660, 3)

In [12]:
i94portCodes.head()


Unnamed: 0,code,location,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


In [13]:
# Demographic data
df_demographics = pd.read_csv('us-cities-demographics.csv', sep=';')


In [14]:
df_demographics.shape


(2891, 12)

In [15]:
df_demographics.head()


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [16]:
df_demographics.columns


Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')

In [17]:
# Airport data
df_airports = pd.read_csv('airport-codes_csv.csv')


In [18]:
df_airports.columns


Index(['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country',
       'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code',
       'coordinates'],
      dtype='object')

In [19]:
df_airports.head()


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [20]:
# Global Temperature data by Cities and countries
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(fname)

In [21]:
df_temperature.shape


(8599212, 7)

In [22]:
df_temperature.head()


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [23]:
# Loading a spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [24]:
df_immigration.count()


3096313

In [25]:
# checking the schema
df_immigration.printSchema()


root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [28]:
df_temperature.shape


(8599212, 7)

In [29]:
df_temperature.head()


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [30]:
df_temperature['Country'].nunique()


159

In [31]:
# Loading only United States temperature
df_temperature = df_temperature[df_temperature['Country']=='United States']


In [32]:
# Convert datatime
df_temperature['convertedDate'] = pd.to_datetime(df_temperature.dt)


In [33]:
# Remove old data (before 1950 year)
df_temperature=df_temperature[df_temperature['convertedDate']>"1950-01-01"].copy()


In [34]:
df_temperature.shape


(196348, 8)

In [35]:
df_temperature['convertedDate'].max()


Timestamp('2013-09-01 00:00:00')

In [36]:
# checking null
df_temperature.isnull().sum()


dt                               0
AverageTemperature               1
AverageTemperatureUncertainty    1
City                             0
Country                          0
Latitude                         0
Longitude                        0
convertedDate                    0
dtype: int64

In [37]:
df_temperature[df_temperature.AverageTemperature.isnull()]


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,convertedDate
287781,2013-09-01,,,Anchorage,United States,61.88N,151.13W,2013-09-01


In [38]:
df_temperature.shape


(196348, 8)

In [39]:
# checking duplicate
df_temperature[['City','convertedDate']].drop_duplicates().shape


(189472, 2)

In [40]:
df_temperature[df_temperature[['City','convertedDate']].duplicated()].head()


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,convertedDate
405836,1950-02-01,1.655,0.057,Arlington,United States,39.38N,76.99W,1950-02-01
405837,1950-03-01,3.871,0.232,Arlington,United States,39.38N,76.99W,1950-03-01
405838,1950-04-01,9.678,0.191,Arlington,United States,39.38N,76.99W,1950-04-01
405839,1950-05-01,16.786,0.234,Arlington,United States,39.38N,76.99W,1950-05-01
405840,1950-06-01,21.548,0.222,Arlington,United States,39.38N,76.99W,1950-06-01


In [142]:
df_temperature[(df_temperature['City'] == 'Arlington') & (df_temperature.dt == '1950-02-01')]


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,date


In [42]:
df_airports.shape


(55075, 12)

In [43]:
df_airports.head()


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [44]:
# groupby aiport by countries
df_airports.groupby('iso_country')['iso_country'].count()


iso_country
AD        2
AE       57
AF       64
AG        3
AI        1
AL       13
AM       13
AO      104
AQ       27
AR      848
AS        4
AT      145
AU     1963
AW        1
AZ       35
BA       15
BB        6
BD       16
BE      146
BF       51
BG      134
BH        4
BI        7
BJ       10
BL        1
BM        3
BN        2
BO      197
BQ        3
BR     4334
      ...  
TM       21
TN       15
TO        6
TR      124
TT        3
TV        3
TW       65
TZ      207
UA      191
UG       38
UM        6
US    22757
UY       54
UZ      176
VA        1
VC        6
VE      592
VG        3
VI        9
VN       50
VU       32
WF        2
WS        4
XK        6
YE       25
YT        1
ZA      489
ZM      103
ZW      138
ZZ        7
Name: iso_country, Length: 243, dtype: int64

In [45]:
df_airports[df_airports['iso_country'].isna()].shape


(247, 12)

In [46]:
df_airports[df_airports['iso_country'].isna()].groupby('continent')['continent'].count()


continent
AF    247
Name: continent, dtype: int64

In [47]:
df_airports = df_airports[df_airports['iso_country'].fillna('').str.upper().str.contains('US')].copy()

In [48]:
# groupby airport by type
df_airports.groupby('type')['type'].count()


type
balloonport          18
closed             1326
heliport           6265
large_airport       170
medium_airport      692
seaplane_base       566
small_airport     13720
Name: type, dtype: int64

In [49]:
# remove other type airports
excludedValues = ['closed', 'heliport', 'seaplane_base', 'balloonport']
df_airports = df_airports[~df_airports['type'].str.strip().isin(excludedValues)].copy()

In [50]:
df_airports.isnull().sum()


ident               0
type                0
name                0
elevation_ft       63
continent       14582
iso_country         0
iso_region          0
municipality       50
gps_code          399
iata_code       12717
local_code        199
coordinates         0
dtype: int64

In [51]:
df_airports[df_airports.municipality.isna()].head()


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
7653,6XA4,small_airport,Zadow Airstrip,,,US,US-TX,,6XA4,,,"-95.954353809, 29.991738550900003"
7887,74xa,small_airport,Gun Barrel City Airpark,385.0,,US,US-TX,,74XA,,,"-96.1456650496, 32.3551499558"
8082,79ID,small_airport,Kooskia (Clear Creek Int) Airport,1800.0,,US,US-ID,,79ID,,,"-115.869691372, 46.0488642914"
8114,79WT,small_airport,Ellensburg (Rotor Ranch) Airport,1962.0,,US,US-WA,,79WT,,,"-120.589778423, 47.091426059499994"
9055,8FA4,small_airport,Samsula / Coe Field,40.0,,US,US-FL,,8FA4,,,"-81.1328315735, 29.0102045831"


In [52]:
df_airports = df_airports[~df_airports['municipality'].isna()].copy()


In [53]:
df_airports.municipality = df_airports.municipality.str.upper()


In [54]:
# group by airports by region
df_airports.groupby('iso_region')['iso_region'].count()


iso_region
US-AK      586
US-AL      197
US-AR      291
US-AZ      214
US-CA      551
US-CO      288
US-CT       56
US-DC        2
US-DE       36
US-FL      522
US-GA      365
US-HI       35
US-IA      232
US-ID      238
US-IL      579
US-IN      486
US-KS      372
US-KY      164
US-LA      281
US-MA       79
US-MD      157
US-ME      122
US-MI      379
US-MN      361
US-MO      411
US-MS      211
US-MT      255
US-NC      349
US-ND      297
US-NE      259
US-NH       54
US-NJ      116
US-NM      149
US-NV      113
US-NY      402
US-OH      492
US-OK      372
US-OR      357
US-PA      486
US-RI       10
US-SC      173
US-SD      162
US-TN      228
US-TX     1546
US-U-A       3
US-UT      103
US-VA      311
US-VT       66
US-WA      379
US-WI      457
US-WV       83
US-WY       95
Name: iso_region, dtype: int64

In [55]:
# checking airport region length
df_airports['len'] = df_airports["iso_region"].apply(len)
# removew airports name
df_airports = df_airports[df_airports['len']==5].copy()
# extract airport states
df_airports['state'] = df_airports['iso_region'].str.strip().str.split("-", n = 1, expand = True)[1]

In [56]:
df_demographics.shape


(2891, 12)

In [57]:
# upper city names
df_demographics.City = df_demographics.City.str.upper().str.strip()


In [58]:
df_demographics.isnull().sum()


City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [59]:
df_demographics.City = df_demographics.City.str.strip().str.upper()


In [60]:
df_demographics[df_demographics[['City','Race']].duplicated()].head()


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
177,WILMINGTON,Delaware,36.4,32680.0,39277.0,71957,3063.0,3336.0,2.45,DE,Asian,1193
210,LAKEWOOD,California,39.9,41523.0,40069.0,81592,4094.0,18274.0,3.13,CA,Hispanic or Latino,24987
238,GLENDALE,California,42.1,98181.0,102844.0,201025,4448.0,111510.0,2.69,CA,White,146718
300,SPRINGFIELD,Massachusetts,31.8,74744.0,79592.0,154336,5723.0,16226.0,2.81,MA,Asian,5606
549,BLOOMINGTON,Indiana,23.5,40588.0,43227.0,83815,2368.0,10033.0,2.33,IN,Asian,9801


In [62]:
# checking duplicate data
df_demographics[df_demographics[['City', 'State','Race']].duplicated()]


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


In [63]:
df_immigration.show(5)


+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

### Step 3: Define the Data Model
#### Aggregated data point

In [64]:
# create immigration view
df_immigration.createOrReplaceTempView("immig_table")


In [65]:
df_immigration.count()


3096313

In [66]:
# Immigration table unique
spark.sql("""
SELECT COUNT (DISTINCT cicid)
FROM immig_table
""").show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+



In [67]:
spark.sql("""
SELECT LENGTH (i94port) AS len
FROM immig_table
GROUP BY len
""").show()

+---+
|len|
+---+
|  3|
+---+



In [68]:
df_immigration = spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immig_table")
df_immigration.createOrReplaceTempView("immig_table")

In [69]:
# Visa type
spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE 'N/A' END AS visa_type 
                        
                FROM immig_table""").createOrReplaceTempView("immig_table")

In [70]:
# add entry port
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        
                FROM immig_table""").createOrReplaceTempView("immig_table")

In [71]:
spark.sql("SELECT count(*) FROM immig_table WHERE departure_date = 'N/A'").show()


+--------+
|count(1)|
+--------+
|       0|
+--------+



In [72]:
spark.sql("""
SELECT COUNT(*)
FROM immig_table
WHERE departure_date <= arrival_date
""").show()

+--------+
|count(1)|
+--------+
|     375|
+--------+



In [73]:
spark.sql("""
SELECT arrival_date, departure_date
FROM immig_table
WHERE departure_date <= arrival_date
""").show(10)

+------------+--------------+
|arrival_date|departure_date|
+------------+--------------+
|  2016-04-01|    2016-03-31|
|  2016-04-02|    2016-03-19|
|  2016-04-02|    2016-01-26|
|  2016-04-02|    2016-04-01|
|  2016-04-02|    2016-01-31|
|  2016-04-02|    2016-04-01|
|  2016-04-03|    2016-04-02|
|  2016-04-04|    2016-03-12|
|  2016-04-05|    2016-04-04|
|  2016-04-05|    2016-04-04|
+------------+--------------+
only showing top 10 rows



In [74]:
spark.sql("""
SELECT *
FROM immig_table
WHERE departure_date >= arrival_date
""").createOrReplaceTempView("immig_table")

In [75]:
#check distinct departure dates
spark.sql("SELECT COUNT (DISTINCT departure_date) FROM immig_table ").show()
#check distinct arrival dates
spark.sql("SELECT COUNT (DISTINCT arrival_date) FROM immig_table ").show()
#check the common values between the two sets
spark.sql("""   SELECT COUNT(DISTINCT departure_date) 
                FROM immig_table 
                WHERE departure_date IN (
                    SELECT DISTINCT arrival_date FROM immig_table
                ) 
                """).show()

+------------------------------+
|count(DISTINCT departure_date)|
+------------------------------+
|                           174|
+------------------------------+

+----------------------------+
|count(DISTINCT arrival_date)|
+----------------------------+
|                          30|
+----------------------------+

+------------------------------+
|count(DISTINCT departure_date)|
+------------------------------+
|                            29|
+------------------------------+



In [76]:
spark.sql("""
SELECT i94mode, count(*)
FROM immig_table
GROUP BY i94mode
""").show()

+-------+--------+
|i94mode|count(1)|
+-------+--------+
|   null|     238|
|    1.0| 2871184|
|    3.0|   61572|
|    2.0|   17970|
|    9.0|    2517|
+-------+--------+



In [77]:
spark.sql("""
SELECT COUNT(*)
FROM immig_table
WHERE i94bir IS NULL
""").show()

+--------+
|count(1)|
+--------+
|      46|
+--------+



In [78]:
spark.sql("SELECT COUNT(biryear) FROM immig_table WHERE biryear IS NULL").show()


+--------------+
|count(biryear)|
+--------------+
|             0|
+--------------+



In [79]:
spark.sql("SELECT MAX(biryear), MIN(biryear) FROM immig_table WHERE biryear IS NOT NULL").show()


+------------+------------+
|max(biryear)|min(biryear)|
+------------+------------+
|      2016.0|      1916.0|
+------------+------------+



In [80]:
#Number of travellers who are older than 80
spark.sql("""
SELECT COUNT(*)
FROM immig_table 
WHERE biryear IS NOT NULL
AND biryear <= 1936
""").show()

# frequency of travellers by birth year
spark.sql("""
SELECT biryear, COUNT(*)
FROM immig_table 
WHERE biryear IS NOT NULL
AND biryear <= 1936
GROUP BY biryear
ORDER BY biryear ASC
""").show()

+--------+
|count(1)|
+--------+
|   24694|
+--------+

+-------+--------+
|biryear|count(1)|
+-------+--------+
| 1916.0|       8|
| 1917.0|      16|
| 1918.0|      21|
| 1919.0|      36|
| 1920.0|      34|
| 1921.0|      69|
| 1922.0|      89|
| 1923.0|     155|
| 1924.0|     209|
| 1925.0|     274|
| 1926.0|     414|
| 1927.0|     569|
| 1928.0|     792|
| 1929.0|    1073|
| 1930.0|    1442|
| 1931.0|    1794|
| 1932.0|    2239|
| 1933.0|    2688|
| 1934.0|    3442|
| 1935.0|    4194|
+-------+--------+
only showing top 20 rows



In [81]:
spark.sql("SELECT (2016-biryear)-i94bir AS difference, count(*) FROM immig_table WHERE i94bir IS NOT NULL GROUP BY difference").show()

+----------+--------+
|difference|count(1)|
+----------+--------+
|       0.0| 2953435|
+----------+--------+



In [82]:
spark.sql("""
SELECT gender, count(*) 
FROM immig_table
GROUP BY gender
""").show()

+------+--------+
|gender|count(1)|
+------+--------+
|     F| 1228646|
|  null|  407456|
|     M| 1316305|
|     U|     238|
|     X|     836|
+------+--------+



In [83]:
spark.sql("""SELECT * FROM immig_table WHERE gender IN ('F', 'M')""").createOrReplaceTempView("immig_table")

In [84]:
#citizenship countries
spark.sql("""
SELECT count(*) 
FROM immig_table
WHERE i94cit IS NULL
""").show()

#residence countries
spark.sql("""
SELECT count(*) 
FROM immig_table
WHERE i94res IS NULL
""").show()

#reported address
spark.sql("""
SELECT count(*) 
FROM immig_table
WHERE i94addr IS NULL
""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|  114019|
+--------+



In [85]:
spark.sql("""
SELECT COUNT(*)
FROM immig_table
WHERE visatype IS NULL
""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [86]:
spark.sql("""
SELECT visa_type, visatype, count(*)
FROM immig_table
GROUP BY visa_type, visatype
ORDER BY visa_type, visatype
""").show()

+---------+--------+--------+
|visa_type|visatype|count(1)|
+---------+--------+--------+
| Business|      B1|  186610|
| Business|      E1|    3182|
| Business|      E2|   16227|
| Business|     GMB|     132|
| Business|       I|    2962|
| Business|      I1|     214|
| Business|      WB|  185857|
| Pleasure|      B2|  967988|
| Pleasure|      CP|   11785|
| Pleasure|     CPL|       8|
| Pleasure|     GMT|   79454|
| Pleasure|     SBP|       2|
| Pleasure|      WT| 1060229|
|  Student|      F1|   27789|
|  Student|      F2|    1774|
|  Student|      M1|     708|
|  Student|      M2|      30|
+---------+--------+--------+



In [87]:
spark.sql("""
SELECT occup, COUNT(*) AS n
FROM immig_table
GROUP BY occup
ORDER BY n DESC, occup
""").show()

+-----+-------+
|occup|      n|
+-----+-------+
| null|2538838|
|  STU|   3275|
|  OTH|    508|
|  NRR|    299|
|  MKT|    262|
|  EXA|    175|
|  ULS|    142|
|  ADM|    119|
|  GLS|    119|
|  TIE|    108|
|  MVC|     58|
|  ENO|     55|
|  CEO|     53|
|  TIP|     49|
|  LLJ|     45|
|  RET|     44|
|  CMP|     43|
|  PHS|     42|
|  UNP|     33|
|  HMK|     30|
+-----+-------+
only showing top 20 rows



In [88]:
df_immigration = spark.sql("""SELECT * FROM immig_table""")


In [89]:
df_demographics_spark = spark.read.format("csv").option("header", "true").option("delimiter", ";").load('us-cities-demographics.csv')

In [90]:
df_demographics_spark.printSchema()


root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [91]:
df_demographics.dtypes


City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

In [92]:
spark.createDataFrame(df_demographics).printSchema()


root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: double (nullable = true)
 |-- Female Population: double (nullable = true)
 |-- Total Population: long (nullable = true)
 |-- Number of Veterans: double (nullable = true)
 |-- Foreign-born: double (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: long (nullable = true)



In [93]:
df_countryCodes = pd.read_csv('countries.csv')
df_i94portCodes = pd.read_csv('i94portCodes.csv')

# load the various csv files into pandas dataframes
df_demographics = pd.read_csv('us-cities-demographics.csv', sep=';')
df_temperature = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')

# load the SAS data
df_immigration=spark.read.parquet("sas_data")

In [94]:
spark_df_countryCodes = spark.createDataFrame(df_countryCodes)
spark_df_countryCodes .createOrReplaceTempView("countryCodes")

In [95]:
df_i94portCodes = df_i94portCodes[~df_i94portCodes.state.isna()].copy()


In [96]:
nonUSstates = ['CANADA', 'Canada', 'NETHERLANDS', 'NETH ANTILLES', 'THAILAND', 'ETHIOPIA', 'PRC', 'BERMUDA', 'COLOMBIA', 'ARGENTINA', 'MEXICO', 
               'BRAZIL', 'URUGUAY', 'IRELAND', 'GABON', 'BAHAMAS', 'MX', 'CAYMAN ISLAND', 'SEOUL KOREA', 'JAPAN', 'ROMANIA', 'INDONESIA',
               'SOUTH AFRICA', 'ENGLAND', 'KENYA', 'TURK & CAIMAN', 'PANAMA', 'NEW GUINEA', 'ECUADOR', 'ITALY', 'EL SALVADOR']

In [97]:
df_i94portCodes = df_i94portCodes[~df_i94portCodes.state.isin(nonUSstates)].copy()


In [98]:
spark_df_i94portCodes = spark.createDataFrame(df_i94portCodes)
spark_df_i94portCodes .createOrReplaceTempView("i94portCodes")

In [99]:
df_immigration.createOrReplaceTempView("immig_table")


In [100]:
# Remove all entries into the united states that weren't via air travel
spark.sql("""
SELECT *
FROM immig_table
WHERE i94mode = 1
""").createOrReplaceTempView("immig_table")

In [101]:
spark.sql("""SELECT * FROM immig_table WHERE gender IN ('F', 'M')""").createOrReplaceTempView("immig_table")

In [102]:
# convert the arrival dates into a useable value
spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immig_table").createOrReplaceTempView("immig_table")

In [103]:
# convert the departure dates into a useable value
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        
                FROM immig_table""").createOrReplaceTempView("immig_table")

In [104]:
# we use an inner join to drop invalid codes
#country of citizenship
spark.sql("""
SELECT im.*, cc.country AS citizenship_country
FROM immig_table im
INNER JOIN countryCodes cc
ON im.i94cit = cc.code
""").createOrReplaceTempView("immig_table")

In [105]:
#country of residence
spark.sql("""
SELECT im.*, cc.country AS residence_country
FROM immig_table im
INNER JOIN countryCodes cc
ON im.i94res = cc.code
""").createOrReplaceTempView("immig_table")

In [106]:
# Add visa character string aggregation
spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE 'N/A' END AS visa_type 
                        
                FROM immig_table""").createOrReplaceTempView("immig_table")

In [107]:
# Add entry_port names and entry port states to the view
spark.sql("""
SELECT im.*, pc.location AS entry_port, pc.state AS entry_port_state
FROM immig_table im 
INNER JOIN i94portCodes pc
ON im.i94port = pc.code
""").createOrReplaceTempView("immig_table")

In [108]:
# Compute the age of each individual and add it to the view
spark.sql("""
SELECT *, (2016-biryear) AS age 
FROM immig_table
""").createOrReplaceTempView("immig_table")

### Step 4: Run ETL to Model the Data
#### Run the ETL model and create table

In [109]:
# Insert the immigration fact data into a spark dataframe
fact_immigration = spark.sql("""
                        SELECT 
                            cicid, 
                            citizenship_country,
                            residence_country,
                            TRIM(UPPER (entry_port)) AS city,
                            TRIM(UPPER (entry_port_state)) AS state,
                            arrival_date,
                            departure_date,
                            age,
                            visa_type,
                            visatype AS detailed_visa_type

                        FROM immig_table
""")

In [110]:
# extract all distinct dates from arrival and departure dates to create dimension table
dim_time = spark.sql("""
SELECT DISTINCT arrival_date AS date
FROM immig_table
UNION
SELECT DISTINCT departure_date AS date
FROM immig_table
WHERE departure_date IS NOT NULL
""")
dim_time.createOrReplaceTempView("dim_time_table")

In [111]:
# extract year, month, day, weekofyear, dayofweek and weekofyear from the date and insert all the values in the dim_time table;
dim_time = spark.sql("""
SELECT date, YEAR(date) AS year, MONTH(date) AS month, DAY(date) AS day, WEEKOFYEAR(date) AS week, DAYOFWEEK(date) as weekday, DAYOFYEAR(date) year_day
FROM dim_time_table
ORDER BY date ASC
""")

In [112]:
# Keep only data for the United States
df_temperature = df_temperature[df_temperature['Country']=='United States'].copy()

# Convert the date to datetime objects
df_temperature['date'] = pd.to_datetime(df_temperature.dt)

# Remove all dates prior to 1950
df_temperature=df_temperature[df_temperature['date']>"1950-01-01"].copy()

In [113]:

# convert the city names to upper case
df_temperature.City = df_temperature.City.str.strip().str.upper()

In [114]:
# convert the dataframes from pandas to spark
spark_df_temperature = spark.createDataFrame(df_temperature)
spark_df_temperature .createOrReplaceTempView("temperature")

In [115]:
dim_temperature = spark.sql("""
SELECT
    DISTINCT date, city,
    AVG(AverageTemperature) OVER (PARTITION BY date, City) AS average_temperature, 
    AVG(AverageTemperatureUncertainty)  OVER (PARTITION BY date, City) AS average_termperature_uncertainty
    
FROM temperature
""")

In [116]:
df_demographics.City = df_demographics.City.str.strip().str.upper()
df_demographics['State Code'] = df_demographics['State Code'].str.strip().str.upper()
df_demographics.Race = df_demographics.Race.str.strip().str.upper()

In [117]:

# convert the dataframes from pandas to spark
spark_df_demographics = spark.createDataFrame(df_demographics)
spark_df_demographics.createOrReplaceTempView("demographics")

In [118]:
# insert data into the demographics dim table
dim_demographics = spark.sql("""
                                SELECT  City, 
                                        State, 
                                        `Median Age` AS median_age, 
                                        `Male Population` AS male_population, 
                                        `Female Population` AS female_population, 
                                        `Total Population` AS total_population, 
                                        `Foreign-born` AS foreign_born, 
                                        `Average Household Size` AS average_household_size, 
                                        `State Code` AS state_code, 
                                        Race, 
                                        Count
                                FROM demographics
""")

In [119]:
#The airport dataset contains a lot of nulls. We'll load the csv directly into a spark dataframe to avoid having to deal with converting pandas NaN into nulls
spark_df_airports = spark.read.format("csv").option("header", "true").load('airport-codes_csv.csv')
spark_df_airports.createOrReplaceTempView("airports")

In [120]:
#equivalent to the following pandas code:
# df_airports = df_airports[df_airports['iso_country'].fillna('').str.upper().str.contains('US')].copy()
spark.sql("""
SELECT *
FROM airports
WHERE iso_country IS NOT NULL
AND UPPER(TRIM(iso_country)) LIKE 'US'
""").createOrReplaceTempView("airports")

In [121]:
#equivalent to the following pandas code:
# excludedValues = ['closed', 'heliport', 'seaplane_base', 'balloonport']
# df_airports = df_airports[~df_airports['type'].str.strip().isin(excludedValues)].copy()
# df_airports = df_airports[~df_airports['municipality'].isna()].copy()
# df_airports = df_airports[~df_airports['municipality'].isna()].copy()
# df_airports['len'] = df_airports["iso_region"].apply(len)
# df_airports = df_airports[df_airports['len']==5].copy()

spark.sql("""
SELECT *
FROM airports
WHERE LOWER(TRIM(type)) NOT IN ('closed', 'heliport', 'seaplane_base', 'balloonport')
AND municipality IS NOT NULL
AND LENGTH(iso_region) = 5
""").createOrReplaceTempView("airports")

In [122]:
dim_airports = spark.sql("""
SELECT TRIM(ident) AS ident, type, name, elevation_ft, SUBSTR(iso_region, 4) AS state, TRIM(UPPER(municipality)) AS municipality, iata_code
FROM airports
""")

In [123]:
# Saving the data in parquet format
dim_demographics.write.parquet("dim_demographics")
dim_time.write.parquet("dim_time")
dim_airports.write.parquet("dim_airports")
dim_temperature.write.parquet("dim_temperature")
fact_immigration.write.parquet("fact_immigration")

In [124]:
#Let's check some things in our data
dim_demographics.createOrReplaceTempView("dim_demographics")
dim_time.createOrReplaceTempView("dim_time")
dim_airports.createOrReplaceTempView("dim_airports")
dim_temperature.createOrReplaceTempView("dim_temperature")
fact_immigration.createOrReplaceTempView("fact_immigration")

In [125]:
# we define the following function to check for null values
def nullValueCheck(spark_ctxt, tables_to_check):
    """
    This function performs null value checks on specific columns of given tables received as parameters and raises a ValueError exception when null values are encountered.
    It receives the following parameters:
    spark_ctxt: spark context where the data quality check is to be performed
    tables_to_check: A dictionary containing (table, columns) pairs specifying for each table, which column is to be checked for null values.   
    """  
    for table in tables_to_check:
        print(f"Performing data quality check on table {table}...")
        for column in tables_to_check[table]:
            returnedVal = spark_ctxt.sql(f"""SELECT COUNT(*) as nbr FROM {table} WHERE {column} IS NULL""")
            if returnedVal.head()[0] > 0:
                raise ValueError(f"Data quality check failed! Found NULL values in {column} column!")
        print(f"Table {table} passed.")

In [126]:
#dictionary of tables and columns to be checked
tables_to_check = { 'fact_immigration' : ['cicid'], 'dim_time':['date'], 'dim_demographics': ['City','state_code'], 'dim_airports':['ident'], 'dim_temperature':['date','City']}

#We call our function on the spark context
nullValueCheck(spark, tables_to_check)

Performing data quality check on table fact_immigration...
Table fact_immigration passed.
Performing data quality check on table dim_time...
Table dim_time passed.
Performing data quality check on table dim_demographics...
Table dim_demographics passed.
Performing data quality check on table dim_airports...
Table dim_airports passed.
Performing data quality check on table dim_temperature...
Table dim_temperature passed.


### Step 5: Complete Project Write Up
#### Wrap it up

In [127]:
#time dimension verification

#check the number of rows in our time table : 192 expected
spark.sql("""
SELECT COUNT(*) - 192
FROM dim_time
""").show()

# make sure each row has a distinct date key : 192 expected
spark.sql("""
SELECT COUNT(DISTINCT date) - 192
FROM dim_time
""").show()

# we could also subtract the result of one query from the other


# and make sure all dates from the fact table are included in the time dimension (NULL is the expected result)
spark.sql("""
SELECT DISTINCT date
FROM dim_time

MINUS

(SELECT DISTINCT arrival_date AS date
FROM immig_table
UNION
SELECT DISTINCT departure_date AS date
FROM immig_table
WHERE departure_date IS NOT NULL)

""").show()

+--------------------------------+
|(count(1) - CAST(192 AS BIGINT))|
+--------------------------------+
|                               0|
+--------------------------------+

+--------------------------------------------+
|(count(DISTINCT date) - CAST(192 AS BIGINT))|
+--------------------------------------------+
|                                           0|
+--------------------------------------------+

+----+
|date|
+----+
+----+



In [128]:
#immigration verification

# The number of primary key from the staging table (2165257 expected)
spark.sql("""
SELECT count(distinct cicid) - 2165257
FROM immig_table
""").show()

#should match the primary key count from the fact table (2165257 expected)
spark.sql("""
SELECT count(distinct cicid) - 2165257
FROM fact_immigration
""").show()

#and should match the row count from the fact table since it is also the primary key (2165257 expected)
spark.sql("""
SELECT count(*) - 2165257
FROM fact_immigration
""").show()

+-------------------------------------------------+
|(count(DISTINCT cicid) - CAST(2165257 AS BIGINT))|
+-------------------------------------------------+
|                                                0|
+-------------------------------------------------+

+-------------------------------------------------+
|(count(DISTINCT cicid) - CAST(2165257 AS BIGINT))|
+-------------------------------------------------+
|                                                0|
+-------------------------------------------------+

+------------------------------------+
|(count(1) - CAST(2165257 AS BIGINT))|
+------------------------------------+
|                                   0|
+------------------------------------+



In [129]:
# Let's check the demographics dimension table (2891 expected) 
spark.sql("""
SELECT count(*) - 2891
FROM dim_demographics
""").show()

spark.sql("""
SELECT COUNT(DISTINCT city, state, race) - 2891
FROM dim_demographics
""").show()

+---------------------------------+
|(count(1) - CAST(2891 AS BIGINT))|
+---------------------------------+
|                                0|
+---------------------------------+

+----------------------------------------------------------+
|(count(DISTINCT city, state, race) - CAST(2891 AS BIGINT))|
+----------------------------------------------------------+
|                                                         0|
+----------------------------------------------------------+



In [130]:
# Let's check the primary key for airports (expected 14529)
spark.sql("""
SELECT count(*) - 14529
FROM dim_airports
""").show()

spark.sql("""
SELECT COUNT(DISTINCT ident) - 14529
FROM dim_airports
""").show()

+----------------------------------+
|(count(1) - CAST(14529 AS BIGINT))|
+----------------------------------+
|                                 0|
+----------------------------------+

+-----------------------------------------------+
|(count(DISTINCT ident) - CAST(14529 AS BIGINT))|
+-----------------------------------------------+
|                                              0|
+-----------------------------------------------+



In [131]:
#finally, city + date is our primary key for the temperature (expected 189472)

spark.sql("""
SELECT count(*) - 189472
FROM dim_temperature
""").show()

spark.sql("""
SELECT COUNT(DISTINCT date, city) - 189472
FROM dim_temperature
""").show()

+-----------------------------------+
|(count(1) - CAST(189472 AS BIGINT))|
+-----------------------------------+
|                                  0|
+-----------------------------------+

+-----------------------------------------------------+
|(count(DISTINCT date, city) - CAST(189472 AS BIGINT))|
+-----------------------------------------------------+
|                                                    0|
+-----------------------------------------------------+



In [132]:
# First, we join airport and immigration
fact_immigration.show(2)
dim_airports.show(2)

+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|    cicid|citizenship_country|residence_country|  city|state|arrival_date|departure_date| age|visa_type|detailed_visa_type|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|4041803.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|49.0| Business|                B1|
|4041804.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|          null|38.0| Business|                B1|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
only showing top 2 rows

+-----+-------------+--------------------+------------+-----+------------+---------+
|ident|         type|                name|elevation_ft|state|municipality|iata_code|
+-----+-------------+--------------------+------------+

In [133]:
#here are the distinct combinations of city and state in our fact table
spark.sql("""
SELECT COUNT(DISTINCT city, state)
FROM fact_immigration
""").show()

# and the combinations of city and state that are common to both
spark.sql("""
SELECT COUNT(*)
FROM
(
SELECT DISTINCT city, state
FROM fact_immigration
) fi
INNER JOIN 
(
SELECT DISTINCT municipality, state
FROM dim_airports 
) da
ON fi.city = da.municipality
AND fi.state = da.state
""").show(2)

+---------------------------+
|count(DISTINCT city, state)|
+---------------------------+
|                        151|
+---------------------------+

+--------+
|count(1)|
+--------+
|     102|
+--------+



In [134]:
fact_immigration.show(2)
dim_demographics.show(2)

+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|    cicid|citizenship_country|residence_country|  city|state|arrival_date|departure_date| age|visa_type|detailed_visa_type|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|4041803.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|49.0| Business|                B1|
|4041804.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|          null|38.0| Business|                B1|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
only showing top 2 rows

+-------------+-------------+----------+---------------+-----------------+----------------+------------+----------------------+----------+------------------+-----+
|         City|        State|median_age|male_population|femal

In [135]:
#here are the distinct combinations of city and state in our fact table
spark.sql("""
SELECT COUNT(DISTINCT city, state)
FROM fact_immigration
""").show()

# and the combinations of city and state that are common to both the fact table and the demographics table
spark.sql("""
SELECT COUNT(*)
FROM
(
SELECT DISTINCT city, state
FROM fact_immigration
) fi
INNER JOIN 
(
SELECT DISTINCT City, state_code
FROM dim_demographics 
) da
ON fi.city = da.City
AND fi.state = da.state_code
""").show(2)

+---------------------------+
|count(DISTINCT city, state)|
+---------------------------+
|                        151|
+---------------------------+

+--------+
|count(1)|
+--------+
|      69|
+--------+



In [136]:
# We use a count to see how many rows we would keep using this strategy
spark.sql("""
SELECT COUNT(*)
FROM fact_immigration
WHERE CONCAT(city, state) IN (
    SELECT CONCAT(fi.city, fi.state)
    FROM
    (
        SELECT DISTINCT city, state
        FROM fact_immigration
    ) fi
    INNER JOIN 
    (
        SELECT DISTINCT municipality, state
        FROM dim_airports 
    ) da
    ON fi.city = da.municipality
    AND fi.state = da.state
)
""").show(2)

+--------+
|count(1)|
+--------+
| 1983869|
+--------+

