# Immigration Data Pipeline
### Data Engineering Capstone Project

#### Project Summary
Perform ETL on multiple datasets containing immigration, airport, and U.S. state demographics. Result will be clean analytics table containing only data for trips to the 50 U.S. states. The final tables should be initiutive and easy to use by anyone interested in analyzing immigration logistic and demographic information.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
import configparser
from datetime import datetime
import os

## Scope the Project and Gather Data

This project will take US immigration related-data from various sources and create clean tables to aid in analytics and visualizations. The main source of the data will be the i94 immigration data supplied from the U.S. National Tourism and Trade Office. This data contains information about a visit to the United States by a foreign citizen. To supplement this immigration data, two additional data sources will be used, U.S. City demographic data from OpenSoft, Airport Codes Table, and i94 country codes. These data sets will allow the end user to investigate immigration patterns and traffic based on airport usage and demographics of visited U.S. states. 

The tools used in this project include Python, Spark, and Amazon S3. Both of the tools (Python and Spark) are incredibly useful in dealing with large datasets and performing ETL (extract, transform, and load) tasks such as this project. Amazon S3 is a reliable cloud-based storage system used for housing big data which makes it ideal for this dataset. 

## Data Exploration

In [2]:
# Read in the data here
fname = 'immigration_data_sample.csv'
df = pd.read_csv(fname)

In [3]:
df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [5]:
#write to parquet
#df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

