# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project analyzes 2 types of data ,upload them to s3 bucket , put them in tables
in redshift using pipelines and query the data while on the redshift cluster .

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# All imports and installs
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import DateType
from datetime import datetime
from boto.s3.key import Key
import boto
import boto.s3
import sys
import os
import pandas as pd
import numpy as np
import configparser
import psycopg2
import boto3
import matplotlib.pyplot as plt
%matplotlib inline

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project normalized tables are used as the tables used haven't a large number of columns and are increased 
vertically so dimentional models aren't needed .

Here data includes immigrants ids, gender, visa type, address, citizenship, date of arrival and travel methods.

The end solution aims at finding the most flouded countries with immigrants ,their gender and citizenship.

Apache spark , apache airflow , postgresql , s3 bucket and redshift cluster are used in this project

#### Describe and Gather Data 
5 data sets are used in this project : 4 are csv files and 1 is a parquet file.

data files are : I94cntyl_country,I94mode_travel,I94visa_type,addr csv_files

 :sas_data parquet_file

Here using configparser to get the secret and access key for the aws account and 
data needed to access the redshift cluster

In [2]:
config = configparser.ConfigParser()
config.read('capstone.cfg')
KEY         = config.get('AWS','KEY')
SECRET      = config.get('AWS','SECRET')
BUCKET_NAME = config.get('CLUSTER','BUCKET_NAME')
DB_NAME     = config.get('CLUSTER','DB_NAME')
DB_USER     = config.get('CLUSTER','DB_USER')
DB_PASSWORD = config.get('CLUSTER','DB_PASSWORD')
DB_PORT     = config.get('CLUSTER','DB_PORT')
DB_ENDPOINT = config.get('CLUSTER','DB_ENDPOINT')

loading spark session and grabing sas_data parquet folder 

In [3]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
config = configparser.ConfigParser()
spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key",SECRET)

In [4]:
df_spark =spark.read.load('./sas_data')

loading all data sets used in this project to analyze

In [4]:
df_spark= spark.read.parquet('sas_data')
country=spark.read.csv('I94cntyl_country.csv' , inferSchema= True)
visa=spark.read.csv('I94visa_type.csv' ,  inferSchema= True)
travel=spark.read.csv('I94mode_travel.csv' ,  inferSchema= True)
address=spark.read.csv('addr.csv' ,  inferSchema= True)

### Step 2: Explore and Assess the Data
#### Explore the Data 
many unwanted cloumns, null values and wrong type columns

#### Cleaning Steps
Document steps necessary to clean the data
Here, many unneeded columns,null values and duplicates from sas_data were removed and numeric values in other files are 
cast to double type also arr_date is cast to date type using udf to be able to extract year and month from it

In [5]:
country.show(1)
visa.show(4)
travel.show(1)
address.show(1)
df_spark.show(1)

+-----+--------------------+
|  _c0|                 _c1|
+-----+--------------------+
|582.0| 'MEXICO Air Sea ...|
+-----+--------------------+
only showing top 1 row

+---+----------+
|_c0|       _c1|
+---+----------+
|  1|'Business'|
|  2|'Pleasure'|
|  3| 'Student'|
+---+----------+

+---+-----+
|_c0|  _c1|
+---+-----+
|  1|'Air'|
+---+-----+
only showing top 1 row

+---+---------+
|_c0|      _c1|
+---+---------+
| AL|'ALABAMA'|
+---+---------+
only showing top 1 row

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+

In [6]:
country.printSchema()
visa.printSchema()
travel.printSchema()
address.printSchema()
df_spark.printSchema()
df_spark.count()

root
 |-- _c0: double (nullable = true)
 |-- _c1: string (nullable = true)

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = tru

3096313

In [7]:
visa = visa.withColumn("_c0",visa._c0.cast('double'))
travel = travel.withColumn("_c0",travel._c0.cast('double'))

In [8]:
sas_data = df_spark.drop('visapost','occup','entdepu','insnum','count','entdepd','matflag','i94yr','i94mon','i94port','i94bir','dtadfile','entdepa','dtaddto','insnum','airline','admnum','fltno','visatype').drop_duplicates().dropna()

In [9]:
def convert_to_datetime(date):
    """
    Convert to yyyy-mm-dd format
    
    :return: date in yyyy-mm-dd format
    """   
    if date is not None:
        return pd.Timestamp('1960-1-1')+pd.to_timedelta(date, unit='D')
