# Insights about US Immigration 2016
### Data Engineering Capstone Project

In [10]:
import os
import configparser

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

import pandas as pd
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, when, isnull, udf, col, dayofmonth, year, month, monotonically_increasing_id

# AWS credentials to access and use S3

config = configparser.ConfigParser()
config.read('s3.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


In [11]:
'''
Creates SparkSessions
Reads and loads datasets to be used for data modeling
'''

spark = SparkSession \
            .builder \
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
            .getOrCreate()

# 2 different data formats
immig_df=spark.read.load('./sas_data')
demo_df = spark.read.json('us-cities-demographics.json')

In [12]:
def col_cleaner(df):  
    
    '''
    Eliminates columns that have more than 50% of cells population as empty, duplicate records and empty records
    
    To be used for cleaning data for dimension tables and fact table; 'time', 'person', 'arrival_event'

    '''
    
    df_count = df.count()

    # Eliminate columns that have less than 50% population
    
    col_pop = df.select([(count(when(isnull(c), c)) / df_count).alias(c) for c in df.columns])
    #col_pop.show(n=1,vertical=True, truncate=False)
    
    col_to_elimin = [key for (key, value) in col_pop.head().asDict().items() if value > 0.5]
    #print(col_to_elimin)

    if col_to_elimin:
        cleaned_df = df.drop(*col_to_elimin)
        cleaned_df = cleaned_df.dropDuplicates()
        cleaned_df = cleaned_df.dropna(how='all')
  
        return cleaned_df
    else:
        cleaned_df = df.dropDuplicates()
        cleaned_df = cleaned_df.dropna(how='all')

        return cleaned_df

In [13]:
cleaned_immig_data = col_cleaner(immig_df)
cleaned_demo_data = col_cleaner(demo_df)

In [39]:
len(cleaned_immig_data.columns)

24

In [44]:
print(type(len(immig_df.columns)))

<class 'int'>


In [6]:
'''
Creates dimension table for time
'''

get_timestamp = udf(lambda x : (datetime.date(1960, 1, 1) + datetime.timedelta(days=x)).isoformat() if x else none)

formatted_arrdate = cleaned_immig_data.withColumn('timestamp', get_timestamp(cleaned_immig_data.arrdate))
#df.show(3)

dimen_time_table = formatted_arrdate.select(\
    col("timestamp").alias('day_of_arrival'),
    year("timestamp").alias('year'),
    month('timestamp').alias('month'),
    dayofmonth('timestamp').alias('day'))

dimen_time_table.show(1)


+--------------+----+-----+---+
|day_of_arrival|year|month|day|
+--------------+----+-----+---+
|    2016-04-01|2016|    4|  1|
+--------------+----+-----+---+
only showing top 1 row



In [7]:
'''
Creates dimension table for US States
'''

dimen_state_table = cleaned_demo_data.withColumn('id', monotonically_increasing_id())

dimen_state_table = dimen_state_table.select(['id','City', 'State', 'State Code', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size'])

dimen_state_table = dimen_state_table.withColumnRenamed('City', 'city') \
                    .withColumnRenamed('State','state') \
                    .withColumnRenamed('State Code', 'state_code') \
                    .withColumnRenamed('Median Age', 'median_age') \
                    .withColumnRenamed('Male Population', 'male_population') \
                    .withColumnRenamed('Female Population', 'female_population') \
                    .withColumnRenamed('Total Population', 'total_population') \
                    .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
                    .withColumnRenamed('Foreign-born','number_of_foreign_born') \
                    .withColumnRenamed('Average Household Size','avg_household_size')

dimen_state_table.show(1)

+---+-------+----------+----------+----------+---------------+-----------------+----------------+------------------+----------------------+------------------+
| id|   city|     state|state_code|median_age|male_population|female_population|total_population|number_of_veterans|number_of_foreign_born|avg_household_size|
+---+-------+----------+----------+----------+---------------+-----------------+----------------+------------------+----------------------+------------------+
|  0|Lynwood|California|        CA|      29.4|          35634|            36371|           72005|               776|                 28061|              4.43|
+---+-------+----------+----------+----------+---------------+-----------------+----------------+------------------+----------------------+------------------+
only showing top 1 row



In [8]:
'''
Creates dimension table for persons arriving in US
'''

dimen_person_table = cleaned_immig_data.select(['cicid', 'i94bir', 'admnum', 'i94cit', 'i94res', 'i94mode', 'i94mon', 'visatype'])

dimen_person_table = dimen_person_table.withColumn('id', monotonically_increasing_id())

dimen_person_table = dimen_person_table.select(['id', 'cicid', 'i94bir', 'admnum', 'i94cit', 'i94res', 'i94mode', 'i94mon', 'visatype'])

dimen_person_table.show(1)

+---+-----+------+---------------+------+------+-------+------+--------+
| id|cicid|i94bir|         admnum|i94cit|i94res|i94mode|i94mon|visatype|
+---+-----+------+---------------+------+------+-------+------+--------+
|  0|474.0|  25.0|5.5410441233E10| 103.0| 103.0|    2.0|   4.0|      WT|
+---+-----+------+---------------+------+------+-------+------+--------+
only showing top 1 row



In [9]:
'''
Creates fact table for arrival events
'''

fact_table = cleaned_immig_data.select(['cicid', 'arrdate', 'depdate', 'i94mode', 'i94addr','i94mon']) # need to add column with duration of visit/residence, in days

### Returning the following table with added column caused 'Java Runtime error'
#get_timestamp = udf(lambda x : (datetime.date(1960, 1, 1) + datetime.timedelta(days=x)).isoformat() if x else none)
#formatted_arrdate = fact_table.withColumn('timestamp', get_timestamp(fact_table.arrdate))

fact_table.show(1)

+-----+-------+-------+-------+-------+------+
|cicid|arrdate|depdate|i94mode|i94addr|i94mon|
+-----+-------+-------+-------+-------+------+
|474.0|20545.0|20547.0|    2.0|   null|   4.0|
+-----+-------+-------+-------+-------+------+
only showing top 1 row



In [None]:
'''
Cleans data for dimension tables and fact table; 'time', 'person', 'arrival_event'
Creates dimension tables and fact table and uploads to S3, partitioned by respective attributes
'''

### Create F/D tables 

# Writing time table to S3
#dimen_time_table.write.partitionBy('year','month').parquet('s3a://capstone23/time', "overwrite")

# Writing persons table to S3
dimen_person_table.write.partitionBy('i94mon').parquet('s3a://capstone23/person', "overwrite")
#spark.read.parquet('s3a://capstone23/person/_temporary/0/task_20210629020203_0009_m_000000/part-00000-4dc1173c-33b1-4534-a2aa-bc3d291a54ce-c000.snappy.parquet').show()

# Writing fact table to S3
###fact_arrival_event_table.write.partitionBy('i94mon').parquet('s3a://capstone23/arrival_event', "overwrite")

In [None]:
dimen_state_table.write.partitionBy('state').parquet('s3a://capstone23/state', "overwrite")

In [None]:
'''
Data quality checks

1. Checking if written files in S3 are populated

'''

In [12]:
df = pd.read_csv ('us-cities-demographics.csv',sep=';')
jason = df.to_json ('./us-cities-demographics.json', orient='records')

In [15]:
spark.read.json('us-cities-demographics.json').show(2, vertical=True)

-RECORD 0------------------------------------
 Average Household Size | 2.6                
 City                   | Silver Spring      
 Count                  | 25924              
 Female Population      | 41862.0            
 Foreign-born           | 30908.0            
 Male Population        | 40601.0            
 Median Age             | 33.8               
 Number of Veterans     | 1562.0             
 Race                   | Hispanic or Latino 
 State                  | Maryland           
 State Code             | MD                 
 Total Population       | 82463              
-RECORD 1------------------------------------
 Average Household Size | 2.39               
 City                   | Quincy             
 Count                  | 58723              
 Female Population      | 49500.0            
 Foreign-born           | 32935.0            
 Male Population        | 44129.0            
 Median Age             | 41.0               
 Number of Veterans     | 4147.0  

## Data Quality Checks

First check compares number of columns between processed and raw datasets.
Second check compares number of records between processed and raw datasets.

In [48]:
def diff_columns(raw_df, processed_df):
    
    '''
    This check compares number of columns between processed and raw datasets
    '''
    
    raw_df_columns = len(raw_df.columns)
    processed_df_columns = len(processed_df.columns)
    
    if(len(raw_df.columns) == len(processed_df.columns)):
        print(f"Processed and raw datasets contain the same number of columns: {raw_df_columns}")
    else:
        print(f"There's a difference in number of columns between processed and raw datasets.\n This is due to columns having more than 50% of population as null.\n Processed dataset has {processed_df_columns} columns.\n Raw dataset has {raw_df_columns} columns.")
        
diff_columns(immig_df, cleaned_immig_data)
diff_columns(demo_df, cleaned_demo_data)

There's a difference in number of columns between processed and raw datasets.
 This is due to columns having more than 50% of population as null.
 Processed dataset has 24 columns.
 Raw dataset has 28 columns.
Processed and raw datasets contain the same number of columns: 12


In [49]:
def diff_records(raw_df, processed_df):
    
    '''
    This check compares number of records between processed and raw datasets
    '''
    
    processed_df_count = processed_df.count()
    raw_df_count = raw_df.count()
    
    if (raw_df_count == processed_df_count):
        print('Checking between processed and raw data, there were no duplicates and empty rows in raw data')
    else:
        missing_records = raw_df_count - processed_df_count
        print(f'Checking between processed and raw data, there were {missing_records} duplicates and empty rows in raw data')
        
diff_records(immig_df, cleaned_immig_data)
diff_records(demo_df, cleaned_demo_data)

Checking between processed and raw data, there were no duplicates and empty rows in raw data
Checking between processed and raw data, there were no duplicates and empty rows in raw data
