# US Immigrant Data Analytics and Analysis
### Data Engineering Capstone Project

#### Project Summary

The US National Tourism and Trade Office wants a data solution to store and analyze the historical immigration data they receive daily for further use by them and related organizations. 

They keep record of millions of data every month and want to perform effective and efficient analysis and find insights with their data. The data that they have recorded is very raw and uncleaned so, it contains unnecessary fields, values and data with mismatched types.  Since they also want to share the data with other groups of people who aren’t dealing with it day-to-day, they want the end data to have descriptive and easy to understand column, entity names and values. In addition to the immigration data, there will be numerous data from different sources in different formats which will be used together with it to provide meaningful and valuable insights.

Due to the unpredictability of the size of the data, they want the solution to work best no matter the size of the data. They also want to store the data in a place with reliable security where it is highly accessible anywhere in the country no matter what time it is.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [21]:
import os
import configparser
import pandas as pd
import datetime as dt
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, isnan, when, count, desc
from pyspark.sql.functions import to_date
import  pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, \
FloatType, LongType, DoubleType

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ['AWS_ACCESS_KEY_ID'] = config['AWS_CREDENTIALS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS_CREDENTIALS']['AWS_SECRET_ACCESS_KEY']

S3_INPUT_BUCKET = "s3a://udacity-capstone-thura/"
S3_OUTPUT_BUCKET = "s3a://udacity-capstone-thura-output/"
US_CITIES_DEMOGRAPHICS = "us-cities-demographics.csv"
MISSING_US_DEMOGRAPHICS = "missing-us-demographics.csv"
IMMIGRATION_DATA = "sas_data"
AIRPORT_CODES = "airport-codes.json"
LABELS_PATH = "I94_SAS_Labels_Descriptions.SAS"
KEY_LABELS = ["i94cntyl", "i94prtl", "i94model", "i94addrl"]


substring_udf = udf(lambda x:x[3:],StringType())
convert_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(float(x))).isoformat() if x else None)


### Step 1: Scope the Project and Gather Data

#### Scope 
With the use of Amazon S3 and Pyspark, the project will create a cloud-based centralized data lake that users from different backgrounds will use and perform analyses on. There are 3 main data sets that are going to be used, namely:
<ul>
<li>US I94 Immigration Data</li>
<li>U.S. City Demographic Data</li>
<li>Airport Code Table</li>
</ul>


First, the raw data files will be hosted on Amazon S3. Pyspark will be used to read those data which have different formats <code>.csv</code>, <code>.parquet</code>, <code>.json</code>, etc. and load them into separate dataframes. In addition to that, the pre-defined label and description data will be read and loaded locally into spark dataframes. The reason the labels aren't hosted on S3 is that they rarely change and it is best to keep them separate from the main datasets.

Next, the data will be explored a bit to ensure correct data had been imported and just to make sure the main columns are included in general. After that, the loaded dataframes will be cleaned by performing type casts, only extracting columns that are needed, renaming columns, filtering, dropping duplicates, etc. by leveraging the built-in functions Python and Pyspark provide. The cleaned data will then be extracted to several dataframes as per the data model that is defined. Finally, each dataframe which are now in the final stage will be written to Amazon S3 as parquet files.

#### Describe and Gather Data 
There are 3 main datasets that will be used:
<li><b>US I94 Immigration Data</b> -  This data comes from the US National Tourism and Trade Office. As the name suggests, it includes US immigration data from a particular time period. Data which is crucial for immigration such as cicid, port of entry, immigration mode, admission number, arrival and departure date, etc. are included in this dataset.</li> 
<li><b>U.S. City Demographic Data</b> -  This data comes from OpenSoft. It includes demographics of all US cities. The data include population, city name, state, state codes, etc. The data will not be used as is. It will first be grouped and aggregated based on state and state code and then the resulting data will be used. Upon preliminary inspection of the data, there are three missing states so, the missing data is collected and stored in a separate file. The missing data references data from The U.S. Census Bureau, the same source as the original data.  </li>
<li><b>Airport Code Table</b> - This dataset comes from DataHub. It contains airport codes from all around the world. For this use case, the airport data from US which are not closed will be used. Airport name, local code, type, country, region, coordinates, etc. are some example of the data that is included. </li> 

In [22]:

def mapper(contents, key):
    """
        Description: Accepts file contents as an object and extract information from it
                     based on the key passed
        Parameters:
                     contents - file contents as an object
                     key - key used to extract data from the file object
        Returns: Extracted data as a dictionary
    """
    temp = contents[contents.index(key):]
    temp = temp[:temp.index(';')].split('\n')
    temp = [i.replace("'", "") for i in temp]
    data_dict = [i.split('=') for i in temp[1:]]
    data_dict = dict([i[0].strip(), i[1].strip()] for i in data_dict if len(i) == 2)
    return data_dict

# Open the labels and descriptions file and load 
with open(LABELS_PATH) as file:
    contents = file.read()
    contents = contents.replace('\t', '')
    
# Create/Get spark session
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()



In [23]:
# Read labels and descriptions data
visa_categories = {'1':'Business', '2': 'Pleasure', '3' : 'Student'}
countries = mapper(contents, KEY_LABELS[0])
ports = mapper(contents, KEY_LABELS[1])
immigration_modes = mapper(contents, KEY_LABELS[2])
i94_addresses = mapper(contents, KEY_LABELS[3])

# Create Spark Dataframes for Labels and Descriptions data

# Load Visa Categories Data
visa_categories_pddf = pd.DataFrame(list(visa_categories.items()), columns=['category_id', 'category_name'])
visa_category_df = spark.createDataFrame(visa_categories_pddf) \
                    .withColumn("category_id",col("category_id").cast(IntegerType()))
visa_category_df.printSchema()

