## Import files to create model

In [1]:
import configparser
import datetime as dt
from datetime import datetime
import os
import glob
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import udf, col, lit, year, month, upper, to_date
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date

## Load configuration data

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

## Declare input and output data

In [3]:
# Declare input and output data
input_data = ""
output_data = "Capstone_Project/"

## Creating the session

In [4]:
#creating the session
spark = SparkSession \
        .builder \
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()

## Define Function to get all filepaths from repository

In [5]:
#Function to get all filepaths from repository
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.sas7bdat'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

## Get filepath to immigration data files

In [6]:
# get filepath to immigration data files
filepath = input_data + '../../data/18-83510-I94-Data-2016/'
all_files = get_files(filepath)

# get total number of files found
num_files = len(all_files)
print('{} files found in {}'.format(num_files, filepath))

12 files found in ../../data/18-83510-I94-Data-2016/


## Create immigration-data schema

In [7]:
#create immigration-data schema
immigrationSchema = R([
    Fld("cicid",Int()),
    Fld("i94yr",Int()),
    Fld("i94mon",Int()),
    Fld("i94cit",Int()),
    Fld("i94res",Int()),
    Fld("i94port",Str()),
    Fld("arrdate",Dbl()),
    Fld("i94mode",Int()),
    Fld("i94addr",Str()),
    Fld("depdate",Dbl()),
    Fld("i94bir",Int()),
    Fld("i94visa",Int()),
    Fld("count",Int()),
    Fld("dtadfile",Str()),
    Fld("visapost",Str()),
    Fld("occup",Str()),
    Fld("entdepa",Str()),
    Fld("entdepd",Str()),
    Fld("entdepu",Str()),
    Fld("matflag",Str()),
    Fld("biryear",Int()),
    Fld("dtaddto",Str()),
    Fld("gender",Str()),
    Fld("insnum",Str()),
    Fld("airline",Str()),
    Fld("admnum",DecimalType(18, 0)),
    Fld("fltno",Str()),
    Fld("visatype",Str())
])

## Create df_immigration datafame with expected schema

In [8]:
# Create an empty RDD
emp_RDD = spark.sparkContext.emptyRDD()

# Create an empty RDD with expected schema
df_immigration = spark.createDataFrame(data = emp_RDD,schema = immigrationSchema)

## Insert data from Immigration data into df_immigration datafame

In [9]:
# iterate over files and process
for i, datafile in enumerate(all_files, 1):
        # read files in immigration data files
        df_temp = spark.read.format('com.github.saurfang.sas.spark').load(datafile)
        df_temp = df_temp\
                            .withColumn('cicid', col('cicid').cast(IntegerType()))\
                            .withColumn('i94yr', col('i94yr').cast(IntegerType()))\
                            .withColumn('i94mon', col('i94mon').cast(IntegerType()))\
                            .withColumn('i94cit', col('i94cit').cast(IntegerType()))\
                            .withColumn('i94res', col('i94res').cast(IntegerType()))\
                            .withColumn('i94mode', col('i94mode').cast(IntegerType()))\
                            .withColumn('i94visa', col('i94visa').cast(IntegerType()))\
                            .withColumn('count', col('count').cast(IntegerType()))\
                            .withColumn('biryear', col('biryear').cast(IntegerType()))\
                            .withColumn('i94bir', col('i94bir').cast(IntegerType()))\
                            .withColumn('admnum', col('admnum').cast(DecimalType(18, 0)))
        
        if datafile == all_files[4]:
            df_temp = df_temp\
                                .drop('validres','delete_days','delete_mexl','delete_dup','delete_visa','delete_recdup')
        df_immigration = df_immigration.union(df_temp)              
        print('{}/{} files processed.'.format(i, num_files))

1/12 files processed.
2/12 files processed.
3/12 files processed.
4/12 files processed.
5/12 files processed.
6/12 files processed.
7/12 files processed.
8/12 files processed.
9/12 files processed.
10/12 files processed.
11/12 files processed.
12/12 files processed.


## View df_immigration datafame

