### Data Engineering Capstone Project

#### Project Summary
In this project, I will apply what I've learned from the previous lessons to:

1. Design the conceptural star-schema database.
2. Build ETL pipelines in python script.
3. Use AWS Cloud Services (S3, Redshift, EMR) and Spark for building data model.
4. Use Airflow to automate and monitor pipelines.

The goal of this project is to get myself familiar with the whole data engineering steps from the beginning database schema design to the data model building part which will take advantage of cloud computing. First, a data warehouse is built by using 4 datasets listed in the following part. Then, model the data in the cloud.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Import necessary libraries:
import os
import pandas as pd
from datetime import datetime
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 1000)

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc...

The final result will be a data warehouse in the cloud, which is useful and helpful for analysts to answer questions through analytic tables and visualization dashboards. I used 4 datasets including (1)immigration (2)airport (3)temperature (4)demographics. Beyond of this, external users could also get reasonable solutions through a web API which queries data from the backend warehouse to help get information.

Tools used here include: S3, EMR, Redshift, Spark, Airflow.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

Mainly datasets that are used include U.S. City Demographic Data (comes from OpenSoft), Immigration data (comes from the US National Tourism and Trade Office), global temperature data (comes from kaggle), and also `I94_SAS_Labels_Descriptions.SAS` file for getting the descriptions for each columns.

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [6]:
! ls

airport-codes_csv.csv		 ppl.cfg
Capstone Project Template.ipynb  sas_data
helper				 sql
I94_SAS_Labels_Descriptions.SAS  us-cities-demographics.csv
immigration_data_sample.csv


In [7]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

This is included in ```etl.py``` script.

#### Cleaning Steps
Document steps necessary to clean the data

This is included in ```etl.py``` script.

- **Immigration Data:**

In [8]:
# Read immigration i94 data:
immigration_fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration = pd.read_sas(immigration_fname, 'sas7bdat', encoding="ISO-8859-1")

In [9]:
# Display first 5 rows about immigration data:
immigration.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [10]:
print(f"There are {len(immigration)} rows in immigration dataset")
print(f"There are {immigration.cicid.nunique()} unique numbers about 'cicid' column")

There are 3096313 rows in immigration dataset
There are 3096313 unique numbers about 'cicid' column


**Data Dictionary for Immigration data**:

In [11]:
pd.DataFrame(data={
    "Columns": immigration.columns,
    "Description": [
        "Unique ID value for each record",
        "Year",
        "Month",
        "City Code",
        "Country Code",
        "Entry Port",
        "Arrival Date in the USA",
        "Transportation Mode",
        "State of Arrival",
        "Departure Date from the USA",
        "Age of Respondent in Years",
        "Visa Codes Collapsed into Three Categories",
        "Used for Summary Statistics",
        "Character Date Field - Date added to I-94 Files",
        "Department of State Where Visa was Issued",
        "Occupation that will be performed in U.S.",
        "Arrival Flag - admitted or paroled into the U.S.",
        "Departure Flag - Departed, lost I-94 or is deceased",
        "Update Flag - Either apprehended, overstayed, adjusted to perm residence",
        "Match flag - Match of arrival and departure records",
        "Year of Birth",
        "Character Date Field - Date to which admitted to U.S. (allowed to stay until)",
        "Gender",
        "INS Number",
        "Airline used to arrive in U.S.",
        "Admission Number",
        "Flight number of Airline used to arrive in U.S.",
        "Class of admission legally admitting the non-immigrant to temporarily stay in U.S."
    ]
})

Unnamed: 0,Columns,Description
0,cicid,Unique ID value for each record
1,i94yr,Year
2,i94mon,Month
3,i94cit,City Code
4,i94res,Country Code
5,i94port,Entry Port
6,arrdate,Arrival Date in the USA
7,i94mode,Transportation Mode
8,i94addr,State of Arrival
9,depdate,Departure Date from the USA