# Load Countries Data
countries_pddf = pd.DataFrame(list(countries.items()), columns=['country_code', 'country_name'])
countries_df = spark.createDataFrame(countries_pddf)\
                            .withColumn("country_code",col("country_code").cast(IntegerType()))
countries_df.printSchema()

# Load Immigration Modes Data
modes_pddf = pd.DataFrame(list(immigration_modes.items()), columns=['mode_id', 'description'])
modes_df = spark.createDataFrame(modes_pddf)\
                            .withColumn("mode_id",col("mode_id").cast(IntegerType()))
modes_df.printSchema()


root
 |-- category_id: integer (nullable = true)
 |-- category_name: string (nullable = true)

root
 |-- country_code: integer (nullable = true)
 |-- country_name: string (nullable = true)

root
 |-- mode_id: integer (nullable = true)
 |-- description: string (nullable = true)



In [24]:
# Read Airport Data
airports_df = spark.read.json(S3_INPUT_BUCKET + AIRPORT_CODES)

# See a sample data and the schema
airports_df.show(3)
airports_df.printSchema()

+---------+--------------------+------------+--------+---------+-----+-----------+----------+----------+------------+--------------------+-------------+
|continent|         coordinates|elevation_ft|gps_code|iata_code|ident|iso_country|iso_region|local_code|municipality|                name|         type|
+---------+--------------------+------------+--------+---------+-----+-----------+----------+----------+------------+--------------------+-------------+
|       NA|-74.9336013793945...|          11|     00A|     null|  00A|         US|     US-PA|       00A|    Bensalem|   Total Rf Heliport|     heliport|
|       NA|-101.473911, 38.7...|        3435|    00AA|     null| 00AA|         US|     US-KS|      00AA|       Leoti|Aero B Ranch Airport|small_airport|
|       NA|-151.695999146, 5...|         450|    00AK|     null| 00AK|         US|     US-AK|      00AK|Anchor Point|        Lowell Field|small_airport|
+---------+--------------------+------------+--------+---------+-----+-----------+

In [25]:
# Read Cities Data
cities_df = spark.read.csv(S3_INPUT_BUCKET + US_CITIES_DEMOGRAPHICS, sep=";", header=True)
missing_df = spark.read.csv(S3_INPUT_BUCKET + MISSING_US_DEMOGRAPHICS, header=True)

# Select data with column renames and type casts

cities_df = cities_df.selectExpr(
    'City city',
    'State state',
    '`State Code` state_code',
    'cast(`Male Population` as long) male_population',
    'cast(`Female Population` as long) female_population',
    'cast(`Total Population` as long) total_population',
    'cast(`Number of Veterans` as long) no_of_veterans',
    'cast(`Foreign-born` as long) foreign_born'
)

missing_df = missing_df.selectExpr(
    'state',
    'state_code',
    'cast(`male_population` as long)',
    'cast(`female_population` as long)',
    'cast(`total_population` as long)',
    'cast(`no_of_veterans` as long)',
    'cast(`foreign_born` as long)'
)

# See a sample data and schema
cities_df.show(3)
cities_df.printSchema()


+-------------+-------------+----------+---------------+-----------------+----------------+--------------+------------+
|         city|        state|state_code|male_population|female_population|total_population|no_of_veterans|foreign_born|
+-------------+-------------+----------+---------------+-----------------+----------------+--------------+------------+
|Silver Spring|     Maryland|        MD|          40601|            41862|           82463|          1562|       30908|
|       Quincy|Massachusetts|        MA|          44129|            49500|           93629|          4147|       32935|
|       Hoover|      Alabama|        AL|          38040|            46799|           84839|          4819|        8229|
+-------------+-------------+----------+---------------+-----------------+----------------+--------------+------------+
only showing top 3 rows

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- male_popul

In [26]:
# Load immigration data
immigration_df = spark.read.parquet(S3_INPUT_BUCKET + IMMIGRATION_DATA)

# Modify the loaded data to have appropriate names and data types
immigration_df = immigration_df.selectExpr(
    'cicid',
    'cast(i94yr as int) i94yr',
    'cast(i94mon as int) i94mon',
    'cast(i94cit as int) i94cit',
    'cast(i94res as int) i94res',
    'i94port',
    'arrdate',
    'cast(i94mode as int) i94mode',
    'i94addr',
    'depdate',
    'cast(i94bir as int) respondent_age',
    'cast(i94visa as int) i94visa',
    'count',
    'dtadfile',
    'visapost visa_post',
    'occup',
    'entdepa arrival_flag',
    'entdepd departure_flag',
    'entdepu update_flag',
    'matflag match_flag',
    'cast(biryear as int) birth_year',
    'dtaddto',
    'gender',
    'cast(insnum as long) ins_num',
    'airline',
    'admnum adm_num',
    'fltno flight_no',
    'visatype visa_type')


# See sample data
immigration_df.show(3)
immigration_df.printSchema()



+---------+-----+------+------+------+-------+-------+-------+-------+-------+--------------+-------+-----+--------+---------+-----+------------+--------------+-----------+----------+----------+--------+------+-------+-------+--------------+---------+---------+
|    cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|respondent_age|i94visa|count|dtadfile|visa_post|occup|arrival_flag|departure_flag|update_flag|match_flag|birth_year| dtaddto|gender|ins_num|airline|       adm_num|flight_no|visa_type|
+---------+-----+------+------+------+-------+-------+-------+-------+-------+--------------+-------+-----+--------+---------+-----+------------+--------------+-----------+----------+----------+--------+------+-------+-------+--------------+---------+---------+
|5748517.0| 2016|     4|   245|   438|    LOS|20574.0|      1|     CA|20582.0|            40|      1|  1.0|20160430|      SYD| null|           G|             O|       null|         M|      1976|10292016|     F|   n

### Step 2: Explore and Assess the Data
#### Explore the Data 
Through exploring the data I have found the followings for each dataset:

##### Immigration Data

