# Capstone Project - Immigration Data Pipeline

## Table of contents
0. Imports and environment configurations
1. Data sources
2. Data model to implement
3. Connect to S3
4. Extract, Transform and Load to S3
5. Create data dictionaries
6. Quality check data
7. Load data to S3

## 0. Imports and environment configurations

### 0.1 Imports

First we import packages for data manipulation

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

pd.set_option("display.max_columns", None)

Next we import packages for cloud configuration

In [2]:
import boto3
import json
import psycopg2
import configparser
from botocore.exceptions import ClientError

Finally we update the environment to be able to write data to S3 storage in Amazon Web Services (AWS)

### 0.2 Setting up the environment

In [3]:
config = configparser.ConfigParser()
config.read_file(open('data_model/dwh.cfg'))

KEY = config.get('AWS', 'KEY')
SECRET = config.get('AWS', 'SECRET')
REGION = config.get('AWS', 'REGION')

import os
os.environ["AWS_ACCESS_KEY_ID"] = KEY
os.environ["AWS_SECRET_ACCESS_KEY"] = SECRET

### 0.3 Setting up a spark connection
The spark connection has to read data from a specific directory, "saurfang..." and also write data to S3 directories, hence the multiple configs.

In [4]:
spark = SparkSession.builder \
  .config("spark.jars.repositories", "https://repos.spark-packages.org/") \
  .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.2") \
  .enableHiveSupport() \
  .getOrCreate()

The cloud infrastructure will be set up later.

## 1. Data Sources
In this project, we build an etl pipeline that extracts data residing in SAS and CSV files, transforms that data, and loads it into a snowflake schema designed for flexible querying. 

Ultimately, we are building a data model that will give the client easy access to query their data. The client here may be anyone with an interest in immigration data, and the end purpose is to allow the client's analytics team to query the data ad hoc for simple analyses, or to integrate the data in a bigger flow to support automated analyses.

We have four data sets consisting of a main, denormalised immigration data table, and supporting tables:
1. Immigration data (sas7bdat)
2. Temperate data (CSV)
3. Demographics data (CSV)
4. Airport data (CSV)

In addition, for dataset (1) we have a description file to translate code into meaningful text. This file is parched to relate code values to the meaningful text, and for a single code, departure codes, a Google search has been used. This information will expand the data model with one more source of data.

### 1.1 Immigration data
This data set holds data relating to immigration to the US. The focal field is the ID (cicid) for each immigration case to the US. In addition, the table holds information on the person related to the case, how and where this person arrived to the US, which date the person arrived respectively was granted admission, and if this person left again.

This table will be split into four tables consisting of a fact table and three dimension tables describing the person dimension, the arrival dimension and the visa dimension. Togerher with a description file, we will further extend each of these tables with translation of various abbrevations (codes) into human readable values.

#### 1.1.0 Extracting the data from the source

In [5]:
immi_df = spark \
    .read \
    .format('com.github.saurfang.sas.spark') \
    .load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [6]:
immi_df.createOrReplaceTempView('immi_raw')

#### 1.1.1 Describing the data
We see several field names that are far from interpretable. We also see data types (numericals in particular, but also dates) that are inappropriate from a business perspective.

Overall, we can group data into facts, arrival, personal, and visa related data. This observation will be the foundation of the data model, see section 4.

In [7]:
immi_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


### 1.2 Temperature data
The temperature data provides information on average temperature measurements in cities around the world from 1743 and onwards. Two fields combine to make a foreign key, Latitude and Longitude.

#### 1.2.0 Extracting the data from the source

In [7]:
temp_df = spark.read \
  .option('header', 'true') \
  .option('inferSchema', 'true') \
  .csv('../../data2/GlobalLandTemperaturesByCity.csv')

In [8]:
temp_df.createOrReplaceTempView('temp_raw')

#### 1.2.1 Describing the data
The temperature data provides information on average temperature measurements in cities around the world from November 1st, 1743 and onwards. 

The data set will be split according to two fact types, (1) being the data and temperature, and (2) being the city and it's location in latitude and longitude. The split allows for a normalisation such that city information isn't repeated for every single temperate observation.

In [10]:
temp_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### 1.3 Demographics data
This table holds information on city populations with respect to age, male/female population, veteran and foreign-born population, and race statistics. The race statistics are registered in a long-format, such that all other statistics are repeated for each race that the city has measurements for. 

#### 1.3.0 Extracting the data from the source

In [36]:
demo_df = spark.read.option("delimiter", ";").csv('other_data/us-cities-demographics.csv', inferSchema='true', header='true')

In [37]:
demo_df.createOrReplaceTempView('demo_raw')

#### 1.3.1 Describing the data

As mentioned above, the table structure calls for a normalised which requires at least two tables. Here, the split goes into three tables:
1. A table not holding race statistics
2. A table holding race statistics, race being abbreviated into codes
3. A table holding abbreviated race codes and the race names associated with each code
This separation allows for normalisation of the table.

The table connects to immigration data through the state codes.

In [13]:
demo_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


### 1.4 Airport data
This data set holds information airports regarding their size, location and elevation etc.