In [10]:
# view data in 1st file in immigration date files
pd.set_option('max_colwidth', 200)
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6,2016,4,692,692,XXX,20573.0,,,,...,U,,1979,10282016,,,,1897628485,,B2
1,7,2016,4,254,276,ATL,20551.0,1.0,AL,,...,Y,,1991,D/S,M,,,3736796330,296.0,F1
2,15,2016,4,101,101,WAS,20545.0,1.0,MI,20691.0,...,,M,1961,09302016,M,,OS,666643185,93.0,B2
3,16,2016,4,101,101,NYC,20545.0,1.0,MA,20567.0,...,,M,1988,09302016,,,AA,92468461330,199.0,B2
4,17,2016,4,101,101,NYC,20545.0,1.0,MA,20567.0,...,,M,2012,09302016,,,AA,92468463130,199.0,B2


## Create Fact Table: Immigrations

In [10]:
# Create Fact Table: Immigrations
Immigrations = df_immigration.select('cicid', 'i94yr', 'i94mon','i94addr', 'i94port', 'i94mode','i94visa', 'arrdate', 'depdate' ,'matflag')\
                    .distinct()\
                    .withColumn("immigration_id", monotonically_increasing_id())
    

# Rename Columns in Fact Table: Immigrations

In [11]:
# Rename Columns in Fact Table
new_column_name = ['cic_id', 'year', 'month', 'state_code','port_code', 'mode_code','visa_code',\
                   'arrival_date','departure_date','match_flag','immigration_id']
Immigrations = Immigrations.toDF(*new_column_name)

## View fact table: Immigrations

In [13]:
# view fact table: Immigrations
Immigrations.limit(5).toPandas()

Unnamed: 0,cic_id,year,month,state_code,port_code,mode_code,visa_code,arrival_date,departure_date,match_flag,immigration_id
0,219,2016,4,NY,NYC,1,1,20545.0,20549.0,M,0
1,228,2016,4,NY,NYC,1,2,20545.0,20550.0,M,1
2,304,2016,4,NY,NYC,1,1,20545.0,20553.0,M,2
3,326,2016,4,CA,LOS,1,2,20545.0,20547.0,M,3
4,698,2016,4,IL,NYC,1,2,20545.0,20548.0,M,4


## Define function to convert to date format

In [12]:
def Convert_SAS_to_date(date):
    if date is not None:
        return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')
Convert_SAS_to_date_udf = udf(Convert_SAS_to_date, DateType())

## Convert both arrival and departure date to date format

In [13]:
# convert both arrival and departure date to date format
Immigrations = Immigrations.withColumn('arrival_date',Convert_SAS_to_date_udf(Immigrations['arrival_date']))
Immigrations = Immigrations.withColumn('departure_date',Convert_SAS_to_date_udf(Immigrations['departure_date']))

## View fact table after adjustments

In [16]:
# view fact table after adjustments
Immigrations.limit(5).toPandas()

Unnamed: 0,cic_id,year,month,state_code,port_code,mode_code,visa_code,arrival_date,departure_date,match_flag,immigration_id
0,219,2016,4,NY,NYC,1,1,2016-04-01,2016-04-05,M,0
1,228,2016,4,NY,NYC,1,2,2016-04-01,2016-04-06,M,1
2,304,2016,4,NY,NYC,1,1,2016-04-01,2016-04-09,M,2
3,326,2016,4,CA,LOS,1,2,2016-04-01,2016-04-03,M,3
4,698,2016,4,IL,NYC,1,2,2016-04-01,2016-04-04,M,4


## Create 1st Dimension Table: Immigrants

In [13]:
#Create 1st Dimension Table: Immigrants
Immigrants = df_immigration.select('cicid', 'i94cit', 'i94res', 'i94bir', 'gender', 'insnum')\
                     .distinct()\
                     .withColumn("immigrants_id", monotonically_increasing_id())

## Rename Columns for 1st Dimension Table: Immigrants

In [19]:
# Rename Columns for 1st Dimension Table: Immigrants
new_column_name = ['cic_id','citizen_country', 'residence_country','age','gender','ins_num','immigrants_id']
Immigrants = Immigrants.toDF(*new_column_name)
Immigrants.limit(5).toPandas()

Unnamed: 0,cic_id,citizen_country,residence_country,age,gender,ins_num,immigrants_id
0,34,101,101,48,M,,0
1,449,103,103,37,,,1
2,481,103,103,34,,,2
3,654,103,103,31,F,,3
4,894,104,104,17,F,,4


## Create 2nd Dimension Table: Airports

In [10]:
#Create 2nd Dimension Table: Airports
Airports = df_immigration.select('cicid','airline','fltno','admnum','visatype')\
                    .distinct()\
                    .withColumn("Airports_id", monotonically_increasing_id())

