# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession


In [2]:
import datetime
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType, IntegerType


### Step 1: Scope the Project and Gather Data

#### Scope 
The goal of this project its use multiple Data source to analyze the Immigration  in usa.
This project was made using Spark in order to make a datawarehouse in parquet file format that reflects inmigration data in US airports. It's used a star schema with a facts table an dimensional tables.


#### Describe and Gather Data 
I94 Immigration Data: This data comes from the US National Tourism and Trade Office.

World Temperature Data: This dataset came from Kaggle

U.S. City Demographic Data: This data comes from OpenSoft

Airport Code Table: This is a simple table of airport codes and corresponding cities

#### load the Data into Pandas chunksize and explore the Data 

In [3]:
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1",chunksize= 1000000)

In [4]:
df2 = next(df)

In [5]:
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1",chunksize= 50000)
sum([len(i) for i in df])

3096313

In [5]:
pd.set_option('display.max_columns',df2.shape[1] +1)

In [6]:
df2.head(10)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,57.0,1.0,1.0,20160401.0,,,O,O,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,63.0,2.0,1.0,20160401.0,,,O,K,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,57.0,2.0,1.0,20160401.0,,,O,K,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,46.0,2.0,1.0,20160401.0,,,O,O,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,48.0,1.0,1.0,20160401.0,,,O,O,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


In [7]:
df2.isna().sum().sort_values(ascending = False)

insnum      1000000
entdepu      999887
occup        996654
visapost     613499
gender       143987
entdepd       35744
depdate       35744
matflag       35744
i94addr       34317
dtaddto          42
airline          15
fltno            14
i94mode          14
entdepa          13
dtadfile          1
arrdate           0
i94cit            0
i94res            0
i94port           0
i94mon            0
i94yr             0
visatype          0
i94bir            0
i94visa           0
count             0
biryear           0
admnum            0
cicid             0
dtype: int64

In [8]:
df2['visatype'].value_counts()

WT     452640
B2     330937
WB     111398
B1      74183
F1      18975
E2       7355
E1       1387
I        1313
F2       1024
M1        531
CP        130
I1        104
M2         17
SBP         3
CPL         3
Name: visatype, dtype: int64

In [9]:
df2.describe()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,biryear,admnum
count,1000000.0,1000000.0,1000000.0,1000000.0,1000000.0,1000000.0,999986.0,964256.0,1000000.0,1000000.0,1000000.0,1000000.0,1000000.0
mean,1025475.0,2016.0,4.0,295.055807,294.12131,20549.822574,1.005696,20564.744405,41.279221,1.824807,1.0,1974.720779,69270690000.0
std,601009.2,0.0,0.0,210.825147,209.727308,3.117242,0.080992,22.652658,17.172368,0.430807,0.0,17.172368,21983420000.0
min,6.0,2016.0,4.0,101.0,101.0,20545.0,1.0,20226.0,0.0,1.0,1.0,1916.0,4406585.0
25%,498040.8,2016.0,4.0,135.0,130.0,20547.0,1.0,20554.0,30.0,2.0,1.0,1962.0,55644240000.0
50%,1018124.0,2016.0,4.0,209.0,209.0,20550.0,1.0,20559.0,41.0,2.0,1.0,1975.0,55920030000.0
75%,1565416.0,2016.0,4.0,464.0,464.0,20553.0,1.0,20565.0,54.0,2.0,1.0,1986.0,92772870000.0
max,2041218.0,2016.0,4.0,999.0,749.0,20573.0,9.0,20716.0,100.0,3.0,1.0,2016.0,93307470000.0


In [9]:
df2.shape

(1000000, 28)

#### load the Data into spark  and explore the Data 

In [6]:


spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
I94_df =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [7]:
US_City_df=spark.read.csv('us-cities-demographics.csv',header=True,sep=';')


In [8]:
airport_codes_df=spark.read.csv('airport-codes_csv.csv',header=True,)


In [9]:
path = "../../data2/GlobalLandTemperaturesByCity.csv"

