# Data Engineering Capstone Project
### Data Lake with Spark

#### Project Summary
Data Lake project with Spark for Imigration analystic team.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import configparser
import os
from pyspark.sql.functions import when
from pyspark.sql.functions import col, split, udf, date_add, monotonically_increasing_id
from pyspark.sql.types import DoubleType, IntegerType
import datetime
from pyspark.sql import SparkSession

### Step 1: Scope the Project and Gather Data

#### Scope 
Immigration analytics team for the U.S. goverment wants to investigate the immigrant's age, total number by state of stay, country of immigrant citizen, and etc.. Thay also want to track immigtants who arrival and departure records don't match, and want to know state where they stay after they arrived and the demographic of the state.  To allow the analytics team to find insight in them, Data enginners will load data, process the data into analytics tables using Spark, and load them back into S3. We want to analyze each month, so we use only data of 04/2016 in this project.
#### Describe and Gather Data  
1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. 
2. U.S. City Demographic Data: This data comes from OpenSoft.
3. Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from DataHub.
4. Country code data: This is simple table of I-94 immigrant's nationality. This is from a data dictionary of I94 Immigration Data.
5. State code data: This is simple table of US state code. This is from a data dictionary of I94 Immigration Data.
6. City code data: This is simple table of US city code. This is from a data dictionary of I94 Immigration Data.


In [2]:
# Read in the data here
## Get the i94 immigration data
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_i94 = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


In [3]:
## Get the city demographic data
df_demog = pd.read_csv("us-cities-demographics.csv", sep=";")

## Get Airport code data
df_airport = pd.read_csv("airport-codes_csv.csv")

## Get Country Code data
df_country = pd.read_csv("country-code.csv", sep=";")

## Get State Code data
df_state = pd.read_csv("state-code.csv", sep=";")

## Get City Code data
df_city = pd.read_csv("city-code.csv", sep=";")


In [4]:
df_i94.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [5]:
df_demog.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [6]:
df_airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [7]:
df_country.head()

Unnamed: 0,code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [38]:
df_state.head()

Unnamed: 0,code,name
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [39]:
df_city.head()

Unnamed: 0,code,city,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKERAAF-BAKERISLAND,AK
3,DAC,DALTONSCACHE,AK
4,PIZ,DEWSTATIONPTLAYDEW,AK


In [None]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ["AWS_ACCESS_KEY_ID"]=config['AWS']["AWS_ACCESS_KEY_ID"]
os.environ["AWS_SECRET_ACCESS_KEY"]=config['AWS']["AWS_SECRET_ACCESS_KEY"]

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").\
config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.8.5")\
.enableHiveSupport().getOrCreate()
df_i94_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").\
config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.8.5")\
.enableHiveSupport().getOrCreate()

In [3]:
df_demog_spark = spark.read.csv("us-cities-demographics.csv", sep=";", inferSchema=True, header=True)
df_airport_spark = spark.read.csv("airport-codes_csv.csv", inferSchema=True, header=True)
df_country_spark = spark.read.csv("country-code.csv", sep=";", inferSchema=True, header=True)
df_state_spark = spark.read.csv("state-code.csv", sep=";", inferSchema=True, header=True)
df_city_spark = spark.read.csv("city-code.csv", sep=";", inferSchema=True, header=True)

In [4]:
#write to parquet
#df_i94_spark.write.parquet("sas_data")
df_i94_spark=spark.read.parquet("sas_data")


### Step 2: Explore and Assess the Data
#### Explore the Data & Cleaning Steps
Identify data quality issues, like missing values, duplicate data, etc.
Regarding i-94 Immigrants data, "i94port" should be only city in US. So if the state of port is not in US, the date should not be include in this project. Because i-94 immigrants arrive in US. 

<ul><strong>i94 Immigration data</strong>
    <li><strong>cicid:</strong> Records with duplicate data and missing value should be deleted.</li>
    <li><strong>port:</strong> Port should be only in us.</li>
    <li><strong>i94addr:</strong> All missing values should be "99". </li>
    <li><strong>matflag:</strong> All missing values should be "X". </li>
    <li><strong>gender:</strong> All missing values should be "X". </li>
</ul>    
<ul><strong>city demographic data</strong>
    <li><strong>city&state:</strong> Records with missing value should be deleted.</li>
    <li><strong>city&state&race:</strong> Records with duplicate data should be deleted.</li>  
</ul>    
<ul><strong>airport code data</strong>
    <li><strong>ident:</strong> Records with duplicate data and missing value should be deleted.</li>
</ul>  
<ul><strong>country code data</strong>
    <li><strong>code:</strong> Records with duplicate data and missing value should be deleted.</li>