In [49]:
# Check missing values in immigration data:
print(f"There are {len(immigration.columns[immigration.isnull().any()])} columns containing missing values in immigration data \
and those columns are {immigration.columns[immigration.isnull().any()].tolist()}")

There are 17 columns containing missing values in immigration dataset and those columns are ['i94mode', 'i94addr', 'depdate', 'i94bir', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline', 'fltno']


- **Temperature Data:**

In [13]:
# Read temperature dataset:
temperature_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature = pd.read_csv(temperature_fname)

In [20]:
# Display first 5 rows of temperature data:
temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


**Data Dictionary for Temperature Data:**

In [24]:
pd.DataFrame(data={
    "Column": temperature.columns,
    "Description": [
        "Date (format - YYYY-MM-DD)",
        "Averate Temperature of The City At The Given Date",
        "Averate Temperature Uncertainty",
        "City",
        "Country",
        "latitude",
        "Longitude"
    ]
})

Unnamed: 0,Column,Description
0,dt,Date (format - YYYY-MM-DD)
1,AverageTemperature,Averate Temperature of The City At The Given Date
2,AverageTemperatureUncertainty,Averate Temperature Uncertainty
3,City,City
4,Country,Country
5,Latitude,latitude
6,Longitude,Longitude


In [50]:
# Check missing values in temperature data:
print(f"There are {len(temperature.columns[temperature.isnull().any()])} columns containing missing values in temperature data \
and those columns are {temperature.columns[temperature.isnull().any()].tolist()}")

There are 2 columns containing missing values in temperature data and those columns are ['AverageTemperature', 'AverageTemperatureUncertainty']


Here, we can see some aggregate information about the temperature via grouping by Country. And, for each city, only the first record is displayed in the following dataframe.

In [26]:
temperature.groupby(["Country","City"]).first()

Unnamed: 0_level_0,Unnamed: 1_level_0,dt,AverageTemperature,AverageTemperatureUncertainty,Latitude,Longitude
Country,City,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
Afghanistan,Baglan,1833-01-01,-2.204,2.693,36.17N,69.61E
Afghanistan,Gardez,1833-01-01,5.553,2.453,32.95N,69.89E
Afghanistan,Gazni,1837-10-01,8.807,2.303,32.95N,67.98E
Afghanistan,Herat,1837-07-01,26.466,1.583,34.56N,62.27E
Afghanistan,Jalalabad,1833-01-01,2.290,2.487,34.56N,70.05E
Afghanistan,Kabul,1833-01-01,2.290,2.487,34.56N,70.05E
Afghanistan,Qandahar,1842-09-01,22.898,1.732,31.35N,65.97E
Afghanistan,Qunduz,1833-01-01,-2.204,2.693,36.17N,69.61E
Albania,Durrës,1743-11-01,12.686,2.051,40.99N,19.17E
Albania,Elbasan,1743-11-01,12.686,2.051,40.99N,19.17E


- **Airport Data:**

In [15]:
# Read airport data and display the first 5 rows of it:
airport = pd.read_csv("airport-codes_csv.csv")
airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [53]:
# Check missing values in airport data:
print(f"There are {len(airport.columns[airport.isnull().any()])} columns containing missing values in airport data \
and those columns are {airport.columns[airport.isnull().any()].tolist()}")

There are 7 columns containing missing values in airport data and those columns are ['elevation_ft', 'continent', 'iso_country', 'municipality', 'gps_code', 'iata_code', 'local_code']


**Data Dicitonary for Airport Data:**


In [30]:
pd.DataFrame(data={
    "Column": airport.columns,
    "Description": [
        "Unique ID for each record",
        "Airport Tyep",
        "Airport Name",
        "Airport Altitude",
        "Continent",
        "ISO Country Code of The Airport",
        "ISO Region Code of The Airport",
        "City where the airport is located",
        "Airport GPS Code",
        "Airport IATA Code",
        "Airport Local Code",
        "Airport GPS Coordinates"
    ]
})

Unnamed: 0,Column,Description
0,ident,Unique ID for each record
1,type,Airport Tyep
2,name,Airport Name
3,elevation_ft,Airport Altitude
4,continent,Continent
5,iso_country,ISO Country Code of The Airport
6,iso_region,ISO Region Code of The Airport
7,municipality,City where the airport is located
8,gps_code,Airport GPS Code
9,iata_code,Airport IATA Code


- **U.S. City Demographic Data:**

In [18]:
# Read demographics data and display the first 5 rows of it:
demographics = pd.read_csv("us-cities-demographics.csv", sep=";")
demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [54]:
# Check missing values in demographics data:
print(f"There are {len(demographics.columns[demographics.isnull().any()])} columns containing missing values in demographics data \
and those columns are {demographics.columns[demographics.isnull().any()].tolist()}")

There are 5 columns containing missing values in demographics data and those columns are ['Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size']


**Data Dictionary for Demographics Data;**

In [33]:
pd.DataFrame(data={
    "Column": demographics.columns,
    "Description": [
        "City Name",
        "US State",
        "Median Age of The Population",
        "# of Males",
        "# of Females",
        "# of Total Population",
        "# of Veterans",
        "# of city residents born outside of city",
        "Average Household Size",
        "Code of the state of the city",
        "Race",
        "# of individual of each race"
    ]
})

Unnamed: 0,Column,Description
0,City,City Name
1,State,US State
2,Median Age,Median Age of The Population
3,Male Population,# of Males
4,Female Population,# of Females
5,Total Population,# of Total Population
6,Number of Veterans,# of Veterans
7,Foreign-born,# of city residents born outside of city
8,Average Household Size,Average Household Size
9,State Code,Code of the state of the city


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The immigration data is with no doubt the fact table here as the analytics we want to look at is the U.S. visitors from all over the world. Based on the previous data exploratory, I'd like to create 3 tables for the data model:  
- Fact table: immigration  
- Dimention tables: state, date

**Fact table schema - Immigration:**  
- cicid INT <font color="green">NOT NULL</font> <font color="red">PRIMARY KEY</font>
- i94yr INT
- i94mon INT
- i94visa INT
- i94res INT <font color="blue">FOREIGN KEY</font>
- i94mode INT
- i94cit INT <font color="blue">FOREIGN KEY</font>
- i94bir INT
- arrdate VARCHAR <font color="blue">FOREIGN KEY</font>
- depdate VARCHAR <font color="blue">FOREIGN KEY</font>
- airline VARCHAR
- i94port VARCHAR
- visatype VARCHAR
- gender VARCHAR
- i94addr VARCHAR <font color="blue">FOREIGN KEY</font>

**Dimension table schema - State**  
- Code VARCHAR <font color="green">NOT NULL</font> <font color="red">PRIMARY KEY</font>
- State VARCHAR
- AmericanIndianAndAlaskaNative INT
- Asian INT
- BlackOrAfricanAmerican INT
- HispanicOrLatino INT
- White INT
- ForeignBorn INT
- NumberVeterans INT
- FemalePopulation INT
- MalePopulation INT
- TotalPopulation INT

**Dimension table schema - Date**
- "date" VARCHAR <font color="green">NOT NULL</font> <font color="red">PRIMARY KEY</font>
- "day" INT
- "month" INT
- "year" INT
- weekofyear INT
- dayofweek INT

#### 3.2 Mapping Out Data Pipelines
After finishing the design of schemas, now I'm going to jump into the ETL pipelines which are created in python script named as ```etl.py``` under helper folder. For simplicity, I will just import etl here.  
- **First**, a spark session object is created and linked with AWS EMR.
- **Then**, ETL process is performed on the provided immigration dataset and date table will also be saved as spark parquet file and output to the corresponding output path within this process.
- **Next**, state table is generated through ```etl_states_data(...)``` function by using demographics data which is ETL-processed via ```etl_demographics_data(...)``` function.
- **What's more**, airport dataset and temperature dataset are also being ETL processed through ```etl_airport_data(...)```, ```etl_temperature_data(...)``` functions and saved as spark parquet files, though they are not used for generating fact and dimension tables.  

Explanations of the full ETL process for each table are included in ```etl.py``` script with exact steps about what things have been done to extract/transform/load the original dataset.

In [75]:
import helper.etl as etl

# Create a spark session:
spark = etl.spark_session_create(aws_spark_emr="<EMR-ENDPOINT>")

# ETL process for immigration dataset, create immigration table & date table, save both as parquet spark files to S3:
immigration = etl_immigration_data(spark, input_path='../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
                                   output_path="s3a://udacity-dend-capstone/immigration.parquet",
                                   date_output_path="s3a://udacity-dend-capstone/date.parquet",
                                   input_format = "com.github.saurfang.sas.spark", 
                                   limit=1000, partitionBy=None, 
                                   columns_to_save="*")

# ETL process for state dataset, create state table, save it as parquet spark files to S3:
state = etl_states_data(spark, output_path="s3a://udacity-dend-capstone/states.parquet")

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
In previous step 2 and step 3, I've done exploratory data analysis(EDA), data schemas design, and ETL process script written. Next, I'm going to build the data pipelines to create the data model. There are mainly 2 steps for the entire pipeline building:  
- **First** - Use Spark:
    - to extract original data stored on S3; transform data including aggregating columns, generating new columns based on existing information in the dataset, etc...; load the processed data after transform step back to S3.
- **Second** - Use Airflow:
    - to build a DAG to extract data from S3; load data into Redshift database.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
**Last but not least**, it is important to check the pipeline quality so that there is no error during the entire process. Airflow provides a way to not only automate data pipelines but also monitor the correctness of the process. Thus, at the last step of loading data into Redshift, we need to have a DAG to check data quality to make sure that tables in Redshift are containing the right information before letting the users pull the data from Redshift or sending data to users who are interested in doing some analytic works.

Data quality checks include things like :
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Quality checks script is included in ```data_quality.py```

#### 4.3 Data dictionary 
Data dictionaries are shown below including immigration, temperature, airport, demographics tables.

In [12]:
# Data dictionary for immigration table:
pd.DataFrame(data={
    "Columns": immigration.columns,
    "Description": [
        "Unique ID value for each record",
        "Year",
        "Month",
        "City Code",
        "Country Code",
        "Entry Port",
        "Arrival Date in the USA",
        "Transportation Mode",
        "State of Arrival",
        "Departure Date from the USA",
        "Age of Respondent in Years",
        "Visa Codes Collapsed into Three Categories",
        "Used for Summary Statistics",
        "Character Date Field - Date added to I-94 Files",
        "Department of State Where Visa was Issued",
        "Occupation that will be performed in U.S.",
        "Arrival Flag - admitted or paroled into the U.S.",
        "Departure Flag - Departed, lost I-94 or is deceased",
        "Update Flag - Either apprehended, overstayed, adjusted to perm residence",
        "Match flag - Match of arrival and departure records",
        "Year of Birth",
        "Character Date Field - Date to which admitted to U.S. (allowed to stay until)",
        "Gender",
        "INS Number",
        "Airline used to arrive in U.S.",
        "Admission Number",
        "Flight number of Airline used to arrive in U.S.",
        "Class of admission legally admitting the non-immigrant to temporarily stay in U.S."
    ]
})

Unnamed: 0,Columns,Description
0,cicid,Unique ID value for each record
1,i94yr,Year
2,i94mon,Month
3,i94cit,City Code
4,i94res,Country Code
5,i94port,Entry Port
6,arrdate,Arrival Date in the USA
7,i94mode,Transportation Mode
8,i94addr,State of Arrival
9,depdate,Departure Date from the USA


In [14]:
# Data dictionary for temperature table:
pd.DataFrame(data={
    "Column": temperature.columns,
    "Description": [
        "Date (format - YYYY-MM-DD)",
        "Averate Temperature of The City At The Given Date",
        "Averate Temperature Uncertainty",
        "City",
        "Country",
        "latitude",
        "Longitude"
    ]
})

Unnamed: 0,Column,Description
0,dt,Date (format - YYYY-MM-DD)
1,AverageTemperature,Averate Temperature of The City At The Given Date
2,AverageTemperatureUncertainty,Averate Temperature Uncertainty
3,City,City
4,Country,Country
5,Latitude,latitude
6,Longitude,Longitude


In [16]:
# Data dictionary for airport table:
pd.DataFrame(data={
    "Column": airport.columns,
    "Description": [
        "Unique ID for each record",
        "Airport Tyep",
        "Airport Name",
        "Airport Altitude",
        "Continent",
        "ISO Country Code of The Airport",
        "ISO Region Code of The Airport",
        "City where the airport is located",
        "Airport GPS Code",
        "Airport IATA Code",
        "Airport Local Code",
        "Airport GPS Coordinates"
    ]
})

Unnamed: 0,Column,Description
0,ident,Unique ID for each record
1,type,Airport Tyep
2,name,Airport Name
3,elevation_ft,Airport Altitude
4,continent,Continent
5,iso_country,ISO Country Code of The Airport
6,iso_region,ISO Region Code of The Airport
7,municipality,City where the airport is located
8,gps_code,Airport GPS Code
9,iata_code,Airport IATA Code


In [19]:
# Data dictionary for demographics table:
pd.DataFrame(data={
    "Column": demographics.columns,
    "Description": [
        "City Name",
        "US State",
        "Median Age of The Population",
        "# of Males",
        "# of Females",
        "# of Total Population",
        "# of Veterans",
        "# of city residents born outside of city",
        "Average Household Size",
        "Code of the state of the city",
        "Race",
        "# of individual of each race"
    ]
})

Unnamed: 0,Column,Description
0,City,City Name
1,State,US State
2,Median Age,Median Age of The Population
3,Male Population,# of Males
4,Female Population,# of Females
5,Total Population,# of Total Population
6,Number of Veterans,# of Veterans
7,Foreign-born,# of city residents born outside of city
8,Average Household Size,Average Household Size
9,State Code,Code of the state of the city


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.  

The entire project is implemented using cloud which is scalable, low-cost, and highly reliable platform. Tools I used on AWS include S3, EMR, Redshift, also Airflow for automating and monitoring pipelines. These tools are very useful for starting small business which is the reason for using.

There are some advantages for these services:

S3: Simple storage service, as its name says, is simple-to-use with scalability, cheap, and high performance and secure to store the data of this project.

Spark: It is the best framework for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing. In this project, spark is used for doing SQL stuff to process the data loaded from S3.

EMR: This is a cloud-native big data platform, allowing teams to process vast amounts of data quickly, and cost-effectively at scale using Spark. EMR is easy to use, secure, elastic and low-cost. Perfect to our project;

Redshift: A natural and logical choice since we based all the solution in the cloud in AWS. Redshift provides a massively parallel, column-oriented data warehouse that provides easy-scale functionality. The main analytical tools have native interface to load from Redshift.

Airflow: A DAG structured workflow management platform which is essential for data pipelines. Here for this project, it is a good choice to utilize for automating and monitoring pipelines.

* Propose how often the data should be updated and why.  

I think for this dataset, monthly update should be good to go.

* Write a description of how you would approach the problem differently under the following scenarios:

    - The data was increased by 100x.  
        Compared to scale the entire pipeline, increasing the number of cluster nodes in EMR should be a good way to go, since the data size is increased dramatically, so we need also more cluster nodes to handle the large amount of data.

    - The data populates a dashboard that must be updated on a daily basis by 7am every day.  
        We can adjust the 'schedule_interval' to '@daily' of the Airflow DAG. And, we can schedule the pipelines to run overnight to make sure data is available before 7am.

    - The database needed to be accessed by 100+ people.  
        We can add and/or remove users at any time through Redshift cluster. Also, using 'elastic resize' feature to adjust number of nodes in Redshift cluster is another way to deal with this.