In [None]:
#import all necessary files for capstone project
import pandas as pd
import boto3
from io import StringIO
import configparser
import glob
import os
import json
import numpy as np
import psycopg2
import datetime
from sqlalchemy import create_engine
from sql_queries import *
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, rand
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import expr,from_unixtime,row_number,dayofweek,year,month,dayofmonth,hour,date_format,desc,col,dense_rank,rank,weekofyear,monotonically_increasing_id
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, TimestampType
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

In [None]:
# Point to the dwh.cfg file to get the access and secret keys for reading and writing to S3 on AWS
config = configparser.ConfigParser()
config.read('dwh.cfg')

# Read the AWS access and secret keys
os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','KEY')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','SECRET')

In [None]:
#define measurer to get max len for each column to give datatype length in model
measurer = np.vectorize(len)

In [None]:
"""Create S3 bucket connection"""
s3 = boto3.resource('s3',region_name='us-west-2',aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],\
                    aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'])

In [None]:
# Build spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()



In [None]:
### Read immigration data for april in SAS format
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#cast ARRIVAL_DATE column to timestamp
get_timestamp = udf(lambda x: x/1000, Dbl())
df_spark = df_spark.withColumn('ts2', get_timestamp('arrdate'))
df_spark = df_spark.withColumn('ARRIVAL_DATE', from_unixtime('ts2').cast(dataType=TimestampType()))

In [None]:
# rename and cast the columns in the immigration dataframe
df_spark1 = df_spark.selectExpr("cast(cicid as integer)  IMMIGRANT_ID","cast(i94yr as int) ARRIVAL_YEAR", "cast(i94mon as int) ARRIVAL_MONTH", "cast(i94cit as int) COUNTY_CITIZEN", \
                         "cast(i94res as int)COUNTY_RESIDENCE", "cast(i94port as varchar(10)) PORT_ID","cast(ARRIVAL_DATE as date) ARRIVAL_DATE", "cast(i94mode as int) MODE_ID",\
                        "i94addr as STATE_ID","cast(year(current_date())-biryear as int) AGE", "cast(i94visa as int) VISA_ID", "matflag as MATCH_FLAG", "cast(biryear as int) BIRTH_YEAR", "gender as GENDER",\
                         "airline as AIRLINE", "cast(admnum as bigint) ADMISSION_NUMBER", "fltno as FLIGHT_NO", "visatype as VISA_TYPE"
                        )

In [None]:
#check the dataframe structure (quality check)
df_spark1.printSchema()

In [None]:
#buffer dataframe to csv on S3 bucket
df_spark1=df_spark1.repartition(1)
#save CSV tolocal location and replace "" with null values(quality check)
df_spark1.write.format('csv').option('header',True).mode('overwrite'). option('sep','|').save("IMM_FILES/",nullValue=None)
#Define S3 client
s3_client = boto3.client('s3')
#get CSV file
csv_files = glob.glob("IMM_FILES/*.csv")
#upload csvto S3 bucket , named it IMMIGRATION and set null values to nothing
for filename in csv_files:
    s3_client.upload_file(filename,"capstone-kobap",'IMMIGRATION.CSV')

In [None]:
#named csv_buffer as StringIO to write files to S3
csv_buffer = StringIO()
#read airport-codes csv file in dataframe 
df_airport=pd.read_csv('airport-codes_csv.csv')
#rename columns to meet target table
df_airport=df_airport.rename(columns={"type": "AIRPORT_TYPE", "name": "AIRPORT_NAME", "iso_country": "COUNTRY"})
df_airport['STATE_ID'] = df_airport['iso_region'].str[3:]
df_airport.to_csv(csv_buffer,sep='|',header='True', index=False)
#buffer dataframe to csv on S3 bucket
s3.Object('capstone-kobap', 'airport.CSV').put(Body=csv_buffer.getvalue())


In [None]:
#check dataframe info
df_airport.info()

In [None]:
#check dataframe count
df_airport.count()

In [None]:
#check dataframe describtion
df_airport.describe(include="all")

In [None]:
#get the max lenght for eich column in airport dataframe 
res1 = measurer(df_airport.values.astype(str)).max(axis=0)
res1

In [None]:
# reading I94_SAS_Labels_Descriptions and cleansing it to newobject
ha_list=[]
with open('I94_SAS_Labels_Descriptions.SAS') as reader,open('newfile.txt', 'w') as newfile:
    desc_lines = reader.readlines()
    for line in desc_lines:
        #new_string = line.replace(",", "||")
        new_string = line.replace("\n", '')
        new_string = new_string.replace("\t", '')
        new_string = new_string.replace(";", '')
        new_string = new_string.replace("'", '')
        ha_list.append(new_string)

