# Udacity Data Engineer Nanodegree - Capstone Project

## Project summary - 
    In this project we are building a data warehouse which will integrate the data from 3 datasets, the immigration dataset, global temperature dataset and the us demography dataset. By the final modelled data we can aim find out the visa related information of a person, by which airline he came and if he departed on a particular date or not and how was the wheather on his visit day. Also we can find out which state is most favoured by the immigrants for visiting and what was the population of that state by sex, foreign born and veteran, also in which month or year was the rate of immigration highest.
    The output of one such analysis, the demographic population of states having maximum immigration rate can be seen in the below image - 
  <img src="sqlqueryoutput.PNG" width="600"/>
    
    Steps involved in the process- 
    Step 1:- Scope the Project and Gather Data
    Step 2:- Explore and Assess the Data
    Step 3:- Define the Data Model
    Step 4:- Run ETL to Model the Data
    Step 5:- Future design consideration

### Step 1: Scope the Project and Gather Data

#### Scope - 
    This project will integrate the immigration data, global temperature data and US demographic data to setup a data warehouse with fact and dimensions tables. We will use AWS redshift as data warehouse and we will model our data according to star schema.

#### Describe and Gather Data 
##### Datasets involved - 
    1. I94 immigration dataset - This data comes from the US National Tourism and Trade Office. The National Travel and Tourism Office (NTTO) works cooperatively with the U.S. Department of Homeland Security (DHS)/U.S. Customs and Border Protection (CBP) to release I-94 Visitor Arrivals Program data, providing a comprehensive count of all visitors (overseas all travel modes plus Mexico air and sea) entering the United States. The dataset can be found on the following link - https://www.trade.gov/travel-and-tourism-research
    Dataset Format - SAS7BDAT
    Number of records - 3096313

    2. Global temperature dataset - The following dataset is availiable on Kaggle and is repackaged from a compilation put together by the Berkeley Earth, which is affiliated with Lawrence Berkeley National Laboratory. The Berkeley Earth Surface Temperature Study combines 1.6 billion temperature reports from 16 pre-existing archives.
    Link to dataset - https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data
    Data format - CSV
    Number of records - 645675

    3. U.S. City Demographic Data - This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 
    This data comes from the US Census Bureau's 2015 American Community Survey.
    Link to dataset - https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
    Data format - CSV
    Number of records - 2891

    4. Label dataset - At last we have a file "I94_SAS_Labels_Descriptions.SAS" which contains the state code, country code and city code.


##### Tools used - 
    1. Python - Python is a high-level, general-purpose programming language. 
    2. AWS S3 - Amazon Simple Storage Service (Amazon S3) is an object storage service offering industry-leading scalability, data availability, security, and performance. We have used this to store the dataset in  parquet files.
    3. AWS redshift - Amazon Redshift is a data warehouse product which forms part of the larger cloud-computing platform Amazon Web Services. It is built on top of technology from the massive parallel processing data warehouse company ParAccel, to handle large scale data sets and database migrations. We will use this to store the tables.
    4. AWS EMR - Amazon EMR is the industry-leading cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning using open-source frameworks such as Apache Spark, Apache Hive, and Presto


    Note - Make sure you have populated the credentials in the config.cfg file present in the aws folder, also make sure the tables are created on the redshift cluster by running the create_table.sql script.

In [1]:
import configparser
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, StringType, DoubleType, IntegerType, DateType
from pyspark.sql.functions import udf, col, lit, year, month, upper, to_date
from pyspark.sql.functions import monotonically_increasing_id, min, max, avg, sum, count
import psycopg2
from datetime import datetime, timedelta

In [2]:
config = configparser.ConfigParser()
config.read_file(open('./aws/config.cfg'))

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
inputS3 = config['S3']['inputS3']
outputS3 = config['S3']['outputS3']
ARN=config['AWS']['ARN']
HOST = config['CLUSTER']['HOST']
DB = config['CLUSTER']['DB']
DBUSER = config['CLUSTER']['DBUSER']
DBPASSWORD = config['CLUSTER']['DBPASSWORD']
DBPORT = config['CLUSTER']['DBPORT']