## Rename Columns 2nd Dimension Table: Airports

In [21]:
#Rename Columns 2nd Dimension Table: Airports
new_column_name = ['cic_id','airline','flight_number','admin_number','visa_type','Airports_id']
Airports = Airports.toDF(*new_column_name)
Airports.limit(5).toPandas()

Unnamed: 0,cic_id,airline,flight_number,admin_number,visa_type,Airports_id
0,70,DL,75,55445916233,WT,0
1,223,OS,87,55435222433,WT,1
2,591,WK,4,55455327733,WT,2
3,734,OS,97,55430184433,WT,3
4,741,NK,240,55453003033,WT,4


## Read demographic data file

In [23]:
# read demographic data file
df_demographic = spark.read.format('csv').load("us-cities-demographics.csv",delimiter=';',header=True)
df_demographic = df_demographic\
                    .withColumn('City',upper(df_demographic['city']))\
                    .withColumn('State',upper(df_demographic['State']))\
                    .withColumn('Male Population', col('Male Population').cast(IntegerType()))\
                    .withColumn('Female Population', col('Female Population').cast(IntegerType()))\
                    .withColumn('Total Population', col('Total Population').cast(IntegerType()))\
                    .withColumn('Number of Veterans', col('Number of Veterans').cast(IntegerType()))\
                    .withColumn('Foreign-born', col('Foreign-born').cast(IntegerType()))\
                    .withColumn('Median Age', col('Median Age').cast(DoubleType()))\
                    .withColumn('Average Household Size', col('Average Household Size').cast(DoubleType()))

## View demographic data file

In [24]:
# view demographic data file
df_demographic.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,SILVER SPRING,MARYLAND,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,QUINCY,MASSACHUSETTS,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,HOOVER,ALABAMA,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,RANCHO CUCAMONGA,CALIFORNIA,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,NEWARK,NEW JERSEY,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


## Create Dimension Table: Populations

In [25]:
#Create Dimension Table: Populations
Populations = df_demographic.select('City','State','State Code','Male Population','Female Population',\
                                        'Total Population','Number of Veterans','Foreign-born','Race')\
                    .distinct()\
                    .withColumn("population_id", monotonically_increasing_id())

## Rename Columns for Dimension Table: Populations

In [26]:
#Rename Columns for Dimension Table: Populations
new_column_name =['city','state','state_code','male_population','female_population','total_population'\
                      ,'num_of_veterans','foreign_born','race','population_id']
Populations = Populations.toDF(*new_column_name)
Populations.limit(5).toPandas()

Unnamed: 0,city,state,state_code,male_population,female_population,total_population,num_of_veterans,foreign_born,race,population_id
0,PARMA,OHIO,OH,38425,41518,79943,5028,6909,Asian,0
1,MADISON,WISCONSIN,WI,122596,126360,248956,9707,30090,American Indian and Alaska Native,1
2,SKOKIE,ILLINOIS,IL,31382,33437,64819,1066,27424,Hispanic or Latino,2
3,YUMA,ARIZONA,AZ,48298,45847,94145,7182,19326,Hispanic or Latino,3
4,NORTH RICHLAND HILLS,TEXAS,TX,35257,33948,69205,5175,7854,Hispanic or Latino,4


## Create 2nd Dimension Table: Population_Statistics

In [27]:
#Create 2nd Dimension Table: Population_Statistics
Population_Statistics = df_demographic.select('City','State','State Code','Median Age','Average Household Size')\
                    .distinct()\
                    .withColumn("pop_statistics_id", monotonically_increasing_id())

## Rename Columns for 2nd Dimension Table: Population_Statistics

In [28]:
#Rename Columns for 2nd Dimension Table: Population_Statistics
new_column_name = ['city','state','state_code','median_age','avg_household_size','pop_statistics_id']
Population_Statistics =  Population_Statistics.toDF(*new_column_name)
Population_Statistics.limit(5).toPandas()

Unnamed: 0,city,state,state_code,median_age,avg_household_size,pop_statistics_id
0,CLARKSVILLE,TENNESSEE,TN,29.7,2.64,0
1,RACINE,WISCONSIN,WI,33.7,2.56,1
2,TUSTIN,CALIFORNIA,CA,33.4,3.27,2
3,SAINT PAUL,MINNESOTA,MN,31.5,2.58,3
4,FULLERTON,CALIFORNIA,CA,34.5,2.97,4


## Read temperature data file