convert_to_datetime_udf = udf(convert_to_datetime, DateType())
sas_data = sas_data.withColumn('arrdate', convert_to_datetime_udf(col('arrdate')))

In [10]:
country.printSchema()
visa.printSchema()
travel.printSchema()
address.printSchema()
sas_data.printSchema()
sas_data.count()

root
 |-- _c0: double (nullable = true)
 |-- _c1: string (nullable = true)

root
 |-- _c0: double (nullable = true)
 |-- _c1: string (nullable = true)

root
 |-- _c0: double (nullable = true)
 |-- _c1: string (nullable = true)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)

root
 |-- cicid: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- biryear: double (nullable = true)
 |-- gender: string (nullable = true)



2431703

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
normalized data model is used here as data isn't that large to need dimentional model as star schema model

#### 3.2 Mapping Out Data Pipelines
Data are uploaded to a s3 bucket then using airflow and a redshift cluster to create tables and load data into them
to do queries on the cluster

### uploading data to s3 bucket
if the bucket is publicly accessable

In [22]:
sas_data.write.mode("overwrite").parquet(path="s3a://{}/sas_data/".format(BUCKET_NAME))
country.write.csv("s3a://{}/country.csv".format(BUCKET_NAME))
visa.write.csv("s3a://{}/visa.csv".format(BUCKET_NAME))
travel.write.csv("s3a://{}/travel.csv".format(BUCKET_NAME))
address.write.csv("s3a://{}/address.csv".format(BUCKET_NAME))

### uploading data to s3 bucket

if the bucket isn't publicly accessable

In [None]:
sas_data.write.parquet("sas_data_modified")
country.write.csv("country_modified",header = True)
visa.write.csv("visa_modified",header = True)
travel.write.csv("travel_modified",header = True)
address.write.csv("address_modified",header = True)

In [26]:
import shutil
shutil.rmtree("sas_data_modified", ignore_errors=False, onerror=None)

In [59]:
session = boto3.Session(
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
                       )
s3 = session.resource('s3')
bucket_name = BUCKET_NAME
s3.meta.client.upload_file(Filename='country_modified/part-00000-cdfad2ea-b3ae-49b7-97a3-769d1b623d20-c000.csv', Bucket=bucket_name, Key='country.csv')
s3.meta.client.upload_file(Filename='address_modified/part-00000-0aeebf1d-4890-4af7-8940-df39aa0a1e53-c000.csv', Bucket=bucket_name, Key='address.csv')
s3.meta.client.upload_file(Filename='travel_modified/part-00000-d5f35a38-7205-4ae8-9548-d3a39de4daed-c000.csv', Bucket=bucket_name, Key='travel.csv')
s3.meta.client.upload_file(Filename='visa_modified/part-00000-0af59931-38df-4b5c-a12b-a1e109c9f821-c000.csv', Bucket=bucket_name, Key='visa.csv')

In [23]:
spark.read.parquet("s3a://{}/sas_data".format(BUCKET_NAME)).show(1)

+-----+------+------+----------+-------+-------+-------+-------+-------+------+
|cicid|i94cit|i94res|   arrdate|i94mode|i94addr|depdate|i94visa|biryear|gender|
+-----+------+------+----------+-------+-------+-------+-------+-------+------+
| 91.0| 103.0| 103.0|2016-04-01|    1.0|     IN|20551.0|    1.0| 1974.0|     F|
+-----+------+------+----------+-------+-------+-------+-------+-------+------+
only showing top 1 row



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
data is built using airflow 

### If not using s3 bucket or airflow
these are codes to create and load tables in the workspace

In [11]:
country.createOrReplaceTempView("country")
visa.createOrReplaceTempView("visa")
travel.createOrReplaceTempView("travel")
sas_data.createOrReplaceTempView("sas_data")
address.createOrReplaceTempView("address")

In [18]:
Drop_table_staging_sas_data = '''Drop table if exists data.staging_sas_data'''
Drop_table_staging_country = '''Drop table if exists data.staging_country'''
Drop_table_staging_visa = '''Drop table if exists data.staging_visa'''
Drop_table_staging_travel = '''Drop table if exists data.staging_travel'''
Drop_table_staging_address = '''Drop table if exists data.staging_address'''
Drop_table_immigrants = '''Drop table if exists data.immigrants'''
Drop_table_address = '''Drop table if exists data.address'''
Drop_table_date = '''Drop table if exists data.date'''