In [None]:
#named csv_buffer as StringIO to write files to S3
csv_buffer= StringIO()
#get lines that contains city codes and description to dataframe
df=pd.DataFrame(ha_list[9:298])
Country_df=pd.DataFrame(df[0])
#rename column to cities_ds
Country_df.columns=['COUNTRY_ds']
#add new columns splited from cities_ds to 'COUNTRY_ID','COUNTRY_DESC' delimeter '='
Country_df[['COUNTRY_ID','COUNTRY_DESC']] = Country_df.COUNTRY_ds.str.split("=",expand=True)
#drop cities_ds column
Country_df=Country_df.drop(['COUNTRY_ds'], axis=1)
#write data to s3 bucket and named file to COUNTRY.csv
Country_df.to_csv(csv_buffer,sep='|',header='True', index=False)
s3.Object('capstone-kobap', 'COUNTRY.CSV').put(Body=csv_buffer.getvalue())

In [None]:
#gey country dataframe datatypes and structure
Country_df.info()

In [None]:
#named csv_buffer as StringIO to write files to S3
csv_buffer= StringIO()
#get lines that contains port codes and description to dataframe
df=pd.DataFrame(ha_list[302:962])
Port_df=pd.DataFrame(df[0])
#rename column to Port_ds
Port_df.columns=['Port_ds']
#add new columns splited from cities_ds to 'Port_id','Port_Desc' delimeter '='
Port_df[['Port_id','Port_Desc']] = Port_df.Port_ds.str.split("=",expand=True)
#drop Port_ds column
Port_df=Port_df.drop(['Port_ds'], axis=1)
#buffer dataframe to csv on S3 bucket
Port_df.to_csv(csv_buffer,sep='|',header='True', index=False)
s3.Object('capstone-kobap', 'PORT.CSV').put(Body=csv_buffer.getvalue())

In [None]:
#named csv_buffer as StringIO to write files to S3
csv_buffer= StringIO()
#get lines that contains mode codes and description to dataframe
df=pd.DataFrame(ha_list[972:976])
model_df=pd.DataFrame(df)
#rename column to model_ds
model_df.columns=['model_ds']
#trim model_ds column
model_df['model_ds']= model_df['model_ds'].str.strip()
#add new columns splited from model_ds to 'Model_Code','Model_Desc' delimeter '='
model_df[['Model_Code','Model_Desc']] = model_df.model_ds.str.split("=",expand=True)
#drop model_ds column
model_df=model_df.drop(['model_ds'], axis=1)
#buffer dataframe to csv on S3 bucket
model_df.to_csv(csv_buffer,sep='|',header='True', index=False)
s3.Object('capstone-kobap', 'Model.CSV').put(Body=csv_buffer.getvalue())

In [None]:
#named csv_buffer as StringIO to write files to S3
csv_buffer= StringIO()
#get lines that contains address codes and description to dataframe
df=pd.DataFrame(ha_list[981:1036])
addrl_df=pd.DataFrame(df)
#rename column to addrl_ds
addrl_df.columns=['addrl_ds']
#trim addrl_ds column
addrl_df['addrl_ds']= addrl_df['addrl_ds'].str.strip()
#add new columns splited from addrl_ds to 'STATE_ID','STATE' delimeter '='
addrl_df[['STATE_ID','STATE']] = addrl_df.addrl_ds.str.split("=",expand=True)
#drop addrl_ds column
addrl_df=addrl_df.drop(['addrl_ds'], axis=1)
#buffer dataframe to csv on S3 bucket
addrl_df.to_csv(csv_buffer,sep='|',header='True', index=False)
s3.Object('capstone-kobap', 'STATE.CSV').put(Body=csv_buffer.getvalue())

In [None]:
#named csv_buffer as StringIO to write files to S3
csv_buffer= StringIO()
#get lines that contains VISA codes and description to dataframe
df=pd.DataFrame(ha_list[1046:1049])
VISA_df=pd.DataFrame(df)
#rename column to VISA_ds
VISA_df.columns=['VISA_ds']
#trim VISA_ds column
VISA_df['VISA_ds']= VISA_df['VISA_ds'].str.strip()
#add new columns splited from VISA_ds to 'VISA_Code','VISA_Desc' delimeter '='
VISA_df[['VISA_Code','VISA_Desc']] = VISA_df.VISA_ds.str.split("=",expand=True)
#drop VISA_ds column
VISA_df=VISA_df.drop(['VISA_ds'], axis=1)
#buffer dataframe to csv on S3 bucket
VISA_df.to_csv(csv_buffer,sep='|',header='True', index=False)
s3.Object('capstone-kobap', 'VISA.CSV').put(Body=csv_buffer.getvalue())