In [3]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()


In [4]:
label_data =spark.read.text(inputS3+'I94_SAS_Labels_Descriptions.SAS').toPandas()

In [6]:
immigration_data = spark.read.format('com.github.saurfang.sas.spark').load(inputS3 + 'i94_apr16_sub.sas7bdat')

In [7]:
demography_data=spark.read.options(delimiter=';').options(inferSchema=True).csv(inputS3 + 'us-cities-demographics.csv',header=True)

In [8]:
temperature_data = spark.read.options(inferSchema=True).csv(inputS3 + 'GlobalLandTemperaturesByState.csv', header=True)

### Step 2: Explore and Assess the Data
#### Explore the Data 


#### Dataset 1 - Immigration dataset

In [9]:
immigration_data.show(3)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| n

In [10]:
immigration_data.count()

3096313

Steps to be performed -
1. The arrival date and departure date format should be corrected
2. Drop records which have airline and i94addr as null
3. The residence country and citizen country are double, convert it to string 

#### Dataset 2 - Label dataset

In [11]:
label_data.head(10)

Unnamed: 0,value
0,libname library 'Your file location' ;
1,proc format library=library ;
2,
3,/* I94YR - 4 digit year */
4,
5,/* I94MON - Numeric month */
6,
7,/* I94CIT & I94RES - This format shows all the...
8,value i94cntyl
9,"582 = 'MEXICO Air Sea, and Not Reported (I..."


Steps to be performed -
1. The country, city and  state codes will have to be extracted


#### Dataset 3 - Temperature dataset

In [12]:
temperature_data.show(3)

+-------------------+------------------+-----------------------------+-----+-------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|State|Country|
+-------------------+------------------+-----------------------------+-----+-------+
|1855-05-01 00:00:00|            25.544|                        1.171| Acre| Brazil|
|1855-06-01 00:00:00|            24.228|                        1.103| Acre| Brazil|
|1855-07-01 00:00:00|            24.371|                        1.044| Acre| Brazil|
+-------------------+------------------+-----------------------------+-----+-------+
only showing top 3 rows



In [13]:
temperature_data.count()

645675

Steps to be perfromed - 
1. Filter out country by USA
2. Convert timestamp to date
3. Replace state full names with state codes

#### Dataset 4 - Demography dataset

In [14]:
demography_data.show(3)

+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|Hispanic or Latino|25924|
|       Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|             White|58723|
|       Hoover|      Alabama|      38.5|          38040|            46799|           

In [15]:
demography_data.count()

2891

Steps to be perfromed - 
1. Tranform city, state in dimension table to upper case to match city_code and state_code table


#### Cleaning Steps


1. Process the immigration data

In [16]:
def SAS_to_datetime(date):
    if date == None:
        return '1960-1-1'
    return str(pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1').date())

def double_to_str(num):
    return str(int(num))

In [17]:
def clean_immigration_data(immigration_data):
    SAS_to_datetimeUDF = udf(lambda z: SAS_to_datetime(z),StringType())
    double_to_strUDF = udf(lambda z: double_to_str(z),StringType())
    immigration_data = immigration_data.withColumn('arrival_date',SAS_to_datetimeUDF(immigration_data['arrdate']))
    immigration_data = immigration_data.withColumn('departure_date',SAS_to_datetimeUDF(immigration_data['depdate']))
    immigration_data =immigration_data.withColumn('arrival_date', to_date(col('arrival_date'),'yyyy-MM-dd'))
    immigration_data =immigration_data.withColumn('departure_date', to_date(col('departure_date'),'yyyy-MM-dd'))
    immigration_data = immigration_data.drop('arrdate','depdate')
    immigration_data = immigration_data.filter('i94addr is not NULL')
    immigration_data = immigration_data.filter('airline is not NULL')
    immigration_data = immigration_data.withColumn('i94cit', double_to_strUDF(col('i94cit')))
    immigration_data = immigration_data.withColumn('i94res', double_to_strUDF(col('i94res')))
    immigration_data = immigration_data.withColumn('i94visa', double_to_strUDF(col('i94visa')))
    print("Immigration table after processing")
    immigration_data.show(3)
    return immigration_data


In [18]:
processed_immigration_data = clean_immigration_data(immigration_data)

Immigration table after processing
+-----+------+------+------+------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+--------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|i94mode|i94addr|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|arrival_date|departure_date|
+-----+------+------+------+------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+--------------+
| 15.0|2016.0|   4.0|   101|   101|    WAS|    1.0|     MI|  55.0|      2|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M|  null|     OS|  6.66643185E8|   93|      B2|  2016-04-01|    2016-08-25|
| 16.0|2016.0|   4.0|   101| 


2. Process the Label file

In [19]:
def create_country_codeDF(label_data):
    country_code = {}
    for countries in label_data.value[9:298]:
        pair = countries.split('=')
        code, country = pair[0].strip(), pair[1].strip().strip("'")
        country_code[code] = country
    return country_code
    

In [20]:
def create_city_codeDF(label_data):
    city_code = {}
    for cities in label_data.value[303:962]:
        pair = cities.split('=')
        code, city = pair[0].strip("\t").strip().strip("'"), pair[1].strip('\t').strip().strip("''")
        city_code[code] = city
    return city_code


In [21]:
def create_state_codeDF(label_data):
    state_code = {}
    for states in label_data.value[982:1036]:
        pair = states.split('=')
        code, state = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
        state_code[code] = state
    return state_code


In [22]:
def create_visa_codeDF(label_data):
    visa_code = {}
    for visa in label_data.value[1046:1049]:
        pair = visa.split('=')
        code, visa = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
        visa_code[code] = visa
    return visa_code

In [23]:
def clean_label_data(label_data):
    country_code = create_country_codeDF(label_data)
    city_code = create_city_codeDF(label_data)
    state_code = create_state_codeDF(label_data)
    visa_code = create_visa_codeDF(label_data)
    country_code = spark.createDataFrame(country_code.items(), ['code', 'country'])
    city_code = spark.createDataFrame(city_code.items(), ['code', 'city'])
    state_code= spark.createDataFrame(state_code.items(), ['code', 'state'])
    visa_code = spark.createDataFrame(visa_code.items(), ['code', 'visa'])
    return country_code, city_code,state_code, visa_code

In [24]:
country_code, city_code,state_code, visa_code=clean_label_data(label_data)

3. Process demography data

In [25]:
def clean_demography_data(demography_data):
    upper_case_transform = udf(lambda z: str(z).upper(),StringType())
    demography_data =demography_data.withColumn('City',upper_case_transform(demography_data['City']))
    demography_data =demography_data.withColumn('State',upper_case_transform(demography_data['State'])) 
    print("Demography table after processing -")
    demography_data.show(3)
    return demography_data

In [26]:
processed_demography_data = clean_demography_data(demography_data)

Demography table after processing -
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|SILVER SPRING|     MARYLAND|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|Hispanic or Latino|25924|
|       QUINCY|MASSACHUSETTS|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|             White|58723|
|       HOOVER|      ALABAMA|      38.5|         

4. Process temperature data

In [27]:
def clean_temperature_data(temperature_data, state_code): 
    upper_case_transform = udf(lambda z: str(z).upper(),StringType())
    temperature_data = temperature_data.filter(col('Country') == 'United States')
    temperature_data = temperature_data.withColumn('dt', to_date('dt'))
    temperature_data = temperature_data.withColumn('State', upper_case_transform(col('State')))
    temperature_data = temperature_data.na.drop(subset=['AverageTemperature'])
    temperature_data = temperature_data.join(state_code, state_code.state == temperature_data.State)
    temperature_data = temperature_data.select('dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'code', 'Country')
    print("temperature table after processing -")
    temperature_data.show(5)
    return temperature_data

In [28]:
dim_temperature = clean_temperature_data(temperature_data, state_code)

temperature table after processing -
+----------+------------------+-----------------------------+----+-------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|code|      Country|
+----------+------------------+-----------------------------+----+-------------+
|1743-11-01|             4.597|                         1.99|  NJ|United States|
|1744-04-01|            10.675|                        2.406|  NJ|United States|
|1744-05-01|            16.271|                        1.912|  NJ|United States|
|1744-06-01|            21.755|                        1.947|  NJ|United States|
|1744-07-01|22.881999999999998|                        1.865|  NJ|United States|
+----------+------------------+-----------------------------+----+-------------+
only showing top 5 rows



### Step 3: Define the Data Model


#### 3.1 Conceptual Data Model
The data model and data dictionary is present in the excel sheet, name of sheet - model.xlsx
For this project we will be using star schema, we are using the fact table and dimension tables as below -
Fact table - fact_immigration
Dimension tables -  dim_city_population, dim_state_population, country_code, state_code, city_code, visa_code, dim_temperature
The reason we chose star schema is because it is simple to design and it is easy to derieve business insight from this model.


Description of our data model - 

Fact_immigration is our fact table, it has the actual immigration records. The fact table have fields city_code, state_code, country_code and visa_code which serve as a reference to the city_code, state_code, country_code and visa_code tables. For connectivity between the demography dataset we had to first group the demography dataset by state and then aggregate the parameters. Later the state_code of the fact_immigration table referenced to the state_code of the dim_state_population table. Now to retreive the city level data would not have been possible as our label dataset did not had all the cities of the demography table so we have referenced the dim_city_population table to dim_state_population by state_code. The temperature dataset dimension(dim_temperature) was at the state level so we used a composite primary key(dt,code) was used and the fact_immigration table referenced to it by arrive_date and state_code.

<img src="data_model.png" width="600"/>


#### 3.2 Mapping Out Data Pipelines
1. Assume all data is stored in S3 bucket as below - 
    - s3://bucket/Data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
    - s3://bucket/Data/I94_SAS_Labels_Descriptions.SAS
    - s3://bucket/Data/GlobalLandTemperaturesByState.csv
    - s3://bucket/Data/us-cities-demographics.csv
2. Read the files from the s3 bucket into the EMR cluster
3. Perform data cleaning
4. Create dataframes with required columns
5. Write tables in parquet format to destination S3 bucket
6. Read parquet data and perform data quality checks
7. Load the data to redshift cluster

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model


#### Immigration dataset

##### Create immigration fact

In [29]:
def create_immi_fact_table(processed_immigration_data):
    fact_immigration = processed_immigration_data.select('cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr',\
                                 'arrival_date', 'departure_date', 'i94visa', 'visatype', 'i94cit','biryear', 'gender', 'airline', 'admnum').distinct()
    fact_immigration = fact_immigration.toDF('cic_id', 'year', 'month', 'city_code', 'state_code',\
                   'arrive_date', 'departure_date', 'visa_code', 'visatype', 'citizen_country','birthyear', 'gender', 'airline', 'admitnum')
    fact_immigration = fact_immigration.withColumn('country_immigrated_to', lit('United States'))     
    print("Created fact_immigration table as below -")   
    fact_immigration.show(5)            
    return fact_immigration

In [30]:
fact_immigration = create_immi_fact_table(processed_immigration_data)

Created fact_immigration table as below -
+------+------+-----+---------+----------+-----------+--------------+---------+--------+---------------+---------+------+-------+---------------+---------------------+
|cic_id|  year|month|city_code|state_code|arrive_date|departure_date|visa_code|visatype|citizen_country|birthyear|gender|airline|       admitnum|country_immigrated_to|
+------+------+-----+---------+----------+-----------+--------------+---------+--------+---------------+---------+------+-------+---------------+---------------------+
| 294.0|2016.0|  4.0|      NYC|        NY| 2016-04-01|    2016-04-02|        2|      WT|            103|   1991.0|  null|     DL|5.5450935033E10|        United States|
| 739.0|2016.0|  4.0|      FTL|        FL| 2016-04-01|    2016-04-02|        2|      WT|            103|   2007.0|     M|     NK|5.5453752933E10|        United States|
| 827.0|2016.0|  4.0|      BOS|        MA| 2016-04-01|    2016-04-10|        2|      WT|            104|   1989.0|    

In [31]:
fact_immigration.write.mode("overwrite").parquet(outputS3 + 'fact_immigration')

#### Label dataset

##### Create Country code dimension

In [32]:
country_code.write.mode("overwrite").parquet(outputS3+'country_code')

##### Create City code dimension

In [33]:
city_code.write.mode("overwrite").parquet(outputS3 +'city_code')

##### Create State code dimension

In [34]:
state_code.write.mode("overwrite").parquet(outputS3+ 'state_code')

##### Create visa code dimension

In [35]:
visa_code.write.mode("overwrite").parquet(outputS3+ 'visa_code')

#### Demography data

In [36]:
def create_dim_demog_population(processed_demography_data):
    dim_demog_population = processed_demography_data.select(['City', 'State Code','Male Population', 'Female Population',\
                              'Number of Veterans', 'Foreign-born', 'Total Population', 'Average Household Size',  'Median Age']).distinct()
    dim_demog_population = dim_demog_population.toDF('city', 'State_Code', 'male_population', 'female_population',\
                               'num_vetarans', 'foreign_born', 'total_population', 'AvgHouseholdSize', 'Median_Age')
    dim_city_population = dim_demog_population
    dim_state_population = dim_demog_population.groupBy('State_Code') \
                    .agg(\
                        sum('male_population').alias('male_population'),\
                        sum('female_population').alias('female_population'),\
                        sum('num_vetarans').alias('num_vetarans'),\
                        sum('foreign_born').alias('foreign_born'),\
                        sum('total_population').alias('total_population'),\
                        avg('AvgHouseholdSize').alias('AvgHouseholdSize'),\
                        avg('Median_Age').alias('Median_Age')
                        )
    print("Created dim_state_population as below - ")                               
    dim_state_population.show(5)
    print("Created dim_city_population as below - ")
    dim_city_population.show(5)
    return dim_state_population, dim_city_population

In [37]:
dim_state_population, dim_city_population = create_dim_demog_population(processed_demography_data)

Created dim_state_population as below - 
+----------+---------------+-----------------+------------+------------+----------------+------------------+-----------------+
|State_Code|male_population|female_population|num_vetarans|foreign_born|total_population|  AvgHouseholdSize|       Median_Age|
+----------+---------------+-----------------+------------+------------+----------------+------------------+-----------------+
|        AZ|        2227455|          2272087|      264505|      682313|         4499542|          2.774375|          35.0375|
|        SC|         260944|           272713|       33463|       27744|          533657|             2.472|34.17999999999999|
|        LA|         626998|           673597|       69771|       83419|         1300595|             2.465|           34.625|
|        MN|         702157|           720246|       64894|      215873|         1422403|2.5009090909090914|35.61818181818182|
|        NJ|         705736|           723172|       30195|      47702

In [38]:
dim_city_population.write.mode("overwrite")\
                        .parquet(outputS3 + 'dim_city_population')

In [39]:
dim_state_population.write.mode("overwrite")\
                        .parquet(outputS3 + 'dim_state_population')

#### Temperature dataset

In [40]:
dim_temperature.show(5)

+----------+------------------+-----------------------------+----+-------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|code|      Country|
+----------+------------------+-----------------------------+----+-------------+
|1743-11-01|             4.597|                         1.99|  NJ|United States|
|1744-04-01|            10.675|                        2.406|  NJ|United States|
|1744-05-01|            16.271|                        1.912|  NJ|United States|
|1744-06-01|            21.755|                        1.947|  NJ|United States|
|1744-07-01|22.881999999999998|                        1.865|  NJ|United States|
+----------+------------------+-----------------------------+----+-------------+
only showing top 5 rows



In [41]:
dim_temperature.write.parquet(outputS3 + 'dim_temperature')

#### 4.2 Data Quality Checks

Make sure the tables are not empty and columns does not have any null value

In [42]:
table_DF = [
        {
            'table_name': 'fact_immigration',
            'columns': ['cic_id', 'arrive_date', 'city_code', 'state_code'],
            'expected_result': 0
        },
        {
            'table_name': 'dim_state_population',
            'columns': ['State_Code'],
            'expected_result': 0
        },
        {
            'table_name': 'dim_temperature',
            'columns': ['AverageTemperature','dt'],
            'expected_result': 0
        },
        {
            'table_name': 'country_code',
            'columns': ['country'],
            'expected_result': 0
        },
        {
            'table_name': 'city_code',
            'columns': ['city'],
            'expected_result': 0
        },
        {
            'table_name': 'state_code',
            'columns': ['state'],
            'expected_result': 0
        },
        {
            'table_name': 'visa_code',
            'columns': ['visa'],
            'expected_result': 0
        },
        {
            'table_name': 'dim_city_population',
            'columns': ['city', 'State_Code'],
            'expected_result': 0
        }
    ]


In [43]:
def perform_data_quality_check(spark , DF, outputS3):
    '''
    Processes individual test case, takes spark session object, an individual test case and location of created table as input
    '''
    table_name = DF['table_name']
    columnlist = DF['columns']
    expected_result = DF['expected_result']
    query = ' is NULL OR '.join(columnlist) + ' is NULL'   
    
    parquettable = spark.read.parquet(outputS3 + table_name)
    if(parquettable.count() >0):
        print( table_name+" table is not empty")
    else:
        print( table_name+" table not empty")
        
    parquettable_count = parquettable.filter(query).count()
    if parquettable_count == 0:
        print('Data quality check suceeded for {} table'.format(table_name))    
    else:
        print('Data quality check failed for {} table'.format(table_name))
    

In [44]:

def data_quality_check(spark, outputS3, table_DF):
    '''
        Perform the data quality check on the data present at the output S3 bucket, takes the spark session
        object and the destination S3 path as arguments.
    '''
    for i in table_DF:
        perform_data_quality_check(spark, i, outputS3)
    print("Checks complete for s3 staged data")


In [45]:
data_quality_check(spark,outputS3, table_DF)

fact_immigration table is not empty
Data quality check suceeded for fact_immigration table
dim_state_population table is not empty
Data quality check suceeded for dim_state_population table
dim_temperature table is not empty
Data quality check suceeded for dim_temperature table
country_code table is not empty
Data quality check suceeded for country_code table
city_code table is not empty
Data quality check suceeded for city_code table
state_code table is not empty
Data quality check suceeded for state_code table
visa_code table is not empty
Data quality check suceeded for visa_code table
dim_city_population table is not empty
Data quality check suceeded for dim_city_population table
Checks complete for s3 staged data


In [47]:
def get_connection_to_redshift():
    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(HOST,DB,DBUSER,DBPASSWORD,DBPORT))
    cur = conn.cursor()
    return conn,cur

In [48]:
def insert_data_into_redshift(conn, cur):
    query_template ="COPY {} FROM '{}{}' IAM_ROLE '{}' FORMAT AS PARQUET;"
    table_names= ['fact_immigration', 'dim_city_population','dim_state_population',\
                        'dim_temperature', 'country_code', 'city_code','state_code', 'visa_code']
    for i in table_names:
        cur.execute(query_template.format(i,outputS3,i,ARN))
        conn.commit()
        print("Data loaded in table " + i)


In [49]:
conn, cur = get_connection_to_redshift()

In [50]:
insert_data_into_redshift(conn, cur)

Data loaded in table fact_immigration
Data loaded in table dim_city_population
Data loaded in table dim_state_population
Data loaded in table dim_temperature
Data loaded in table country_code
Data loaded in table city_code
Data loaded in table state_code
Data loaded in table visa_code


Fetch statistics of top 5 states which had maximum immigration

In [51]:
query_top5states_immigration="""SELECT * from
                        (
                        SELECT 
                            immigration.state_code,
                            COUNT(cic_id) as visitor_count
                        FROM "dev"."public"."fact_immigration" as immigration
                        GROUP BY immigration.state_code
                        ORDER BY COUNT(cic_id) DESC
                        ) as demo, dim_state_population
                        WHERE 
                            dim_state_population.state_code = demo.state_code
                        ORDER BY demo.visitor_count DESC
                        LIMIT 5;"""

In [52]:
cur.execute(query_top5states_immigration)

In [53]:
rows = cur.fetchall()

In [54]:
for row in rows:
    print(row)

('FL', 187, 'FL', 3236773, 3487375, 388228, 1688931, 6796738, 2.77787234042553, 39.6666666666667)
('CA', 159, 'CA', 12278281, 12544179, 928270, 7448257, 24822460, 3.10094890510949, 36.1824817518248)
('NY', 157, 'NY', 4692055, 5123571, 204901, 3438081, 9815626, 2.76181818181818, 35.6636363636364)
('HI', 52, 'HI', 176807, 175959, 23213, 101312, 352766, 2.69, 41.4)
('TX', 39, 'TX', 7063571, 7236412, 693501, 2942164, 14299983, 2.86, 33.340350877193)


#### 4.3 Data dictionary 
The data model and data dictionary is present in the excel sheet, name of sheet - model.xlsx

#### Step 5 - Future design consideration
    A data pipeline with airflow can be setup to automatically read, process and load the data from s3 to redshift, the pipeline can be either triggerred manually or be scheduled at a specific time.

#### Project Write Up
##### Clearly state the rationale for the choice of tools and technologies for the project.
    1. Python - Python is a high-level, general-purpose programming language. It has manay useful packages namely pandas, pyspark which we have used in this project to process the data.
    2. AWS S3 - Amazon Simple Storage Service (Amazon S3) is an object storage service offering industry-leading scalability, data availability, security, and performance. We have used this to store the dataset in  parquet files.
    3. AWS redshift - Amazon Redshift is a data warehouse product which forms part of the larger cloud-computing platform Amazon Web Services. It is built on top of technology from the massive parallel processing data warehouse company ParAccel, to handle large scale data sets and database migrations. We will use this to store the tables.
    4. AWS EMR - Amazon Elastic Map Reduce is a web service that you can use to process large amounts of data efficiently. Amazon EMR uses Hadoop processing combined with several AWS products to do such tasks as web indexing, data mining, log file analysis, machine learning, scientific simulation, and data warehousing.

##### Propose how often the data should be updated and why.
    - The fact and dimension table of immigration dataset should be updated every month, temperature dataset can be updated on biweekly or monthly while the demography data should be updated once per year as it will take time to gather the data.

##### Write a description of how you would approach the problem differently under the following scenarios:


##### The data was increased by 100x.
    - The use of Redshift perfectly handles this situation in case of data warehousing, in terms of processing the data the EMR cluster can be used.

##### The data populates a dashboard that must be updated on a daily basis by 7am every day.
    - The data pipeline solution like apache airflow can we used to schedule the run before 7am everyday.


##### The database needed to be accessed by 100+ people.
    - The default value for mximum number of connections handled by redshift is 500, so use of redshift handles this requirement