create_schema = '''Create schema data '''

create_staging_sas_data = ''' create table if not exists data.staging_sas_data (
                                                         cicid float ,
                                                         i94cit float,
                                                         i94res float,
                                                         arrdate date,
                                                         i94mode float,
                                                         i94addr string,
                                                         depdate float,
                                                         i94visa float,
                                                         biryear float,
                                                         gender string ) '''
create_staging_country = ''' create table if not exists data.staging_country (
                                                         i94cntyl float ,
                                                         country string)'''
create_staging_visa = ''' create table if not exists data.staging_visa (
                                                         i94visa float ,
                                                         visa_value string)'''
create_staging_travel = '''Create table if not exists data.staging_travel (
                                                            i94mode float,
                                                            travel_mode string)'''
create_staging_address = ''' create table if not exists data.staging_address(
                                                                i94addr string,
                                                                place string) '''
create_immigrants_table ='''create table if not exists data.immigrants (
                                                       citizen_id float ,
                                                       country_cit string,
                                                       birth_year float,
                                                       gender string)'''
create_address_table = '''create table if not exists data.address (
                                                       citizen_id float ,
                                                       address string,
                                                       visa_type string ,
                                                       travel_mode string)'''
create_date_table = '''create table if not exists data.date (
                                                  citizen_id float ,
                                                  arrival_date date,
                                                  arrival_day int,
                                                  arrival_month int,
                                                  arrival_year int
                                                                    )'''

In [19]:
create_tables = [create_staging_sas_data,create_staging_country,create_staging_visa,create_staging_travel,create_staging_address,create_immigrants_table,create_address_table,create_date_table]
drop_tables = [Drop_table_staging_sas_data,Drop_table_staging_country,Drop_table_staging_visa,Drop_table_staging_travel,Drop_table_staging_address,Drop_table_immigrants,Drop_table_address,Drop_table_date]

In [20]:
for i in drop_tables :
    spark.sql(i)
for i in create_tables :
    spark.sql(i)    

In [23]:
load_staging_sas_data = (''' insert into data.staging_sas_data
                                        select   s.cicid ,
                                                 s.i94cit ,
                                                 s.i94res ,
                                                 s.arrdate ,
                                                 s.i94mode ,
                                                 s.i94addr ,
                                                 s.depdate ,
                                                 s.i94visa ,
                                                 s.biryear ,
                                                 s.gender
                                        From sas_data as s       ''') 
load_staging_country =('''insert into data.staging_country 
                                         select c._c0 ,
                                                c._c1 
                                          From  country as c ''')
load_staging_visa = (''' insert into data.staging_visa 
                                         select v._c0 ,
                                                v._c1
                                          From  visa as v   ''')
load_staging_travel = (''' insert into data.staging_travel 
                                         select t._c0 ,
                                                t._c1
                                          From  travel as t ''')
load_staging_address=('''insert into data.staging_address
                                          select a._c0 ,
                                                 a._c1
                                          From  address as a ''')
load_immigrants_table = ('''Insert into data.immigrants 
                                        select s.cicid,
                                               c.country,
                                               s.biryear,
                                               s.gender
                                        From  data.staging_sas_data s
                                        left join data.staging_country c
                                             ON c.i94cntyl = s.i94cit''')
load_address_table = ('''Insert into data.address 
                                        select b.cicid,
                                                  b.place,
                                                  b.visa_value,
                                                  t.travel_mode
                                           From (select o.cicid,
                                                     o.place,
                                                     v.visa_value,
                                                     o.i94visa,
                                                     o.i94mode
                                                From ( select s.cicid,
                                                              a.place,
                                                              s.i94visa ,
                                                              s.i94mode
                                                         From data.staging_sas_data s
                                                         Left Join data.staging_address a
                                                         ON s.i94addr = a.i94addr   
                                                      ) as o
                                                             Left Join data.staging_visa v
                                                             ON v.i94visa = o.i94visa 
                                                 ) as b
                                                                    Left Join data.staging_travel t
                                                                    ON t.i94mode = b.i94mode
                      ''')
load_date_table = (''' Insert into data.date 
                                      select cicid,
                                             arrdate,
                                             date_format(date(arrdate),'dd') as day,
                                             Month(arrdate) as month,
                                             date_format(date(arrdate),'yyyy') as year
                                      From   data.staging_sas_data      ''')
                                