In [10]:
# read temperature data file
df_temperature = spark.read.format('csv').load("../../data2/*.csv",header=True)
df_temperature = df_temperature\
                        .withColumn('AverageTemperature', col('AverageTemperature').cast(DoubleType()))\
                        .withColumn('AverageTemperatureUncertainty', col('AverageTemperatureUncertainty').cast(DoubleType()))\
                        .withColumn('City',upper(df_temperature['City']))\
                        .withColumn('Country',upper(df_temperature['Country']))


## View temperature data file

In [11]:
# view temperature data file
df_temperature.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,ÅRHUS,DENMARK,57.05N,10.33E
1,1743-12-01,,,ÅRHUS,DENMARK,57.05N,10.33E
2,1744-01-01,,,ÅRHUS,DENMARK,57.05N,10.33E
3,1744-02-01,,,ÅRHUS,DENMARK,57.05N,10.33E
4,1744-03-01,,,ÅRHUS,DENMARK,57.05N,10.33E


## Add year and month to temperatur data

In [14]:
# Add year and month to temperature data
df_temperature = df_temperature.withColumn('dt', to_date(df_temperature['dt']))
df_temperature = df_temperature.withColumn('year', year(df_temperature['dt']))
df_temperature = df_temperature.withColumn('month', month(df_temperature['dt']))
df_temperature.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year,month
0,1743-11-01,6.068,1.737,ÅRHUS,DENMARK,57.05N,10.33E,1743,11
1,1743-12-01,,,ÅRHUS,DENMARK,57.05N,10.33E,1743,12
2,1744-01-01,,,ÅRHUS,DENMARK,57.05N,10.33E,1744,1
3,1744-02-01,,,ÅRHUS,DENMARK,57.05N,10.33E,1744,2
4,1744-03-01,,,ÅRHUS,DENMARK,57.05N,10.33E,1744,3


## Create Dimension Table: Temperatures

In [15]:
#Create Dimension Table: Temperatures
Temperatures = df_temperature.select('dt','year','month','Country','City','Latitude','Longitude').distinct()

## Rename Columns for Dimension Table: Temperatures

In [16]:
#Rename Columns for Dimension Table: Temperatures
new_column_name =['date','year','month','country','city','latitude','longitude']
Temperatures = Temperatures.toDF(*new_column_name)
Temperatures.limit(5).toPandas()

Unnamed: 0,date,year,month,country,city,latitude,longitude
0,1842-07-01,1842,7,INDONESIA,BONTANG,0.80N,118.13E
1,1846-12-01,1846,12,INDONESIA,BONTANG,0.80N,118.13E
2,1847-10-01,1847,10,INDONESIA,BONTANG,0.80N,118.13E
3,1852-08-01,1852,8,CONGO (DEMOCRATIC REPUBLIC OF THE),BUTEMBO,0.80N,29.73E
4,1861-01-01,1861,1,INDONESIA,BONTANG,0.80N,118.13E


## Create Dimension Table Temperature_Statistics

In [18]:
#Create Dimension Table Temperature_Statistics
Temperature_Statistics = df_temperature.select('dt','year','month','Country','City'\
                                                       ,'AverageTemperature','AverageTemperatureUncertainty')

## Rename Columns for Table Temperature_Statistics

In [19]:
#Rename Columns for Table Temperature_Statistics
new_column_name =['date','year','month','country','city','avg_temp','avg_temp_uncertainty']
Temperature_Statistics = Temperature_Statistics.toDF(*new_column_name)
Temperature_Statistics.limit(5).toPandas()

Unnamed: 0,date,year,month,country,city,avg_temp,avg_temp_uncertainty
0,1743-11-01,1743,11,DENMARK,ÅRHUS,6.068,1.737
1,1743-12-01,1743,12,DENMARK,ÅRHUS,,
2,1744-01-01,1744,1,DENMARK,ÅRHUS,,
3,1744-02-01,1744,2,DENMARK,ÅRHUS,,
4,1744-03-01,1744,3,DENMARK,ÅRHUS,,


## Read library data file

In [37]:
# read library data file
with open("I94_SAS_Labels_Descriptions.SAS") as library:
    lines = library.readlines()

## Create Auxilary Tables

### Create Auxilary Table: Countries

In [38]:
# Create an empty RDD
emp_RDD = spark.sparkContext.emptyRDD()

# Create an expected schema
columns = StructType([StructField('code',StringType(), True)\
                    ,StructField('country',StringType(), True)])