<ul>
<li>There are pver 3 million records in this dataset.</li>
<li>There aren’t any rows where the cicid is null or duplicated.</li>   
<li>The main i94 data fields don’t have any null values.</li>   
<li>Most of the rows don’t have occupation, INS number and update_flag data. Approximately 96.3 percent of the rows don't have INS number, 99 percent of the rows don't have occupation and 99.8 percent of the rows don't have update_flag.</li>
<li>Birth year is between 1902 and 2019 across the entire dataset. No adnormal data is seen here.</li>
<li>Looking through the gender column, it can be estimated that males immigrated more than females. It should be noted that there are over four hundred thousand rows where gender is null. Since we are more concerned about the main i94 fields, this isn't much of a concern but this should be taken into consideration of all analyses where gender info is included.</li>
<li>The original date format for both arrival and departure dates are sas date format and they are converted to yyyy-MM-dd format for further processing and analysis.</li>
<li>Arrival date is between 2016–04-01 and 2016–04-30 which implies the data is reflective of the immigration period that occurred in April 2016.</li>
<li>Even though the data revolves around the timeframe of April 2016, there are 3 departure dates which are either too far off in the past or the future. Minimum and maximum departure date is between 2001-07-20 and 2084-05-16. This is 3 rows out of 3 million so it wouldn't affect the accuracy of the data much.
</li>
<li>dtadfile and dtaddto columns have varying date formats so they will also be converted to yyyy-MM-dd format for consistence.</li>
<li>In the mode of transport, on top of the cases where it is not reported (which falls in to the category id of 9), there is a count of 239 null fields. Air is the most used mode of immigration. </li>
<li>There are some rows where airline (airline) and flight number (fltno) are null and it is acceptable because there are immigrations where the mode of transport is not by air.</li>
<li>Most people migrated using the WT visa type.</li>
</ul>


##### Airport Code Data

<li>Since we are only interested in the airports that are in the US, all the airport information that are outside the US is filtered out.</li>
<li>In addition to that, the airports which are closed and those where there is no local_code will also be filtered out.</li>
<li>The iso_country column will be dropped cause we are only going to be looking that the US airports.</li>
<li>The iso_region column will also be dropped after the region codes are stripped and made into another column.</li>
<li>The columns gps_code and iata_code has a lot of missing values so they will be dropped.</li>
<li>The elevation feet is within an acceptable range.</li>
<li>The type of airport that has the most count is “small”.</li>
<li>Texas (TX) state has the most airports with the count being 2017.</li>


##### US Cities Demographics Data
<li>For this data solution, instead of cities, states data are needed so they will first be aggregated and grouped by state name and code.</li>
<li>There isn't data for 3 US states: Vermont, West Virginia and Wyoming so the missing data are collected from the same data source as the main dataset and kept in a separate file to be processed together later.</li>
<li>Female population is higher than male population.</li>
<li>The state of California has the most population with Texas being the second.</li>
<li>Before aggregation, there is one row where both number of veterans and foreign-born columns are null.</li>
<li>The “Race” and “Count” columns are not used cause they don’t provide information that is relevant to this particular use.</li>

#### Cleaning Steps
Steps necessary to clean the data differ a bit on an individual dataset.
##### Cleaning Immigration Data
<li>Since there are a bunch of data with different types, it is not favourable to let spark to infer data schema. </li>
<li>First, after loading the data, <code>selectExpr</code> function will be used to extract the data with desired data types. Along with that, some columns will be given descriptive names to make them easily comprehensible.</li>
<li>sas dates will be type casted to a more readable date format.</li>
<li>Duplicates will be dropped if there is any.</li>
<li>Unnecessary columns will be droppped.</li>

##### Cleaning Airport Code Data
<li>After loading the data, airports which are not in the US will be filtered out along with the ones that are closed.</li>
<li>Rows where the local_code is empty will also be filtered out.</li>
<li>iso_region column will be stripped to get region code for each row and the original column will later be dropped.</li>
<li>Appropriate type castings will take place. For instance, the elevation_ft column will be casted to integer type.</li>
<li>Some columns will be renamed to be more readable.</li>
<li>Duplicates will be dropped if there is any.</li>
<li>Unnecessary columns will be droppped.</li>


##### Cleaning US Cities Demographics Data
<li>After loading the data, <code>selectExpr</code> function will be used to extract the data with desired data types. Along with that, some columns will be given descriptive names to make them easily comprehensible.</li>
<li>Since there are missing values as stated above, they will be loaded from a separate file and be added to the main dataframe later on after it has been cleaned. </li>
<li>States data is needed instead of the individual cites so, the loaded dataframe will be grouped by state name and code.</li>
<li>After the main dataframe had been shaped, the missing data that had been loaded will be appended to it.</li>
<li>Duplicates will be dropped if there is any.</li>

In [27]:
# Type castings, data modifying, column renaming, etc.

# Immigration Data
immigration_df = immigration_df.withColumn('arrival_date', convert_date(immigration_df.arrdate).cast('date'))
immigration_df = immigration_df.withColumn('departure_date', convert_date(immigration_df.depdate).cast('date'))
immigration_df = immigration_df.withColumn('date_adm_to', F.to_date(F.unix_timestamp(immigration_df.dtaddto, 'MMddyyyy').cast('timestamp')))
immigration_df = immigration_df.withColumn('date_added', F.to_date(F.unix_timestamp(immigration_df.dtadfile, 'yyyyMMdd').cast('timestamp')))
immigration_df.printSchema()

# Cities Data
# What I really want is the states data so, the data need to be aggregated
cities_df = cities_df.groupBy("state", "state_code") \
            .sum("male_population","female_population", "total_population", "no_of_veterans", "foreign_born")

states_df = cities_df.withColumnRenamed("sum(male_population)","male_population") \
            .withColumnRenamed("sum(female_population)","female_population") \
            .withColumnRenamed("sum(total_population)","total_population") \
            .withColumnRenamed("sum(no_of_veterans)","no_of_veterans") \
            .withColumnRenamed("sum(foreign_born)","foreign_born") \