In [None]:
#named csv_buffer as StringIO to write files to S3
csv_buffer= StringIO()
#read demographics csv file to dataframe 
DEMOGRAPHICS_df=pd.read_csv('us-cities-demographics.csv',delimiter=';')
#rename columns to meet target table
DEMOGRAPHICS_df=DEMOGRAPHICS_df.rename(columns={"Median Age": "MEDIAN_AGE", "Male Population": "MALE_POPULATION", "Female Population": "FEMALE_POPULATION", "Total Population": "TOTAL_POPULATION"})
DEMOGRAPHICS_df=DEMOGRAPHICS_df.rename(columns={"Foreign-born": "FOREIGN_BORN", "Average Household Size": "AVE_HOUSEHOLD", "State Code": "STATE_ID", "Count": "POP_COUNT"})
#drop not needed columns
DEMOGRAPHICS_df=DEMOGRAPHICS_df.drop(['Number of Veterans'], axis=1)
#buffer dataframe to csv on S3 bucket
DEMOGRAPHICS_df.to_csv(csv_buffer,sep='|',header='True', index=False)
s3.Object('capstone-kobap', 'DEMOGRAPHICS.CSV').put(Body=csv_buffer.getvalue())

In [None]:
#--------------------------All data buffered to s3 bucket completly------------

In [None]:
#--------------------------preparing moving data to redshift------------

In [None]:
#get required configration from dwh.cfg
""" Load DWH Params from a file"""
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

In [None]:
#define iam and redshift client
iam = boto3.client('iam',region_name='us-west-2',aws_access_key_id=KEY,aws_secret_access_key=SECRET)
redshift =boto3.client('redshift',region_name='us-west-2',aws_access_key_id=KEY, aws_secret_access_key=SECRET) 

In [None]:
# print('1.2 Attaching Policy')
iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                        PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                       )['ResponseMetadata']['HTTPStatusCode']

In [None]:
"""Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)
Create the iam role if not existed"""
try:
    print('1.1 Creating a new IAM Role')
    dwhRole =iam.create_role(
    Path='/',
    RoleName=DWH_IAM_ROLE_NAME,
    Description="allow redshift to call aws services",
    AssumeRolePolicyDocument=json.dumps(
    {'Statement': [{'Action': 'sts:AssumeRole',
    'Effect': 'Allow',
    'Principal': {'Service': 'redshift.amazonaws.com'}}],
    'Version': '2012-10-17'}
    )

    )
    
except Exception as e:
     print(e)
    

In [None]:
# print('1.3 Get the IAM role ARN')
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn'] 
roleArn
# print(roleArn)

In [None]:
#create redshift cluster if not existed 
try:
    response = redshift.create_cluster(        
             # TODO: add parameters for hardware
    ClusterType=DWH_CLUSTER_TYPE,
    NodeType=DWH_NODE_TYPE,
    NumberOfNodes=int(DWH_NUM_NODES),

             # TODO: add parameters for identifiers & credentials
    DBName=DWH_DB,
    ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
    MasterUsername=DWH_DB_USER,
    MasterUserPassword=DWH_DB_PASSWORD,


             # TODO: add parameter for role (to allow s3 access)
    IamRoles=[roleArn]
     )
except Exception as e:
    print(e)


In [None]:
#get redshift properties 
def prettyRedshiftProps(props):
    """get redshift below properties to prepare database connection and check cluster status"""
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
#define Endpoint and role_arn 
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']

In [None]:
#initiate redshift database connection 
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(DWH_ENDPOINT,DWH_DB,DWH_DB_USER,DWH_DB_PASSWORD,DWH_PORT))
cur = conn.cursor()

In [None]:
"""Creation of capstone schema"""
#create and set schema
cur.execute(CREATE_SCHEMA)
conn.commit()
cur.execute(SET_SCHEMA)
conn.commit()

In [None]:
"""drop of fact and Dimensions Tables"""
#drop all tables if exsited 
for query in drop_table_queries:
    #print(query)
    cur.execute(query)
    conn.commit()

In [None]:
"""Creation of fact and Dimensions Tables"""
#create tables
for query in create_table_queries:
    cur.execute(query)
    conn.commit()

In [None]:
#load data to target tables
for query in copy_table_queries:
    cur.execute(query)
    conn.commit()

In [None]:
#create numeric list contains fact and  dimension tables by looping on count_table_queries
count_list = []
for query in count_table_queries:
    cur.execute(query)
    tbl_count=cur.fetchall()
    res=tbl_count[0]
    count_list.append(res[0])

In [None]:
#check data warehouse tables count with dataframes
print('immigrants dataframe=', df_spark1.count(),' inserted records= ',count_list[0])
print('AIRPORT dataframe=', len(df_airport),' inserted records= ',count_list[4])
print('Country dataframe=', len(Country_df),' inserted records= ',count_list[7])
print('PORT dataframe=', len(Port_df),' inserted records= ',count_list[2])
print('MODE dataframe=', len(model_df),' inserted records= ',count_list[6])
print('DEMOGRAPHICS dataframe=', len(DEMOGRAPHICS_df),' inserted records= ',count_list[1])
print('STATE dataframe=', len(addrl_df),' inserted records= ',count_list[5])
print('VISA dataframe=', len(VISA_df),' inserted records= ',count_list[3])


In [None]:
#Quality Checks#
#1- trim values
#2- relace "" with none
#3- Check count between inserted records and dataframes
#4- analysis quality checks to can understand the data to implement the model

In [None]:
#Delete Redshift Cluster
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)