temperature_df = spark.read.csv(path,header=True,sep=',')

In [16]:
I94_df.printSchema()
I94_df.limit(10).toPandas()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,...,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,...,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


In [16]:
I94_df.count()

3096313

In [15]:
c = I94_df.select('airline').drop_duplicates()
c.toPandas()

Unnamed: 0,airline
0,LT
1,MM
2,DZ
3,CI
4,447
5,YFA
6,TC
7,AZ
8,FI
9,R0E


In [16]:
c = I94_df.select('I94BIR').drop_duplicates()
c.toPandas()

Unnamed: 0,I94BIR
0,70.0
1,67.0
2,8.0
3,0.0
4,69.0
5,7.0
6,108.0
7,88.0
8,49.0
9,101.0


In [19]:
US_City_df.count()

2891

In [17]:
US_City_df.printSchema()
US_City_df.limit(10).toPandas()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


In [21]:
c = US_City_df.select('City').drop_duplicates()
c.toPandas()

Unnamed: 0,City
0,Saint George
1,Worcester
2,Tyler
3,Springfield
4,Caguas
5,Charleston
6,Pasco
7,Corona
8,Tempe
9,North Las Vegas


In [18]:
c = US_City_df.select('State Code').drop_duplicates()
c.toPandas()

Unnamed: 0,State Code
0,AZ
1,SC
2,LA
3,MN
4,NJ
5,DC
6,OR
7,VA
8,RI
9,KY


In [19]:
c = US_City_df.select('City').drop_duplicates()
c.toPandas()

Unnamed: 0,City
0,Saint George
1,Worcester
2,Tyler
3,Springfield
4,Caguas
5,Charleston
6,Pasco
7,Corona
8,Tempe
9,North Las Vegas


In [24]:
rase = US_City_df.select('Race').drop_duplicates()
rase.toPandas()

Unnamed: 0,Race
0,Black or African-American
1,Hispanic or Latino
2,White
3,Asian
4,American Indian and Alaska Native


In [20]:
US_City_df.count()

2891

In [21]:
airport_codes_df.printSchema()
airport_codes_df.limit(10).toPandas()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


In [27]:
a = airport_codes_df.select('continent').drop_duplicates()
a.show()

+---------+
|continent|
+---------+
|       NA|
|       SA|
|       AS|
|       AN|
|       OC|
|       EU|
|       AF|
+---------+



In [22]:
a = airport_codes_df.filter('iso_country = "US"').select('iso_region').drop_duplicates()
a.toPandas()

Unnamed: 0,iso_region
0,US-TN
1,US-OK
2,US-VT
3,US-SD
4,US-WA
5,US-IN
6,US-AL
7,US-NY
8,US-MS
9,US-MT


In [30]:
c= airport_codes_df.select('iso_country').drop_duplicates()
c.toPandas()

Unnamed: 0,iso_country
0,DZ
1,LT
2,MM
3,CI
4,TC
5,AZ
6,FI
7,SC
8,PM
9,UA


In [23]:
airport_codes_df

DataFrame[ident: string, type: string, name: string, elevation_ft: string, continent: string, iso_country: string, iso_region: string, municipality: string, gps_code: string, iata_code: string, local_code: string, coordinates: string]

In [24]:
temperature_df.printSchema()
temperature_df.limit(10).toPandas()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.787999999999999,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.050999999999998,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


In [25]:
temperature_df.count()

8599212

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

first we will deal with missing values

In [34]:
# before we remove the missing data
I94_df.count()

3096313

In [26]:
cleaned_i94_df = I94_df.dropna(how="any", subset=["ARRDATE",'I94PORT','i94addr','I94BIR','I94VISA',
                                                  "gender" , 'visatype'])


In [36]:
# aftter we remove the missing data
cleaned_i94_df.count()

2551014

In [37]:
US_City_df.count()

2891

In [27]:
cleaned_US_City_df = US_City_df.dropna(how="any", subset=["City",'State','Median Age','Male Population',
                                                          'Female Population',"Total Population" , 'State Code','Race' , 'Count'])

