In [None]:
import re
import os
import boto3
import configparser

import numpy as np
import pandas as pd
from sqlalchemy import create_engine

import matplotlib.pyplot as plt
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [None]:
config = configparser.ConfigParser()
config.read_file(open('awskey.cfg'))
CAPSTONE_AWS_ACCESS_KEY_ID = config.get('capstone', 'AWS_ACCESS_KEY_ID')
CAPSTONE_AWS_SECRET_ACCESS_KEY = config.get('capstone', 'AWS_SECRET_ACCESS_KEY')

In [None]:
s3_creds = {'region_name':"us-west-2",
            'aws_access_key_id': CAPSTONE_AWS_ACCESS_KEY_ID,
            'aws_secret_access_key': CAPSTONE_AWS_SECRET_ACCESS_KEY}
                          
client = boto3.client('s3', **s3_creds)
resource = boto3.resource('s3', **s3_creds)
bucket = resource.Bucket('us-immigration')

### Step 1: Scope the Project and Gather Data

#### Scope 
The purpose of this project is to provide a deep dive into US immigration, primiarily focusing on the type of visas being issued and the profiles associated. The scope of this project is limited to the data sources listed below with data being aggregated across numerous features such as visatype, gender, port_of_entry, nationality and month.

#### Data Description & Sources 
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office found [here](https://travel.trade.gov/research/reports/i94/historical/2016.html). Each report contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).
- World Temperature Data: This dataset came from Kaggle found [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
- U.S. City Demographic Data: This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. Dataset comes from OpenSoft found [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
- Airport Code Table: This is a simple table of airport codes and corresponding cities. The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code (from wikipedia). It comes from [here](https://datahub.io/core/airport-codes#data).

In [None]:
bucket = resource.Bucket('us-immigration')
#csv_files = [o.key for o in bucket.objects.filter(Prefix="Dimensions") if '.csv' in o.key]
csv_files = [o.key for o in bucket.objects.all() if '.csv' in o.key]
#parquet_files = [o.key for o in bucket.objects.all() if '.parquet' in o.key]

print(f"{len(csv_files)} csv files")
#print(f"{len(parquet_files)} parquet files")

In [None]:
csv_files

In [None]:
data = {}
for file in csv_files:
    data[file.replace('.csv', '')] = pd.read_csv(client.get_object(Bucket='us-immigration', Key=file)['Body'])

In [None]:
for k,v in data.items():
    k
    v.head()

In [None]:
data['immigration_data_sample'].merge(data['visa_codes'], left_on='visatype', right_on='class_of_admission'
                                     ).groupby(['gender','visatype'])[['cicid']].count()

#### Airport Data
-------------------------------------------

In [None]:
data['airport_codes'].head()

In [None]:
pd.DataFrame(data['airport_codes']['coordinates'].head().str.split(',').tolist(), columns=['Latitude', 'Longitude'])

#### Cities Data
-------------------------------------------

In [None]:
data['us_cities_demographics'].shape

In [None]:
data['us_cities_demographics'].head()

### Weather Data

In [None]:
data['GlobalLandTemperaturesByCity'].shape

In [None]:
data['GlobalLandTemperaturesByCity'].head()

#### Immigration Data
-------------------------------------------

In [None]:
# Read in the data here
!ls ../../data/18-83510-I94-Data-2016

In [None]:
# https://github.com/saurfang/spark-sas7bdat
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

df_spark =spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat", forceLowercaseNames=True, inferLong=True)
df_spark.write.parquet("i94_apr16/imm_data.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat", forceLowercaseNames=True, inferLong=True)
df_spark.write.parquet("i94_aug16/imm_data.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat", forceLowercaseNames=True, inferLong=True)
df_spark.write.parquet("i94_dec16/imm_data.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat", forceLowercaseNames=True, inferLong=True)
df_spark.write.parquet("i94_feb16/imm_data.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat", forceLowercaseNames=True, inferLong=True)
df_spark.write.parquet("i94_jan16/imm_data.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat", forceLowercaseNames=True, inferLong=True)
df_spark.write.parquet("i94_jul16/imm_data.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat", forceLowercaseNames=True, inferLong=True)
df_spark.write.parquet("i94_jun16/imm_data.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat", forceLowercaseNames=True, inferLong=True)
df_spark.write.parquet("i94_mar16/imm_data.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat", forceLowercaseNames=True, inferLong=True)
df_spark.write.parquet("i94_may16/imm_data.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat", forceLowercaseNames=True, inferLong=True)
df_spark.write.parquet("i94_nov16/imm_data.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat", forceLowercaseNames=True, inferLong=True)
df_spark.write.parquet("i94_oct16/imm_data.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat", forceLowercaseNames=True, inferLong=True)
df_spark.write.parquet("i94_sep16/imm_data.parquet",mode='overwrite',compression='snappy')

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,DoubleType,IntegerType,TimestampType,DateType
from pyspark.sql.functions import udf, col
import pyspark.sql.functions as funcToInt

immigrationSchema = StructType([        
        StructField('cicid', DoubleType()),
        StructField('i94yr', DoubleType()),
        StructField('i94mon', DoubleType()),
        StructField('i94cit', DoubleType()),
        StructField('i94res', DoubleType()),
        StructField('i94port',StringType()),
        StructField('arrdate',DoubleType()),
        StructField('i94mode', DoubleType()),
        StructField('i94addr', StringType()),        
        StructField('depdate',DoubleType()),
        StructField('i94bir', DoubleType()),
        StructField('i94visa', DoubleType()),
        StructField('count', DoubleType()),
        StructField('dtadfile', StringType()),
        StructField('visapost', StringType()), 
        StructField('occup', StringType()),
        StructField('entdepa', StringType()), 
        StructField('entdepd', StringType()), 
        StructField('entdepu', StringType()), 
        StructField('matflag', StringType()), 
        StructField('biryear', DoubleType()),
        StructField('dtaddto', StringType()), 
        StructField('gender', StringType()), 
        StructField('insnum', StringType()),   
        StructField('airline', StringType()), 
        StructField('admnum', DoubleType()), 
        StructField('fltno', StringType()), 
        StructField('visatype',StringType())
    ])

spark = SparkSession\
        .builder\
        .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
        .enableHiveSupport()\
        .getOrCreate()

sc=spark.sparkContext
hadoop_confg=sc._jsc.hadoopConfiguration()
hadoop_confg.set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_confg.set("fs.s3a.awsAccessKeyId", CAPSTONE_AWS_ACCESS_KEY_ID)
hadoop_confg.set("fs.s3a.awsSecretAccessKey", CAPSTONE_AWS_SECRET_ACCESS_KEY)

df_spark =spark.read.schema(immigrationSchema).parquet("i94_apr16/imm_data.parquet")
df_sparkFinal = df_spark.withColumn("cicid", funcToInt.round(df_spark["cicid"], 1).cast('integer'))\
                        .withColumn("i94yr", funcToInt.round(df_spark["i94yr"], 1).cast('integer'))\
                        .withColumn("i94mon", funcToInt.round(df_spark["i94mon"], 1).cast('integer'))\
                        .withColumn("i94cit", funcToInt.round(df_spark["i94cit"], 1).cast('integer'))\
                        .withColumn("i94res", funcToInt.round(df_spark["i94res"], 1).cast('integer'))\
                        .withColumn("arrdate", funcToInt.round(df_spark["arrdate"], 1).cast('integer'))\
                        .withColumn("i94mode", funcToInt.round(df_spark["i94mode"], 1).cast('integer'))\
                        .withColumn("depdate", funcToInt.round(df_spark["depdate"], 1).cast('integer'))\
                        .withColumn("i94bir", funcToInt.round(df_spark["i94bir"], 1).cast('integer'))\
                        .withColumn("i94visa", funcToInt.round(df_spark["i94visa"], 1).cast('integer'))\
                        .withColumn("count", funcToInt.round(df_spark["count"], 1).cast('integer'))\
                        .withColumn("biryear", funcToInt.round(df_spark["biryear"], 1).cast('integer'))\
                        .withColumn("admnum", funcToInt.round(df_spark["admnum"], 1).cast('integer')) 
df_sparkFinal.write.parquet("s3a://us-immigration/i94_parquet_data/i94_apr16immigrationData.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.schema(immigrationSchema).parquet("i94_aug16/imm_data.parquet")
df_sparkFinal = df_spark.withColumn("cicid", funcToInt.round(df_spark["cicid"], 1).cast('integer'))\
                        .withColumn("i94yr", funcToInt.round(df_spark["i94yr"], 1).cast('integer'))\
                        .withColumn("i94mon", funcToInt.round(df_spark["i94mon"], 1).cast('integer'))\
                        .withColumn("i94cit", funcToInt.round(df_spark["i94cit"], 1).cast('integer'))\
                        .withColumn("i94res", funcToInt.round(df_spark["i94res"], 1).cast('integer'))\
                        .withColumn("arrdate", funcToInt.round(df_spark["arrdate"], 1).cast('integer'))\
                        .withColumn("i94mode", funcToInt.round(df_spark["i94mode"], 1).cast('integer'))\
                        .withColumn("depdate", funcToInt.round(df_spark["depdate"], 1).cast('integer'))\
                        .withColumn("i94bir", funcToInt.round(df_spark["i94bir"], 1).cast('integer'))\
                        .withColumn("i94visa", funcToInt.round(df_spark["i94visa"], 1).cast('integer'))\
                        .withColumn("count", funcToInt.round(df_spark["count"], 1).cast('integer'))\
                        .withColumn("biryear", funcToInt.round(df_spark["biryear"], 1).cast('integer'))\
                        .withColumn("admnum", funcToInt.round(df_spark["admnum"], 1).cast('integer')) 
df_sparkFinal.write.parquet("s3a://us-immigration/i94_parquet_data/i94_aug16immigrationData.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.schema(immigrationSchema).parquet("i94_dec16/imm_data.parquet")
df_sparkFinal = df_spark.withColumn("cicid", funcToInt.round(df_spark["cicid"], 1).cast('integer'))\
                        .withColumn("i94yr", funcToInt.round(df_spark["i94yr"], 1).cast('integer'))\
                        .withColumn("i94mon", funcToInt.round(df_spark["i94mon"], 1).cast('integer'))\
                        .withColumn("i94cit", funcToInt.round(df_spark["i94cit"], 1).cast('integer'))\
                        .withColumn("i94res", funcToInt.round(df_spark["i94res"], 1).cast('integer'))\
                        .withColumn("arrdate", funcToInt.round(df_spark["arrdate"], 1).cast('integer'))\
                        .withColumn("i94mode", funcToInt.round(df_spark["i94mode"], 1).cast('integer'))\
                        .withColumn("depdate", funcToInt.round(df_spark["depdate"], 1).cast('integer'))\
                        .withColumn("i94bir", funcToInt.round(df_spark["i94bir"], 1).cast('integer'))\
                        .withColumn("i94visa", funcToInt.round(df_spark["i94visa"], 1).cast('integer'))\
                        .withColumn("count", funcToInt.round(df_spark["count"], 1).cast('integer'))\
                        .withColumn("biryear", funcToInt.round(df_spark["biryear"], 1).cast('integer'))\
                        .withColumn("admnum", funcToInt.round(df_spark["admnum"], 1).cast('integer')) 
df_sparkFinal.write.parquet("s3a://us-immigration/i94_parquet_data/i94_dec16immigrationData.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.schema(immigrationSchema).parquet("i94_feb16/imm_data.parquet")
df_sparkFinal = df_spark.withColumn("cicid", funcToInt.round(df_spark["cicid"], 1).cast('integer'))\
                        .withColumn("i94yr", funcToInt.round(df_spark["i94yr"], 1).cast('integer'))\
                        .withColumn("i94mon", funcToInt.round(df_spark["i94mon"], 1).cast('integer'))\
                        .withColumn("i94cit", funcToInt.round(df_spark["i94cit"], 1).cast('integer'))\
                        .withColumn("i94res", funcToInt.round(df_spark["i94res"], 1).cast('integer'))\
                        .withColumn("arrdate", funcToInt.round(df_spark["arrdate"], 1).cast('integer'))\
                        .withColumn("i94mode", funcToInt.round(df_spark["i94mode"], 1).cast('integer'))\
                        .withColumn("depdate", funcToInt.round(df_spark["depdate"], 1).cast('integer'))\
                        .withColumn("i94bir", funcToInt.round(df_spark["i94bir"], 1).cast('integer'))\
                        .withColumn("i94visa", funcToInt.round(df_spark["i94visa"], 1).cast('integer'))\
                        .withColumn("count", funcToInt.round(df_spark["count"], 1).cast('integer'))\
                        .withColumn("biryear", funcToInt.round(df_spark["biryear"], 1).cast('integer'))\
                        .withColumn("admnum", funcToInt.round(df_spark["admnum"], 1).cast('integer')) 
df_sparkFinal.write.parquet("s3a://us-immigration/i94_parquet_data/i94_feb16immigrationData.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.schema(immigrationSchema).parquet("i94_jan16/imm_data.parquet")
df_sparkFinal = df_spark.withColumn("cicid", funcToInt.round(df_spark["cicid"], 1).cast('integer'))\
                        .withColumn("i94yr", funcToInt.round(df_spark["i94yr"], 1).cast('integer'))\
                        .withColumn("i94mon", funcToInt.round(df_spark["i94mon"], 1).cast('integer'))\
                        .withColumn("i94cit", funcToInt.round(df_spark["i94cit"], 1).cast('integer'))\
                        .withColumn("i94res", funcToInt.round(df_spark["i94res"], 1).cast('integer'))\
                        .withColumn("arrdate", funcToInt.round(df_spark["arrdate"], 1).cast('integer'))\
                        .withColumn("i94mode", funcToInt.round(df_spark["i94mode"], 1).cast('integer'))\
                        .withColumn("depdate", funcToInt.round(df_spark["depdate"], 1).cast('integer'))\
                        .withColumn("i94bir", funcToInt.round(df_spark["i94bir"], 1).cast('integer'))\
                        .withColumn("i94visa", funcToInt.round(df_spark["i94visa"], 1).cast('integer'))\
                        .withColumn("count", funcToInt.round(df_spark["count"], 1).cast('integer'))\
                        .withColumn("biryear", funcToInt.round(df_spark["biryear"], 1).cast('integer'))\
                        .withColumn("admnum", funcToInt.round(df_spark["admnum"], 1).cast('integer')) 
df_sparkFinal.write.parquet("s3a://us-immigration/i94_parquet_data/i94_jan16immigrationData.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.schema(immigrationSchema).parquet("i94_jul16/imm_data.parquet")
df_sparkFinal = df_spark.withColumn("cicid", funcToInt.round(df_spark["cicid"], 1).cast('integer'))\
                        .withColumn("i94yr", funcToInt.round(df_spark["i94yr"], 1).cast('integer'))\
                        .withColumn("i94mon", funcToInt.round(df_spark["i94mon"], 1).cast('integer'))\
                        .withColumn("i94cit", funcToInt.round(df_spark["i94cit"], 1).cast('integer'))\
                        .withColumn("i94res", funcToInt.round(df_spark["i94res"], 1).cast('integer'))\
                        .withColumn("arrdate", funcToInt.round(df_spark["arrdate"], 1).cast('integer'))\
                        .withColumn("i94mode", funcToInt.round(df_spark["i94mode"], 1).cast('integer'))\
                        .withColumn("depdate", funcToInt.round(df_spark["depdate"], 1).cast('integer'))\
                        .withColumn("i94bir", funcToInt.round(df_spark["i94bir"], 1).cast('integer'))\
                        .withColumn("i94visa", funcToInt.round(df_spark["i94visa"], 1).cast('integer'))\
                        .withColumn("count", funcToInt.round(df_spark["count"], 1).cast('integer'))\
                        .withColumn("biryear", funcToInt.round(df_spark["biryear"], 1).cast('integer'))\
                        .withColumn("admnum", funcToInt.round(df_spark["admnum"], 1).cast('integer')) 
df_sparkFinal.write.parquet("s3a://us-immigration/i94_parquet_data/i94_jul16immigrationData.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.schema(immigrationSchema).parquet("i94_jun16/imm_data.parquet")
df_sparkFinal = df_spark.withColumn("cicid", funcToInt.round(df_spark["cicid"], 1).cast('integer'))\
                        .withColumn("i94yr", funcToInt.round(df_spark["i94yr"], 1).cast('integer'))\
                        .withColumn("i94mon", funcToInt.round(df_spark["i94mon"], 1).cast('integer'))\
                        .withColumn("i94cit", funcToInt.round(df_spark["i94cit"], 1).cast('integer'))\
                        .withColumn("i94res", funcToInt.round(df_spark["i94res"], 1).cast('integer'))\
                        .withColumn("arrdate", funcToInt.round(df_spark["arrdate"], 1).cast('integer'))\
                        .withColumn("i94mode", funcToInt.round(df_spark["i94mode"], 1).cast('integer'))\
                        .withColumn("depdate", funcToInt.round(df_spark["depdate"], 1).cast('integer'))\
                        .withColumn("i94bir", funcToInt.round(df_spark["i94bir"], 1).cast('integer'))\
                        .withColumn("i94visa", funcToInt.round(df_spark["i94visa"], 1).cast('integer'))\
                        .withColumn("count", funcToInt.round(df_spark["count"], 1).cast('integer'))\
                        .withColumn("biryear", funcToInt.round(df_spark["biryear"], 1).cast('integer'))\
                        .withColumn("admnum", funcToInt.round(df_spark["admnum"], 1).cast('integer')) 
df_sparkFinal.write.parquet("s3a://us-immigration/i94_parquet_data/i94_jun16immigrationData.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.schema(immigrationSchema).parquet("i94_mar16/imm_data.parquet")
df_sparkFinal = df_spark.withColumn("cicid", funcToInt.round(df_spark["cicid"], 1).cast('integer'))\
                        .withColumn("i94yr", funcToInt.round(df_spark["i94yr"], 1).cast('integer'))\
                        .withColumn("i94mon", funcToInt.round(df_spark["i94mon"], 1).cast('integer'))\
                        .withColumn("i94cit", funcToInt.round(df_spark["i94cit"], 1).cast('integer'))\
                        .withColumn("i94res", funcToInt.round(df_spark["i94res"], 1).cast('integer'))\
                        .withColumn("arrdate", funcToInt.round(df_spark["arrdate"], 1).cast('integer'))\
                        .withColumn("i94mode", funcToInt.round(df_spark["i94mode"], 1).cast('integer'))\
                        .withColumn("depdate", funcToInt.round(df_spark["depdate"], 1).cast('integer'))\
                        .withColumn("i94bir", funcToInt.round(df_spark["i94bir"], 1).cast('integer'))\
                        .withColumn("i94visa", funcToInt.round(df_spark["i94visa"], 1).cast('integer'))\
                        .withColumn("count", funcToInt.round(df_spark["count"], 1).cast('integer'))\
                        .withColumn("biryear", funcToInt.round(df_spark["biryear"], 1).cast('integer'))\
                        .withColumn("admnum", funcToInt.round(df_spark["admnum"], 1).cast('integer')) 
df_sparkFinal.write.parquet("s3a://us-immigration/i94_parquet_data/i94_mar16immigrationData.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.schema(immigrationSchema).parquet("i94_may16/imm_data.parquet")
df_sparkFinal = df_spark.withColumn("cicid", funcToInt.round(df_spark["cicid"], 1).cast('integer'))\
                        .withColumn("i94yr", funcToInt.round(df_spark["i94yr"], 1).cast('integer'))\
                        .withColumn("i94mon", funcToInt.round(df_spark["i94mon"], 1).cast('integer'))\
                        .withColumn("i94cit", funcToInt.round(df_spark["i94cit"], 1).cast('integer'))\
                        .withColumn("i94res", funcToInt.round(df_spark["i94res"], 1).cast('integer'))\
                        .withColumn("arrdate", funcToInt.round(df_spark["arrdate"], 1).cast('integer'))\
                        .withColumn("i94mode", funcToInt.round(df_spark["i94mode"], 1).cast('integer'))\
                        .withColumn("depdate", funcToInt.round(df_spark["depdate"], 1).cast('integer'))\
                        .withColumn("i94bir", funcToInt.round(df_spark["i94bir"], 1).cast('integer'))\
                        .withColumn("i94visa", funcToInt.round(df_spark["i94visa"], 1).cast('integer'))\
                        .withColumn("count", funcToInt.round(df_spark["count"], 1).cast('integer'))\
                        .withColumn("biryear", funcToInt.round(df_spark["biryear"], 1).cast('integer'))\
                        .withColumn("admnum", funcToInt.round(df_spark["admnum"], 1).cast('integer')) 
df_sparkFinal.write.parquet("s3a://us-immigration/i94_parquet_data/i94_may16immigrationData.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.schema(immigrationSchema).parquet("i94_nov16/imm_data.parquet")
df_sparkFinal = df_spark.withColumn("cicid", funcToInt.round(df_spark["cicid"], 1).cast('integer'))\
                        .withColumn("i94yr", funcToInt.round(df_spark["i94yr"], 1).cast('integer'))\
                        .withColumn("i94mon", funcToInt.round(df_spark["i94mon"], 1).cast('integer'))\
                        .withColumn("i94cit", funcToInt.round(df_spark["i94cit"], 1).cast('integer'))\
                        .withColumn("i94res", funcToInt.round(df_spark["i94res"], 1).cast('integer'))\
                        .withColumn("arrdate", funcToInt.round(df_spark["arrdate"], 1).cast('integer'))\
                        .withColumn("i94mode", funcToInt.round(df_spark["i94mode"], 1).cast('integer'))\
                        .withColumn("depdate", funcToInt.round(df_spark["depdate"], 1).cast('integer'))\
                        .withColumn("i94bir", funcToInt.round(df_spark["i94bir"], 1).cast('integer'))\
                        .withColumn("i94visa", funcToInt.round(df_spark["i94visa"], 1).cast('integer'))\
                        .withColumn("count", funcToInt.round(df_spark["count"], 1).cast('integer'))\
                        .withColumn("biryear", funcToInt.round(df_spark["biryear"], 1).cast('integer'))\
                        .withColumn("admnum", funcToInt.round(df_spark["admnum"], 1).cast('integer')) 
df_sparkFinal.write.parquet("s3a://us-immigration/i94_parquet_data/i94_nov16immigrationData.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.schema(immigrationSchema).parquet("i94_oct16/imm_data.parquet")
df_sparkFinal = df_spark.withColumn("cicid", funcToInt.round(df_spark["cicid"], 1).cast('integer'))\
                        .withColumn("i94yr", funcToInt.round(df_spark["i94yr"], 1).cast('integer'))\
                        .withColumn("i94mon", funcToInt.round(df_spark["i94mon"], 1).cast('integer'))\
                        .withColumn("i94cit", funcToInt.round(df_spark["i94cit"], 1).cast('integer'))\
                        .withColumn("i94res", funcToInt.round(df_spark["i94res"], 1).cast('integer'))\
                        .withColumn("arrdate", funcToInt.round(df_spark["arrdate"], 1).cast('integer'))\
                        .withColumn("i94mode", funcToInt.round(df_spark["i94mode"], 1).cast('integer'))\
                        .withColumn("depdate", funcToInt.round(df_spark["depdate"], 1).cast('integer'))\
                        .withColumn("i94bir", funcToInt.round(df_spark["i94bir"], 1).cast('integer'))\
                        .withColumn("i94visa", funcToInt.round(df_spark["i94visa"], 1).cast('integer'))\
                        .withColumn("count", funcToInt.round(df_spark["count"], 1).cast('integer'))\
                        .withColumn("biryear", funcToInt.round(df_spark["biryear"], 1).cast('integer'))\
                        .withColumn("admnum", funcToInt.round(df_spark["admnum"], 1).cast('integer')) 
df_sparkFinal.write.parquet("s3a://us-immigration/i94_parquet_data/i94_oct16immigrationData.parquet",mode='overwrite',compression='snappy')

df_spark =spark.read.schema(immigrationSchema).parquet("i94_sep16/imm_data.parquet")
df_sparkFinal = df_spark.withColumn("cicid", funcToInt.round(df_spark["cicid"], 1).cast('integer'))\
                        .withColumn("i94yr", funcToInt.round(df_spark["i94yr"], 1).cast('integer'))\
                        .withColumn("i94mon", funcToInt.round(df_spark["i94mon"], 1).cast('integer'))\
                        .withColumn("i94cit", funcToInt.round(df_spark["i94cit"], 1).cast('integer'))\
                        .withColumn("i94res", funcToInt.round(df_spark["i94res"], 1).cast('integer'))\
                        .withColumn("arrdate", funcToInt.round(df_spark["arrdate"], 1).cast('integer'))\
                        .withColumn("i94mode", funcToInt.round(df_spark["i94mode"], 1).cast('integer'))\
                        .withColumn("depdate", funcToInt.round(df_spark["depdate"], 1).cast('integer'))\
                        .withColumn("i94bir", funcToInt.round(df_spark["i94bir"], 1).cast('integer'))\
                        .withColumn("i94visa", funcToInt.round(df_spark["i94visa"], 1).cast('integer'))\
                        .withColumn("count", funcToInt.round(df_spark["count"], 1).cast('integer'))\
                        .withColumn("biryear", funcToInt.round(df_spark["biryear"], 1).cast('integer'))\
                        .withColumn("admnum", funcToInt.round(df_spark["admnum"], 1).cast('integer')) 
df_sparkFinal.write.parquet("s3a://us-immigration/i94_parquet_data/i94_sep16immigrationData.parquet",mode='overwrite',compression='snappy')


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,DoubleType,IntegerType,TimestampType,DateType
from pyspark.sql.functions import udf, col
import pyspark.sql.functions as funcToInt
spark = SparkSession\
        .builder\
        .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
        .enableHiveSupport()\
        .getOrCreate()

sc=spark.sparkContext
hadoop_confg=sc._jsc.hadoopConfiguration()
hadoop_confg.set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_confg.set("fs.s3a.awsAccessKeyId", CAPSTONE_AWS_ACCESS_KEY_ID)
hadoop_confg.set("fs.s3a.awsSecretAccessKey", CAPSTONE_AWS_SECRET_ACCESS_KEY)
df = spark.read.parquet("s3a://us-immigration/i94_parquet_data/*.parquet")

In [None]:
df.select('arrdate','depdate','dtadfile','dtaddto').show()

In [None]:
import datetime
s = "01232017"
#s_datetime = datetime.datetime.strptime(s, '%Y%m%d')
#s_datetime = datetime.datetime.strptime(s, '%m%d%Y')
print(s_datetime)

- i94yr: 4 digit year
- i94mon: numeric month
- i94cit & i94res: valid & invalid codes for processing
- i94port: valid & invalid codes for processing
- arrdate: arrival date in USA. SAS date numeric field.
- i94mode: 1 = 'Air', 2 = 'Sea', 3 = 'Land', 9 = 'Not Reported'
- i94addr: 
- i94bir: age of respondent in years
- i94 visa: visa code, 1 = 'Business', 2 = 'Pleasure', 3 = 'Student'
- count: summary statistics
- dtadfile: character date field
- visapost: department of state where visa was issued
- occup: occupation performed in the U.S.
- entdepa: arrival flag - admitted or paroled into the U.S.
- entdepu: update flag - apprehended or overstayed, adjusted to perm residence
- matflag: match flag - match of arrival and departure records
- biryear: 4 digit year of birth
- dtaddto: chracter date field - date to which admitted to U.S.
- gender
- insnum: INS number
- airline: airline sued to arrive in U.S.
- admnum - admission number
- fltnoL flight number of airline used to arrive in U.S.
- visatype: class of admission legally admitting the non-immigrant to temporarily stay in U.S.

In [None]:
df.groupby(['gender','visatype']).agg({'count': 'count'}).collect()

### Step 2: Preprocessing Data
Note: preprocessing was performed prior to storing CSV files in S3 buckets i.e. converting expanding columns, Capitalizing/Lowercasing test etc.
#### Explore Data 
- Identify missing values
- Identify duplicate values

In [None]:
for k,v in data.items():
    null_count = v.isnull().sum()
    if null_count.sum()>0:
        ax = (null_count[null_count>0]/v.shape[0]).plot(kind='bar', title=f"{k} Null %")
        plt.show()

In [None]:
ax = (data['airport_codes'][data['airport_codes'].iata_code.isnull()].type.value_counts()/
      data['airport_codes'].type.value_counts()).plot(kind='bar', title=f"Airport Type Null %")

The above provides further insight into the majority of null values in the airport data. Specifically, majority of null values associated with airport data stem from missing `iata_codes`. Upon further investigation it appears that the lesser important airports associated with immigration, namely, `balloon ports, closed, heliports, seaplanes and small airports` do not have corresponding `iata_codes` information.

#### Cleaning Steps
- Either drop rows or fill missing data with median values where appropriate
- Expand coordinates to Latitude & Longitude columns
- Expand locations to City & State columns
e.g. the data provided for `port_of_entry_codes` was originally `code` and `location`. These have subsequently been expanded out to `city` and `state_or_country` as shown below:

In [None]:
data['port_of_entry_codes'][['code', 'location']].head()

In [None]:
pd.DataFrame(data['port_of_entry_codes']['location'].head().str.split(',').tolist(), columns=['city', 'state'])

### Step 3: Data Model
<img src="./images/schema.png"/>

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Creating the data model involves various steps, which can be made significantly easier through the use of Airflow. The process of extracting files from S3 buckets, transforming the data and then writing CSV and PARQUET files to Redshift is accomplished through various tasks highlighted below in the ETL Dag graph. These steps include:
- Extracting data from SAS Documents and writing as CSV files to S3 immigration bucket
- Extracting remaining CSV and PARQUET files from S3 immigration bucket
- Writing CSV and PARQUET files from S3 to Redshift
- Performing data quality checks on the newly created tables
<img src="./images/dag_graph.png"/>

#### 4.2 Data Quality Checks
Data quality checks include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness

#### Check Tables

In [None]:
config = configparser.ConfigParser()
config.read_file(open('awskey.cfg'))
CAPSTONE_REDSHIFT_USER = config.get('redshift', 'CAPSTONE_REDSHIFT_USER')
CAPSTONE_REDSHIFT_PASSWORD = config.get('redshift', 'CAPSTONE_REDSHIFT_PASSWORD')
CAPSTONE_HOST = config.get('redshift', 'CAPSTONE_HOST')
CAPSTONE_PORT = config.get('redshift', 'CAPSTONE_PORT')
CAPSTONE_DB = config.get('redshift', 'CAPSTONE_DB')

from sqlalchemy import create_engine
eng = create_engine(f'postgres://{CAPSTONE_REDSHIFT_USER}:{CAPSTONE_REDSHIFT_PASSWORD}@{CAPSTONE_HOST}:{CAPSTONE_PORT}/{CAPSTONE_DB}')

In [None]:
qry = eng.execute("""
SELECT TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG='dev'
""")
tables = pd.DataFrame([{**row} for row in qry]).tail(12)
tables

In [None]:
qry = eng.execute("""select * from i94mode limit 5;""")
pd.DataFrame([{**row} for row in qry])

In [None]:
tables['table_name'][1:]

#### Check Distinct Rows

In [None]:
def get_col_from_table(table, col=0):
    qry = eng.execute(f"select * from {table} limit 1;")
    return pd.DataFrame([{**row} for row in qry]).columns[col]

for table in tables['table_name'][1:]:
    column = get_col_from_table(table)
    sql = f"SELECT COUNT(DISTINCT {column}) FROM {table};"
    c = eng.execute(sql).fetchone()[0]
    print(f"{table}:\n\t{c} distinct rows for column {column}\n")

#### Basic Study of Visa Counts by Gender

In [None]:
qry = eng.execute("""
SELECT 
    im.gender, 
    im.visatype,
    count(im.cicid) 
FROM immigration as im 
    JOIN visa_codes 
    ON im.visatype = visa_codes.class_of_admission 
GROUP BY im.gender, im.visatype
""")

qry_df = pd.DataFrame([{**row} for row in qry])

In [None]:
df = qry_df.set_index(['visatype', 'gender']).unstack()[[('count', 'M'),('count', 'F')]]
df.columns=['F', 'M']
df.plot(kind='bar');

#### 4.3 Data dictionary 
Create a data dictionary for your data
model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.