# Project Title
### Data Engineering Capstone Project

#### Project Summary

The idea of this project is to build ETL pipeline that aquires the I94 immigration data and city temperature to answer questions regarding average temperature and tourist arrivals to certian cities.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [116]:
#imports
import configparser
import os
import pandas as pd
import re
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, year, month, dayofmonth, hour, weekofyear, date_format, upper

In [117]:
#config
config = configparser.ConfigParser()
config.read('project.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

input_data= config.get('PATH', 'INPUT_DATA')
output_data= config.get('PATH', 'OUTPUT_DATA')

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, I will create ETL pipeline that aquires and aggregates the I94 immigration data and average temperature date by destination city for April, 2016. I will use provided data sources to build stage tables. Next step will be to create fact table and necessary dimension tables. To create this solution, I will use Pandas, PySpark and AWS S3 buckets.

#### Describe and Gather Data 

* **I94 immigration data** - comes from the US National Tourism and Trade Office ([NTTO](https://www.trade.gov/national-travel-and-tourism-office)), provided by Udacity. The data is provided in [sas7bdat  binary file format](https://cran.r-project.org/web/packages/sas7bdat/vignettes/sas7bdat.pdf). I will use the following attributes:
    - i94cit - code of visitor origin country
    - i94port - code of destination city in the US
    - i94mode - code of transportation mode
    - arrdate - arrival date to the US
    - depdate - departure date from the US
    - i94visa - code of visa type 
    - visatype - class of admission legally admitting the non-immigrant to temporarily stay in US
    
    
* **City temperature data** - dataset comes from [Kaggle](https://www.kaggle.com/sudalairajkumar/daily-temperature-of-major-cities). The dataset is in [csv](https://docs.fileformat.com/spreadsheet/csv/) format. I will use the following attributes:
    - date
    - city
    - state
    - average temperature

In [118]:
#create Spark session
spark = SparkSession.builder \
                    .config("spark.jars.repositories", "https://repos.spark-packages.org/") \
                    .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
                    .enableHiveSupport() \
                    .getOrCreate()

In [119]:
#get I94 immigration data
df_spark_i94 = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark_i94.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [120]:
df_spark_temperature = spark.read.options(header='True', inferSchema='True').csv(os.path.join(input_data,'city_temperature.csv'))
df_spark_temperature.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- State: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- AvgTemperature: double (nullable = true)



### Step 2: Explore and Assess the Data
#### Explore the Data 

Data quality issues:

* **I94 immigration data**
    - invalid ports, some port codes are missing or does not belong to ports in the US
    - missing values for departure date - cca 4.6% of records have no values
    - missing values for arrival mode - cca 0.0077% of records have no values
    - missing values for match of arrival and departure records - cca 4.47 of records have no values
    
    
* **City temperature data**
    - worldwide data - only records for US ports for April 2016 are necessary
    - convert the columns City and State to uppercase to speed up joining with city mapping list where the names are in upper case
        
#### Cleaning Steps

* **I94 immigration data**
    - remove records with invalid ports
    - remove records without departure date
    - remove records without arrival mode
    - remove records without match of arrival and departure records
    
    
* **City temperature data**
    - filter the records to get data for US ports for April 2016
    - change column case for columns City and State

In [121]:
#create dictionary of valid ports
port_re_filter = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94_port_dict = {}

with open(os.path.join(input_data, 'map/I94-us_ports.txt')) as file:
    for line in file:
        groups = port_re_filter.search(line)
        if 'No PORT Code' not in groups[2] and 'Collapsed' not in groups[2]:
            i94_port_dict[groups[1]] = groups[2]

In [122]:
#clean I94 immigration data
df_spark_i94_clean = df_spark_i94.dropDuplicates() \
                                .filter(df_spark_i94.i94port.isin(list(i94_port_dict.keys()))) \
                                .na.drop(subset=["depdate"]) \
                                .na.drop(subset=["i94mode"]) \
                                .na.drop(subset=["matflag"])


In [123]:
df_spark_i94_clean.write.mode('overwrite').partitionBy('arrdate').parquet(os.path.join(output_data, "stage_i94_immigration"))

In [124]:
df_spark_temperature = df_spark_temperature.toDF(*[c.lower() for c in df_spark_temperature.columns])
df_spark_temperature_clean = df_spark_temperature.dropDuplicates() \
                                                .filter(df_spark_temperature.country == 'US') \
                                                .filter(df_spark_temperature.year == 2016) \
                                                .filter(df_spark_temperature.month == 4) \
                                                .withColumn('state', upper(df_spark_temperature.state)) \
                                                .withColumn('city', upper(df_spark_temperature.city)) \
 

In [125]:
df_spark_temperature_clean.write.mode('overwrite').parquet(os.path.join(output_data, "stage_city_temperatures"))

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

As a concept, I will use the following star schema:

<img src="project_model.png" alt="Conceptual model" title="Conceptual data model" />

* **Fact table**
    - fact_i94_visits - contains data from I94 immigration data joined with daily average temperature by the US port city and arrival data
    
    
* **Dimension tables**
    - dim_countries - contains list of countries, it gives an information about which country does visitor comes from
    - dim_travel_modes - contains list of travel modes, it gives an information about transformation mode used by visitor to come to the US
    - dim_us_visas - contains list of visas, it gives an information about the reason of visiting US
    - dim_date - contains date information
    - dim_us_ports - contains list of US ports, it gives an information about which city or state do visitors visit

#### 3.2 Mapping Out Data Pipelines

The ETL pipeline steps are described bellow:

<ol>
    <li>Load raw data to the Spark dataframes: <em>df_spark_i94</em> and <em>df_spark_temperature</em></li>
    <li>Clean each Spark dataframe and write each cleaned dataframe into parquet as staging tables: <em>stage_i94_immigration</em> and <em>stage_city_temperatures</em></li>
    <li>Create and write dimension tables into parquet: <em>dim_countries</em>, <em>dim_travel_modes</em>, <em>dim_us_visas</em> and <em>dim_us_ports</em></li>
    <li>Create and write fact table <em>fact_i94_visits</em> by joining staging tables <em>stage_i94_immigration</em> and <em>stage_city_temperatures</em></li>
    <li>Run quality checks</li>
</ol>   

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Prepare dimension tables

In [126]:
#create and write dimension table for countries
df_countries = pd.read_csv(os.path.join(input_data, 'map/I94-country-codes.txt'), sep="=", header=None, names = ['code', 'name'])
df_countries['code'].apply(pd.to_numeric)
df_countries['name'] = df_countries['name'].str.replace("'","").replace(to_replace=['No Country.*', 'INVALID.*', 'Collapsed.*'], value = 'Other countries', regex = True).str.strip()

df_spark_countries = spark.createDataFrame(df_countries)
df_spark_countries.write.mode('overwrite').parquet(os.path.join(output_data, "dim_countries"))

In [127]:
#create and write dimension table for travel modes
df_travel_modes = pd.read_csv(os.path.join(input_data, 'map/I94-travel_modes.txt'), sep="=", header=None, names = ['code', 'name'])
df_travel_modes['code'].apply(pd.to_numeric)
df_travel_modes['name'] = df_travel_modes['name'].str.replace("'","").str.strip()


df_spark_travel_modes = spark.createDataFrame(df_travel_modes)
df_spark_travel_modes.write.mode('overwrite').parquet(os.path.join(output_data, "dim_travel_modes"))

In [128]:
#create and write dimension table for US visas
df_us_visas = pd.read_csv(os.path.join(input_data, 'map/I94-us_visas.txt'), sep="=", header=None, names = ['code', 'name'])
df_us_visas['code'].apply(pd.to_numeric)
df_us_visas['name'].str.strip()

df_spark_us_visas = spark.createDataFrame(df_us_visas)
df_spark_us_visas.write.mode('overwrite').parquet(os.path.join(output_data, "dim_us_visas"))

In [129]:
#create and write dimension table for US ports
df_us_ports = pd.read_csv(os.path.join(input_data, 'map/I94-us_ports.txt'), sep="=", header=None, names = ['code', 'city'])
df_us_ports['code'] = df_us_ports['code'].str.replace("'","").str.replace("\t","").str.strip()
df_us_ports['city'] = df_us_ports['city'].str.replace("'","").str.replace("\t","").replace(to_replace=['No PORT Code.*', 'Collapsed.*'], value = 'Other US ports', regex = True).str.strip()

us_ports_split_col = df_us_ports['city'].str.split(",", n=1, expand=True)

df_us_ports['city'] = us_ports_split_col[0].str.strip()

df_us_ports['state'] = us_ports_split_col[1]
df_us_ports['state'] = df_us_ports['state'].str.strip()

df_us_states = pd.read_csv(os.path.join(input_data, 'map/I94-us_states.txt'), sep="=", header=None, names = ['state_code', 'state_name'])
df_us_states['state_code'] = df_us_states['state_code'].str.replace("'","").str.replace("\t","").str.strip()
df_us_states['state_name'] = df_us_states['state_name'].str.replace("'","").str.replace("\t","").str.strip()

df_us_ports = df_us_ports.join(df_us_states.set_index('state_code'), on='state', how='inner', lsuffix='p', rsuffix='s')

df_spark_us_ports = spark.createDataFrame(df_us_ports)
df_spark_us_ports.write.mode('overwrite').parquet(os.path.join(output_data, "dim_us_ports"))


Build fact table

In [130]:
#read immigration data from parquet
df_fact_I94_visits = spark.read.parquet(os.path.join(output_data, 'stage_i94_immigration'))
df_fact_I94_visits.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+-------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|arrdate|
+---------+------+------+------+------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+-------+
|5659207.0|2016.0|   4.0| 103.0| 103.0|    NEW|    1.0|     NJ|20582.0|  55.0|    2.0|  1.0|20160430|    null| null|      G|      O|   null|      M| 1961.0|07282016|     F|  null|     OS|5.9529151033E10|00089|      WT|20574.0|
|5659318.0|2016.0|   4.0| 103.0| 103.0|    HOU|    1.0|     UT|20595.0|  51.0|    2.0|  1.0|

In [131]:
#read temperature data from parquet
df_dim_temperatures = spark.read.parquet(os.path.join(output_data, "stage_city_temperatures"))
df_dim_temperatures.show(5)

+-------------+-------+-----------+--------------+-----+---+----+--------------+
|       region|country|      state|          city|month|day|year|avgtemperature|
+-------------+-------+-----------+--------------+-----+---+----+--------------+
|North America|     US|   ARKANSAS|   LITTLE ROCK|    4|  3|2016|          59.8|
|North America|     US| CALIFORNIA|     SAN DIEGO|    4| 26|2016|          61.1|
|North America|     US|   COLORADO|GRAND JUNCTION|    4| 13|2016|          56.7|
|North America|     US|CONNECTICUT|    BRIDGEPORT|    4| 10|2016|          41.0|
|North America|     US|    FLORIDA|  JACKSONVILLE|    4| 29|2016|          75.6|
+-------------+-------+-----------+--------------+-----+---+----+--------------+
only showing top 5 rows



In [132]:
#read US ports data from parquet
df_dim_us_ports = spark.read.parquet(os.path.join(output_data, "dim_us_ports"))
df_dim_us_ports.show(5)

+----+--------------------+-----+----------+
|code|                city|state|state_name|
+----+--------------------+-----+----------+
| ALC|               ALCAN|   AK|    ALASKA|
| ANC|           ANCHORAGE|   AK|    ALASKA|
| BAR|BAKER AAF - BAKER...|   AK|    ALASKA|
| DAC|       DALTONS CACHE|   AK|    ALASKA|
| PIZ|DEW STATION PT LA...|   AK|    ALASKA|
+----+--------------------+-----+----------+
only showing top 5 rows



In [133]:
#create udf to convert SAS date type to ISO date type
get_date_from_sas = udf(lambda x: (datetime(1960, 1, 1).date() + timedelta(x)).isoformat() if x else None)

#create udf to get date from date parts
get_date = udf(lambda x, y, z: (datetime(x, y, z).date()).isoformat())

#create udf to get visitor's stay
get_stay = udf(lambda x, y: int(x - y))

In [134]:
#prepare df_fact_I94_visits table
df_fact_I94_visits = df_fact_I94_visits.withColumn('stay', get_stay(df_fact_I94_visits.depdate, df_fact_I94_visits.arrdate)) \
                                        .withColumn('arrdate', get_date_from_sas(df_fact_I94_visits.arrdate)) \
                                        .withColumn('depdate', get_date_from_sas(df_fact_I94_visits.depdate)) \
                                        
df_fact_I94_visits.show(5)

+---------+------+------+------+------+-------+-------+-------+----------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+----------+----+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|i94mode|i94addr|   depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|   arrdate|stay|
+---------+------+------+------+------+-------+-------+-------+----------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+----------+----+
|5659207.0|2016.0|   4.0| 103.0| 103.0|    NEW|    1.0|     NJ|2016-05-08|  55.0|    2.0|  1.0|20160430|    null| null|      G|      O|   null|      M| 1961.0|07282016|     F|  null|     OS|5.9529151033E10|00089|      WT|2016-04-30|   8|
|5659318.0|2016.0|   4.0| 103.0| 103.0|    HOU| 

In [135]:
#prepare df_dim_temperatures temperature table
df_dim_temperatures = df_dim_temperatures.withColumn('date', get_date(df_dim_temperatures.year, df_dim_temperatures.month, df_dim_temperatures.day))
df_dim_temperatures.show(5)

+-------------+-------+-----------+--------------+-----+---+----+--------------+----------+
|       region|country|      state|          city|month|day|year|avgtemperature|      date|
+-------------+-------+-----------+--------------+-----+---+----+--------------+----------+
|North America|     US|   ARKANSAS|   LITTLE ROCK|    4|  3|2016|          59.8|2016-04-03|
|North America|     US| CALIFORNIA|     SAN DIEGO|    4| 26|2016|          61.1|2016-04-26|
|North America|     US|   COLORADO|GRAND JUNCTION|    4| 13|2016|          56.7|2016-04-13|
|North America|     US|CONNECTICUT|    BRIDGEPORT|    4| 10|2016|          41.0|2016-04-10|
|North America|     US|    FLORIDA|  JACKSONVILLE|    4| 29|2016|          75.6|2016-04-29|
+-------------+-------+-----------+--------------+-----+---+----+--------------+----------+
only showing top 5 rows



In [136]:
#join df_fact_I94_visits dataframe with df_dim_us_ports dataframe
df_fact_I94_visits = df_fact_I94_visits.join(df_dim_us_ports, df_fact_I94_visits.i94port == df_dim_us_ports.code, how='inner').drop(df_dim_us_ports.code)

In [137]:
#join df_fact_I94_visits dataframe with df_dim_temperatures dataframe
df_fact_I94_visits = df_fact_I94_visits.join(df_dim_temperatures, [df_fact_I94_visits.arrdate == df_dim_temperatures.date, df_fact_I94_visits.city == df_dim_temperatures.city, df_fact_I94_visits.state_name == df_dim_temperatures.state] , how='inner').drop(df_dim_temperatures.city)

In [138]:
#select necessary columns and write fact table partitioned by arrival date
df_fact_I94_visits = df_fact_I94_visits.select('cicid', 'arrdate', 'depdate', 'stay', 'i94port', 'i94cit', 'i94mode', 'i94visa', 'visatype', 'avgtemperature')

df_fact_I94_visits.write.mode('overwrite').partitionBy('arrdate').parquet(os.path.join(output_data, "fact_i94_visits"))

In [158]:
#create and write dimension table for date
df_date = df_fact_I94_visits.select('arrdate').dropDuplicates()

df_date = df_date.withColumn('day', dayofmonth(df_date.arrdate)) \
                 .withColumn('weekday', date_format(df_date.arrdate, 'E')) \
                 .withColumn('week', weekofyear(df_date.arrdate)) \
                 .withColumn('month', month(df_date.arrdate)) \
                 .withColumn('year', year(df_date.arrdate))

df_date.write.mode('overwrite').parquet(os.path.join(output_data, "dim_date"))

#### 4.2 Data Quality Checks

To ensure the pipeline ran as expected, I need to perform a number of quality checks. I have prepared the following:
 * Integrity constraints on the relational database to check if:
     * certain column exists on certain table
     * certain column with certain data type exists on certain table
     * logical primary key is really primary key (unique key check) 
 * Unit tests for the scripts to ensure they are doing the right thing
     * check if there is any arrivals outside expected period (non-April 2016)
 * Source/Count checks to ensure completeness
     * check if there are rows in certain tables
 
Run Quality Checks

In [139]:
# Perform quality checks here
def check_column_exists(path, table, column):
    df = spark.read.parquet(os.path.join(path, table)).limit(1)
    if column in df.columns:
        print('Column "{}" DOES exists in table "{}"'.format(column, table))
    else:
        raise ValueError('Column "{}" DOES NOT exists in table "{}"'.format(column, table))

In [140]:
check_column_exists(output_data, "fact_i94_visits", "cicid")

Column "cicid" DOES exists in table "fact_i94_visits"


In [141]:
def check_column_type(path, table, column, expected_type):
    df = spark.read.parquet(os.path.join(path, table)).limit(1)
    
    column_exists = False
    for name, dtype in df.dtypes:
        if name == column:
            column_exists = True
            if dtype == expected_type:
                print('Column "{}.{}" HAS expected data type'.format(table, column)) 
                return
            else:
                raise ValueError('Column "{}.{}" DOES NOT HAVE expected data type'.format(column, table))

    if not column_exists: 
        raise ValueError('Column "{}" DOES NOT exists in table "{}"'.format(column, table))

In [142]:
check_column_type(output_data, "fact_i94_visits", "cicid", "double")

Column "fact_i94_visits.cicid" HAS expected data type


In [143]:
def run_unit_test_on_I94_arrival_date(path):
    df = spark.read.parquet(os.path.join(path, "fact_i94_visits"))
    
    rec_count = df.filter((year(df.arrdate) != 2016) | (month(df.arrdate) != 4)).count()
                          
    if rec_count == 0:
        print("Unit test has passed. Fact table contains only records for April 2016.")
    else:
        raise ValueError("Unit test has failed. Fact table contains records outside expected period.")

In [144]:
#run unit test to check arrival dates in fact table
run_unit_test_on_I94_arrival_date(output_data)

Unit test has passed. Fact table contains only records for April 2016.


In [145]:
def check_row_count(path, table):
    df = spark.read.parquet(os.path.join(path, table))
    
    rec_count = df.count()
    
    if rec_count == 0:
        raise ValueError('Quality check for table "{}" has failed. The table has no records.'.format(table))
    else:
        print("Succesfull quality check on table {}. The table has {} records.".format(table, rec_count))

In [146]:
#run quality check by checking the row count
check_row_count(output_data, "dim_us_ports")

Succesfull quality check on table dim_us_ports. The table has 498 records.


In [147]:
def check_unique_key(path, table, column_list):
    df = spark.read.parquet(os.path.join(path, table))
    
    rec_count = df.count()
    
    if df.count() > df.dropDuplicates(column_list).count():
        raise ValueError('Error checking unique key on table. Key has duplicates for {}'.format(table, column_list))
    else:
        print("Succesfull unique key check on table {}.".format(table))

In [148]:
#run quality check by checking the unique key
check_unique_key(output_data, "fact_i94_visits", ['cicid'])

Succesfull unique key check on table fact_i94_visits.


In [None]:
#end spark session
spark.stop()

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Fact table

***fact_i94_visits*** - extracted from I94 immigration data for April, 2016; combined with city temperature data provided by Kaggle.
 * cicid - double - primary key
 * arrdate - date - date of arrival
 * depdate - date - date of departure
 * stay - int - number of days that visitor has stayed in the US
 * i94port - varchar - three-character code of destination port/city
 * i94cit - int - three-digit code of visitor's origin country
 * i94mode - int - one-digit code of transportation mode
 * i94visa - int - one-digit code of visa type
 * visatype - varchar - class of admission legally admitting the non-immigrant to temporarily stay in US
 
Dimension tables

***dim_countries*** - extracted from mapping text, which is provided by Udacity
 * code - int - primary key, three-digit code of visitor's origin country
 * name - varchar - name of the origin country
 
***dim_travel_modes*** - extracted from mapping text, which is provided by Udacity
 * code - int - primary key, one-digit code of transportation mode
 * name - varchar - name of the transportation mode
 
***dim_us_visas*** - extracted from mapping text, which is provided by Udacity
 * code - int - primary key, one-digit code of visa type
 * name - varchar - name of the visa type
 
***dim_us_date*** - extracted from fact table 
 * arrdate - date - primary key, date of arrival
 * day - int - day of arrival
 * weekday - varchar - weekday of arrival
 * week - int - week of arrival
 * month - int - month of arrival
 * year - int - year of arrival
 
***dim_us_ports*** - extracted from mapping text, which is provided by Udacity
 * code - varchar - primary key, three-character code of destination port/city
 * city - varchar - city of the arrival
 * state - varchar - state code of the arrival
 * state_name - varchar - name of the state of the arrival

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    - In this project, to build the data model, ETL pipeline and data lake, I have used pandas library, Apache Spark and Amazon S3. I have used pandas for dealing with small datasets like reading text files with mapping data. For processing of large dataset like I94 immigration data (over three million records per month), I have used Apache Spark, because it handles large amounts of data coming from multiple different datasources and data formats with ease. And finally, to implement a data lake, I have used Amazon S3 to store fact and dimension tables in parquet format.
* Propose how often the data should be updated and why.
    - It depends how often the data is being provided by the transactional system or how often do the end users need the fresh data. My proposal will be to update data on monthly or weekly basis.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
    - In that case, a Spark cluster setup on AWS EMR with be a solution.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
    - For this purpose, I would introduce a scheduler tool to manage certain tasks. Apache Airflow, with his DAGs, would be a very good solution. 
 * The database needed to be accessed by 100+ people.
    - For this purpose, I would propose to load data to Amazon Redshift since it is column-oriented storage, best suited for OLAP workloads and summing over log history, with differnt possible table design strategies. Another solution can be Azure SQl, Oracle Exadata or Terradata Aster.
     