# Add missing data to the dataframe
states_df = states_df.unionByName(missing_df)

states_df.printSchema()

# Airport Data
# Filtering Airport Data to have airports that are in the US and are not closed
airports_df = airports_df.filter((airports_df.iso_country  == "US") \
                      & (airports_df.type  != "closed") \
                      & (col('local_code').isNotNull()))
                     

# Modifying airport data
airports_df = airports_df.withColumn('region', substring_udf('iso_region'))
airports_df = airports_df.withColumnRenamed("ident","id")
airports_df = airports_df.withColumn("elevation_ft", col("elevation_ft").cast('integer'))

airports_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- respondent_age: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visa_post: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- arrival_flag: string (nullable = true)
 |-- departure_flag: string (nullable = true)
 |-- update_flag: string (nullable = true)
 |-- match_flag: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ins_num: long (nullable = true)
 |-- airline: string (nullable = 

In [28]:
# Exploring Immigration Data

# Get entry count based on gender
gender_count = immigration_df.groupby(['gender']).count()
gender_count.show()

# Get entry count based on visatype
visa_count = immigration_df.groupby(['visa_type']).count()
visa_count.show()

# Get entry count based on occupation 
occup_count = immigration_df.groupby(['occup']).count()
occup_count.show()

# Get minimum and maximum birth year 
immigration_df.agg(F.min(col('birth_year')), F.max(col('birth_year'))).show()

# Get minimum and maximum departure date
immigration_df.agg(F.min(col('departure_date')), F.max(col('departure_date'))).show()

# Get minimum and maximum arrival date
immigration_df.agg(F.min(col('arrival_date')), F.max(col('arrival_date'))).show()

# Check if there are rows with duplicated cicid
immigration_df.groupBy("cicid").count().where("count > 1").drop("count").show()

# Null value count across columns
immigration_df.select([count(when(col(c).isNull(), c)).alias(c) for c in immigration_df.columns]).show()

+------+-------+
|gender|  count|
+------+-------+
|     F|1302743|
|  null| 414269|
|     M|1377224|
|     U|    467|
|     X|   1610|
+------+-------+

+---------+-------+
|visa_type|  count|
+---------+-------+
|       F2|   2984|
|      GMB|    150|
|       B2|1117897|
|       F1|  39016|
|      CPL|     10|
|       I1|    234|
|       WB| 282983|
|       M1|   1317|
|       B1| 212410|
|       WT|1309059|
|       M2|     49|
|       CP|  14758|
|      GMT|  89133|
|       E1|   3743|
|        I|   3176|
|       E2|  19383|
|      SBP|     11|
+---------+-------+

+-----+-----+
|occup|count|
+-----+-----+
|  PHA|    5|
|  REL|    3|
|  ENT|    3|
|  ACH|   29|
|  101|    1|
|  EMM|    4|
|  ULS|  175|
|  GEN|    3|
|  MTH|    1|
|  DVM|    2|
|  SVC|    3|
|  ECH|   11|
|  EXA|  196|
|  ENV|    1|
|  ENP|    2|
|  PRF|   11|
|  VOC|   10|
|  TCH|   26|
|  MSC|    3|
|  AST|    2|
+-----+-----+
only showing top 20 rows

+---------------+---------------+
|min(birth_year)|max(birth_ye

In [29]:
# Exploring Airport Data

# Get entry count based on state
count_by_state = airports_df.groupby(['region']).count()
count_by_state.sort(col("count").desc()).show()

# Get entry count based on airport type
count_by_airport_type = airports_df.groupby(['type']).count()
count_by_airport_type.show()

# Get minimum and maximum elevation feet 
airports_df.agg(F.min(col('elevation_ft')), F.max(col('elevation_ft'))).show()

# Null value count across columns
airports_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in airports_df.columns]).show()

+------+-----+
|region|count|
+------+-----+
|    TX| 2017|
|    CA|  985|
|    FL|  884|
|    PA|  851|
|    IL|  813|
|    AK|  782|
|    OH|  713|
|    IN|  631|
|    NY|  581|
|    WI|  572|
|    WA|  554|
|    MO|  545|
|    LA|  533|
|    MN|  509|
|    MI|  495|
|    GA|  488|
|    CO|  475|
|    VA|  472|
|    OR|  469|
|    OK|  468|
+------+-----+
only showing top 20 rows

+--------------+-----+
|          type|count|
+--------------+-----+
| large_airport|  170|
|   balloonport|   17|
| seaplane_base|  562|
|      heliport| 6161|
|medium_airport|  685|
| small_airport|13364|
+--------------+-----+

+-----------------+-----------------+
|min(elevation_ft)|max(elevation_ft)|
+-----------------+-----------------+
|             -210|            29977|
+-----------------+-----------------+

+---------+-----------+------------+--------+---------+---+-----------+----------+----------+------------+----+----+------+
|continent|coordinates|elevation_ft|gps_code|iata_code| id|iso_count

In [30]:
# Exploring Cities Data
states_df.select("state", "male_population").sort(col("male_population").desc()).show(5)

states_df.select("state", "female_population").sort(col("female_population").desc()).show(5)

states_df.select("state", "total_population").sort(col("total_population").desc()).show(5)

states_df.select("state", "foreign_born").sort(col("foreign_born").desc()).show(5)

states_df.select("state", "no_of_veterans").sort(col("no_of_veterans").desc()).show(5)

# Get total sum of male and female population 
states_df.agg(F.sum(col('male_population')), F.sum(col('female_population'))).show()

# Null value count across columns
states_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in states_df.columns]).show()

+----------+---------------+
|     state|male_population|
+----------+---------------+
|California|       61055672|
|     Texas|       34862194|
|  New York|       23422799|
|   Florida|       15461937|
|   Arizona|       11137275|
+----------+---------------+
only showing top 5 rows