</ul>  
<ul><strong>state code data AND city code data</strong>
    <li>There are non-US cities. The port or entry means first port where immigrants arrived in US. So do inner-join state code data and city code data, and then keep only US cities. </li> 
    <li><strong>code:</strong> Records with duplicate data and missing value should be deleted.</li>
</ul> 


In [5]:
# Performing cleaning tasks here
df_demog_spark = df_demog_spark.filter(df_demog_spark.City.isNotNull())\
                            .filter(df_demog_spark.State.isNotNull())\
                            .dropDuplicates(subset=['City', 'State', 'Race'])

df_airport_spark = df_airport_spark.filter(df_airport_spark.ident.isNotNull())\
                                .dropDuplicates(subset=['ident'])

df_country_spark = df_country_spark.filter(df_country_spark.countryid.isNotNull())\
                                .dropDuplicates(subset=['countryid']) 

df_state_spark = df_state_spark.filter(df_state_spark.stateid.isNotNull())\
                                .dropDuplicates(subset=['stateid']) 

df_citystate_spark = df_city_spark.join(df_state_spark, df_city_spark.state == df_state_spark.stateid)\
                             .select("cityid", "city", "stateid", "name")\
                             .filter(df_city_spark.cityid.isNotNull())\
                                .dropDuplicates(subset=['cityid']) 

fill_X = udf(lambda x: "X" if x is None else x) 
df_i94_spark = df_i94_spark.join(df_citystate_spark, df_i94_spark.i94port == df_citystate_spark.cityid)\
                        .filter(df_i94_spark.cicid.isNotNull())\
                        .dropDuplicates(subset=['cicid'])\
                        .withColumn("i94addr", when(df_i94_spark.i94addr=="", "99").otherwise(df_i94_spark.i94addr))\
                        .withColumn("matflag", fill_X(df_i94_spark.matflag))\
                        .withColumn("gender", fill_X(df_i94_spark.gender))
                        

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The below 5 models are defined. <font color="red"> Please see the file "Schema_model.xlsx" in the Workspace.</font>
<ol>
<b>*Fact model</b>
<li>immigrants table - </li>
<b>*Dimention models</b>
<li>demographics table</li>
<li>airports table</li>
<li>countries table</li>
<li>ports table</li>    
</ol>

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
* Step 1: Insert data from df_citystate_spark to ports table
* Step 2: Insert data from df_countries_spark to countries table
* Step 3: Select only airports in US. Separate "coordinates" to "latitude" and "longitude" in df_airport_spark, and insert data to airports table with collect data type.
* Step 4: Create demographics table with population each races by city&state. Demographics table is created with collect data type.
* Step 5: Change SAS date numeric to timestamp, change transportation mode code to text, and visa type code to text, and then insert data from df_i94_spark to immigrants table with collect data type.


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [6]:
# Create cities table
ports_table = df_citystate_spark.selectExpr("cityid as portid", "city", "stateid", "name as state")

In [7]:
# Create countries tabke
countries_table = df_country_spark.selectExpr("countryid", "country")

In [8]:
# Create airports table
airports_table = df_airport_spark.filter(df_airport_spark.iso_country=="US")\
                                 .withColumn("stateid", split(df_airport_spark.iso_region, "-")[1])\
                                 .withColumn("latitude", split(df_airport_spark.coordinates, ",")[0]\
                                 .cast(DoubleType()))\
                                 .withColumn("longitude", split(df_airport_spark.coordinates, ",")[1]\
                                 .cast(DoubleType()))\
                                 .selectExpr("ident as airportid", "type", "name as airportname",\
                                   "elevation_ft", "iso_country", "stateid", "municipality", "latitude", "longitude")
                                

In [9]:
# Create demographics table

hispanic = df_demog_spark.select("City", "State", "Median Age", "Male Population", \
        "Female Population", "Total Population", "Foreign-born", "Average Household Size", \
        "State Code", "Count").where(df_demog_spark.Race=="Hispanic or Latino")\
         .withColumnRenamed("City", "city")\
         .withColumnRenamed("State", "state")\
         .withColumnRenamed("Median Age", "median_age")\
         .withColumnRenamed("Male Population", "male_population")\
         .withColumnRenamed("Female Population", "female_population")\
         .withColumnRenamed("Total Population", "total_population")\
         .withColumnRenamed("Foreign-born", "foreign_born")\
         .withColumnRenamed("Average Household Size", "ave_household")\
         .withColumnRenamed("State Code", "stateid")\
         .withColumnRenamed("Count", "hispanic_latino")