The data set connects to the temperature data set through latitude and longitude values (although generally we don't have measurements for airport locations), and additionally we can connect to the immigration data set through IATA_ARPT_CD on visapost. This can only be done under the assumption that the visapost, which is a code for the department that has issued the visa to an immigrant, is done at an airport, and that the code in fact is an IATA_ARPT_CD.

#### 1.4.0 Extracting the data from the source

In [11]:
arpts_df = spark.read.option("header","true").option('inferSchema', 'true').csv('other_data/airport-codes_csv.csv')

In [12]:
arpts_df.createOrReplaceTempView('arpts_raw')

#### 1.4.1 Describing the data
There are no obvious ways to improve this data, although it can be argued that some fields are redundant. Without more business knowledge it is difficult to know which ones, and hence I keep the data.

Generally, ident seems to be similar to gps_code, iata_code and local_code, but there are exceptions everywhere between these columns.

In [16]:
arpts_df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### 1.5 Additional Code tables
An additional set of tables can be extracted from a text file. Udacity mentors helped in developing the code. These tables mainly provide translation of abbreviations or code into human readable information. No further comment will be provided.

In [10]:
with open('./sas_data/I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')

In [11]:
def code_mapper(file, idx):
    '''
    Description
    -----------
    This function takes a SAS file and a unique word as an 
    input. The unique word must match a segment of the SAS file 
    that needs to relate codes to values.
    
    Parameters
    ----------
    file : str
        A .SAS file translated to a string.
    
    idx : str
        A string representing a section in the file from which 
        to extract.
    
    Returns
    -------
    dic : dictionary
        A dictionary containing the codes as keys and their 
        translation as values.
    '''
    # First, extract everything after the key word
    f_content_2 = f_content[f_content.index(idx):]
    
    # Second, remove anything after the key word table
    f_content_2 = f_content_2[:f_content_2.index(';')].split('\n')
    
    # Third, remove empty quotation marks
    f_content_2 = [i.replace("'","") for i in f_content_2]
    
    # Fourth, get list pairs of key and values
    dic = [i.split('=') for i in f_content_2[1:]]
    
    # Fifth, turn keys and values into dictionary
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

The extraction of the data will also be the transformation, so this will take place in section 2. Data Model.

## 2. Data Model

The data model covers taking the sourced data and normalising it by structuring the data into a star schema. 

As was covered in section 1 the data is loaded from several sources:
1. Immigration data
2. Temperature data
3. Demographics data
4. Airports data
5. Text file with documentation

There will be a heavy focus on field naming conventions to facilitate analysis. Thus we have some standards:
- CD: Fields holding abbreviations such as AL end in CD to indicate they are a code for some meaning. E.g. AL is a state code for the state name Alabama.
- DT: Dates are the format 'YYYY-MM-DD' such as '2999-12-31'
- ID: These are integer fields always and hold an identification number such as IMM_ID being the identification number for an immigration case.
- NM: These fields are descriptive names that don't need interpretation, such as Alabama being a state name.
- YR: An integer-field describing the year.
- NO: Numerical fields much like ID but where the number might not be sufficiently unique to mark an ID
- AMT: Numerical field which holds amounts typically in a decimal form such as a bank balance amt.

In general, the field names are abbreviated to three or four-letter words ending in one of the four suffixes above unless it is clear what the name holds such as 'AGE' where it seems excessive to say 'AGE_YR'.

**Immigration data**

The main tables are a snowflake set of tables with an immigration fact table in the center surrounded by immigration dimension tables. All immigration data have code fields ("\_CD") that refer to some abbreviation or ID that can be decoded. To decode these, a set of CODE-tables have been set up that link to the immigration tables.

**Supplementary data**

In addition to the immigration tables we have tables that don't directly link to the Immigration data set. These data sets are temperature, demographics and airports data. Each have been normalised to the most reasonable degree. See the graph underneath to get an idea of the entities' relationships.

A difficult decision was whether or not to extract code fields to separate tables or translate them directly in the table. For this data model it was decided that the code-fields should reside in separate tables, which could eventually all be combined to one large code-table holding a field for domain, a field for code value and a field for the corresponding meaning of the code value.

**Notes on relationships**

Note that although there are tables to decrypt most code-fields, these tables don't necessarily pair well due to a lack of data. E.g. temperature data and airport data both have longitudes and latitudes and should thus be able to pair. In practice, since temperatures aren't generally measured at airports the relationships are impaired.

![Image of Immigration Data Model](data_model/imm_data.jpg)

## 3. Connect to S3

The aim is to store the data to S3 in parquet files. This requires a few steps as outlined below.

### 3.1 AWS IAM user

We need an IAM user with a key and a secret to create an S3 bucket programmatically.

1. Create a new `IAM user` in AWS
2. Provide `AdministratorAccess` from `Attach existing policies directly` tab
3. Copy `Access key` and `Secret`
4. Update `dwh.cfg` file with new access key and secret.

Note on (4) that the dwh.cfg file has already been loaded. The Notebook needs to be rerun if the key-secret pair needs to change.

### 3.2 Programmatic S3 bucket

First the config file needs to be parsed and read and then the clients will be created.

1. Create S3 client
2. Create S3 bucket

#### 3.2.1 Create S3 client

In [15]:
s3 = boto3.resource(
    's3',
    region_name=REGION,
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

#### 3.2.2 Create S3 bucket

In [20]:
S3_BUCKET = config.get('S3', 'S3_BUCKET')
try:
    s3.create_bucket(
        Bucket='wgram-capstone', 
        CreateBucketConfiguration={
            'LocationConstraint': REGION
        }
    )
except Exception as e:
    print("If bucket already exists then okay.")
    print(e)

If bucket already exists then okay.
An error occurred (BucketAlreadyOwnedByYou) when calling the CreateBucket operation: Your previous request to create the named bucket succeeded and you already own it.


#### 3.2.3 If we need to delete the bucket

Uncomment the below to delete the S3 bucket

In [23]:
bucket = s3.Bucket('wgram-capstone')
bucket.objects.all().delete()
bucket.delete()

{'ResponseMetadata': {'RequestId': 'V7XXYHG78QXPWDM7',
  'HostId': '0qA90Gwx0Lp70CGH8sikjfmeVGRCOzqne7QKl1711IX6cFRT47yUxDvURVA8Pl4MYCZXYCDuRJUcSj0nQE27ng==',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': '0qA90Gwx0Lp70CGH8sikjfmeVGRCOzqne7QKl1711IX6cFRT47yUxDvURVA8Pl4MYCZXYCDuRJUcSj0nQE27ng==',
   'x-amz-request-id': 'V7XXYHG78QXPWDM7',
   'date': 'Sun, 12 Jun 2022 21:09:11 GMT',
   'server': 'AmazonS3'},
  'RetryAttempts': 0}}

#### 3.3 Store files in S3
Pandas dataframes write very easily to S3. In order to ensure the format, a set of table specifications written in SQL Data Definition Language (DDL) have been scripted.

Because Spark has it's own table specification syntax, a function has been defined to convert SQL DDL to Spark schemas. These are supplied when writing the tables to S3 to ensure the data types. Writing to S3 could be done without the schemas also.

#### 3.3.1 Define the Schema
Below is the function that converts SQL DDL to Spark syntax

In [24]:
def ddl_to_spark_schema(ddl):
    '''
    Description
    -----------
    This function takes a SQL DDL script and transforms it
    providing a text string that Spark can interpret as a schema.
    
    Parameters
    ----------
    ddl : str
        A string holding a SQL DDL.
    
    Returns
    -------
    spark_schema : str
        A string that can be interpreted as data schema by spark.
    '''
    
    ddl_name = ddl[ddl.find('\n'):ddl.find('(')] \
      .replace('\n', '') \
      .split()[0]
    
    ddl_field_type_list = [txt.strip() for txt in ddl[ddl.find('(')+1:-1] \
      .replace('\n', '') \
      .replace('PRIMARY KEY', '') \
      .replace(', ', '') \
      .split()
    ]
    
    # Pair field names and data types and join each pair separated by a comma
    spark_schema = ', '.join(
        [i+' '+j for i, j in zip(ddl_field_type_list[::2], ddl_field_type_list[1::2])]
    )
    
    return {ddl_name : spark_schema}

#### 3.3.2 Read the DDL's
The DDL's are imported and then turned into a dictionary.

In [27]:
from create_tables import create_data_model_tables

In [28]:
s = [ddl_to_spark_schema(ddl) for ddl in create_data_model_tables]
schema_dict = {k: v for d in s for k, v in d.items()}
schema_dict

{'IMM_FCT': 'IMM_ID INTEGER, ADM_CD CHAR(1), ADM_MTD_CD INTEGER, ADM_NO INTEGER, ADM_DT DATE, DEP_CD CHAR(1), DEP_DT DATE, ADM_UPD_CD CHAR(1)',
 'IMM_DIM_PSN': 'IMM_ID INTEGER, INS_ID INTEGER, BRTH_YR INTEGER, AGE INTEGER, OCC_CD CHAR(3), GNDR_CD CHAR(1), CTY_OF_RSDN_ID INTEGER, CTY_OF_CTZ_ID INTEGER, ADR_RGN_CD CHAR(2)',
 'IMM_DIM_VISA': 'IMM_ID INTEGER, VISA_TP_CD CHAR(2), VISA_RSN_CD INTEGER, VISA_ISSU_DEP_CD CHAR(3)',
 'IMM_DIM_ARPT': 'IMM_ID INTEGER, ARPT_CD CHAR(3), ARLN_CD CHAR(2), FLGT_CD CHAR(5), ARR_DT DATE',
 'TEMP_FCT': 'TEMP_CITY_ID INTEGER, TEMP_MSR_DT DATE, TEMP_AVG_VAL DECIMAL(5,3), TEMP_SD_VAL DECIMAL(5,3)',
 'TEMP_DIM_CITY': 'TEMP_CITY_ID INTEGER, CITY_NM VARCHAR(255), CTY_NM VARCHAR(255), LAT_VAL DECIMAL(4,2), LONG_VAL DECIMAL(5,2)',
 'ARPT_FCT': 'ARPT_ID CHAR(4), ARPT_TP VARCHAR(20), ARPT_NM VARCHAR(255), ELEV_FT DECIMAL(9,2), CONT_CD CHAR(2), ISO_CTY_CD CHAR(2), ISO_RGN_CD CHAR(7), MUNI_NM VARCHAR(255), GPS_CD CHAR(4), IATA_ARPT_CD CHAR(3), LCL_ARPT_CD CHAR(3), LAT

Any output must be written to the following path:

In [29]:
output_data = 's3a://wgram-data-lake-songs/'

#### 3.3.3 Define the write to S3 function

In [30]:
def write_to_s3(df, df_name, partitionBy=[], schema_dict=schema_dict, output_dir='s3a://wgram-data-lake-songs/'):
    '''
    Description
    -----------
    This function takes a pandas dataframe and writes it 
    to S3 storage.
    
    Parameters
    ----------
    df : DataFrame
        A Pandas DataFrame
    df_name : str
        The name of the dataframe
    schema_dic : dict
        A dictionary of schemas
    output_dir : str
        The S3 bucket to write to
    
    Returns
    -------
    None
    '''
    try:
        schema = schema_dict[df_name.upper()]
    except KeyError as e:
        print("schema dictionary does not hold a schema with name " + df_name.upper())
        print(f"Error: {e}")
        return(None)
    
    if isinstance(df, pd.DataFrame):
        try:
            df = spark.createDataFrame(df, schema=schema)
        except TypeError as e:
            print("Something went wrong")
            print(f"Error: {e}")
            return(None)
    
    if len(partitionBy) == 0:
        df \
            .write \
            .mode('overwrite') \
            .parquet(output_dir + df_name + '.parquet')
    else:
        df \
            .repartition(*partitionBy) \
            .write \
            .mode('overwrite') \
            .partitionBy(*partitionBy) \
            .parquet(output_dir + df_name + '.parquet')
    
    print("File " + output_dir + df_name + '.parquet' + ' was saved.')
    
    return(None)

## 4. Table creation

The tables will be created in the following order:

0. Code data
1. Airport data
2. Demographics data
3. Temperature data
4. Immigration (i94) data

In each step we will model the source data into their respective tables including necessary transformations and allocation of a key field. Then we will write the file to parquet which makes for easy storage in S3.

### 4.0 Code data

#### 4.0.1 ARPT_CD

In [12]:
dic = code_mapper(f_content, 'i94prtl')

cd_arpt = pd.DataFrame \
    .from_dict(dic, orient='index') \
    .reset_index() \
    .rename(columns = {'index':'ARPT_CD', 0:'ARPT_NM'})

In [13]:
cd_arpt['ARPT_STT_CD'] = cd_arpt.ARPT_NM.str.split(pat=',', n = -1, expand=True)[1].str[1:3]
cd_arpt.ARPT_NM = cd_arpt.ARPT_NM.str.split(pat=',', n = -1, expand=True)[0]

In [14]:
cd_arpt.head(5)

Unnamed: 0,ARPT_CD,ARPT_NM,ARPT_STT_CD
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


write_to_s3(cd_arpt, 'cd_arpt')

#### 4.0.2 CTY_CD

In [17]:
dic = code_mapper(f_content, 'i94cntyl')

cd_cty = pd.DataFrame \
    .from_dict(dic, orient='index') \
    .reset_index() \
    .rename(columns = {'index':'CTY_CD', 0:'CTY_NM'})

In [18]:
cd_cty.head(5)

Unnamed: 0,CTY_CD,CTY_NM
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [227]:
write_to_s3(cd_cty, 'cd_cty')

File s3a://wgram-data-lake-songs/cd_cty.parquet was saved.


#### 4.0.3 ADM_MTD_CD

In [19]:
dic = code_mapper(f_content, 'i94model')

cd_adm_mtd = pd.DataFrame \
    .from_dict(dic, orient='index') \
    .reset_index() \
    .rename(columns = {'index':'ADM_MTD_CD', 0:'ADM_MTD_NM'})

In [20]:
cd_adm_mtd.head(5)

Unnamed: 0,ADM_MTD_CD,ADM_MTD_NM
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [117]:
write_to_s3(cd_adm_mtd, 'cd_adm_mtd')

Something went wrong
Error: field ADM_MTD_CD: IntegerType can not accept object '1' in type <class 'str'>


#### 4.0.4 RGN_CD

In [21]:
dic = code_mapper(f_content, 'i94addrl')

cd_rgn = pd.DataFrame \
    .from_dict(dic, orient='index') \
    .reset_index() \
    .rename(columns = {'index':'RGN_CD', 0:'RGN_NM'})

In [22]:
cd_rgn.head(5)

Unnamed: 0,RGN_CD,RGN_NM
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


write_to_s3(cd_rgn, 'cd_rgn')

#### 4.0.5 VISA_RSN_CD

In [23]:
dic = code_mapper(f_content, 'I94VISA')

cd_visa_rsn = pd.DataFrame \
    .from_dict(dic, orient='index') \
    .reset_index() \
    .rename(columns = {'index':'VISA_RSN_CD', 0:'VISA_RSN_NM'})

In [24]:
cd_visa_rsn.head(5)

Unnamed: 0,VISA_RSN_CD,VISA_RSN_NM
0,1,Business
1,2,Pleasure
2,3,Student


In [122]:
write_to_s3(cd_visa_rsn, 'cd_visa_rsn')

schema dictionary does not hold a schema with name CD_VISA_RSN
Error: 'CD_VISA_RSN'


### 4.0.6 Additional Code Tables
From Udacity forums we were provided with inputs to CD_ADM_ST and from Google we could hypothesise on values for CD_DEP_ST. We could not get a clear answer regarding the latter on the forums unfortunately.

##### 4.0.6.1 CD_ADM_ST

In [25]:
enta_flags = {
    'G':'Admitted into US', 
    'O':'Paroled into US', 
    'R':'Departed',
    'K':'Lost I94 or is deceased',
    'N':'Apprehended',
    'T':'Overstayed',
    'Z':'Adjusted to per residence'
}

cd_adm_st = pd.DataFrame \
    .from_dict(enta_flags, orient='index') \
    .reset_index() \
    .rename(columns = {'index':'ADM_ST_CD', 0:'ADM_ST_NM'})

In [26]:
cd_adm_st.head(5)

Unnamed: 0,ADM_ST_CD,ADM_ST_NM
0,G,Admitted into US
1,O,Paroled into US
2,R,Departed
3,K,Lost I94 or is deceased
4,N,Apprehended


In [125]:
write_to_s3(cd_adm_st, 'cd_adm_st')

File s3a://wgram-data-lake-songs/cd_adm_st.parquet was saved.


##### 4.0.6.2 CD_DEP_ST

The departure flag seems farfetched, but I kept returning to the below site, which seemed to have something interpretable. I asked about it on the forum but to no avail. Hence the mappings should be accepted only cautiously if at all.

https://www.dshs.wa.gov/sites/default/files/ESA/eaz-manual/USCIS%20I-94%20Table.pdf

Another source could be the following, but again - there is a lack of verification:
https://www.dhs.gov/immigration-statistics/lawful-permanent-residents/ImmigrantCOA

And other sources which uses the same codes as the first link:
https://www.dhs.gov/immigration-statistics/special-reports/legal-immigration
https://www.dickinson-wright.com/practice-areas/immigration-services?tab=0
https://travel.state.gov/content/dam/visas/Statistics/Immigrant-Statistics/MonthlyIVIssuances/Immigrant%20Visa%20Symbols.pdf

I want to reconciliate with the following Dashboard:
https://www.trade.gov/data-visualization/adisi-94-visitor-arrivals-monitor-cor

In [27]:
entd_flags = {
    'D': 'Alien crewman.',
    'I': 'Representative of foreign information media, spouse and children.',
    'J': 'Exchange visitor or spose or child thereof.',
    'K': "Relating to a marriage contract or as an immediate relative \
of a U.S. citizen (I-130).",
    'L': 'Intercompany transferee',
    'M': 'Student pursuing a full course of study at an established \
vocational or other recognized nonacademic institution.',
    'N': 'Parent of an alien classified SK3 or SN3 or Child of \
N-8 or of an alien classified SK1, SK2, SK4, SN1, SN2, or SN4.',
    'O': 'Temporary worker with extraordinary ability/achievement in \
the sciences, arts, education, business, or athletics.',
    'Q': 'Temporary worker in an international cultural exchange program',
    'R': 'Temporary worker to perform work in religious occupations.',
    'V': 'Nonimmigrant spouse of lawful permanent \
residents relating to the LIFE Act, as of December 28, 2000.',
    'W': 'Temporary visitor for under the Visa Waiver Program.'
}

cd_dep = pd.DataFrame \
    .from_dict(entd_flags, orient='index') \
    .reset_index() \
    .rename(columns = {'index':'DEP_CD', 0:'DEP_CD_NM'})

In [28]:
cd_dep.head(5)

Unnamed: 0,DEP_CD,DEP_CD_NM
0,D,Alien crewman.
1,I,"Representative of foreign information media, s..."
2,J,Exchange visitor or spose or child thereof.
3,K,Relating to a marriage contract or as an immed...
4,L,Intercompany transferee


write_to_s3(cd_dep, 'cd_dep')

### 4.0.7 Codes dataframe dictionary

df_dict_cd = {
    'CD_ARPT': cd_arpt,
    'CD_CTY': cd_cty,
    'CD_ADM_MTD': cd_adm_mtd,
    'CD_RGN': cd_rgn,
    'CD_VISA_RSN': cd_visa_rsn,
    'CD_ADM_ST': cd_adm_st,
    'CD_DEP': cd_dep
}

### 4.1 Airport data

In [27]:
arpt_fct = spark.sql("""
    SELECT DISTINCT 
        CAST(ard.ident                        AS CHAR(4))      AS ARPT_ID      ,
        CAST(INITCAP(REPLACE(type, '_', ' ')) AS VARCHAR(20))  AS ARPT_TP      ,
        CAST(name                             AS VARCHAR(255)) AS ARPT_NM      ,
        CAST(elevation_ft                     AS DECIMAL(9,2)) AS ELEV_FT      ,
        CAST(continent                        AS CHAR(2))      AS CONT_CD      ,
        CAST(iso_country                      AS CHAR(2))      AS ISO_CTY_CD   ,
        CAST(iso_region                       AS CHAR(7))      AS ISO_RGN_CD   ,
        CAST(municipality                     AS VARCHAR(255)) AS MUNI_NM      ,
        CAST(gps_code                         AS CHAR(4))      AS GPS_CD       ,
        CAST(iata_code                        AS CHAR(3))      AS IATA_ARPT_CD ,
        CAST(local_code                       AS CHAR(3))      AS LCL_ARPT_CD  ,
        CAST(trim(split(coordinates, ',')[1]) AS DECIMAL(4,2)) AS LAT_VAL      ,
        CAST(trim(split(coordinates, ',')[0]) AS DECIMAL(5,2)) AS LONG_VAL
    FROM arpts_raw AS ard
""")

In [28]:
arpt_fct.limit(5).toPandas()

Unnamed: 0,ARPT_ID,ARPT_TP,ARPT_NM,ELEV_FT,CONT_CD,ISO_CTY_CD,ISO_RGN_CD,MUNI_NM,GPS_CD,IATA_ARPT_CD,LCL_ARPT_CD,LAT_VAL,LONG_VAL
0,00TS,Small Airport,Alpine Range Airport,670.0,,US,US-TX,Everman,00TS,,00TS,32.61,-97.24
1,03VA,Closed,Whipoorwill Springs Airport,250.0,,US,US-VA,Nokesville,03VA,,03VA,38.66,-77.58
2,04NJ,Small Airport,Emmanuel Airport,155.0,,US,US-NJ,Elmer,04NJ,,04NJ,39.6,-75.23
3,07Y,Small Airport,Hill City-Quadna Mountain Airport,1289.0,,US,US-MN,Hill City,07Y,,07Y,46.96,-93.6
4,0AK9,Small Airport,Falcon Lake Strip,110.0,,US,US-AK,Point Mackenzie,0AK9,,0AK9,61.33,-150.06


In [179]:
write_to_s3(arpt_fct, 'arpt_fct')

File s3a://wgram-data-lake-songs/arpt_fct.parquet was saved.


#### 4.1.2 Airport dataframe dictionary

In [181]:
df_dict_arpt = {'ARPT_FCT': arpt_fct}

### 4.2 Demographics data
This data set will be split into three data sets in order to normalise the data.

1. DEMO_FCT that holds general population facts for each city
2. DEMO_RACE_FCT that holds population facts for each relevant race in each city
3. CD_RACE_TP that holds codes translating race types from code to their translations

#### 4.2.1 DEMO_FCT

In [29]:
demo_fct = spark.sql("""
    SELECT 
        ROW_NUMBER() OVER (PARTITION BY '' ORDER BY '') AS DEMO_CITY_ID,
        tmp.*
    FROM (
    SELECT DISTINCT 
        CAST(`State Code`             AS CHAR(2)      ) AS RGN_CD,
        CAST(State                    AS VARCHAR(255) ) AS RGN_NM,
        CAST(City                     AS VARCHAR(255) ) AS CITY_NM,
        CAST(`Median Age`             AS DECIMAL(4,1) ) AS MED_AGE,
        CAST(`Total Population`       AS INTEGER      ) AS POP_TOT_AMT,
        CAST(`Male Population`        AS INTEGER      ) AS POP_MLE_AMT,
        CAST(`Female Population`      AS INTEGER      ) AS POP_FMLE_AMT,
        CAST(`Number of Veterans`     AS INTEGER      ) AS POP_VET_AMT,
        CAST(`Foreign-born`           AS INTEGER      ) AS POP_FGN_AMT,
        CAST(`Average Household Size` AS DECIMAL(4,2) ) AS AVG_HH_SZ
    FROM demo_raw AS drd
    ) AS tmp
""")

In [30]:
demo_fct.createOrReplaceTempView('demo_fct_df')

In [50]:
demo_fct.limit(5).toPandas()

Unnamed: 0,DEMO_CITY_ID,RGN_CD,RGN_NM,CITY_NM,MED_AGE,POP_TOT_AMT,POP_MLE_AMT,POP_FMLE_AMT,POP_VET_AMT,POP_FGN_AMT,AVG_HH_SZ
0,1,MO,Missouri,Independence,38.6,117255,54348,62907,10220,4911,2.33
1,2,CO,Colorado,Colorado Springs,34.8,456562,225544,231018,49291,35320,2.48
2,3,MA,Massachusetts,Cambridge,31.5,110402,55421,54981,2495,27757,2.06
3,4,LA,Louisiana,Metairie,41.6,146458,69515,76943,7187,19871,2.39
4,5,CA,California,Santa Clarita,38.1,182367,90192,92175,8537,40666,3.02


#### 4.2.2 DEMO_FCT_RACE

In [31]:
demo_fct_race = spark.sql("""
    SELECT
        ddd.DEMO_CITY_ID, 
        drd.RACE_TP_CD,
        drd.POP_RACE_AMT
    FROM (
    SELECT DISTINCT 
        CAST(`State Code`           AS CHAR(2))      AS RGN_CD,
        CAST(City AS VARCHAR(255))  AS CITY_NM,
        CASE Race
            WHEN 'American Indian and Alaska Native' THEN 'AIAN'
            WHEN 'Hispanic or Latino'                THEN 'HILA'
            WHEN 'Black or African-American'         THEN 'BLAA'
            WHEN 'Asian'                             THEN 'ASAN'
            WHEN 'White'                             THEN 'WHTE'
            ELSE 'UNKN'
        END AS RACE_TP_CD,
        CAST(Count AS INTEGER) AS POP_RACE_AMT
    FROM demo_raw AS drd
    ) AS drd
    INNER JOIN demo_fct_df AS ddd
        ON  1=1
        AND drd.RGN_CD = ddd.RGN_CD
        AND drd.CITY_NM = ddd.CITY_NM
""")

In [34]:
demo_fct_race.limit(5).toPandas()

Unnamed: 0,DEMO_CITY_ID,RACE_TP_CD,POP_RACE_AMT
0,309,WHTE,52445
1,447,BLAA,138242
2,73,BLAA,9221
3,208,BLAA,5294
4,363,AIAN,845


#### 4.2.3 Demographics race codes

In [32]:
cd_race_tp = spark.sql("""
    SELECT DISTINCT 
        CASE Race
            WHEN 'American Indian and Alaska Native' THEN 'AIAN'
            WHEN 'Hispanic or Latino'                THEN 'HILA'
            WHEN 'Black or African-American'         THEN 'BLAA'
            WHEN 'Asian'                             THEN 'ASAN'
            WHEN 'White'                             THEN 'WHTE'
            ELSE 'UNKN'
        END AS RACE_TP_CD,
        CAST(Race AS VARCHAR(255)) AS RACE_TP
    FROM demo_raw AS drd
""")

In [54]:
cd_race_tp.limit(5).toPandas()

Unnamed: 0,RACE_TP_CD,RACE_TP
0,ASAN,Asian
1,AIAN,American Indian and Alaska Native
2,HILA,Hispanic or Latino
3,BLAA,Black or African-American
4,WHTE,White


#### 4.2.4 Save to S3

In [244]:
write_to_s3(demo_fct, 'demo_fct', partitionBy=['RGN_CD'])

File s3a://wgram-data-lake-songs/demo_fct.parquet was saved.


In [245]:
write_to_s3(demo_fct_race, 'demo_fct_race', partitionBy=['RACE_TP_CD'])

File s3a://wgram-data-lake-songs/demo_fct_race.parquet was saved.


In [256]:
write_to_s3(cd_race_tp, 'cd_race_tp')

File s3a://wgram-data-lake-songs/cd_race_tp.parquet was saved.


#### 4.2.4 Demographics dataframe dictionary

In [44]:
df_dict_demo = {
    'DEMO_FCT': demo_fct.toPandas(),
    'DEMO_FCT_RACE': demo_fct_race.toPandas(),
    'CD_RACE_TP': cd_race_tp.toPandas()
}

### 4.3 Temperature data
This data set will be split into two tables:
1. TEMP_DIM_CITY holding information about each city so that the fact table only holds a city identity key
2. CITY_FCT holding the facts for each city key for each measurement date

#### 4.3.1 Temperature city dimension table

In [30]:
temp_dim_city = spark.sql("""
    SELECT
        ROW_NUMBER() OVER (PARTITION BY '' ORDER BY '') AS TEMP_CITY_ID,
        CITY_NM,
        CTY_NM,
        CAST(LAT_SIGN || LAT_VAL AS DECIMAL(4,2)) LAT_VAL,
        CAST(LONG_SIGN || LONG_VAL AS DECIMAL(5,2)) LONG_VAL
    FROM (
    SELECT DISTINCT 
        CAST(City as VARCHAR(255)) AS CITY_NM,
        CAST(Country as VARCHAR(255)) AS CTY_NM,
        MAX(CASE WHEN SUBSTRING(latitude, CHARACTER_LENGTH(latitude), 1) = 'N' THEN '+' ELSE '-' END) AS LAT_SIGN,
        MAX(TRIM(SUBSTRING(latitude, 1, CHARACTER_LENGTH(latitude) - 1))) as LAT_VAL,
        MAX(CASE WHEN SUBSTRING(longitude, CHARACTER_LENGTH(longitude), 1) = 'N' THEN '+' ELSE '-' END) AS LONG_SIGN,
        MAX(TRIM(SUBSTRING(longitude, 1, CHARACTER_LENGTH(longitude) - 1))) as LONG_VAL
    FROM temp_raw AS drd
    GROUP BY
        City,
        Country
    ) AS TEMP
""")

In [31]:
temp_dim_city.createOrReplaceTempView('temp_dim_city_df')

In [56]:
temp_dim_city.limit(5).toPandas()

Unnamed: 0,TEMP_CITY_ID,CITY_NM,CTY_NM,LAT_VAL,LONG_VAL
0,1,Ontario,United States,34.56,-116.76
1,2,Fuzhou,China,26.52,-120.0
2,3,Maanshan,China,31.35,-118.74
3,4,Windhoek,Namibia,-23.31,-16.6
4,5,Ueda,Japan,36.17,-139.23


In [258]:
write_to_s3(temp_dim_city, 'temp_dim_city', partitionBy = ['CTY_NM'])

File s3a://wgram-data-lake-songs/temp_dim_city.parquet was saved.


#### 4.3.2 Temperature fact table

In [32]:
temp_fct = spark.sql("""
    SELECT
        dtcd.TEMP_CITY_ID,
        trd.MSR_DT,
        trd.TEMP_AVG_VAL,
        trd.TEMP_SD_VAL
    FROM (
    SELECT DISTINCT 
        CAST(dt                            AS DATE)         AS MSR_DT,
        CAST(AverageTemperature            AS DECIMAL(5,3)) AS TEMP_AVG_VAL,
        CAST(AverageTemperatureUncertainty AS DECIMAL(5,3)) AS TEMP_SD_VAL,
        CAST(City                          AS VARCHAR(255)) AS CITY_NM,
        CAST(Country                       AS VARCHAR(255)) AS CTY_NM
    FROM temp_raw AS trd
    ) AS trd
    INNER JOIN temp_dim_city_df AS dtcd
        ON  1=1
        AND trd.CITY_NM = dtcd.CITY_NM
        AND trd.CTY_NM = dtcd.CTY_NM
""")

In [59]:
temp_fct.limit(5).toPandas()

Unnamed: 0,TEMP_CITY_ID,MSR_DT,TEMP_AVG_VAL,TEMP_SD_VAL
0,1537,1750-03-01,2.773,2.227
1,1537,1772-04-01,9.009,2.7
2,1537,1797-03-01,2.155,1.986
3,1537,1814-06-01,19.209,1.843
4,1537,1820-01-01,-5.747,8.125


In [None]:
write_to_s3(temp_fct, 'temp_fct', partitionBy=['TEMP_CITY_ID', 'MSR_DT'])

#### 4.3.3 Temperature dataframe dictionary

In [None]:
df_dict_temp = {
    'TEMP_FCT': temp_fct.toPandas(),
    'TEMP_DIM_CITY': temp_dim_city.toPandas()
}

### 4.4 Immigration data
This data set comprises the main data set for the data model. We will split this data set into four tables:

1. IMM_FCT - this provides all the facts for each immigration case
2. IMM_DIM_PSN - this explains the immigrant data points for each immigration case
3. IMM_DIM_ARPT - this explains all airport and flight information regarding each immigration case
4. IMM_DIM_VISA - this explains the immigrant's visa situation in each immigration case.

In [51]:
imm_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [52]:
imm_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### 4.4.1 Fact

Although most codes will be explained in a separate code-table, we have a specific field that needs to be adjusted slightly.

- **dtaddto**

Date until end of stay which is typically an eight character date ('MMddyyyy') has a value, 'D/S', which means duration of stay. To ensure it doesn't become a NULL-value, we translate this value to '12312999' saying that the end date is open-ended.

In [36]:
imm_fct = spark.sql("""
    SELECT DISTINCT 
        CAST(rd.cicid AS INTEGER) AS IMM_ID,
        
        CAST(entdepa  AS CHAR(1)) AS ADM_CD,
        CAST(i94mode  AS INTEGER) AS ADM_MTD_CD,
        CAST(admnum   AS INTEGER)  AS ADM_NO,
        CASE WHEN COALESCE(dtaddto, 'None') = 'None' THEN Date('1900-01-01') when (dtaddto) = 'D/S' then Date('2999-12-31') ELSE to_date(`dtaddto`, 'MMddyyyy') END ADM_TO_DT,
        CAST(entdepd  AS CHAR(1)) AS DEP_CD,
        CASE WHEN COALESCE(depdate, 'None') = 'None' THEN Date'1900-01-01' ELSE date_add('1960-01-01', `depdate`) END AS DEP_DT,
        CAST(entdepu  AS CHAR(1)) AS ADM_UPD_CD
        
    FROM immi_raw AS rd
""")

In [61]:
imm_fct.limit(5).toPandas()

Unnamed: 0,IMM_ID,ADM_CD,ADM_MTD_CD,ADM_NO,ADM_TO_DT,DEP_CD,DEP_DT,ADM_UPD_CD
0,93,G,1,2147483647,2016-06-29,O,2016-04-09,
1,166,G,1,2147483647,2016-06-29,R,2016-04-09,
2,414,G,1,2147483647,2016-06-29,O,2016-04-22,
3,550,G,1,2147483647,2016-06-29,O,2016-04-30,
4,584,G,1,2147483647,2016-06-29,O,2016-04-08,


In [None]:
write_to_s3(imm_fct, 'imm_fct', partitionBy=['ADM_MTD_CD', 'ADM_CD'])

#### 4.4.2 DIM PSN table

In [37]:
imm_dim_psn = spark.sql("""
    SELECT DISTINCT 
        CAST(rd.cicid AS INTEGER)                                                            AS IMM_ID ,
        CASE WHEN COALESCE(insnum, 'None') = 'None' THEN -1 ELSE CAST(insnum AS INTEGER) END AS INS_ID  ,
        CAST(biryear AS INTEGER)                                                             AS BRTH_YR ,
        CAST(i94bir AS INTEGER)                                                              AS AGE     ,
        CASE WHEN COALESCE(occup, 'None') = 'None' THEN 'XXX' ELSE occup END                 AS OCC_CD  ,
        CASE WHEN COALESCE(gender, 'None') = 'None' THEN 'U' ELSE gender END                 AS GNDR_CD ,
        CAST(rd.i94cit AS INTEGER)                                                           AS CTY_OF_CTZ_ID ,
        CAST(rd.i94res AS INTEGER)                                                           AS CTY_OF_RSDN_ID ,
        CASE WHEN COALESCE(i94addr, 'None') = 'None' THEN 'XX' ELSE i94addr END              AS ADR_RGN_CD
    FROM immi_raw AS rd
""")

In [63]:
imm_dim_psn.limit(5).toPandas()

Unnamed: 0,IMM_ID,INS_ID,BRTH_YR,AGE,OCC_CD,GNDR_CD,CTY_OF_CTZ_ID,CTY_OF_RSDN_ID,ADR_RGN_CD
0,127,-1,1991,25,XXX,F,103,103,NJ
1,137,-1,2000,16,XXX,F,103,103,NY
2,460,-1,2013,3,XXX,U,103,103,FL
3,527,-1,1985,31,XXX,M,103,103,NV
4,1250,-1,1988,28,XXX,M,104,104,NY


write_to_s3(imm_dim_psn, 'imm_dim_psn', partitionBy=['ADR_RGN_CD', 'BRTH_YR'])

#### 4.4.3 DIM ARPT TBL

In [38]:
imm_dim_arpt = spark.sql("""
    SELECT DISTINCT 
        CAST(cicid AS INTEGER)       AS IMM_ID,
        CAST(i94port AS CHAR(3))     AS ARPT_CD,
        CAST(airline AS CHAR(2))     AS ARLN_CD,
        CAST(fltno AS CHAR(5))       AS FLGT_CD,
        CASE WHEN COALESCE(arrdate, 'None') = 'None' THEN Date'1900-01-01' ELSE date_add('1960-01-01', `arrdate`) END AS ARR_DT
    FROM immi_raw AS rd
""")

In [65]:
imm_dim_arpt.createOrReplaceTempView('arpt_dim_df')

In [66]:
imm_dim_arpt.limit(5).toPandas()

Unnamed: 0,IMM_ID,ARPT_CD,ARLN_CD,FLGT_CD,ARR_DT
0,30,ATL,OS,89,2016-04-01
1,69,ATL,DL,131,2016-04-01
2,246,NYC,AA,65,2016-04-01
3,417,MIA,OS,97,2016-04-01
4,686,MIA,OS,97,2016-04-01


#### 4.4.4 DIM VISA TBL

In [39]:
imm_dim_visa = spark.sql("""
    SELECT DISTINCT
        CAST(rd.cicid AS INTEGER) AS IMM_ID,
        CAST(visatype AS CHAR(2)) AS VISA_TP_CD,
        CAST(i94visa AS INTEGER)  AS VISA_RSN_CD,
        CAST(visapost AS CHAR(3)) AS VISA_ISSU_DEP_CD
    FROM immi_raw AS rd        
""")

In [68]:
imm_dim_visa.createOrReplaceTempView('visa_dim_df')

In [70]:
imm_dim_visa.limit(5).toPandas()

Unnamed: 0,IMM_ID,VISA_TP_CD,VISA_RSN_CD,VISA_ISSU_DEP_CD
0,265,WT,2,
1,702,WT,2,
2,733,WT,2,
3,765,WT,2,
4,888,WT,2,


#### 4.4.5 Immigration dataframe dictionary

In [None]:
df_dict_imm = {
    'IMM_FCT': imm_fct.toPandas(),
    'IMM_DIM_PSN': imm_dim_psn.toPandas(),
    'IMM_DIM_ARPT': imm_dim_arpt.toPandas(),
    'IMM_DIM_VISA': imm_dim_visa.toPandas()
}

## 5. Creating the data dictionaries

Documentation of all objects in this project will be done through data dictionaries. The format will be the following:
1. Table name
2. Data type
3. Field size
4. Field description
5. Example

The dictionaries will be created for each table then be combined for one large dictionary. The order of creation will be as follows:
0. Code tables built from text files and Google searches
1. Airport data
2. Demograhics data
3. Temperature data
4. Immigration data

### 5.0 Data from text files and Google
Here the sas description file tables are being documented together with departure codes found by way of Google.

#### 5.0.1 Airport codes

In [40]:
arpt_cd_dict = {
    'Table': 'CD_ARPT',
    'Field Name': [
        'ARPT_CD',
        'ARPT_NM',
        'ARPT_STT_CD'
    ],
    'Data Type': [
        'CHAR(3)',
        'VARCHAR(100)',
        'CHAR(2)'
    ],
    'Field size for display': [
        '3',
        '38',
        '2'
    ],
    'Description': [
        'Airport CD consisting of 3 letters',
        'Airport name (typically the name of the city). Can be "Collapsed" or No Port Code or otherwise unkown.',
        '2-letter code for the state the airport is in'
    ],
    'Example': [
        'ANC',
        'ANCHORAGE',
        'AK'
    ]
}

In [41]:
arpt_cd_dict_df = pd.DataFrame.from_dict(arpt_cd_dict)

#### 5.0.2 Region codes (states and other territories)

In [42]:
rgn_cd_dict = {
    'Table': 'CD_RGN',
    'Field Name': [
        'RGN_CD',
        'RGN_NM'
    ],
    'Data Type': [
        'CHAR(2)',
        'VARCHAR(100)'
    ],
    'Field size for display': [
        '2',
        '17'
    ],
    'Description': [
        'Code for US state or region',
        'Name of US State or region'
    ],
    'Example': [
        'AK',
        'ALASKA'
    ]
}

In [43]:
rgn_cd_dict_df = pd.DataFrame.from_dict(rgn_cd_dict)

#### 5.0.3 Country codes

In [44]:
cty_cd_dict = {
    'Table': 'CD_CTY',
    'Field Name': [
        'CTY_CD',
        'CTY_NM'
    ],
    'Data Type': [
        'INTEGER',
        'VARCHAR(100)'
    ],
    'Field size for display': [
        '3',
        '57'
    ],
    'Description': [
        'Country code consisting of up to 3 integers',
        'Country name'
    ],
    'Example': [
        '236',
        'AFGHANISTAN'
    ]
}

In [45]:
cty_cd_dict_df = pd.DataFrame.from_dict(cty_cd_dict)

#### 5.0.4 Admission method

In [46]:
adm_mtd_cd_dict = {
    'Table': 'CD_ADM_MTD',
    'Field Name': [
        'ADM_MTD_CD',
        'ADM_MTD_NM'
    ],
    'Data Type': [
        'INTEGER',
        'VARCHAR(100)'
    ],
    'Field size for display': [
        '3',
        '12'
    ],
    'Description': [
        'Method code taking on 1, 2, 3, or 9',
        'Description of whether immigrant came by Air, Sea, Land or Not reported'
    ],
    'Example': [
        '3',
        'Land'
    ]
}

In [47]:
adm_mtd_cd_dict_df = pd.DataFrame.from_dict(adm_mtd_cd_dict)

#### 5.0.5 Visa codes

In [48]:
visa_cd_dict = {
    'Table': 'CD_VISA',
    'Field Name': [
        'VISA_CD',
        'VISA_NM'
    ],
    'Data Type': [
        'INTEGER',
        'VARCHAR(100)'
    ],
    'Field size for display': [
        '1',
        '8'
    ],
    'Description': [
        'Code for reason for applying for a visa',
        'Description for reason for applying for a visa'
    ],
    'Example': [
        '1',
        'Business'
    ]
}

In [49]:
visa_cd_dict_df = pd.DataFrame.from_dict(visa_cd_dict)

#### 5.0.6 Admission status

In [50]:
adm_st_cd_dict = {
    'Table': 'CD_ADM_ST',
    'Field Name': [
        'ADM_ST_CD',
        'ADM_ST_NM'
    ],
    'Data Type': [
        'CHAR(1)',
        'VARCHAR(100)'
    ],
    'Field size for display': [
        '1',
        '23'
    ],
    'Description': [
        'Code for admission status',
        'Description for admission status'
    ],
    'Example': [
        'G',
        'Admitted into US'
    ]
}

In [51]:
adm_st_cd_dict_df = pd.DataFrame.from_dict(adm_st_cd_dict)

#### 5.0.7 Departure codes

In [52]:
dep_st_cd_dict = {
    'Table': 'CD_DEP',
    'Field Name': [
        'DEP_CD',
        'DEP_NM'
    ],
    'Data Type': [
        'CHAR(1)',
        'VARCHAR(150)'
    ],
    'Field size for display': [
        '1',
        '115'
    ],
    'Description': [
        'Code for departure reason',
        'Description for departure reason - the letter codes match documentation, but the description seems odd.'
    ],
    'Example': [
        'D',
        'Alien crewman'
    ]
}

In [53]:
dep_st_cd_dict_df = pd.DataFrame.from_dict(dep_st_cd_dict)

#### 5.0.8 Combine all

In [54]:
cd_dict_df = arpt_cd_dict_df \
  .append(rgn_cd_dict_df) \
  .append(cty_cd_dict_df) \
  .append(adm_mtd_cd_dict_df) \
  .append(visa_cd_dict_df) \
  .append(adm_st_cd_dict_df) \
  .append(dep_st_cd_dict_df)

### 5.1 Airport tables

#### 5.1.1 Airport dim

In [55]:
arpt_dict = {
    'Table': 'ARPT_FCT',
    'Field Name': [
        'ARPT_ID',     
        'ARPT_TP',     
        'ARPT_NM',     
        'ELEV_FT',     
        'CONT_CD',     
        'ISO_CTY_CD',  
        'ISO_RGN_CD',  
        'MUNI_NM',     
        'GPS_CD',      
        'IATA_ARPT_CD',
        'LCL_ARPT_CD', 
        'LAT_VAL',     
        'LONG_VAL' 
    ],
    'Data Type': [
        'CHAR(4)',
        'VARCHAR(20)',
        'VARCHAR(255)',
        'DECIMAL(9,2)',
        'CHAR(2)',
        'CHAR(2)',
        'CHAR(7)',
        'VARCHAR(255)',
        'CHAR(4)',
        'CHAR(3)',
        'CHAR(3)',
        'DECIMAL(4,2)',
        'DECIMAL(5,2)'
    ],
    'Field size for display': [
        '4',
        '20',
        '255',
        '9',
        '2',
        '2',
        '7',
        '255',
        '4',
        '3',
        '3',
        '4',
        '5'
    ],
    'Description': [
        'Airport ID consisting of letters and numbers',
        'Type of airport, including "Closed" in case it is so.',
        'Name of airport',
        'Elevation of airport above sea level in feet',
        '2-letter code of continent, e.g. NA for North America',
        'ISO Country code, 2 characters',
        '4-letter code of region in format "country - State" although state may also be just a region',
        'Name of municipality',
        'GPS location code - see also ARPT_ID',
        'IATA airport code, similar to ARPT_ID and GPS_CD',
        'Local airport code, similar to ARPT_ID, GPS_CD, IATA_ARPT_CD',
        'Latitude',
        'Longitude'
    ],
    'Example': [
        '00TS',
        'Small Airport',
        'Alpine Range Airport',
        '670.00',
        'NA',
        'US',
        'US-TX',
        'Everman',
        '00TS',
        '00TS',
        '00TS',
        '32.61',
        '-97.24'
    ]
}

In [56]:
arpt_dict_df = pd.DataFrame.from_dict(arpt_dict)

### 5.2 Demographics data

#### 5.2.1 Demographics main

In [57]:
demo_fct_dict = {
    'Table': 'DEMO_FCT',
    'Field Name': [
        'DEMO_CITY_ID',
        'RGN_CD',
        'RGN_NM',     
        'CITY_NM',    
        'MED_AGE',    
        'POP_TOT_AMT',
        'POP_MLE_AMT',
        'POP_FMLE_AMT',
        'POP_VET_AMT',
        'POP_FGN_AMT',
        'AVG_HH_SZ'
    ],
    'Data Type': [
        'INTEGER',
        'CHAR(2)',
        'VARCHAR(255)',
        'VARCHAR(255)', 
        'DECIMAL(4,1)',
        'INTEGER',
        'INTEGER',     
        'INTEGER',     
        'INTEGER',     
        'INTEGER',     
        'DECIMAL(4,2)' 
    ],
    'Field size for display': [
        '4',
        '2',
        '255',
        '255', 
        '4',
        '8',
        '8',     
        '8',     
        '8',     
        '8',     
        '4'
    ],
    'Description': [
        'Identification (serial) number of city',
        '2-letter code for region - typically a US state',
        'Name for region - typically a US state',
        'Name of city',
        'Median age in city',
        'Total population in city',
        'Total male population in city',
        'Total female population in city',
        'Total veteran population in city',
        'Total foreign-born population in city',
        'Average size of household in city'
    ],
    'Example': [
        '1',
        'MO',
        'Missouri',
        'Independence',
        '38.6',
        '117255',
        '54348',
        '62907',
        '10220',
        '4911',
        '2.33'
    ]
}

In [58]:
demo_fct_dict_df = pd.DataFrame.from_dict(demo_fct_dict)

#### 5.2.2 Race population

In [59]:
demo_fct_race_dict = {
    'Table': 'DEMO_FCT_RACE',
    'Field Name': [
        'DEMO_CITY_ID',
        'RACE_TP_CD',
        'POP_RACE_AMT'
    ],
    'Data Type': [
        'INTEGER',
        'CHAR(4)',
        'INTEGER'
    ],
    'Field size for display': [
        '4',
        '4',
        '8'
    ],
    'Description': [
        'City ID, same as for DEMO_FCT table',
        '4-letter code for race',
        'Total population of specific race for specific city'
    ],
    'Example': [
        '447',
        'BLAA',
        '138242'
    ]
}

In [60]:
demo_fct_race_dict_df = pd.DataFrame.from_dict(demo_fct_race_dict)

#### 5.2.3 Race codes

In [61]:
race_tp_cd_dict = {
    'Table': 'CD_RACE_TP',
    'Field Name': [
        'RACE_TP_CD',
        'POP_RACE_AMT'
    ],
    'Data Type': [
        'CHAR(4)',
        'VARCHAR(50)'
    ],
    'Field size for display': [
        '4',
        '255'
    ],
    'Description': [
        '4-letter code for race',
        'Race relating to the race code'
    ],
    'Example': [
        'AIAN',
        'American Indian and Alaska Native'
    ]
}

In [62]:
race_tp_cd_dict_df = pd.DataFrame.from_dict(race_tp_cd_dict)

#### 5.2.4 Combine all

In [63]:
demo_dict_df = demo_fct_dict_df \
  .append(demo_fct_race_dict_df) \
  .append(race_tp_cd_dict_df)

### 5.3 Temperature data

#### 5.3.1 TEMP_CITY

In [64]:
temp_city_dict = {
    'Table': 'TEMP_DIM_CITY',
    'Field Name': [
        'TEMP_CITY_ID',
        'CITY_NM',
        'CTY_NM',
        'LAT_VAL',
        'LONG_VAL'
    ],
    'Data Type': [
        'INTEGER',
        "VARCHAR(255)",
        'VARCHAR(255)',
        'DECIMAL(4,2)',
        'DECIMAL(5,3)'
    ],
    'Field size for display': [
        '4',
        '255',
        '255',
        '4',
        '5'
    ],
    'Description': [
        'Id for city in temperature data set (serial id)',
        'Name of city',
        'Name of country',
        'Latitude',
        'Longitude'
    ],
    'Example': [
        '1',
        'Ontario',
        'United States',
        '34.56',
        '-116.76'
    ]
}

In [65]:
temp_city_dict_df = pd.DataFrame.from_dict(temp_city_dict)

#### 5.3.2 TEMP FCT

In [66]:
temp_fct_dict = {
    'Table': 'TEMP_FCT',
    'Field Name': [
        'TEMP_CITY_ID',
        'MSR_DT',
        'TEMP_AVG_VAL',
        'TEMP_SD_VAL'
    ],
    'Data Type': [
        'INTEGER',
        "DATE 'YYYY-MM-DD'",
        'DECIMAL(5,3)',
        'DECIMAL(5,3)'
    ],
    'Field size for display': [
        '4',
        '10',
        '5',
        '5'
    ],
    'Description': [
        'Id for city in temperature data set (serial id)',
        'Date of temperature measuremenet',
        'Average temperature for date for city',
        'Average temperature uncertainty (standard deviation) for city'
    ],
    'Example': [
        '1537',
        '1750-03-01',
        '2.773',
        '2.227'
    ]
}

In [67]:
temp_fct_dict_df = pd.DataFrame.from_dict(temp_fct_dict)

In [68]:
temp_dict_df = temp_city_dict_df \
  .append(temp_fct_dict_df)

### 5.4 Immigration data

#### 5.4.1 Fct

In [69]:
imm_fct_dict = {
    'Table': 'IMM_FCT',
    'Field Name': [
        'IMM_ID',
        'ADM_CD',
        'ADM_MTD_CD',     
        'ADM_NO',    
        'ADM_TO_DT',    
        'DEP_CD',
        'DEP_DT',
        'ADM_UPD_CD'
    ],
    'Data Type': [
        'INTEGER',
        'CHAR(1)',
        'INTEGER',
        'INTEGER',  
        "DATE 'YYYY-MM-DD'",
        'CHAR(1)',     
        "DATE 'YYYY-MM-DD'",
        'CHAR(1)'
    ],
    'Field size for display': [
        '4',
        '1',
        '1',
        '10', 
        '10',
        '1',
        '10',
        '1'
    ],
    'Description': [
        'Immigration case ID',
        '1-letter code explaining the cause of admission into the US',
        '1-letter code explaining the entry into the US',
        'An admission number which is technical only',
        'End date until when the admitted person can stay in the US. Defaults to 1900-01-01 when there is no date, and 2999-12-31 when the visa is granted for duration of stay',
        'Reason for departure',
        'Date of departure',
        'Code for admission updates'
    ],
    'Example': [
        '93',
        'G',
        '1',
        '2147483647',
        '2016-06-29',
        'O',
        '2016-04-09',
        'None'
    ]
}

In [70]:
imm_fct_dict_df = pd.DataFrame.from_dict(imm_fct_dict)

#### 5.4.2 Person dimension

In [71]:
imm_psn_dict = {
    'Table': 'IMM_DIM_PSN',
    'Field Name': [
        'IMM_ID',
        'INS_ID',
        'BRTH_YR',     
        'AGE',    
        'OCC_CD',    
        'GNDR_CD',
        'CTY_OF_RSDN_ID',
        'CTY_OF_CTZ_ID',
        'ADR_RGN_CD'
    ],
    'Data Type': [
        'INTEGER',
        'INTEGER',
        'INTEGER',
        'INTEGER',  
        "CHAR(3)",
        'CHAR(1)',     
        "INTEGER",
        'INTEGER', 
        "CHAR(2)"
    ],
    'Field size for display': [
        '4',
        '6',
        '4',
        '3', 
        '3',
        '1',
        '3',
        '3',
        '2'
    ],
    'Description': [
        'Immigration case ID',
        'Insurance ID, takes on -1 in the event it is undefined',
        'Year of birth',
        'Age in years',
        'Three letter code for occupation - not decoded in this data model',
        'One-letter code for (M)ale, (F)emale or (U)nknown',
        'ID of country of prior citizenship',
        'ID of country of prior residence',
        'State code of immigrants new address',
        
    ],
    'Example': [
        '127',
        '545507',
        '1991',
        '25',
        'XXX',
        'F',
        '103',
        '103',
        'NJ'
    ]
}

In [72]:
imm_psn_dict_df = pd.DataFrame.from_dict(imm_psn_dict)

#### 5.4.3 Airport dimension

In [73]:
imm_arpt_dict = {
    'Table': 'IMM_DIM_ARPT',
    'Field Name': [
        'IMM_ID',
        'ARPT_CD',
        'ARLN_CD',     
        'FLGT_CD',    
        'ARR_DT'
    ],
    'Data Type': [
        'INTEGER',  
        "CHAR(3)",
        'CHAR(2)',     
        "CHAR(5)",
        "DATE 'YYYY-MM-DD'"
    ],
    'Field size for display': [
        '4',
        '3',
        '2',
        '5', 
        '10'
    ],
    'Description': [
        'Immigration case ID',
        '3-letter code for airport city of arrival flight to the US',
        '2-letter code for airline of arrival flight to the US',
        '5 character code for flight number of arrival flight to the US',
        'Date of arrival in the US, not admission'
    ],
    'Example': [
        '30',
        'ATL',
        'OS',
        '00089',
        '2016-04-01'
    ]
}

In [74]:
imm_arpt_dict_df = pd.DataFrame.from_dict(imm_arpt_dict)

#### 5.4.4 Visa dimension

In [75]:
imm_visa_dict = {
    'Table': 'IMM_DIM_VISA',
    'Field Name': [
        'IMM_ID',
        'VISA_TP_CD',
        'VISA_RSN_CD',     
        'VISA_ISSU_DEP_CD'
    ],
    'Data Type': [
        'INTEGER',  
        "CHAR(2)",
        'INTEGER',     
        "CHAR(3)"
    ],
    'Field size for display': [
        '4',
        '2',
        '1',
        '3'
    ],
    'Description': [
        'Immigration case ID',
        '2-letter code for VISA obtained - not decoded',
        '1 digit code for VISA application reason - decoded in separate table',
        '3-letter code for issuing city of VISA'
    ],
    'Example': [
        '265',
        'WT',
        '2',
        'None'
    ]
}

In [76]:
imm_visa_dict_df = pd.DataFrame.from_dict(imm_visa_dict)

#### 5.4.5 Combine all

In [77]:
imm_dict_df = imm_fct_dict_df \
  .append(imm_psn_dict_df) \
  .append(imm_arpt_dict_df) \
  .append(imm_visa_dict_df)

### 5.5 Combine all
Through the above work and the below consolidation all data fields are documented and can be visualised however it pleases the user, be it on a wiki page or exported to an excel-sheet etc.

In [78]:
all_dict_df = cd_dict_df \
  .append(arpt_dict_df, ignore_index = True) \
  .append(demo_dict_df, ignore_index = True) \
  .append(temp_dict_df, ignore_index = True) \
  .append(imm_dict_df, ignore_index = True)

In [79]:
pd.set_option('display.max_rows', len(all_dict_df))
all_dict_df.sort_values(by=['Table'])

Unnamed: 0,Table,Field Name,Data Type,Field size for display,Description,Example
20,ARPT_FCT,ISO_CTY_CD,CHAR(2),2,"ISO Country code, 2 characters",US
17,ARPT_FCT,ARPT_NM,VARCHAR(255),255,Name of airport,Alpine Range Airport
18,ARPT_FCT,ELEV_FT,"DECIMAL(9,2)",9,Elevation of airport above sea level in feet,670.00
19,ARPT_FCT,CONT_CD,CHAR(2),2,"2-letter code of continent, e.g. NA for North ...",
21,ARPT_FCT,ISO_RGN_CD,CHAR(7),7,"4-letter code of region in format ""country - S...",US-TX
15,ARPT_FCT,ARPT_ID,CHAR(4),4,Airport ID consisting of letters and numbers,00TS
16,ARPT_FCT,ARPT_TP,VARCHAR(20),20,"Type of airport, including ""Closed"" in case it...",Small Airport
23,ARPT_FCT,GPS_CD,CHAR(4),4,GPS location code - see also ARPT_ID,00TS
24,ARPT_FCT,IATA_ARPT_CD,CHAR(3),3,"IATA airport code, similar to ARPT_ID and GPS_CD",00TS
25,ARPT_FCT,LCL_ARPT_CD,CHAR(3),3,"Local airport code, similar to ARPT_ID, GPS_CD...",00TS


## 6. Data quality checks

We should assess two sets of quality checks: 
1. The quality of each data in separation
2. The quality of data after join

Since we have set join keys it is natural to see if they perform at all and how well they perform. If data is lost there is either a data integrity issue or the key is inappropriate.

As an example of a key that is not inappropriate yet holds a data integrity issue we can name latitude and longitude fields between the temperature data set and the airport data set. Because temperatures aren't generally measured at airports we can't expect the join to perform well. Where temperatures are taken at airports the join should perform well.

Another key which is expected to have data integrity issues is the ADM_CD which holds reasons for admission. Because we do not have a full data description of source data we do not have a description for each value. Here we should enter the missing code descriptions into the DIM_ADM_CD even if there is no description.

An example of an inappropriate key set is the ADM_CD field in DIM_ADM_CD to be joint on DEP_CD in FCT. This was suggested in the Q&A Platform but wouldn't work since they keys are non-overlapping.

**The check**

The checks can be any degree of sophisticated. We do a Proof of Concept here just by checking if each table on it's own has any rows and if each paired table returns any rows. Any amount of rows passes. In some instances an inner join should not provide any loss of records, e.g. the aforementioned ADM_CD, if written correctly, should not let a table lose any records when joined upon.

**The order**

The checks will be created for each table then be combined for one large dictionary. The order of creation will be as follows:
0. Code tables built from text files and Google searches
1. Airport data
2. Demograhics data
3. Temperature data
4. Immigration data

**The test function**
We have a test function, *count_rows*, that counts the rows for multiple conditions:
1. The amount of rows in a simple query
2. The amount of rows when a conditional field is required not to be null
3. The amount of rows when inner joining another table

The results can be:
1. Passed, which means there are positive rows for the simple query and no loss of rows for all others
2. Warning, which means that there is a loss or duplication of rows on queries that are not the simple query relative to the simple query
3. Fail, which means the query returns no rows.

In [9]:
def count_rows(spark_df, cond_fld=None, join_dict=None):
    '''
    Description
    -----------
    This function takes a spark dataframe and performs
    queries on this to count the records. If a condition
    field is inputted, it will count records with no nulls
    on that field. If a join dictionary is provided it will
    count records outputted for each table after inner joining
    with the spark dataframe.
    
    Parameters
    ----------
    spark_df : str
        The name of a Spark Data Frame from the data model.
    
    cond_fld : str
        A string representing a field in the spark dataframe
        that is required to not be null.
    
    join_dict : dictionary
        A dictionary where all keys are tables to join on the
        spark dataframe, and which requires the values to be
        dictionaries consisting of a "left_join_fld" and a 
        "right_join_fld" with the former being the join field
        from the spark dataframe and the latter being the join
        field from the join table.
        
    
    Returns
    -------
    dic : dictionary
        A dictionary containing the tests as keys and their 
        row counts as values.
    '''
    base_qry = f'SELECT * FROM {spark_df}'
    return_dict = {}
    
    # Make base query
    print('Querying base table: ' + spark_df)
    base_row_count = spark.sql(base_qry).count()
    return_dict['base_row_count'] = base_row_count
    
    if cond_fld is not None:
        cond_qry = base_qry + f' WHERE {cond_fld} IS NOT NULL'
    
        # Make conditional query
        print('Querying conditional table: ' + spark_df)
        cond_row_count = spark.sql(cond_qry).count()
        return_dict['cond_row_count'] = cond_row_count
    
    if join_dict is not None:    
        for k, v in join_dict.items():
            print(f'Get join conditions for table {k}: ')

            try:
                jt = k
                ljf = v['left_join_fld']
                rjf = v['right_join_fld']
            except KeyError as e:
                print('Key is missing.')
                print(e)

            print('Querying join statement')
            join_qry = f'SELECT * FROM {spark_df} AS mt \
              INNER JOIN {jt} AS jt \
              ON mt.{ljf} = jt.{rjf}'

            join_row_count = spark.sql(join_qry).count()
            return_dict[k] = join_row_count
    
    for k, v in return_dict.items():
        if v == 0:
            print('Failed: Test ' + k + ' has no records.')
        elif v < base_row_count:
            print('Warning: Test ' + k + ' has passed but records have been lost.' )
        elif v > base_row_count:
            print('Warning: Test ' + k + ' has passed but records have been duplicated.')
        else:
            print('Success: Test ' + k + ' has passed with no lost records.')
            
        print('Query ' + k + ' has ' + str(v) + ' rows.')
    
    print('Tests done.')
    return(return_dict)

### 6.1 Immigration

We will test the following:
1. Fct
  1. Join on ADM_CD
  2. Join on ADM_MTD_CD
  3. Join on DEP_CD
  4. Join on ADM_UPD_CD
2. Person dimension
  1. Join on CTY_OF_CTZ_ID
  2. Join on CTY_OF_RSDN_ID
  3. Join on RGN_CD
3. Airport dimension
  1. Join on ARPT_CD
4. Visa dimension
  1. Join on VISA_RSN_CD
  2. Join on VISA_ISSU_DEP_CD

#### 6.1.1 FCT

In [81]:
imm_fct.createOrReplaceTempView('IMM_FCT')

In [82]:
spark.createDataFrame(cd_adm_st).createOrReplaceTempView('cd_adm_st')
spark.createDataFrame(cd_adm_mtd).createOrReplaceTempView('cd_adm_mtd')
spark.createDataFrame(cd_dep).createOrReplaceTempView('cd_dep')

In [83]:
join_dict = {
    'CD_ADM_ST' : {'left_join_fld' : 'ADM_CD', 'right_join_fld' : 'ADM_ST_CD'},
    'CD_ADM_MTD': {'left_join_fld' : 'ADM_MTD_CD', 'right_join_fld' : 'ADM_MTD_CD'},
    'CD_DEP' : {'left_join_fld' : 'DEP_CD', 'right_join_fld' : 'DEP_CD'}
}

In [106]:
ret_dict = count_rows('IMM_FCT', cond_fld='IMM_ID', join_dict=join_dict)

Querying base table: imm_fct_df
Querying conditional table: imm_fct_df
Get join conditions for table CD_ADM_ST: 
Querying join statement
Get join conditions for table CD_ADM_MTD: 
Querying join statement
Get join conditions for table CD_DEP: 
Querying join statement
Success: Test base_row_count has passed with no lost records.
Query base_row_count has 3096313 rows.
Success: Test cond_row_count has passed with no lost records.
Query cond_row_count has 3096313 rows.
Query CD_ADM_ST has 2955975 rows.
Query CD_ADM_MTD has 3096074 rows.
Query CD_DEP has 2957884 rows.


#### 6.1.2 PSN_DIM

In [84]:
imm_dim_psn.createOrReplaceTempView('IMM_DIM_PSN')

In [85]:
spark.createDataFrame(cd_cty).createOrReplaceTempView('CD_CTY')
spark.createDataFrame(cd_rgn).createOrReplaceTempView('CD_RGN')

In [86]:
join_dict = {
    'IMM_FCT' : {'left_join_fld' : 'IMM_ID', 'right_join_fld' : 'IMM_ID'},
    'CD_CTY': {'left_join_fld' : 'CTY_OF_RSDN_ID', 'right_join_fld' : 'CTY_CD'},
    'CD_RGN' : {'left_join_fld' : 'ADR_RGN_CD', 'right_join_fld' : 'RGN_CD'}
}

In [120]:
ret_dict = count_rows('IMM_DIM_PSN', cond_fld='IMM_ID', join_dict=join_dict)

Querying base table: IMM_DIM_PSN
Querying conditional table: IMM_DIM_PSN
Get join conditions for table IMM_FCT: 
Querying join statement
Get join conditions for table CD_CTY: 
Querying join statement
Get join conditions for table CD_RGN: 
Querying join statement
Success: Test base_row_count has passed with no lost records.
Query base_row_count has 3096313 rows.
Success: Test cond_row_count has passed with no lost records.
Query cond_row_count has 3096313 rows.
Success: Test IMM_FCT has passed with no lost records.
Query IMM_FCT has 3096313 rows.
Success: Test CD_CTY has passed with no lost records.
Query CD_CTY has 3096313 rows.
Query CD_RGN has 2917199 rows.


#### 6.1.3 ARPT_DIM

In [87]:
imm_dim_arpt.createOrReplaceTempView('IMM_DIM_ARPT')

In [88]:
spark.createDataFrame(cd_arpt).createOrReplaceTempView('CD_ARPT')

In [89]:
join_dict = {
    'IMM_FCT' : {'left_join_fld' : 'IMM_ID', 'right_join_fld' : 'IMM_ID'},
    'CD_ARPT': {'left_join_fld' : 'ARPT_CD', 'right_join_fld' : 'ARPT_CD'}
}

In [127]:
arpt_ret_dict = count_rows('IMM_DIM_ARPT', cond_fld='IMM_ID', join_dict=join_dict)

Querying base table: IMM_DIM_ARPT
Querying conditional table: IMM_DIM_ARPT
Get join conditions for table IMM_FCT: 
Querying join statement
Get join conditions for table CD_ARPT: 
Querying join statement
Success: Test base_row_count has passed with no lost records.
Query base_row_count has 3096313 rows.
Success: Test cond_row_count has passed with no lost records.
Query cond_row_count has 3096313 rows.
Success: Test IMM_FCT has passed with no lost records.
Query IMM_FCT has 3096313 rows.
Success: Test CD_ARPT has passed with no lost records.
Query CD_ARPT has 3096313 rows.


#### 6.1.4 VISA_DIM

In [90]:
imm_dim_visa.createOrReplaceTempView('IMM_DIM_VISA')

In [91]:
spark.createDataFrame(cd_visa_rsn).createOrReplaceTempView('CD_VISA_RSN')

In [92]:
join_dict = {
    'IMM_FCT' : {'left_join_fld' : 'IMM_ID', 'right_join_fld' : 'IMM_ID'},
    'CD_VISA_RSN': {'left_join_fld' : 'VISA_RSN_CD', 'right_join_fld' : 'VISA_RSN_CD'}
}

In [131]:
visa_ret_dict = count_rows('IMM_DIM_VISA', cond_fld='IMM_ID', join_dict=join_dict)

Querying base table: IMM_DIM_VISA
Querying conditional table: IMM_DIM_VISA
Get join conditions for table IMM_FCT: 
Querying join statement
Get join conditions for table CD_VISA_RSN: 
Querying join statement
Success: Test base_row_count has passed with no lost records.
Query base_row_count has 3096313 rows.
Success: Test cond_row_count has passed with no lost records.
Query cond_row_count has 3096313 rows.
Success: Test IMM_FCT has passed with no lost records.
Query IMM_FCT has 3096313 rows.
Success: Test CD_VISA_RSN has passed with no lost records.
Query CD_VISA_RSN has 3096313 rows.


### 6.2 Demographics

#### 6.2.1 DEMO_FCT

In [96]:
demo_fct.createOrReplaceTempView('DEMO_FCT')
demo_fct_race.createOrReplaceTempView('DEMO_FCT_RACE')
cd_race_tp.createOrReplaceTempView('CD_RACE_TP')

In [97]:
join_dict = {
    'CD_RGN' : {'left_join_fld' : 'RGN_CD', 'right_join_fld' : 'RGN_CD'},
    'DEMO_FCT_RACE': {'left_join_fld' : 'DEMO_CITY_ID', 'right_join_fld' : 'DEMO_CITY_ID'}
}

In [98]:
demo_fct_ret_dict = count_rows('DEMO_FCT', cond_fld='DEMO_CITY_ID', join_dict=join_dict)

Querying base table: DEMO_FCT
Querying conditional table: DEMO_FCT
Get join conditions for table CD_RGN: 
Querying join statement
Get join conditions for table DEMO_FCT_RACE: 
Querying join statement
Success: Test base_row_count has passed with no lost records.
Query base_row_count has 596 rows.
Success: Test cond_row_count has passed with no lost records.
Query cond_row_count has 596 rows.
Success: Test CD_RGN has passed with no lost records.
Query CD_RGN has 596 rows.
Query DEMO_FCT_RACE has 2891 rows.


#### 6.2.2 DEMO_FCT_RACE

In [99]:
join_dict = {
    'CD_RACE_TP': {'left_join_fld' : 'RACE_TP_CD', 'right_join_fld' : 'RACE_TP_CD'}
}

In [100]:
demo_fct_race_ret_dict = count_rows('DEMO_FCT_RACE', cond_fld='RACE_TP_CD', join_dict=join_dict)

Querying base table: DEMO_FCT_RACE
Querying conditional table: DEMO_FCT_RACE
Get join conditions for table CD_RACE_TP: 
Querying join statement
Success: Test base_row_count has passed with no lost records.
Query base_row_count has 2891 rows.
Success: Test cond_row_count has passed with no lost records.
Query cond_row_count has 2891 rows.
Success: Test CD_RACE_TP has passed with no lost records.
Query CD_RACE_TP has 2891 rows.


### 6.3 Airports

In [102]:
arpt_fct.createOrReplaceTempView('ARPT_FCT')

In [103]:
join_dict = {
    'IMM_DIM_VISA' : {'left_join_fld' : 'IATA_ARPT_CD', 'right_join_fld' : 'VISA_ISSU_DEP_CD'}
}

In [104]:
arpt_fct_ret_dict = count_rows('ARPT_FCT', cond_fld='ARPT_ID', join_dict=join_dict)

Querying base table: ARPT_FCT
Querying conditional table: ARPT_FCT
Get join conditions for table IMM_DIM_VISA: 
Querying join statement
Success: Test base_row_count has passed with no lost records.
Query base_row_count has 55075 rows.
Success: Test cond_row_count has passed with no lost records.
Query cond_row_count has 55075 rows.
Query IMM_DIM_VISA has 939955 rows.
Tests done.


### 6.4 Temperatures

In [33]:
temp_fct.createOrReplaceTempView('TEMP_FCT')
temp_dim_city.createOrReplaceTempView('TEMP_DIM_CITY')

In [34]:
join_dict = {
    'TEMP_DIM_CITY' : {'left_join_fld' : 'TEMP_CITY_ID', 'right_join_fld' : 'TEMP_CITY_ID'}
}

In [35]:
temp_fct_ret_dict = count_rows('TEMP_FCT', cond_fld='TEMP_CITY_ID', join_dict=join_dict)

Querying base table: TEMP_FCT
Querying conditional table: TEMP_FCT
Get join conditions for table TEMP_DIM_CITY: 
Querying join statement
Success: Test base_row_count has passed with no lost records.
Query base_row_count has 8598145 rows.
Success: Test cond_row_count has passed with no lost records.
Query cond_row_count has 8598145 rows.
Success: Test TEMP_DIM_CITY has passed with no lost records.
Query TEMP_DIM_CITY has 8598145 rows.
Tests done.


## 7. Redshift

In the case the user wants to establish a redshift connection, the below steps can be followed.

Following steps need to be taken:
1. IAM client to 
  1. Create an IAM role
  2. Attach IAM role to IAM user
2. Create Redshift client
  1. Create Redshift cluster
3. Create ec2 client
  1. Establish VPC connection to redshift


### 7.1 The IAM Client

#### 7.1.1 Creating the IAM client

In [16]:
iam = boto3.client(
    'iam',
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET,
    region_name=REGION
)

#### 7.1.2 Create IAM role
This assumes an IAM user has already been created in AWS.

In [17]:
DWH_IAM_ROLE_NAME = config.get('DWH', 'DWH_IAM_ROLE_NAME')

try:
    print("Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description='Allows Redshift clusters to call AWS services on your behalf.',
        AssumeRolePolicyDocument=json.dumps(
            {
                'Statement': [{
                    'Action': 'sts:AssumeRole',
                    'Effect': 'Allow',
                    'Principal': {'Service': 'redshift.amazonaws.com'}
                }],
                'Version': '2012-10-17'
            }
        )
    )
except iam.exceptions.EntityAlreadyExistsException as e: 
    print("Error: The IAM role is already created.")
    print(e)

Creating a new IAM Role
Error: The IAM role is already created.
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhadmin already exists.


#### 7.1.3 Attach IAM role policy

In [18]:
print("1.2 Attaching Policy")
iam.attach_role_policy(
    RoleName=DWH_IAM_ROLE_NAME,
    PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
)['ResponseMetadata']['HTTPStatusCode']

1.2 Attaching Policy


200

We want to establish a connection to a Redshift cluster where we can create the tables and populate them with data. We will set up a Redshift cluster programmatically, but first we need to make sure the basics are handled.

### 7.2 The Redshift cluster

#### 7.2.1 Create Redshift client

In [7]:
redshift = boto3.client(
    'redshift',
    region_name='us-west-2',
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

#### 7.2.2 Create redshift cluster

In [8]:
# HW
DWH_CLUSTER_TYPE       = config.get('DWH', 'DWH_CLUSTER_TYPE')
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")

# Identifiers & Credentials
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")

roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

try:
    redshift.create_cluster(
        # HW
        ClusterType        = DWH_CLUSTER_TYPE,
        NodeType           = DWH_NODE_TYPE,
        NumberOfNodes      = int(DWH_NUM_NODES),

        # Identifiers & Credentials
        DBName             = DWH_DB,
        ClusterIdentifier  = DWH_CLUSTER_IDENTIFIER,
        MasterUsername     = DWH_DB_USER,
        MasterUserPassword = DWH_DB_PASSWORD,

        #Roles (for s3 access)
        IamRoles           = [roleArn]
    )
except redshift.exceptions.ClusterAlreadyExistsFault as e: 
    print("Error: The cluster is already created")
    print(e)

Error: The cluster is already created
An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


#### 7.2.3 Describe cluster
In this description we note that we can see when the cluster is available. Until status says available we should only proceed where meaningful.

In [9]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

In [10]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.chwnsn8m2dbb.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-73f0f80b
7,NumberOfNodes,4


### 7.2.4 Describe cluster endpoint and IAM role arn

In [11]:
try:
    DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
    DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
    print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
    print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)
except KeyError as e:
    print("Check that cluster has been created (is available) - can't locate Endpoint if not.")
    print(e)

DWH_ENDPOINT ::  dwhcluster.chwnsn8m2dbb.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::355814333192:role/dwhadmin


#### 7.2.5 Open an incoming TCP port to access the cluster endpoint
Reading and writing to Redshift requires an open TCP port.

##### 7.2.5.1 Create EC2 client

In [12]:
ec2 = boto3.resource(
    'ec2',
    region_name="us-west-2",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

In [13]:
VPC_ID = myClusterProps['VpcId']
DWH_PORT = config.get('DWH', 'DWH_PORT')

try:
    vpc = ec2.Vpc(id=VPC_ID)
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        #GroupName=defaultSg.group_name,
        GroupName='default',
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)
    print("If error is that port already exists then Okay.")

ec2.SecurityGroup(id='sg-002a328934a579da2')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists
If error is that port already exists then Okay.


In [14]:
%load_ext sql

In [15]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
try:
    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
    cur = conn.cursor()
    print(conn)
except psycopg2.Error as e:
    print("Error: Could not make connection to the Postgres database")
    print(e)

<connection object at 0x7fc6ab3313d8; dsn: 'user=dwhuser password=xxx dbname=dwh host=dwhcluster.chwnsn8m2dbb.us-west-2.redshift.amazonaws.com port=5439', closed: 0>


In [59]:
conn_string

'postgresql://dwhuser:Passw0rd@dwhcluster.chwnsn8m2dbb.us-west-2.redshift.amazonaws.com:5439/dwh'

In [56]:
%sql $conn_string

'Connected: dwhuser@dwh'

### 7.3 Cleanup

#### 7.3.1 Delete cluster

In [182]:
# redshift.delete_cluster(
#     ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
#     SkipFinalClusterSnapshot=True
# )

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.chwnsn8m2dbb.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2022, 6, 4, 20, 36, 9, 103000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-fc8e7ff7',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-73f0f80b',
  'AvailabilityZone': 'us-west-2d',
  'PreferredMaintenanceWindow': 'wed:13:30-wed:14:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessible': True,
  'Encrypted': False,
  'Tags': [],
  'EnhancedVpcRouting': False,
  'IamR

#### 7.3.2 Detach IAM role from IAM user and delete the role

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!

## 8. Conclusion
Loading tables to Redshift was out of scope, but we managed to load tables to S3 and our quality checks highlighted issues in some departments for joining the code tables.

Further steps would be to:
- Improve partition of the temperature data set in particular.
- Fill missing codes in code tables
- Create further quality checks
- Set up a data pipeline in e.g. Airflow where quality checks and further are included
- Move staging tables to S3 and do Data Model transformations on Redshift - again through Airflow
- Set up a Dashboard to illustrate all data

#### Skewness and partitioning as data volume increases
As the size of data sets increases further attention should be paid to data partitioning. We saw that it caused an issue for the temperature data set and it would also cause a problem for other data sets. Here we could do a `df.field.value_counts()` to look for skewness and consequently partition wisely to ensure an even distribution across workers.

#### Daily run of pipelines at 7am
Here we would use a scheduling tool such as Airflow to ensure reliable availability of data to all analysts. Here we can schedule loads of data and quality checks and beyond to not only ensure availability, but quality also.

#### Large user base
In the case that 100+ people will need to rely on the data model built above we could deploy the data model across multiple availability zones by deploying to multiple EC2 instances. Once the user base goes into the thousands we could simply store data in S3 buckets where users access the data using RESTful API's. This solution will be more cost-efficient relative to EC2.

#### Final comments on Redshift
For Redshift, we already have the table schemas in place and the connection made, so loading to here will be fairly straightforward. I ran out of credit on my free tier account with AWS and also with Udacity, so I stopped before having to pay too much in cloud usage. Had I had more credit and more time I could have continued with the Airflow and Redshift setup.

#### Final comments on write-to-S3
Since I exhausted my free tier with AWS (both through Udacity and through my personal access also) before having fixed some typos I had to comment out some write_to_S3 calls. This was particularly unfortunate for the temperature dataset, where I was still analysing data skewness to find the optimal partition when writing to S3. The call that used up my free tier was on the temperature dataset and had been running for 30+ minutes.