+----------+-----------------+
|     state|female_population|
+----------+-----------------+
|California|         62388681|
|     Texas|         35691659|
|  New York|         25579256|
|   Florida|         16626425|
|  Illinois|         11570526|
+----------+-----------------+
only showing top 5 rows

+----------+----------------+
|     state|total_population|
+----------+----------------+
|California|       123444353|
|     Texas|        70553853|
|  New York|        49002055|
|   Florida|        32306132|
|  Illinois|        22514390|
+----------+----------------+
only showing top 5 rows

+----------+------------+
|     state|foreign_born|
+----------+------------+
|California|    37059662|
|  New York

In [31]:
# Dropping the duplicates in the three dataframes if there is any
immigration_df = immigration_df.dropDuplicates()
airports_df = airports_df.dropDuplicates()
states_df = states_df.dropDuplicates()
countries_df = countries_df.dropDuplicates()

# Dropping the unnecessary columns
immigration_df = immigration_df.drop("count", "arrdate", "depdate", "dtaddto", "dtadfile")
airports_df = airports_df.drop('iso_country', 'iso_region', 'gps_code', 'iata_code')


immigration_df.printSchema()
print(f"Immigration data row count: {immigration_df.count()}")

airports_df.printSchema()
print(f"\n Airport data row count: {airports_df.count()}")

states_df.printSchema()
print(f"States data row count: {states_df.count()}")

visa_category_df.printSchema()
print(f"Visa category data row count: {visa_category_df.count()}")

modes_df.printSchema()
print(f"\n Immigration mode data row count: {modes_df.count()}")

countries_df.printSchema()
print(f"Countries data row count: {countries_df.count()}")