white = df_demog_spark.selectExpr("City as city", "State as state", "Count as white").where(df_demog_spark.Race=="White")
asian = df_demog_spark.selectExpr("City as city", "State as state", "Count as asian").where(df_demog_spark.Race=="Asian")
black = df_demog_spark.selectExpr("City as city", "State as state", "Count as black_africanAmerican").where(df_demog_spark.Race=="Black or African-American")
native = df_demog_spark.selectExpr("City as city", "State as state", "Count as americanIndian_alaskaNative").where(df_demog_spark.Race=="American Indian and Alaska Native")

demographics_table = hispanic.join(white, (hispanic.city==white.city) & (hispanic.state==white.state))\
                             .join(asian, (hispanic.city==asian.city) & (hispanic.state==asian.state))\
                             .join(black, (hispanic.city==black.city) & (hispanic.state==black.state))\
                             .join(native, (hispanic.city==native.city) & (hispanic.state==native.state))\
                             .withColumn("demographicsid", monotonically_increasing_id())\
                             .select("demographicsid", hispanic.city, hispanic.state, "median_age", "male_population", "female_population", \
                                    "total_population", "foreign_born", "ave_household", "stateid", \
                                     "hispanic_latino", "white", "asian", "black_africanAmerican", "americanIndian_alaskaNative")\
                             .withColumn("median_age", hispanic.median_age.cast(IntegerType()))\
                             .withColumn("male_population", hispanic.male_population.cast(IntegerType()))\
                             .withColumn("female_population", hispanic.female_population.cast(IntegerType()))\
                             .withColumn("total_population", hispanic.total_population.cast(IntegerType()))\
                             .withColumn("foreign_born", hispanic.foreign_born.cast(IntegerType()))\
                             .withColumn("ave_household", hispanic.ave_household.cast(DoubleType()))\
                             .withColumn("hispanic_latino", hispanic.hispanic_latino.cast(IntegerType()))\
                             .withColumn("white", white.white.cast(IntegerType()))\
                             .withColumn("asian", asian.asian.cast(IntegerType()))\
                             .withColumn("black_africanAmerican", black.black_africanAmerican.cast(IntegerType()))\
                             .withColumn("americanIndian_alaskaNative", native.americanIndian_alaskaNative.cast(IntegerType()))\
                             

                   

In [10]:
# Create immigrants table
from pyspark.sql.functions import col, split, udf, date_add, current_date
get_date = udf(lambda x: datetime.datetime(1,1,1).strftime('%Y-%m-%d %H:%M:%S') if x is None else\
               (datetime.datetime(1960, 1, 1) + datetime.timedelta(seconds=int(x*24*60*60)))\
               .strftime('%Y-%m-%d %H:%M:%S'))

immigrants_table = df_i94_spark.withColumn("cicid", df_i94_spark.cicid.cast(IntegerType()))\
                               .withColumn("i94yr", df_i94_spark.i94yr.cast(IntegerType()))\
                               .withColumn("i94mon", df_i94_spark.i94mon.cast(IntegerType()))\
                               .withColumn("i94cit", df_i94_spark.i94cit.cast(IntegerType()))\
                               .withColumn("i94res", df_i94_spark.i94res.cast(IntegerType()))\
                               .withColumn("arrival_date", get_date(df_i94_spark.arrdate))\
                               .withColumn("mode", df_i94_spark.i94mode.cast(IntegerType()))\
                               .withColumn("mode_text", when(df_i94_spark.i94mode==1, "Air")\
                                           .when(df_i94_spark.i94mode==2, "Sea")\
                                           .when(df_i94_spark.i94mode==3, "Land")\
                                           .otherwise("Not Reported"))\
                               .withColumn("departure_date", get_date(df_i94_spark.depdate))\
                               .withColumn("i94bir", df_i94_spark.i94bir.cast(IntegerType()))\
                               .withColumn("visa", df_i94_spark.i94visa.cast(IntegerType()))\
                               .withColumn("visa_text", when(df_i94_spark.i94visa==1, "Business")\
                                           .when(df_i94_spark.i94visa==2, "Pleasure")\
                                           .when(df_i94_spark.i94visa==3, "Student")\
                                           .otherwise("Not Reported"))\
                               .withColumn("bityear", df_i94_spark.biryear.cast(IntegerType()))\
                               .withColumn("admnum", df_i94_spark.admnum.cast(IntegerType()))\
                               .selectExpr("cicid as immigrandid", "i94yr as year", "i94mon as month", \
                   "i94cit as country_citizen", "i94res as county_residence", "i94port as portid", "arrival_date", \
                   "mode", "mode_text", "i94addr as stateid", "departure_date", "i94bir as age", "visa", "visa_text", "matflag", \
                   "biryear", "gender", "airline", "admnum", "fltno", "visatype")\
                               


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [11]:
#Check DataType of demographics table
demographics_table.printSchema()