In [43]:
cleaned_US_City_df.count()

2888

In [44]:
temperature_df.count()

8599212

In [28]:
cleaned_temperature_df = temperature_df.dropna(how="any", subset=['dt','AverageTemperature','City','Country'])

In [46]:
cleaned_temperature_df.count()

8235082

In [47]:
airport_codes_df.count()

55075

In [29]:
cleaned_airport_codes_df = airport_codes_df.dropna(how="any", subset=['ident','iso_country','iso_region',
                                                                      'local_code'])

In [49]:
cleaned_airport_codes_df.count()

28686

now we will correct some columns

In [30]:
state_code = cleaned_US_City_df.toPandas()["State Code"].unique()
print(len(state_code))
print(state_code)

49
['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA' 'NV'
 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY' 'SC' 'LA'
 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID' 'IN' 'AK' 'MS'
 'HI' 'SD' 'ME' 'MT']


In [31]:
@udf(StringType())
def validate_state(n):  
    if n in state_code:
        return n
    return 'other'

In [32]:
cleaned_i94_df = cleaned_i94_df.withColumn("i94addr", validate_state(cleaned_i94_df.i94addr))

In [54]:
cleaned_i94_df.count()

2551014

In [33]:
cleaned_i94_df = cleaned_i94_df.filter('i94addr != "other"')


In [56]:
cleaned_i94_df.count()

2435765

In [34]:
@udf(StringType())
def convert_datetime(x):
    return (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(x)).isoformat()
   


In [35]:
cleaned_i94_df = cleaned_i94_df.withColumn("arrdate", convert_datetime(cleaned_i94_df.arrdate))

correct the name of columns

In [36]:
cleaned_i94_df = cleaned_i94_df.selectExpr('cicid as ID' , 'i94addr as state_code' , 'arrdate as Date' ,'i94port as city_code',
                                'i94bir as age' , 'i94visa as visa_code' , 'visatype','count').drop_duplicates()


In [61]:
cleaned_US_City_df.count()

2888

In [37]:
cleaned_US_City_df = cleaned_US_City_df.select('City','State'
                                  ,'Median Age','Male Population','Female Population',
                                'Total Population' ,
                                                   'State Code' , 'Race' , 'Count').drop_duplicates()


In [38]:
cleaned_US_City_df = cleaned_US_City_df.withColumnRenamed('Total Population','Total_Population')\
       .withColumnRenamed("Male Population", "Male_Population")\
        .withColumnRenamed("Median Age", "Median_Age") \
        .withColumnRenamed("State Code", "State_Code") \
        .withColumnRenamed("City", "City_Name") \
        .withColumnRenamed("State", "State_Name") \
        .withColumnRenamed("Count", "count_of_Race_in_city").drop_duplicates()




In [216]:
cleaned_US_City_df.count()

2888

In [39]:
cleaned_temperature_df = cleaned_temperature_df.filter('Country = "United States"')

In [127]:
cleaned_temperature_df.count()

661524

In [40]:
cleaned_temperature_df = cleaned_temperature_df.select('AverageTemperature','City').drop_duplicates()

In [133]:
cleaned_temperature_df.count()

626244

In [41]:
cleaned_airport_codes_df =  cleaned_airport_codes_df.filter('iso_country = "US"')

In [137]:
cleaned_airport_codes_df.count()

21236

In [42]:
cleaned_airport_codes_df = cleaned_airport_codes_df.selectExpr('ident as id','type','name','iso_country',
                                                                      'local_code','coordinates').drop_duplicates()

In [44]:
cleaned_airport_codes_df.count()

21236

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
we will use star schema as our data model because Starr schemas are designed to optimize user ease-of-use and minimizing the number of tables join to do the query.
the schema:

Dimension Tables :

    city_df
        'City_Name',
         'State_Name',
         'Median_Age',
         'Male_Population',
         'Female Population',
         'Total_Population',
          'State_Code',
           'Race',
         'count_of_Race_in_city'
       
    
    immigrant_df
        
    
    city_temp_df
        'AverageTemperature'
        'City'
    
    ariport_df
        id'
        'type'
        'name'
        'iso_country'
         'local_code'
         'coordinates'
         
        
        
    
Fact Table

    immigration_df:
        id
        state_code
        city_code
        Date
        count




### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [10]:
def immigration_df_ETL():
    cleaned_i94_df = I94_df.dropna(how="any", subset=["ARRDATE",'I94PORT','i94addr','I94BIR','I94VISA',
                                                  "gender" , 'visatype'])
    state_code = US_City_df.toPandas()["State Code"].unique()
    @udf(StringType())
    def validate_state(n):  
        if n in state_code:
            return n
        return 'other'

    cleaned_i94_df = cleaned_i94_df.withColumn("i94addr", validate_state(cleaned_i94_df.i94addr))
    cleaned_i94_df = cleaned_i94_df.filter('i94addr != "other"')

    @udf(StringType())
    def convert_datetime(x):
        return (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(x)).isoformat()

    cleaned_i94_df = cleaned_i94_df.selectExpr('cicid as ID' , 'i94addr as state_code' , 'arrdate as Date' ,'i94port as city_code',
                                'i94bir as age' , 'i94visa as visa_code' , 'visatype','count').drop_duplicates()
    
    immigration_df = cleaned_i94_df.select('ID','state_code','city_code','Date','count')
    return immigration_df
    
    

In [11]:
def immigrant_df_ETL():
    cleaned_i94_df = I94_df.dropna(how="any", subset=["ARRDATE",'I94PORT','i94addr','I94BIR','I94VISA',
                                                  "gender" , 'visatype'])
    state_code = US_City_df.toPandas()["State Code"].unique()
    @udf(StringType())
    def validate_state(n):  
        if n in state_code:
            return n
        return 'other'

    cleaned_i94_df = cleaned_i94_df.withColumn("i94addr", validate_state(cleaned_i94_df.i94addr))
    cleaned_i94_df = cleaned_i94_df.filter('i94addr != "other"')

    @udf(StringType())
    def convert_datetime(x):
        return (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(x)).isoformat()

    cleaned_i94_df = cleaned_i94_df.selectExpr('cicid as ID' , 'i94addr as state_code' , 'arrdate as Date' ,'i94port as city_code',
                                'i94bir as age' , 'i94visa as visa_code' , 'visatype','count').drop_duplicates()
    
    immigration_df = cleaned_i94_df.select('ID','state_code','city_code','Date','count')
    return immigration_df
    

In [12]:
def city_df_ETL():
    cleaned_US_City_df = US_City_df.dropna(how="any", subset=["City",'State','Median Age','Male Population',
                                                          'Female Population',"Total Population" , 'State Code','Race' , 'Count'])

    cleaned_US_City_df = cleaned_US_City_df.select('City','State'
                                  ,'Median Age','Male Population','Female Population',
                                'Total Population' ,
                                                   'State Code' , 'Race' , 'Count').drop_duplicates()

    city_df = cleaned_US_City_df.withColumnRenamed('Total Population','Total_Population')\
       .withColumnRenamed("Male Population", "Male_Population")\
        .withColumnRenamed("Median Age", "Median_Age") \
        .withColumnRenamed("State Code", "State_Code") \
        .withColumnRenamed("City", "City_Name") \
        .withColumnRenamed("State", "State_Name") \
        .withColumnRenamed("Count", "count_of_Race_in_city").drop_duplicates()
    return city_df
    
    
    

In [13]:
def city_temp_df_ETL():
    cleaned_temperature_df = temperature_df.dropna(how="any", subset=['dt','AverageTemperature','City','Country'])
    city_temp_df = cleaned_temperature_df.filter('Country = "United States"')
    city_temp_df = cleaned_temperature_df.select('AverageTemperature','City').drop_duplicates()
    return city_temp_df
    

