# Project Title
### Data Engineering Capstone Project

#### Project Summary

In this project, I am going to analyse the immigration dataset, city demograhic, Temperature Dataset provided by udacity as well as the airline datset I have downloaded from openflights.org to perform many analytics analysis by building a star schema as below: I have one fact table and 6 dimensions



![Schema](Star_schema.JPG)

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, LongType as Lg, DateType as Date

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [2]:
# Read in the data here
#check the immigration data in a folder with the following path: ../../data/18-83510-I94-Data-2016/
directory='/data/18-83510-I94-Data-2016'
All_files = [os.path.join(os.getcwd(), directory, path) for path in os.listdir(directory)]
All_files

['/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
 '/data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat',
 '/data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat',
 '/data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat',
 '/data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat',
 '/data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat',
 '/data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat',
 '/data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat',
 '/data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat',
 '/data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat',
 '/data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat',
 '/data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat']

In [3]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [4]:
loading_path=[]
for file in All_files:
    new_path='../..'+file
    loading_path.append(new_path)

loading_path


['../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat']

In [5]:
sample_file=loading_path[0]
df_spark =spark.read.format('com.github.saurfang.sas.spark').load(sample_file)

In [6]:
df_spark.head(3)

[Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2'),
 Row(cicid=7.0, i94yr=2016.0, i94mon=4.0, i94cit=254.0, i94res=276.0, i94port='ATL', arrdate=20551.0, i94mode=1.0, i94addr='AL', depdate=None, i94bir=25.0, i94visa=3.0, count=1.0, dtadfile='20130811', visapost='SEO', occup=None, entdepa='G', entdepd=None, entdepu='Y', matflag=None, biryear=1991.0, dtaddto='D/S', gender='M', insnum=None, airline=None, admnum=3736796330.0, fltno='00296', visatype='F1'),
 Row(cicid=15.0, i94yr=2016.0, i94mon=4.0, i94cit=101.0, i94res=101.0, i94port='WAS', arrdate=20545.0, i94mode=1.0, i94addr='MI', depdate=20691.0, i94bir=55.0, i94visa=2.0, count=1.0, dtadfile=

In [7]:
print((df_spark.count(), len(df_spark.columns)))

(3096313, 28)


In [8]:

df_spark.columns

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype']

In [9]:
df_spark.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [10]:
#reading the World Temperature Data GlobalLandTemperaturesByCity.csv
df_temperature=pd.read_csv('GlobalLandTemperaturesByCity.csv')
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [11]:
#reading the U.S. City Demographic Data: This data comes from OpenSoft. 
df_US_demog= pd.read_csv('us-cities-demographics.csv', sep=';')
df_US_demog.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [76]:
#reading the Airport Code Table to map the airport with city
df_airport= spark.read.csv('airport-codes_csv.csv',header='true')
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [13]:
#reading the airline Table to get from it the original place the vistor was in it, data was from openflights.org
df_airlines= pd.read_csv('Airlines.csv')
df_airlines.head()

Unnamed: 0,Airline ID,Name,Alias,IATA,ICAO,Callsign,Country,Active
0,1,Private flight,\N,-,,,,Y
1,2,135 Airways,\N,,GNL,GENERAL,United States,N
2,3,1Time Airline,\N,1T,RNX,NEXTIME,South Africa,Y
3,4,2 Sqn No 1 Elementary Flying Training School,\N,,WYT,,United Kingdom,N
4,5,213 Flight Unit,\N,,TFU,,Russia,N


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [14]:
# Performing cleaning tasks here
# importing helpers files to clean data I94CIT and I94RES   


df_helper_countries= pd.read_csv('Countries.csv')
df_helper_countries.head()

Unnamed: 0,Country code,Country Name,Flag
0,582,MEXICO Air Sea,Valid
1,236,AFGHANISTAN,Valid
2,101,ALBANIA,Valid
3,316,ALGERIA,Valid
4,102,ANDORRA,Valid


In [15]:
#valid_code=  values.flatten().tolist()
df_filtered = df_helper_countries[df_helper_countries['Flag']=='Valid']['Country code']

#df_filtered 

In [16]:
valid_code=df_filtered.values.flatten().tolist()


In [17]:
#Clean spark frame where i94cit is not having a valid country of citizenship code, old dimension (3096313, 28)
df_spark_1 = df_spark.filter(df_spark.i94cit.isin(list(valid_code)))


In [18]:
print((df_spark_1.count(), len(df_spark_1.columns)))

(2702245, 28)


In [19]:
#Clean spark frame where i94cit is not having a valid country of residence code, old dimension (3096313, 28)
df_spark_2 = df_spark_1.filter(df_spark_1.i94res.isin(list(valid_code)))


In [20]:
print((df_spark_2.count(), len(df_spark_2.columns)))

(2702245, 28)


In [21]:
# importing helpers files to clean data i94port
df_helper_ports= pd.read_csv('ports.csv')
df_helper_ports.head()

Unnamed: 0,portofentry,Port Name,City,Flag
0,ALC,ALCAN,AK,Valid
1,ANC,ANCHORAGE,AK,Valid
2,BAR,BAKER AAF - BAKER ISLAND,AK,Valid
3,DAC,DALTONS CACHE,AK,Valid
4,PIZ,DEW STATION PT LAY DEW,AK,Valid


In [22]:
df_ports_filtered = df_helper_ports[df_helper_ports['Flag']=='Valid']['portofentry']

In [23]:
valid_code_ports=df_ports_filtered.values.flatten().tolist()
#valid_code_ports

In [24]:
#Clean spark frame where i94port is not having a valid country of residence code, old dimension (2702245, 28)
df_spark_3 = df_spark_2.filter(df_spark_2.i94port.isin(list(valid_code_ports)))

In [25]:
print((df_spark_3.count(), len(df_spark_3.columns)))

(2695274, 28)


In [26]:
### for the temparture DF will keep only records of one year
df_temperature.dtypes

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object

In [27]:
df_temperature.tail()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
8599207,2013-05-01,11.464,0.236,Zwolle,Netherlands,52.24N,5.26E
8599208,2013-06-01,15.043,0.261,Zwolle,Netherlands,52.24N,5.26E
8599209,2013-07-01,18.775,0.193,Zwolle,Netherlands,52.24N,5.26E
8599210,2013-08-01,18.025,0.298,Zwolle,Netherlands,52.24N,5.26E
8599211,2013-09-01,,,Zwolle,Netherlands,52.24N,5.26E


In [28]:
df_temperature.dt.unique()

array(['1743-11-01', '1743-12-01', '1744-01-01', ..., '2013-07-01',
       '2013-08-01', '2013-09-01'], dtype=object)

In [29]:
#Seems 2012 is the most recent year with whole year value
df_temperature=df_temperature[df_temperature['dt'].str.startswith(('2012'))]
df_temperature

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
3218,2012-01-01,1.978,0.319,Århus,Denmark,57.05N,10.33E
3219,2012-02-01,-0.021,0.277,Århus,Denmark,57.05N,10.33E
3220,2012-03-01,6.050,0.433,Århus,Denmark,57.05N,10.33E
3221,2012-04-01,6.366,0.352,Århus,Denmark,57.05N,10.33E
3222,2012-05-01,12.814,0.303,Århus,Denmark,57.05N,10.33E
3223,2012-06-01,13.329,0.388,Århus,Denmark,57.05N,10.33E
3224,2012-07-01,16.746,0.329,Århus,Denmark,57.05N,10.33E
3225,2012-08-01,17.006,0.330,Århus,Denmark,57.05N,10.33E
3226,2012-09-01,13.204,0.201,Århus,Denmark,57.05N,10.33E
3227,2012-10-01,8.320,0.310,Århus,Denmark,57.05N,10.33E


In [30]:
#I will drop the coordinates from the tempearture table

df_temperature.drop(['Latitude','Longitude'],axis=1,inplace=True)
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country
3218,2012-01-01,1.978,0.319,Århus,Denmark
3219,2012-02-01,-0.021,0.277,Århus,Denmark
3220,2012-03-01,6.05,0.433,Århus,Denmark
3221,2012-04-01,6.366,0.352,Århus,Denmark
3222,2012-05-01,12.814,0.303,Århus,Denmark


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model



My Data Model is a star schema consisting of one fact table and 6 dimensions table to give flexibility running different analysis and answering questions like:
* tourist are going mostly to which area in each month of the year
* most popular arlines and if direct flights or not
* people are coming to which state reference to home country
* female between 20 and 40 going to which city with high foreign born
* Bussiness vistors analysis and relation with city demographic in terms of population

Fact table logs_immigration coming from the immigration data:
* cicid
* I94YR 
* I94MON 
* I94PORT 
* ARRDATE
* I94MODE 
* DEPDATE 
* COUNT 
* DTADFILE 
* AIRLINE 
* FLTNO 
* ADMNUM 
* municipality

Dimensions table:

1.Immigrant:
* cicid
* I94CIT 
* I94RES 
* I94ADDR 
* I94BIR 
* OCCUP 
* BIRYEAR 
* GENDER 
* INSNUM 

2.Visa:
* VISAPOST 
* DTADDTO 
* ADMNUM 
* VISATYPE 
* I94VISA 
* ENTDEPA 
* ENTDEPD 
* ENTDEPU 
* MATFLAG 

3.Airline:
* Airline ID
* Name
* Alias
* IATA
* ICAO
* Callsign
* Country
* Active

4.Temperature:
* AverageTemperature
* AverageTemperatureUncertainty
* City
* Country

5.Airport_Data:
* type
* name
* elevation_ft
* continent
* iso_country
* iso_region
* municipality
* gps_code
* iata_code
* local_code
* coordinates

6.Demograhic:
* City
* State
* Median_Age
* Male_Population
* Female_Population
* Total_Population
* Number_of_Veterans
* Foreign_born
* Average_Household_Size
* State_Code
* Race
* Count


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [78]:
# Write code here

# 4 dimesions table arealready ready: Airline, temperature, Demograhic, Airport_Data

# Will have to create the fact table logs_immigration and 2 dimensions table: Immigrant, Visa

# Immigrant dimension table

Immigrant_extracted_fields=['cicid','i94cit','i94res','i94addr','i94bir','occup','biryear','gender','insnum','I94mon']
Immigrant=df_spark_3.selectExpr(Immigrant_extracted_fields).dropDuplicates()

# Visa dimension table

Visa_extracted_fields=['visapost','dtaddto','admnum','visatype','i94visa','entdepa','entdepd','entdepu','matflag','I94mon']
Visa=df_spark_3.selectExpr(Visa_extracted_fields).dropDuplicates()

# logs_immigration Fact table

logs_extracted_fields=['cicid','i94yr','I94mon','i94port','arrdate','i94mode','depdate','count','dtadfile','airline','fltno','admnum']
logs_immigration=df_spark_3.selectExpr(logs_extracted_fields).dropDuplicates() 

# Adding the municipality to logs_immigration


logs_immigration_temp=logs_immigration.join(
    df_airport,
    (logs_immigration.i94port == df_airport.iata_code),
    "left"    
    )

logs_extracted_fields_final=['cicid','i94yr','I94mon','i94port','arrdate','i94mode','depdate','count','dtadfile','airline','fltno','admnum','municipality']
logs_immigration_final=logs_immigration_temp.selectExpr(logs_extracted_fields_final).dropDuplicates() 

In [80]:
logs_immigration_final.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,I94mon,i94port,arrdate,i94mode,depdate,count,dtadfile,airline,fltno,admnum,municipality
0,96.0,2016.0,4.0,DAL,20545.0,1.0,,1.0,20160401,AA,262,55443460000.0,Dallas
1,209.0,2016.0,4.0,NYC,20545.0,1.0,20550.0,1.0,20160401,AF,6,55444010000.0,
2,326.0,2016.0,4.0,LOS,20545.0,1.0,20547.0,1.0,20160401,TN,2,55415980000.0,Lagos
3,343.0,2016.0,4.0,LOS,20545.0,1.0,20566.0,1.0,20160401,LH,456,55435630000.0,Lagos
4,445.0,2016.0,4.0,MIA,20545.0,1.0,20555.0,1.0,20160401,OS,97,55426510000.0,Miami


In [None]:
#writing output to parquet files partioned by month

output_data='/output/'

Immigrant.write.partitionBy("i94mon").parquet(output_data + 'Immigrant/')
Visa.write.partitionBy("i94mon").parquet(output_data + 'Visa/')
logs_immigration_final.write.partitionBy("i94mon").parquet(output_data + 'logs_immigration/')

df_US_demog.write.parquet(output_data + 'US_demog/')
df_airlines.write.parquet(output_data + 'airlines/')
df_temperature.write.parquet(output_data + 'temperature/')
df_airport.write.parquet(output_data + 'airport/')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [82]:
# Perform quality checks here

# There is a lot of quality checks can be performed, I am presenting below sample of check which is checking the tables 
#have records and are not empty 

def row_count_check(df):
    if df.count() == 0: return 'fail'
    else: return 'pass'
    

# run quality check on all data frames
print("Perform quality check on Immigrant table, the result is {}.".format(row_count_check(Immigrant)))
print("Perform quality check on Visa table, the result is {}.".format(row_count_check(Visa)))
print("Perform quality check on logs_immigration table, the result is {}.".format(row_count_check(logs_immigration_final)))

Perform quality check on Immigrant table, the result is pass.
Perform quality check on Visa table, the result is pass.
Perform quality check on logs_immigration table, the result is pass.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
In this project, I have used both Spark and pandas to handle the processing of the data. I was able to explore the data, clean it and perform data modeling

* Propose how often the data should be updated and why.
    * My schema enables data to be updated on monthly basis as I seperated my whole work in monthly interval 

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     * I would setup a proper Spark cluster using 4 or 8 EC2 instances accordying to load
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * I will need in this case to setup a datapipeline like Apache airflow because it ensures task dependency and proper execution on schedule basis. if the task will start at 1 am so I would ensure run time would be less than 6 hours etherwise may need to further add computitional power for the spark cluster
 * The database needed to be accessed by 100+ people.
     * I would suggest understand first customers queries to be done on the database so if it is predefined I would remodel the whole output data in appropriate format suits the Cassandra modeling as it alows high number of access and in same time low latency to provide response to queries.