load_tables =[load_staging_sas_data,load_staging_visa,load_staging_travel,load_staging_address,load_immigrants_table,load_address_table,load_date_table]

In [24]:
for i in load_tables :
    spark.sql(i)

In [25]:
spark.sql(''' select * from data.date ''').show(10)

+----------+------------+-----------+-------------+------------+
|citizen_id|arrival_date|arrival_day|arrival_month|arrival_year|
+----------+------------+-----------+-------------+------------+
|      41.0|  2016-04-01|          1|            4|        2016|
|      88.0|  2016-04-01|          1|            4|        2016|
|     337.0|  2016-04-01|          1|            4|        2016|
|     411.0|  2016-04-01|          1|            4|        2016|
|     412.0|  2016-04-01|          1|            4|        2016|
|     684.0|  2016-04-01|          1|            4|        2016|
|    1124.0|  2016-04-01|          1|            4|        2016|
|    1270.0|  2016-04-01|          1|            4|        2016|
|    1437.0|  2016-04-01|          1|            4|        2016|
|    1481.0|  2016-04-01|          1|            4|        2016|
+----------+------------+-----------+-------------+------------+
only showing top 10 rows



#### 4.2 Data Quality Checks
 data checks in airflow check if each table has rows and has content in them 

### running queries on the finished tables to see the output result

In [24]:
import os
%load_ext sql
conn_string="postgresql://{}:{}@{}:{}/{}".format( DB_USER, DB_PASSWORD,DB_ENDPOINT ,DB_PORT,DB_NAME)
print(conn_string)
%sql $conn_string

The sql extension is already loaded. To reload it, use:
  %reload_ext sql
postgresql://awsuser:Shizoka1234@redshift-cluster-1.cvfmpkrnx51k.us-west-2.redshift.amazonaws.com:5439/dev


'Connected: awsuser@dev'

United Kingdom is the highest country of the number of travellers and most of them travel for pleasure

In [25]:
%%sql
Select country_cit ,gender , count(country_cit),visa_type
From data.immigrants i
Join data.address d
ON d.citizen_id = i.citizen_id
group by 1,2,4
order by 3 desc
limit 10 ;

 * postgresql://awsuser:***@redshift-cluster-1.cvfmpkrnx51k.us-west-2.redshift.amazonaws.com:5439/dev
10 rows affected.


country_cit,gender,count,visa_type
'UNITED KINGDOM',F,1843792,'Pleasure'
'UNITED KINGDOM',M,1777888,'Pleasure'
'JAPAN',F,1166496,'Pleasure'
'FRANCE',F,1106928,'Pleasure'
'MEXICO Air Sea and Not Reported (I-94 no land arrivals)',F,1076704,'Pleasure'
'FRANCE',M,1012688,'Pleasure'
'JAPAN',M,986624,'Pleasure'
'CHINA ; PRC',F,964976,'Pleasure'
'MEXICO Air Sea and Not Reported (I-94 no land arrivals)',M,870432,'Pleasure'
'BRAZIL',F,820992,'Pleasure'


Here shows that most travellers use air method and then land more than sea and
most of them are men

In [26]:
%%sql
select a.travel_mode ,i.gender , count(i.gender)
from data.address a
join data.immigrants i
ON i.citizen_id = a.citizen_id
group by 1,2
order by 3 desc ;

 * postgresql://awsuser:***@redshift-cluster-1.cvfmpkrnx51k.us-west-2.redshift.amazonaws.com:5439/dev
14 rows affected.


travel_mode,gender,count
'Air',M,19639040
'Air',F,18409808
'Land',M,365312
'Land',F,325856
'Sea',F,65088
'Sea',M,61120
'Not reported',F,17440
'Not reported',M,14960
'Air',X,4128
'Land',U,3104


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

Spark is used here as an easy and fast etl choice to handle the data however large the become and by using airflow and dags 
any mistake can be spotted easily along the way and the time of production is controlled.
Also using a normalized schema is suitable for the size of the data 

It is more appropiate to update this data once a month as they are not that large or too dependant on time

### For the 1st senario
if the data was increased vertically then the change would only be in the size of the cluster but if the
increase is vertically then a star schema approach is better in that case

### For the 2nd senario
then in the dag of the pipeline we edit the start time and schedule interval to be daily at 7 am

### For the 3rd senario
If the data to be accessed by more than 100 people then it will be stored on s3 bucket that has a public access
with ACLs mode not to depend on aws accounts 