In [14]:
def ariport_df_ETL():
    cleaned_airport_codes_df = airport_codes_df.dropna(how="any", subset=['ident','iso_country','iso_region',
                                                                      'local_code'])
    
    ariport_df =  cleaned_airport_codes_df.filter('iso_country = "US"')

    
    ariport_df = ariport_df.selectExpr('ident as id','type','name','iso_country',
                                                                      'local_code','coordinates').drop_duplicates()

    return ariport_df
    

In [17]:
city_df = city_df_ETL()
immigrant_df = immigrant_df_ETL()
city_temp_df =  city_temp_df_ETL()
ariport_df= ariport_df_ETL()
immigration_df=immigration_df_ETL()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [108]:
from pyspark.sql import DataFrame
allDataFrames = [k for (k, v) in globals().items() if isinstance(v, DataFrame)]
allDataFrames

['I94_df',
 'US_City_df',
 'airport_codes_df',
 'temperature_df',
 'c',
 'a',
 '_23',
 'cleaned_i94_df',
 'cleaned_US_City_df',
 'cleaned_temperature_df',
 'cleaned_airport_codes_df',
 'city_df',
 'immigrant_df',
 'city_temp_df',
 'ariport_df',
 'immigration_df']

In [198]:

def cheak_table(*args) :
    
    for table in args:

        print(isinstance(table,DataFrame))
        
cheak_table(immigrant_df,city_temp_df,city_df,ariport_df,immigration_df)        

True
True
True
True
True


In [199]:
def cheak_if_the_table_is_not_null(*args):
    for table in args:
        if table.count() > 0:
            print(True)
        else:
             print(False)
            
 
    

In [200]:
cheak_if_the_table_is_not_null(immigrant_df,city_temp_df,city_df,ariport_df,immigration_df)        

True
True
True
True
True


In [201]:
def check_for_Exclude_duplicate(*args):
    for table in args:
        if table.count() > table.dropDuplicates().count():
            print(False)
        else:
             print(True)
            

In [202]:
check_for_Exclude_duplicate(immigrant_df,city_temp_df,city_df,ariport_df,immigration_df)        

True
True
True
True
True


#### 4.3 Data dictionary 
Dimension Tables

immigrant_df:

    id: id of immigrant
    age: age of immigrant
    gender: gender of immigrant
    visa_type: immigrant's visa type
    visa_code : Visa codes collapsed into three categories:
           1 = Business
           2 = Pleasure
           3 = Student
    

city_df:

       City_Name: name of the city
       State_Name: name of the  State
       Median_Age: median age of the city
       Male_Population: Population of Male in city
       Female Population: Population of Female  in city
       Total_Population: total Population in city
       State_Code: state code of the city
       Race:Race categories
       Black or African-American
        Hispanic or Latino
        White
        Asian
        American Indian and Alaska Native
       count_of_Race_in_city: count_of_Race_in_city


city_temp_df:

    City: city Name
    AverageTemperature: average temperature in city 

ariport_df:

    id: ariport ID
    type:type of airport
    name: name of airport
    iso_country:  internationally recognized codes that designate every country
    local_code:local_code of ariport
    coordinates : coordinates of Flight 
    
Fact Table

immigration_df:

    id: id
    state_code: state code of arrival city
    city_code: city port code of arrival city
    date: date of arrival
    count: count of immigrant's entries into the US
    


#### Step 5: Complete Project Write Up
* technologies: since the Data set is huge i decide to use pyspark to deal with the big data and use Pandas in first to divide the data into chunksize to explore the Data. 
* Update Cycles
    1- reporting  
    2- new data to be lowod into the system
    3 - through Apache Airflow
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x. We can get more nodes to spark cluster and upgrade the node power.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day. we can uee Apache Airflow to schedule and automate the   data pipeline process.
 
 * The database needed to be accessed by 100+ people. We can use Amazon Redshift to handle the accessed by users and use IAM role which is an       IAM entity that defines a set of permissions to users for making AWS service requests. 
   