# Create an empty RDD with expected schema
Countries = spark.createDataFrame(data = emp_RDD,
                           schema = columns)

### Insert into Auxilary Table: Countries

In [39]:
# Insert into Auxilary Table: Countries
country_data = lines[9:298]
for data in country_data:
    temp = data.split('=')
    list = [temp[0].strip(), temp[1].strip().strip("'").strip(";")]
    columns =  ['code', 'country']
    newRow = spark.createDataFrame([list], columns)
    Countries = Countries.union(newRow)

### Convert code datatype to int

In [40]:
# Convert code datatype to int
Countries = Countries.withColumn('code', col('code').cast(IntegerType()))

### View Auxilary Table: Countries

In [41]:
# view Auxilary Table: Countries
Countries.limit(5).toPandas()

Unnamed: 0,code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no land arrivals)"
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


### Create Auxilary Table: States

In [44]:
# Create Auxilary Table: States
# Create an empty RDD
emp_RDD = spark.sparkContext.emptyRDD()

    # Create an expected schema
columns = StructType([StructField('state_code',StringType(), True)\
                    ,StructField('state',StringType(), True)])
    # Create an empty RDD with expected schema
States = spark.createDataFrame(data = emp_RDD,schema = columns)

### Insert into Auxilary Table: States

In [45]:
# Insert into Auxilary Table: States
country_data = lines[981:1036]
for data in country_data:
    temp = data.split('=')
    list = [temp[0].strip(), temp[1].strip().strip("'").strip(";")]
    columns =  ['state_code', 'state']
    newRow = spark.createDataFrame([list], columns)
    States = States.union(newRow)

### View Auxilary Table: States

In [46]:
# view Auxilary Table: States
States.limit(5).toPandas()

Unnamed: 0,state_code,state
0,'AL',ALABAMA
1,'AK',ALASKA
2,'AZ',ARIZONA
3,'AR',ARKANSAS
4,'CA',CALIFORNIA


### Create Auxilary Table: Ports

In [49]:
# Create Auxilary Table: Ports
# Create an empty RDD
emp_RDD = spark.sparkContext.emptyRDD()

# Create an expected schema
columns = StructType([StructField('port_code',StringType(), True)\
                    ,StructField('port_city',StringType(), True)])
# Create an empty RDD with expected schema
Ports = spark.createDataFrame(data = emp_RDD,schema = columns)

### Insert into Auxilary Table: Ports

In [50]:
# Insert into Auxilary Table: Ports
country_data = lines[302:961]
for data in country_data:
    temp = data.split('=')
    list = [temp[0].strip(), temp[1].strip().strip("'").split(',')[0]]
    columns =  ['port_code', 'port_city']
    newRow = spark.createDataFrame([list], columns)
    Ports = Ports.union(newRow)

### View Auxilary Table: States

In [51]:
# view Auxilary Table: States
Ports.limit(5).toPandas()

Unnamed: 0,port_code,port_city
0,'ALC',ALCAN
1,'ANC',ANCHORAGE
2,'BAR',BAKER AAF - BAKER ISLAND
3,'DAC',DALTONS CACHE
4,'PIZ',DEW STATION PT LAY DEW


### Create Auxilary Table: Visas

In [52]:
# Create Auxilary Table: Visas
# Create an empty RDD
emp_RDD = spark.sparkContext.emptyRDD()

# Create an expected schema
columns = StructType([StructField('visa_code',StringType(), True)\
                    ,StructField('visa',StringType(), True)])

# Create an empty RDD with expected schema
Visas = spark.createDataFrame(data = emp_RDD,schema = columns)

### Insert into Auxilary Table: Visas

In [53]:
# Insert into Auxilary Table: Visas
country_data = lines[1046:1049]
for data in country_data:
    temp = data.split('=')
    list = [temp[0].strip(), temp[1].strip().strip("'").strip(";")]
    columns =  ['visa_code', 'visa']
    newRow = spark.createDataFrame([list], columns)
    Visas = Visas.union(newRow)

### Convert visa_code datatype to int

In [54]:
# Convert visa_code datatype to int
Visas = Visas.withColumn('visa_code', col('visa_code').cast(IntegerType()))

### View Auxilary Table: Visas

In [55]:
# view Auxilary Table: Visas
Visas.limit(5).toPandas()

Unnamed: 0,visa_code,visa
0,1,Business
1,2,Pleasure
2,3,Student