In [6]:
df_spark.take(5)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='NV', depdate=20591.0, i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1984.0, dtaddto='10292016', gender='F', insnum=None, airline='VA', admnum=94955622830.0, fltno='00007', visatype='B1'),
 Row(cicid=5748519.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='WA', depdate=20582.0, i94bir=29.

In [7]:
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg

In [8]:
num_rows = df_spark.count()
for c in df_spark.columns:
    df_spark.select([(100*count(when(col(c).isNull() | isnan(col(c)), c))/num_rows).alias(c)]).show()

+-----+
|cicid|
+-----+
|  0.0|
+-----+

+-----+
|i94yr|
+-----+
|  0.0|
+-----+

+------+
|i94mon|
+------+
|   0.0|
+------+

+------+
|i94cit|
+------+
|   0.0|
+------+

+------+
|i94res|
+------+
|   0.0|
+------+

+-------+
|i94port|
+-------+
|    0.0|
+-------+

+-------+
|arrdate|
+-------+
|    0.0|
+-------+

+--------------------+
|             i94mode|
+--------------------+
|0.007718857880324115|
+--------------------+

+-----------------+
|          i94addr|
+-----------------+
|4.928183940060324|
+-----------------+

+---------------+
|        depdate|
+---------------+
|4.6008591508675|
+---------------+

+--------------------+
|              i94bir|
+--------------------+
|0.025901774142342845|
+--------------------+

+-------+
|i94visa|
+-------+
|    0.0|
+-------+

+-----+
|count|
+-----+
|  0.0|
+-----+

+--------------------+
|            dtadfile|
+--------------------+
|3.229647648671178E-5|
+--------------------+

+-----------------+
|         visapost|
+-----

#### Handling missing values
Since the main purpose of this data set is to allow the end user to analyze immigration logistic and demographic patterns I had to make decision regarding the missing values discovered above. One of the supporting data sets will include US state demographics. Since there are only 5% of the rows missing i94 addresses which is coded as the US state visited then I am going to drop all rows missing that field. I have also decided to include visatype in my tables going forward since the data is complete and it could be used to analyze visits by visa type which could potentially lead to insights and logistics planning. It is disappointing that 13% of the gender field is missing but I am still going to include this field in the final table. The end user will just need to exercise caution when using this field knowing that it may be missing for a decently large portion of the rows. I will also be dropping any rows that are missing the "i94bir" column which represents the age of the traveler. I want the end user to have access to demographics related to age groups when conducting analysis. Finally, it is worth noting that the depature date is missing for around 5% of the data. I am going to leave the length of stay missing for rows without a depdate. I do not have enough information to make an assumption. I would rather the end user known there is missing data than make a false assumption that leads them astray. For the other data sets, I will be making the i94 country code via Excel so it will not contain missing values. Also the airport codes file will have a lot of missing values for the iata_code. However, since this code is necessary as a tie to my facts table, I will be dropping all rows that do not have an iata_code.



In [9]:
### look at data in i94addr
df_spark.select('i94addr').groupBy('i94addr').agg({'i94addr':'count'}).sort(desc('count(i94addr)')).show(25)

+-------+--------------+
|i94addr|count(i94addr)|
+-------+--------------+
|     FL|        621701|
|     NY|        553677|
|     CA|        470386|
|     HI|        168764|
|     TX|        134321|
|     NV|        114609|
|     GU|         94107|
|     IL|         82126|
|     NJ|         76531|
|     MA|         70486|
|     WA|         55792|
|     GA|         44663|
|     MI|         32101|
|     VA|         31399|
|     PA|         30293|
|     DC|         28228|
|     NE|         26574|
|     MD|         25360|
|     NC|         23375|
|     LA|         22655|
|     AZ|         20218|
|     OH|         18089|
|     CO|         15874|
|     CT|         13991|
|     OR|         12574|
+-------+--------------+
only showing top 25 rows



In [10]:
### look at data in i94addr
df_spark.select('i94addr').groupBy('i94addr').agg({'i94addr':'count'}).sort('count(i94addr)').show(25)

+-------+--------------+
|i94addr|count(i94addr)|
+-------+--------------+
|   null|             0|
|     71|             1|
|     52|             1|
|     HR|             1|
|     73|             1|
|     RU|             1|
|     XN|             1|
|     CG|             1|
|     RF|             1|
|     UR|             1|
|     UL|             1|
|     ZN|             1|
|     EV|             1|
|     KF|             1|
|     YH|             1|
|     S6|             1|
|     RO|             1|
|     VL|             1|
|     RA|             1|
|     EX|             1|
|     JC|             1|
|     PD|             1|
|     FC|             1|
|     TC|             1|
|     85|             1|
+-------+--------------+
only showing top 25 rows



#### Handling invalid codes
This project is solely to focus on visits to U.S. states. There appear to be values that do not match up to any U.S. states. Therefore, I will be using a list of state codes to filter the data.

In [12]:
### check to see how much data will be lost
states_list = pd.read_csv("US_state_list.csv")
state_codes = list(states_list.Code)

num_rows = df_spark.count()
print("Perc of data remaining: ")
100 * df_spark.where(col("i94addr").isin(state_codes)).count() / num_rows

Perc of data remaining: 


90.8609691591257

In [13]:
### look at data in i94port
df_spark.select('i94port').groupBy('i94port').agg({'i94port':'count'}).sort(desc('count(i94port)')).show(25)

+-------+--------------+
|i94port|count(i94port)|
+-------+--------------+
|    NYC|        485916|
|    MIA|        343941|
|    LOS|        310163|
|    SFR|        152586|
|    ORL|        149195|
|    HHW|        142720|
|    NEW|        136122|
|    CHI|        130564|
|    HOU|        101481|
|    FTL|         95977|
|    ATL|         92579|
|    LVG|         89280|
|    AGA|         80919|
|    WAS|         74835|
|    DAL|         71809|
|    BOS|         57354|
|    SEA|         47719|
|    PHO|         38890|
|    DET|         37832|
|    TAM|         25632|
|    PHI|         24973|
|    DUB|         24371|
|    SAI|         23628|
|    TOR|         20886|
|    DEN|         18260|
+-------+--------------+
only showing top 25 rows



In [14]:
### look at data in i94port
df_spark.select('i94port').groupBy('i94port').agg({'i94port':'count'}).sort('count(i94port)').show(25)

+-------+--------------+
|i94port|count(i94port)|
+-------+--------------+
|    COO|             1|
|    PHF|             1|
|    HNN|             1|
|    NC8|             1|
|    PCF|             1|
|    CNC|             1|
|    VNB|             1|
|    RIO|             1|
|    CPX|             1|
|    REN|             1|
|    YIP|             1|
|    BWM|             1|
|    LWT|             1|
|    ANA|             1|
|    SCH|             1|
|    MAI|             1|
|    ERC|             1|
|    NIG|             1|
|    MND|             1|
|    NEC|             2|
|    CRP|             2|
|    DVL|             2|
|    RYY|             2|
|    PSM|             2|
|    MTH|             2|
+-------+--------------+
only showing top 25 rows



There does not appear to be anything crazy with the i94port codes. They all are 3 digit codes. It is possible that they will not have a matching iata_code in the airports table but initial analysis looks fine. There is one value "XXX" which is the code for unknown or unreported values. Checking below to see how often that happens.

In [15]:
print("Percentage of data coded as XXX which means unknown: ")
100 * df_spark.select('i94port').filter(df_spark.i94port == 'XXX').count() / num_rows

Percentage of data coded as XXX which means unknown: 


0.11374819018619887

Low percentage of rows so will be filtering this data out in the final table

## Data Model and Data Dictionary

A storage method known as a STAR schema will be applied to make the data end user friendly. The idea behind this schema is a single fact table surrounded by supporting dimensional tables. This schema requires more storage than a tradition OLTP relational database but comes with the benefit of easier usage by analysts. Since storage on S3 is relatively cheap and extremely reliable, the space tradeoff is an easy decision to make. The final deliverable will be a Python ETL script that can be run on demand to create the analytic tables which will be saved as Parquet files (file type that plays well with Spark) in an S3 bucket. See below for the schema

The i94 data includes a variety of information about each visit to the United States from a foreign citizen. The data extracted in this project will include the following:
- immigration_record_id
- country_of_citizenship_i94_code = country of citizenship for traveler
- country_of_residence_i94_code = country of residence for traveler
- state_visited_code = U.S. state visited
- airport_iata_code = airport used
- airline_used 
- visatype
- month_of_arrival = month of arrival in U.S.
- year_of_arrival = year of arrival in U.S.
- traveler_age
- traveler_gender
- visit_length_days

The supplemental data set, U.S. city demographics contains information regarding populations for U.S. cities with each row representing a ethnic group in that city. For this project, the data will only be extracted a state level with no regards to ethnic groups. The following data will be used from each state:
- state_code = 2 digit state code
- name = name of state
- total_male_population
- total_female_population
- total_foreign_born_residents

The Airport Code Table includes a lot of data about airports in the world, including information such as country, region, and type. The data will be limited to only airports located in the United States and will include:
- iata_code = code used to uniquely identify airport
- name = name of airport
- type = type of airport (small, heliport, etc)
- state = state where airport is located
- municipality = municipality in which airport is located
- longitude
- latitude

There will also be a small file containing i94 country codes that will be used to determine the country of residence and citizenship of the travelers. The table will contain the following:
- i94 code
- name of location

### Schema
![immigration_schema](immigration_schema_erd.jpg)

## Pipelines to Model the Data

#### Read in AWS Credentials to allow access to output final Parquet files into S3 bucket.

In [111]:
import configparser
import os

config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

#### Create Spark Session

In [17]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()

#### Read in and clean i94 Immigration Data

This data is stored in SaS format. You will need to store the link to the i94 data in a file named dl.cfg under the variable i94_DATA. This data can be stored in S3 or local storage depending on the number of files you are interested in ingesting. Since this data is in SaS format, the ingestion step will require use of PySparks read_format function.

In [54]:
i94_file_path = config.get('DATA', 'i94_DATA')
immigration_data_full = spark.read.format('com.github.saurfang.sas.spark').load(i94_file_path)

***Select only the columns used in the schema described above. Columns have different names in the i94 data so one part of the pipeline will be renaming to match the schema.***

In [55]:
immigration_data = immigration_data_full.select(col('cicid').alias('immigration_record_id'),
                                            col('i94cit').alias('country_of_citizenship_i94_code'),
                                            col('i94res').alias('country_of_residence_i94_code'),
                                            col('i94addr').alias('state_visited_code'),
                                            col('i94port').alias('airport_iata_code'),
                                            col('airline').alias('airline_used'),
                                            col('visatype'),
                                            col('i94mon').alias('month_of_arrival'),
                                            col('i94yr').alias('year_of_arrival'),
                                            col('i94bir').alias('traveler_age'),
                                            col('gender').alias('traveler_gender'),
                                            col('arrdate'),
                                            col('depdate'))

In [56]:
### read in states list to filter state_visited down to only US states
states_list = pd.read_csv('US_state_list.csv')
state_codes = list(states_list.Code)

immigration_data = immigration_data.where(col('i94addr').isin(state_codes))

In [57]:
### filter out any data with an airport_iata_code = "XXX"
immigration_data = immigration_data.filter(immigration_data.airport_iata_code != 'XXX')

In [58]:
### drop missing values in traveler age
immigration_data = immigration_data.where(col('traveler_age').isNotNull())

In [59]:
### calculate length of stay
immigration_data = immigration_data.withColumn('visit_length_days', col('depdate')-col('arrdate'))
### drop arrdate and depdate columns
immigration_data = immigration_data.drop('arrdate','depdate')

[Row(immigration_record_id=7.0, country_of_citizenship_i94_code=254.0, country_of_residence_i94_code=276.0, state_visited_code='AL', airport_iata_code='ATL', airline_used=None, visatype='F1', month_of_arrival=4.0, year_of_arrival=2016.0, traveler_age=25.0, traveler_gender='M', visit_length_days=None),
 Row(immigration_record_id=15.0, country_of_citizenship_i94_code=101.0, country_of_residence_i94_code=101.0, state_visited_code='MI', airport_iata_code='WAS', airline_used='OS', visatype='B2', month_of_arrival=4.0, year_of_arrival=2016.0, traveler_age=55.0, traveler_gender='M', visit_length_days=146.0),
 Row(immigration_record_id=16.0, country_of_citizenship_i94_code=101.0, country_of_residence_i94_code=101.0, state_visited_code='MA', airport_iata_code='NYC', airline_used='AA', visatype='B2', month_of_arrival=4.0, year_of_arrival=2016.0, traveler_age=28.0, traveler_gender=None, visit_length_days=22.0),
 Row(immigration_record_id=17.0, country_of_citizenship_i94_code=101.0, country_of_resi

In [60]:
### write immigration data to parquet file, partition by year and month
immigration_data.write.partitionBy('year_of_arrival','month_of_arrival').mode('overwrite').parquet('immigration_data')

#### Read in and clean airport codes

In [112]:
airports_input_file_path = config.get('DATA', 'AIRPORT_INPUT_DATA')
airports = pd.read_csv('airport-codes_csv.csv')

In [69]:
### keep only rows with 3 digit iata codes and in the US
three_digit_code = lambda x: True if not pd.isna(x) and len(x) == 3 else False
valid_codes = airports.iata_code.apply(three_digit_code)
airports_clean = airports[valid_codes]
airports_clean = airports_clean[airports_clean.iso_country == 'US']

In [71]:
### extract columns
columns_to_select = ['type','name','iso_region','municipality','iata_code','coordinates']
airports_clean = airports_clean[columns_to_select]

In [76]:
### create new "state" column by extracting code from iso_region
extract_state_from_region = lambda x: x.split('-')[1]
airports_clean['state'] = airports_clean.iso_region.apply(extract_state_from_region)
### drop iso_region
airports_clean = airports_clean.drop('iso_region',axis=1)

In [80]:
### create latitude and longitude columns from coordinates
extract_lat_from_coords = lambda x: x.split(',')[0]
extract_long_from_coords = lambda x: x.split(',')[1]
airports_clean['latitude'] = airports_clean.coordinates.apply(extract_lat_from_coords)
airports_clean['longitude'] = airports_clean.coordinates.apply(extract_long_from_coords)
### drop coordinates data
airports_clean = airports_clean.drop('coordinates',axis=1)

In [113]:
### save clean file
airport_output_file_path = config.get('DATA', 'AIRPORT_OUTPUT_DATA')
airports_clean.to_csv(airport_output_file_path, index=False)

#### Read in and aggregate US demographics data

In [114]:
state_demo_input_file_path = config.get('DATA', 'US_DEMO_INPUT_DATA')
state_demo = pd.read_csv(state_demo_input_file_path, sep=';')

In [99]:
### extract only necessary columns
columns_to_select = ['City','State','Male Population','Female Population','Foreign-born','State Code']
state_demo = state_demo[columns_to_select]

In [100]:
### aggregate by city by state by taking minimum of population numbers since the rows are 
### broken out by ethnic group but the population numbers are for the whole city
### which causes them to get replicated
state_demo_grouped = state_demo.groupby(['City','State','State Code']).agg({'Male Population':'min',
                                                                            'Female Population':'min',
                                                                            'Foreign-born':'min'})
state_demo_grouped.reset_index(inplace=True)

Unnamed: 0,City,State,State Code,Male Population,Female Population,Foreign-born
0,Abilene,Texas,TX,65212.0,60664.0,8129.0
1,Akron,Ohio,OH,96886.0,100667.0,10024.0
2,Alafaya,Florida,FL,39504.0,45760.0,15842.0
3,Alameda,California,CA,37747.0,40867.0,18841.0
4,Albany,Georgia,GA,31695.0,39414.0,861.0


In [103]:
### now aggregate by state since that is the level at which we are interested
state_demo_grouped = state_demo_grouped.groupby(['State','State Code']).agg({'Male Population':'sum',
                                                                            'Female Population':'sum',
                                                                            'Foreign-born':'sum'})
state_demo_grouped.reset_index(inplace=True)

In [106]:
### rename columns to match schema
state_demo_final = state_demo_grouped.rename({'State':'name',
                                             'State Code':'state_code',
                                             'Male Population':'total_male_population',
                                             'Female Population':'total_female_population',
                                             'Foreign-born':'total_foreign_born_residents'})

In [115]:
### export data
state_demo_output_file_path = config.get('DATA', 'US_DEMO_OUTPUT_DATA')
state_demo_final.to_csv(state_demo_output_file_path, index=False)

#### Data Quality Checks

There are two main data quality check I will be performing. The first and most obvious one is to make sure that there is actually data in the analytic table files. The second check will be to ensure that certain columns do not contain any missing values.

##### Missing Value Checks

In [133]:
### check immigrations data to make sure that data has been loaded
immigration_data_file_path = config.get('DATA', 'IMMIGRATION_OUTPUT_DATA')
immigration_data = spark.read.parquet(immigration_data_file_path)

num_records = immigration_data.count()
if num_records == 0:
    raise ValueError("Something went wrong! Immigration data has zero records.")
else:
    print("Success! Immigration data has", num_records, "records!")

Success! Immigration data has 2811325 records!


In [134]:
### check airport codes
airport_output_file_path = config.get('DATA', 'AIRPORT_OUTPUT_DATA')
airport_codes = pd.read_csv(airport_output_file_path)

num_records = airport_codes.shape[0]
if num_records == 0:
    raise ValueError("Something went wrong! Airport codes data has zero records.")
else:
    print("Success! Airport codes data has", num_records, "records!")

Success! Airport codes data has 2019 records!


In [136]:
### check US demographics data
state_demo_output_file_path = config.get('DATA', 'US_DEMO_OUTPUT_DATA')
state_demo = pd.read_csv(state_demo_output_file_path)

num_records = state_demo.shape[0]
if num_records == 0:
    raise ValueError("Something went wrong! US demographics data has zero records.")
else:
    print("Success! US demographics data has", num_records, "records!")

Success! US demographics data has 49 records!


##### Null Value Checks

For the immigration data there are a few columns in which we do not want to have missing values: immigration_record_id, country_of_citizenship_i94_code, country_of_residence_i94_code,
state_visited_code, traveler_age, airport_iata_code.

For the airport and US demographics data, there should be no missing values

In [146]:
### check immigrations data for missing values
columns_to_check = ['immigration_record_id','country_of_citizenship_i94_code',
                    'country_of_residence_i94_code','state_visited_code',
                    'traveler_age','airport_iata_code']
for column in columns_to_check:
    num_missing_values = immigration_data.select([(count(when(col(column).isNull() | isnan(col(column)), c)))]).collect()[0][0]
    if num_missing_values > 0:
        raise ValueError('Something went wrong!', column, 'has', num_missing_values, 'missing_values')
    else:
        print('Success!', column, 'has 0 missing values!')

Success! immigration_record_id has 0 missing values!
Success! country_of_citizenship_i94_code has 0 missing values!
Success! country_of_residence_i94_code has 0 missing values!
Success! state_visited_code has 0 missing values!
Success! traveler_age has 0 missing values!
Success! airport_iata_code has 0 missing values!


#### Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

The tools used in this project include Python, Spark, and Amazon S3. Both of the tools (Python and Spark) are incredibly useful in dealing with large datasets and performing ETL (extract, transform, and load) tasks such as this project. Amazon S3 is a reliable cloud-based storage system used for housing big data which makes it ideal for this dataset. 

The data update rate for this data would heavily depend on the analytics use case. However, since the files are based on month then it makes the most sense to update this data at the end of every month. The immigration data requires monetary investment so that may play a large role in how often the end user would want to update the data.

### Future considerations
 * The data was increased by 100x.
 
     If the data was to be increased by 100x then alterations would definitely need to be made. Rather than reading and writing the parquet files from local storage, a cloud based storage solution such as Amazon S3 would be absolutely necessary unless the end user already had access to capable on premise infrastructure. It would also essential to offload the data processing work to a cloud based or distrubed work environment. Right now Spark is being used locally but distributing the work load would have to happen with that magnitude of increase. Amazon also offers solutions such as EMR and EC2 to handle this type of workload.
    
    
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
     Since this is monthly data, I wouldn't see the business case for a daily updating dashboard. However, should there be the need, you would need a pipeline to extract the data from the relevant sources. Rather than using static csv and sas files that have been downloaded manually, you would need to find a way to automate the data retrieval. This could be in the form of API endpoints, FTP sites, web scraping scripts, etc. In order to properly orchestrate all of these automated processes, it would be good to use a tool such as Airflow, Luigi, or any other DAG pipeline scheduler. This would allow monitoring of task statuses and task dependencies such as making sure the data retrieval automation had been completed before the processing steps. Also it would allow the tasks to retry and notify you on task failure.
     
     
 * The database needed to be accessed by 100+ people.
 
     Depending on the budget, hosting the data on the cloud would be the best solution for scaling up to hundreds of users. By storing the data in a S3 bucket or similar cloud service. Access to the data can then be given based on IAM roles.