root
 |-- cicid: double (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- respondent_age: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- visa_post: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- arrival_flag: string (nullable = true)
 |-- departure_flag: string (nullable = true)
 |-- update_flag: string (nullable = true)
 |-- match_flag: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- ins_num: long (nullable = true)
 |-- airline: string (nullable = true)
 |-- adm_num: double (nullable = true)
 |-- flight_no: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
![ETL](images/data_model.png "ETL 3") 

As shown in the above picture, the conceptual data model is structured using the star schema methodology with the Immigration being the centralized fact table with a few dimension tables surrounding it. The main immigration data is separated into two tables Immigration and Respondent_info just to make the fact table look cleaner and concise. The other dimension tables enriches and extends the fact table with more information. 

The star schema methodology is adopted here because of several beneficial reasons. Since the data needs to be shared with other organizations where there is going to be people from different backgrounds, it is important that the model is easy to understand. With star schema, everything revolves around the fact table and everything is connected through it which makes it easy to comprehend since the table relations aren't all over the place.

Beacuse of the fact that the data are divided into different smaller tables, users can be query specific parts of data where unnecessary data won't be brought up which gives fast performance. In addition to that, there is no need to do numerous complex joins which makes querying faster compared to other models where it is necessary to do four or five joins across tables to get the resulting data. For instance, if one wants to get insights about the top countries where the immigrants come from, only Countries and Immigration tables need to be joined to get the desired insight. The star schema methodology has been around for quite some time now so, many business intelligence tools support it.
#### 3.2 Mapping Out Data Pipelines

Since the data has been shaped and cleaned in the above stages, there isn't much left to do to get it into the chosen data model.
<li>New dataframes will be created from the cleaned data with appropriate names to fit the specified model.</li>
<li>Row counts will be stored in a dictionary to do data qualtiy checks later.</li>
<li>Each table will be written as separate parquet files to a designated Amazon S3 bucket.</li>

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model


In [32]:
source_row_counts = {}

# Define Visa Category Table
visa_category_table = visa_category_df.select(col("category_id").alias("id"), 
                                              col("category_name").alias("name"))

source_row_counts['visa_categories'] = visa_category_table.count()

print(f"visa_category_table row count: {source_row_counts['visa_categories']}")


# Define Immigration Mode Table
immigration_mode_table = modes_df.select(col("mode_id").alias("id"), 
                                         col("description"))

source_row_counts['immigration_modes'] = immigration_mode_table.count()

print(f"immigration_mode_table row count: {source_row_counts['immigration_modes']}")


# Define Countries Table
countries_table = countries_df.select(col("country_code").alias("code"), 
                                      col("country_name").alias("name"))

source_row_counts['countries'] = countries_table.count()

print(f"countries_table row count: {source_row_counts['countries']}")

# Define States Table
states_table = states_df.select(col("state").alias("name"), 
                                col("state_code").alias("code"), 
                                col("male_population"), 
                                col("female_population"), 
                                col("total_population"), 
                                col("no_of_veterans"), 
                                col("foreign_born"))

source_row_counts['states'] = states_table.count()

print(f"states_table row count: {source_row_counts['states']}")
 
# Define Airports Table
airports_table = airports_df.select(col("id"), 
                                    col("local_code"),
                                    col("name"), 
                                    col("type"), 
                                    col("region"), 
                                    col("continent"), 
                                    col("elevation_ft"), 
                                    col("coordinates"), 
                                    col("municipality"))

source_row_counts['airports'] = airports_table.count()

print(f"airports_table row count: {source_row_counts['airports']}")
 

# Define Respondent Info Table
respondent_info_table = immigration_df.select(col("cicid"), 
                                              col("adm_num"), 
                                              col("ins_num"),
                                              col("respondent_age").alias("age"), 
                                              col("gender"), 
                                              col("occup"), 
                                              col("birth_year"), 
                                              col("arrival_date"), 
                                              col("departure_date"), 
                                              col("visa_post"),
                                              col("visa_type"),
                                              col("arrival_flag"),
                                              col("departure_flag"), 
                                              col("update_flag"), 
                                              col("match_flag"), 
                                              col("date_adm_to")) 


source_row_counts['respondent_info'] = respondent_info_table.count()

print(f"respondent_info_table row count: {source_row_counts['respondent_info']}")

# Define Immigration Table
fact_immigration_table = immigration_df.select(col("cicid"), 
                                              col("i94yr"), 
                                              col("i94mon"),
                                              col("i94cit"), 
                                              col("i94res"), 
                                              col("i94port"), 
                                              col("i94mode"), 
                                              col("i94addr"), 
                                              col("i94visa"),
                                              col("airline"),
                                              col("flight_no"),
                                              col("date_added")) 

source_row_counts['immigration'] = fact_immigration_table.count()

print(f"fact_immigration_table row count: {source_row_counts['immigration']}")


visa_category_table row count: 3
immigration_mode_table row count: 4
countries_table row count: 289
states_table row count: 52
airports_table row count: 20959
respondent_info_table row count: 3096313
fact_immigration_table row count: 3096313


In [33]:
# Printing schemas to see if the tables have correct column names and types before writing to S3
visa_category_table.printSchema()
immigration_mode_table.printSchema()
countries_table.printSchema()
states_table.printSchema()
airports_table.printSchema()
respondent_info_table.printSchema()
fact_immigration_table.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- description: string (nullable = true)

root
 |-- code: integer (nullable = true)
 |-- name: string (nullable = true)

root
 |-- name: string (nullable = true)
 |-- code: string (nullable = true)
 |-- male_population: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- total_population: long (nullable = true)
 |-- no_of_veterans: long (nullable = true)
 |-- foreign_born: long (nullable = true)

root
 |-- id: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- region: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- municipality: string (nullable = true)

root
 |-- cicid: double (nullable = true)
 |-- adm_num: double (nullable = true)
 |-- ins_num

In [34]:
# Write tables to S3 as parquet files

visa_category_table.write.mode("overwrite") \
                    .parquet(os.path.join(S3_OUTPUT_BUCKET, 'visa_categories'))
print("visa_categories dimension table has been written to the S3 bucket.")

immigration_mode_table.write.mode("overwrite") \
                    .parquet(os.path.join(S3_OUTPUT_BUCKET, 'immigration_modes'))
print("immigration_modes dimension table has been written to the S3 bucket.")

countries_table.coalesce(15).write.mode("overwrite") \
                    .parquet(os.path.join(S3_OUTPUT_BUCKET, 'countries'))
print("countries dimension table has been written to the S3 bucket.")

states_table.coalesce(15).write.mode("overwrite") \
                    .parquet(os.path.join(S3_OUTPUT_BUCKET, 'states'))
print("states dimension table has been written to the S3 bucket.")

airports_table.coalesce(15).write.mode("overwrite") \
                    .parquet(os.path.join(S3_OUTPUT_BUCKET, 'airports'))
print("airports dimension table has been written to the S3 bucket.")

respondent_info_table.coalesce(18).write.mode("overwrite") \
                    .parquet(os.path.join(S3_OUTPUT_BUCKET, 'respondent_info'))
print("respondent_info dimension table has been written to the S3 bucket.")

fact_immigration_table.coalesce(18).write.mode("overwrite") \
                    .parquet(os.path.join(S3_OUTPUT_BUCKET, 'immigration'))
print("immigration fact table has been written to the S3 bucket.")

visa_categories dimension table has been written to the S3 bucket.
immigration_modes dimension table has been written to the S3 bucket.
countries dimension table has been written to the S3 bucket.
states dimension table has been written to the S3 bucket.
airports dimension table has been written to the S3 bucket.
respondent_info dimension table has been written to the S3 bucket.
immigration fact table has been written to the S3 bucket.


In [35]:
# Read the parquet files that were written
dim_visa_categories = spark.read.parquet(S3_OUTPUT_BUCKET + "visa_categories/")

dim_immigration_modes = spark.read.parquet(S3_OUTPUT_BUCKET + "immigration_modes/")

dim_countries = spark.read.parquet(S3_OUTPUT_BUCKET + "countries/")

dim_states = spark.read.parquet(S3_OUTPUT_BUCKET + "states/")

dim_airports = spark.read.parquet(S3_OUTPUT_BUCKET + "airports/")

dim_respondent_info = spark.read.parquet(S3_OUTPUT_BUCKET + "respondent_info/")

fact_immigration = spark.read.parquet(S3_OUTPUT_BUCKET + "immigration/")

print("Parquet files have been read.")

Parquet files have been read.


#### 4.2 Data Quality Checks

There are a few data quality checks that are going to be performed here to ensure the pipeline ran as expected. These include:

<li>Check if each table has rows</li>
<li>Check if each table has the same row count as the source</li>
<li>Check there aren't null or duplicated unique keys</li>
<li>Run sample queries</li>
 


In [36]:
# Data Quality Checks

# Check if they have rows and if they have the same count as the source 
dim_visa_categories_count = dim_visa_categories.count()
print(f"dim_visa_categories table has {dim_visa_categories_count} rows. \
        \n\tHas rows: {dim_visa_categories_count > 0} \
        \n\tRow count same as the source: {source_row_counts['visa_categories'] == dim_visa_categories_count}\n")

dim_immigration_modes_count = dim_immigration_modes.count()
print(f"dim_immigration_modes table has {dim_immigration_modes_count} rows. \
        \n\tHas rows: {dim_immigration_modes_count > 0} \
        \n\tRow count same as the source: {source_row_counts['immigration_modes'] == dim_immigration_modes_count}\n")

dim_countries_count = dim_countries.count() 
print(f"dim_countries table has {dim_countries_count} rows. \
        \n\tHas rows: {dim_countries_count > 0} \
        \n\tRow count same as the source: {source_row_counts['countries'] == dim_countries_count}\n")

dim_states_count = dim_states.count()
print(f"dim_states table has {dim_states_count} rows. \
        \n\tHas rows: {dim_states_count > 0} \
        \n\tRow count same as the source: {source_row_counts['states'] == dim_states_count}\n")

dim_airports_count = dim_airports.count()
print(f"dim_airports table has {dim_airports_count} rows. \
        \n\tHas rows: {dim_airports_count > 0} \
        \n\tRow count same as the source: {source_row_counts['airports'] == dim_airports_count}\n")

dim_respondent_info_count = dim_respondent_info.count()
print(f"dim_respondent_info table has {dim_respondent_info_count} rows. \
        \n\tHas rows: {dim_respondent_info_count > 0} \
        \n\tRow count same as the source: {source_row_counts['respondent_info'] == dim_respondent_info_count}\n")

fact_immigration_count = fact_immigration.count()
print(f"fact_immigration table has {fact_immigration_count} rows. \
        \n\tHas rows: {fact_immigration_count > 0} \
        \n\tRow count same as the source: {source_row_counts['immigration'] == fact_immigration_count}\n")


dim_visa_categories table has 3 rows.         
	Has rows: True         
	Row count same as the source: True

dim_immigration_modes table has 4 rows.         
	Has rows: True         
	Row count same as the source: True

dim_countries table has 289 rows.         
	Has rows: True         
	Row count same as the source: True

dim_states table has 52 rows.         
	Has rows: True         
	Row count same as the source: True

dim_airports table has 20959 rows.         
	Has rows: True         
	Row count same as the source: True

dim_respondent_info table has 3096313 rows.         
	Has rows: True         
	Row count same as the source: True

fact_immigration table has 3096313 rows.         
	Has rows: True         
	Row count same as the source: True



In [37]:
vsac_id_dups = dim_visa_categories.groupBy("id").count().where("count > 1")
print(f"visa category id duplicate count: {vsac_id_dups.count()}")

imode_id_dups = dim_immigration_modes.groupBy("id").count().where("count > 1")
print(f"immigration mode id duplicate count: {imode_id_dups.count()}")

cnty_code_dups = dim_countries.groupBy("code").count().where("count > 1")
print(f"country code duplicate count: {cnty_code_dups.count()}")

sts_code_dups = dim_states.groupBy("code").count().where("count > 1")
print(f"state code duplicate count: {sts_code_dups.count()}")

ap_id_dups = dim_airports.groupBy("id").count().where("count > 1")
print(f"airport id duplicate count: {ap_id_dups.count()}")

res_cicid_dups = dim_respondent_info.groupBy("cicid").count().where("count > 1")
print(f"respondent cicid duplicate count: {res_cicid_dups.count()}")

fact_cicid_dups = fact_immigration.groupBy("cicid").count().where("count > 1")
print(f"immigration cicid duplicate count: {fact_cicid_dups.count()}")

visa category id duplicate count: 0
immigration mode id duplicate count: 0
country code duplicate count: 0
state code duplicate count: 0
airport id duplicate count: 0
respondent cicid duplicate count: 0
immigration cicid duplicate count: 0


In [38]:
# Check null values on primary key columns

vsac_id_nulls = dim_visa_categories.select([count(when(col(c).isNull(), c)).alias(c) for c in ['id']])
print(f"visa category id null count: {vsac_id_nulls.first().id}")

imode_id_nulls = dim_immigration_modes.select([count(when(col(c).isNull(), c)).alias(c) for c in ['id']])
print(f"immigration mode id null count: {imode_id_nulls.first().id}")

cnty_code_nulls = dim_countries.select([count(when(col(c).isNull(), c)).alias(c) for c in ['code']])
print(f"country code null count: {cnty_code_nulls.first().code}")

sts_code_nulls = dim_states.select([count(when(col(c).isNull(), c)).alias(c) for c in ['code']])
print(f"state code null count: {sts_code_nulls.first().code}")

ap_id_nulls = dim_airports.select([count(when(col(c).isNull(), c)).alias(c) for c in ['id']])
print(f"airport id null count: {ap_id_nulls.first().id}")

res_cicid_nulls = dim_respondent_info.select([count(when(col(c).isNull(), c)).alias(c) for c in ['cicid']])
print(f"respondent cicid null count: {res_cicid_nulls.first().cicid}")

fact_cicid_nulls = fact_immigration.select([count(when(col(c).isNull(), c)).alias(c) for c in ['cicid']])
print(f"immigration cicid null count: {fact_cicid_nulls.first().cicid}")

visa category id null count: 0
immigration mode id null count: 0
country code null count: 0
state code null count: 0
airport id null count: 0
respondent cicid null count: 0
immigration cicid null count: 0


In [None]:
# Sample Queries
fact_immigration.createOrReplaceTempView('immigration')
dim_countries.createOrReplaceTempView('countries')
dim_respondent_info.createOrReplaceTempView('respondent_info')

# Top countries from which immigrants visit the US
top_countries = spark.sql("""
        SELECT co.name,
               count(*)
        FROM immigration im
        JOIN countries co
        ON co.code = im.i94res
        GROUP BY 1
        ORDER BY 2 DESC
""")

top_countries.show(5)

# Count of immigrants whose age is over 60
over_60 = spark.sql("""
        SELECT count(*)
        FROM immigration im
        JOIN respondent_info ri
        ON ri.cicid = im.cicid
        WHERE ri.age > 60
""")

over_60.show()

# Immigration and Respondent Info Join
respondent_immigration = spark.sql("""
        SELECT im.cicid,
               im.i94yr,
               im.i94mon,
               im.i94cit,
               im.i94res,
               im.i94visa,
               im.i94addr,
               im.airline,
               im.flight_no,
               ri.adm_num,
               ri.age,
               ri.gender,
               ri.birth_year,
               ri.arrival_date,
               ri.departure_date
        FROM immigration im
        JOIN respondent_info ri
        ON ri.cicid = im.cicid
""")
respondent_immigration.show(5)

+--------------------+--------+
|                name|count(1)|
+--------------------+--------+
|      UNITED KINGDOM|  368421|
|               JAPAN|  249167|
|          CHINA, PRC|  185609|
|              FRANCE|  185339|
|MEXICO Air Sea, a...|  179603|
+--------------------+--------+
only showing top 5 rows

+--------+
|count(1)|
+--------+
|  487163|
+--------+



#### 4.3 Data dictionary 

##### Immigration Table


<li><code>cicid</code> - Unique identifier for the immigrants</li>
<li><code>i94yr</code> - 4 digit year</li>
<li><code>i94mon</code> - Numeric month</li>
<li><code>i94cit</code> - 3 digit code of the country of citizenship</li>
<li><code>i94res</code> - 3 digit code of the country of residence</li>
<li><code>i94port</code> - 3 character code of the destination city. The port of entry to the US</li>
<li><code>i94mode</code> - Mode of immigration</li>
<li><code>i94addr</code> - The final address of the migrants, where they currently live in the US</li>
<li><code>i94visa</code> - Reason for immigration</li>
<li><code>airline</code> - Airline used to arrive in the US</li>
<li><code>flight_no</code> - Flight number of the airline used to arrive in the US</li>
<li><code>date_added</code> - Date added to I-94 files.</li>


##### Respondent_Info Table

<li><code>cicid</code> - Unique identifier for the immigrants</li>
<li><code>adm_num</code> - Admission number</li>
<li><code>ins_num</code> - INS number</li>
<li><code>age</code> -  Age of the respondent</li>
<li><code>gender</code> - Non-immigrant sex</li>
<li><code>occup</code> - Occupation that will be performed in the US</li>
<li><code>birth_year</code> - 4 digit year of birth</li>
<li><code>arrival_date</code> - The arrival date in the US</li>
<li><code>departure_date</code> - The departure date from the US</li>
<li><code>visa_post</code> - The department of state where the visa was issued</li>
<li><code>visa_type</code> - Class of admission</li>
<li><code>arrival_flag</code> - Status flag indicating if the migrant has been admitted or paroled into the US</li>
<li><code>departure_flag</code> - Status flag indicating if the migrant has departed, lost I-94 or is deceased</li>
<li><code>update_flag</code> - Status flag indicating if the migrant has been apprehended, overstayed, adjusted to perm residence</li>
<li><code>match_flag</code> - Match of arrival and departure records</li>
<li><code>date_adm_to</code> - Date to which allowed to stay until in the US</li>

##### Airports Table

<li><code>id</code> - Unique identifier for the airport</li>
<li><code>local_code</code> - The local country code for the airport</li>
<li><code>name</code> - The official name of the airport</li>
<li><code>type</code> -  Type of the airport</li>
<li><code>region</code> - The region that the airport belongs to</li>
<li><code>continent</code> - The continent that the airport belongs to</li>
<li><code>elevation_ft</code> - The airport elevation in feet</li>
<li><code>coordinates</code> - The coordinates for the airport</li>
<li><code>municipality</code> - The primary municipality that the airport serves</li>

##### States Table

<li><code>name</code> - The name of the state</li>
<li><code>code</code> - 2 character code of a US state</li>
<li><code>male_population</code> - Male population count of the state</li>
<li><code>female_population</code> - Female population count of the state</li>
<li><code>total_population</code> - Total population count of the state</li>
<li><code>no_of_veterans</code> - Number of veterans in the state</li>
<li><code>foreign_born</code> - Number of foreign born people in the state</li>

##### Countries Table

<li><code>code</code> - 3 digit country code</li>
<li><code>name</code> - The name of the country</li>

##### ImmigrationModes Table

<li><code>id</code> - Immigration mode id</li>
<li><code>description</code> - A brief name or description of the immigration mode</li>

##### VisaCategories Table

<li><code>id</code> - Visa category id</li>
<li><code>name</code> - The name of the category</li>

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.
 
#### Tools and Technologies Used

<li><b>Amazon S3 </b> - An object storage service. Since we are dealing with sensitive data of the immigrants' personal information, we need a storage with reliable security and Amazon S3 provides exactly that. We cannot predict the size of the data since it might change rapidly according to the time of the year or the other travel situations. Not only it is inexpensive, it is highly scalable which is beneficial for the type of data we are dealing with. As stated in the project summary, the data will be shared across the different organizations and storing here helps cause of its 99.9% availability and its user friendly interface.
</li>
<li><b>Spark (Pyspark)</b> - An open-source unified analytics engine for large-scale data processing. Here we are using an interface of it in Python. Since we are dealing with millions of data, it is crucial to be able to process them efficiently. Spark is an advanced analytical tool which can process large volumes data much faster than other tools like Hadoop. It has many built-in functions for data analyzing so, it doesn't matter if one doesn't know SQL or one isn't that familiar with Python. </li>
<li><b>Pandas </b> - An easy to use open source data analysis and manipulation tool built with python. It has the ability to turn python dictionaries and arrays into dataframes and is mainly used here for that.

#### Schedule Proposal
 
<span>Since I-94 database is updated monthly, I propose that there should be a monthly schedule where the data gets updated at each month’s end. Other major datasets like Airports and Demographics data gets updated less often than the immigration data so this should be enough.</span>

#### Scenarios to consider

##### The data was increased by 100x.
    
* In this case, for processing, I would make good use of spark's parallism and increase worker nodes to work with. For data storage, I would write the data to Amazon Redshift since it is more flexible than S3.

##### The data populates a dashboard that must be updated on a daily basis by 7am every day.
    
* I would use Apache's Airflow and place a scheduler that would trigger the update task at 7am every day.

##### The database needed to be accessed by 100+ people.
* According to Amazon, S3 can handle 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket which should be enough but as the users increase, it would be more structured approach would be to use Amazon Redshift as a storage.