root
 |-- demographicsid: long (nullable = false)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: integer (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- ave_household: double (nullable = true)
 |-- stateid: string (nullable = true)
 |-- hispanic_latino: integer (nullable = true)
 |-- white: integer (nullable = true)
 |-- asian: integer (nullable = true)
 |-- black_africanAmerican: integer (nullable = true)
 |-- americanIndian_alaskaNative: integer (nullable = true)



In [12]:
#Check DataType of airports_table
airports_table.printSchema()

root
 |-- airportid: string (nullable = true)
 |-- type: string (nullable = true)
 |-- airportname: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- stateid: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [13]:
#Check DataType of countries_table
countries_table.printSchema()

root
 |-- countryid: integer (nullable = true)
 |-- country: string (nullable = true)



In [14]:
#Check DataType of ports_table
ports_table.printSchema()

root
 |-- portid: string (nullable = true)
 |-- city: string (nullable = true)
 |-- stateid: string (nullable = true)
 |-- state: string (nullable = true)



In [15]:
#Check DataType of immigrants table
immigrants_table.printSchema()

root
 |-- immigrandid: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- country_citizen: integer (nullable = true)
 |-- county_residence: integer (nullable = true)
 |-- portid: string (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- mode: integer (nullable = true)
 |-- mode_text: string (nullable = false)
 |-- stateid: string (nullable = true)
 |-- departure_date: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- visa: integer (nullable = true)
 |-- visa_text: string (nullable = false)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: integer (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [12]:
# Unit tests for the scripts to ensure they are doing the right thing
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg
## Top 5 Numbe of Immigrants by state of stay
sql = immigrants_table.select("stateid")\
                      .groupBy('stateid') \
                      .agg({'stateid':'count'}) \
                      .withColumnRenamed('count(stateid)', 'statecount') \
                      .sort(desc('statecount')) \
                      .show(5)

+-------+----------+
|stateid|statecount|
+-------+----------+
|     FL|    593101|
|     NY|    529507|
|     CA|    451232|
|     HI|    165853|
|     TX|    129899|
+-------+----------+
only showing top 5 rows



In [37]:
#Foreign-born population in top 1 state of stay
sql = demographics_table.select("city", "stateid", "foreign_born").where(demographics_table.stateid=="FL")\
.sort(desc('foreign_born')).show(5)
                        

+--------------+-------+------------+
|          city|stateid|foreign_born|
+--------------+-------+------------+
|         Miami|     FL|      260789|
|  Jacksonville|     FL|       85650|
|Pembroke Pines|     FL|       62210|
|         Tampa|     FL|       58795|
|     Hollywood|     FL|       55158|
+--------------+-------+------------+
only showing top 5 rows



In [16]:
#immigtants who arrival and departure records don't match
immigrants_table.filter(immigrants_table.matflag=="X").show()

+-----------+----+-----+---------------+----------------+------+-------------------+----+---------+-------+----------------+---+----+---------+-------+-------+------+-------+----------+-----+--------+
|immigrandid|year|month|country_citizen|county_residence|portid|       arrival_date|mode|mode_text|stateid|  departure_date|age|visa|visa_text|matflag|biryear|gender|airline|    admnum|fltno|visatype|
+-----------+----+-----+---------------+----------------+------+-------------------+----+---------+-------+----------------+---+----+---------+-------+-------+------+-------+----------+-----+--------+
|      11757|2016|    4|            111|             111|   CHI|2016-04-01 00:00:00|   1|      Air|     IL|1-01-01 00:00:00| 63|   2| Pleasure|      X| 1953.0|     X|     AF|2147483647|00144|      WT|
|      12275|2016|    4|            114|             114|   SPM|2016-04-01 00:00:00|   1|      Air|     CA|1-01-01 00:00:00| 47|   1| Business|      X| 1969.0|     X|     DL|2147483647|00165|     

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. 
<p><strong>Please see file "Schema_model.xlsx" </strong></p>

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
The immigrants dataset is analystic data rather than structured data. So Data lake is better than Data Warehouse. Apache Spark process ETL pipeline for the data lake and HDSF is used for data strage in EMR. Because Apache Spark allow us in this project fast and in-memory data processing and data straged on HDFS is accessed by BI app or Advanced Analystic App.

* Propose how often the data should be updated and why.
 <p>As data set of only 04/2016 is used in this project, the immigrants data should be updated once a month. On the other hand, other data sets don't change frequently so they should be updated once a year.  </p>
 
 
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 <p>First of all, all data should be ingested from S3. Second, we can add nodes and increase process capacity in EMR. But If we want to save cost even if the processing speed is low, we can change them to spark+S3 (not HDFS).    </p>
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 <p>The data lake process should be scheduled by Airflow </p>
 * The database needed to be accessed by 100+ people.
 <p>We can add nodes and increase process capacity. And also 