# Effect of cities, demographics and temperature on US Immigration
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project provides an opportunity to model US immigration data and to answer fundamental questions about immigration inflows in to the US. Questions such which cities do immigrants frequently target, what is the mean age of each immigrant, what visa types do immigrants possess before entering into us and what is the average temperature per month over each immigration city. 

Out of the four datasets provided, this project harnessed the I94 immigration data set obtained from  US National Tourism and Trade Office immigration data, US demographic dataset obtained from OpenSoft and . city temperature dataset obtained from Kaggle.

The database was designed using the Star schema with four dimension tables (immigrants, city, time and monthly city temperatures ) and one fact table (immigration)


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#df_immig_i94 = spark.read.load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_immig_i94 = spark.read.load('./sas_data')

In [3]:
# Do all imports and installs here
import pandas as pd
import os
import re
from datetime import datetime, timedelta
#from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType, IntegerType

## Step 1: Scope the Project and Gather Data

### Scope 
The objective of the project is to extract data from three sources and stage them. The extracted data would be transformed and aggregated into four dimension tables and one fact table so that analytics on US immigration can be performed based on city demographics and city average temperatures.

#### Describe and Gather Data 

**U.S. City Demographic Data**: comes from OpenSoft [demographic data](https://public.opendatasoft.com) and includes data by city, state, age, population, veteran status and race.

**I94 Immigration Data**: comes from the US National Tourism and Trade Office [immigration data](https://travel.trade.gov/research/reports/i94/historical/2016.html)
and includes details on incoming immigrants and their ports of entry.

**Airport Code Table**: comes from datahub.io [airport code](https://datahub.io/core/airport-codes#data) and includes airport codes and corresponding cities.

**World Temperature Data**: comes from kaggle [world temperature data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) and
includes data on temperature changes in the U.S. since 1850.

In [4]:
# Read demographics data
demog_df = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")

In [5]:
# Read in temperature data
temp_df = spark.read.format("csv").option("header", "true").load('../../data2/GlobalLandTemperaturesByCity.csv')

In [6]:
df_immig_i94.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [7]:
df_immig_i94.count()

3096313

In [8]:
# Create dictionary of valid i94 immigration valid port codes
#immig_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"
with open("./I94_SAS_Labels_Descriptions.SAS") as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
ports = {}
for line in lines[302:961]:
    match = re_compiled.search(line)
    ports[match.group(1)] = match.group(2)
print(len(ports))

659


In [10]:
# Create list of states
states = demog_df.toPandas()["State Code"].unique()
print(len(states))
print(states)

49
['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA' 'NV'
 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY' 'SC' 'LA'
 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID' 'IN' 'AK' 'MS'
 'HI' 'SD' 'ME' 'MT']


In [11]:
# Convert SAS date to PySpark date using udf
@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

In [12]:
# Validate state using udf
@udf(StringType())
def state_validation(x):
    if x in states:
        return x
    return 'other'

In [19]:
# immigration cleaning function
def immig_clean(df):
    # Remove all missing values
    clean_immig_df = df.dropna(how="any", subset=["i94port", "i94addr", "gender"])
    # Extract states
    clean_immig_df = clean_immig_df.withColumn("i94addr", state_validation(clean_immig_df.i94addr))
    # Convert arrival_date in SAS format to PySpark format
    clean_immig_df = clean_immig_df.withColumn("arrdate", convert_datetime(clean_immig_df.arrdate))
    # filter relevant immigration data
    clean_immig_df = clean_immig_df.filter(clean_immig_df.i94addr != 'other')
    return clean_immig_df

In [20]:
# Test function
immig_test_df = immig_clean(df_immig_i94)
immig_test_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [23]:
# stage immigration dataframe
clean_immig_df = immig_test_df
staging_immig = clean_immig_df.select(col("cicid").alias("id"), 
                                       col("arrdate").alias("date"),
                                       col("i94port").alias("city_code"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("i94visa").alias("visa_type"),
                                       "count").drop_duplicates()

staging_immig.show(5)

+---------+----------+---------+----------+----+------+---------+-----+
|       id|      date|city_code|state_code| age|gender|visa_type|count|
+---------+----------+---------+----------+----+------+---------+-----+
|5925997.0|2016-04-09|      BUF|        NY|null|     U|      2.0|  1.0|
|5928645.0|2016-04-25|      HHW|        HI|null|     U|      2.0|  1.0|
| 158100.0|2016-04-01|      MIA|        FL| 0.0|     F|      2.0|  1.0|
|1868564.0|2016-04-10|      NAS|        FL| 0.0|     M|      2.0|  1.0|
|2686612.0|2016-04-15|      MIA|        FL| 0.0|     F|      2.0|  1.0|
+---------+----------+---------+----------+----+------+---------+-----+
only showing top 5 rows



In [24]:
staging_immig.printSchema()

root
 |-- id: double (nullable = true)
 |-- date: string (nullable = true)
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_type: double (nullable = true)
 |-- count: double (nullable = true)



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

#### Demographics Data

In [25]:
demog_df.count()

2891

In [28]:
# map city name to city port abbreviation using udf

@udf(StringType())
def city_port(city):
    for key in ports:
        if city.lower() in ports[key].lower():
            return key

In [29]:
# Clean demographics data and create new columns

# Calculate demographics population percentages
clean_demog_df = demog_df.withColumn("median_age", demog_df['Median Age']) \
    .withColumn("pct_male_pop", (demog_df['Male Population'] / demog_df['Total Population']) * 100) \
    .withColumn("pct_female_pop", (demog_df['Female Population'] / demog_df['Total Population']) * 100) \
    .withColumn("pct_veterans", (demog_df['Number of Veterans'] / demog_df['Total Population']) * 100) \
    .withColumn("pct_foreign_born", (demog_df['Foreign-born'] / demog_df['Total Population']) * 100) \
    .withColumn("pct_race", (demog_df['Count'] / demog_df['Total Population']) * 100) \
    .withColumn("city_code", city_port(demog_df["City"])) \
    .dropna(how='any', subset=["city_code"])

In [31]:
clean_demog_df = clean_demog_df.select(col("City").alias("city_name"),
                                        col("State Code").alias("state_code"), 
                         "median_age", "pct_male_pop", "pct_female_pop","pct_veterans", 
                         "pct_foreign_born", 
                        col("Total Population").alias("total_pop"), 
                         col("Race").alias("race"),
                        "pct_race").drop_duplicates()

clean_demog_df.count()

883

In [33]:
#create pivot of the race column
pivot_demog_df = clean_demog_df.groupBy("city_name", "state_code", "median_age", "pct_male_pop",
                                        "pct_female_pop","pct_veterans", "pct_foreign_born",
                                       "total_pop").pivot("Race").avg("pct_race")

pivot_demog_df = pivot_demog_df.withColumn("city_code", city_port(pivot_demog_df["city_name"])) \
    .dropna(how='any', subset=["city_code"])

In [34]:
# stage demographic data
staging_demog_df = pivot_demog_df.select("city_code", "state_code", "city_name", "median_age",
                                    round(col("pct_male_pop"), 1).alias("pct_male_pop"),
                                    round(col("pct_female_pop"), 1).alias("pct_female_pop"),
                                    round(col("pct_veterans"), 1).alias("pct_veterans"),
                                    round(col("pct_veterans"), 1).alias("pct_foreign_born"),
                                    round(col("American Indian and Alaska Native"), 1).alias("pct_native_american"),
                                    round(col("Asian"), 1).alias("pct_asian"),
                                    round(col("Black or African-American"), 1).alias("pct_black"),
                                    round(col("Hispanic or Latino"), 1).alias("pct_hispanic_or_latino"),
                                    round(col("White"), 1).alias("pct_white"), "total_pop")
print(staging_demog_df.count())
staging_demog_df.show(5)

180
+---------+----------+--------------+----------+------------+--------------+------------+----------------+-------------------+---------+---------+----------------------+---------+---------+
|city_code|state_code|     city_name|median_age|pct_male_pop|pct_female_pop|pct_veterans|pct_foreign_born|pct_native_american|pct_asian|pct_black|pct_hispanic_or_latino|pct_white|total_pop|
+---------+----------+--------------+----------+------------+--------------+------------+----------------+-------------------+---------+---------+----------------------+---------+---------+
|      TUC|        AZ|        Tucson|      33.6|        49.8|          50.2|         7.2|             7.2|                4.6|      4.6|      6.4|                  43.5|     76.1|   531674|
|      MCA|        TX|         Allen|      37.2|        52.3|          47.7|         3.6|             3.6|                0.2|     16.1|     13.4|                  10.8|     71.2|    98138|
|      CRP|        TX|Corpus Christi|      35.

In [35]:
staging_demog_df.printSchema()

root
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- pct_male_pop: double (nullable = true)
 |-- pct_female_pop: double (nullable = true)
 |-- pct_veterans: double (nullable = true)
 |-- pct_foreign_born: double (nullable = true)
 |-- pct_native_american: double (nullable = true)
 |-- pct_asian: double (nullable = true)
 |-- pct_black: double (nullable = true)
 |-- pct_hispanic_or_latino: double (nullable = true)
 |-- pct_white: double (nullable = true)
 |-- total_pop: string (nullable = true)



### Temperature Data

In [36]:
temp_df.count()

8599212

In [38]:
# Clean temperature data
#filter U.S temperature data out of the world Temperature Data, and only use the latest year(2013)
# remove wrong ports and map city name to port abbreviation


ustemp_df = temp_df.filter(temp_df["Country"] == "United States") \
    .withColumn("year", year(temp_df['dt'])) \
    .withColumn("month", month(temp_df["dt"])) \
    .withColumn("i94port", city_port(temp_df["City"])) \
    .withColumn("AverageTemperature", temp_df["AverageTemperature"]*9/5+32) \
    .dropna(how='any', subset=["i94port"])

ustemp_df = ustemp_df.filter(year(temp_df["dt"])==2013)

In [39]:
# stage us temperature data
staging_ustemp_df = ustemp_df.select(col("year"), col("month"), 
                                     col("i94port").alias("city_code"),
                                         round(col("AverageTemperature"), 1).alias("avg_temperature"),
                                         col("Latitude").alias("lat"), 
                                     col("Longitude").alias("long")).drop_duplicates()

print(staging_ustemp_df.count())
staging_ustemp_df.show(5)

1044
+----+-----+---------+---------------+------+------+
|year|month|city_code|avg_temperature|   lat|  long|
+----+-----+---------+---------------+------+------+
|2013|    2|      OKC|           40.0|36.17N|97.46W|
|2013|    1|      WAS|           35.3|39.38N|76.99W|
|2013|    5|      BHX|           69.1|32.95N|87.13W|
|2013|    8|      CLE|           69.8|40.99N|80.95W|
|2013|    8|      HSV|           70.7|42.59N|89.45W|
+----+-----+---------+---------------+------+------+
only showing top 5 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

**Star Schema**

#### Mapping Out Data Pipelines¶
The steps necessary to pipeline the data into the chosen data model

Build the data pipeline to create the data model by:
    
1. Dropping duplicates from immigration data
2. Creating dimension tables from cleaned data
3. Writing each dimension table to parquet for downstream query
4. Creating fact table
5. Writing fact table to parquet for downstream query

### Step 4: Run Pipelines to Model the Data 
#### Create the data model
Build the data pipelines to create the data model.

In [41]:
# Create dimension table for immigrant dataframe

immigrants = staging_immig.select("id",
                                     "gender",
                                     "age", 
                                     "visa_type").drop_duplicates()

In [43]:
immigrants.count()

2435922

In [44]:
immigrants.show(5)

+---------+------+----+---------+
|       id|gender| age|visa_type|
+---------+------+----+---------+
| 488015.0|     M| 6.0|      2.0|
|3885998.0|     M|11.0|      2.0|
|3305770.0|     M|13.0|      2.0|
|5087701.0|     F|13.0|      2.0|
| 798492.0|     M|18.0|      2.0|
+---------+------+----+---------+
only showing top 5 rows



In [45]:
# Write immigrant dimension table to parquet
immigrants.write.mode("overwrite").partitionBy("gender", "age").parquet("immigrants")

In [46]:
# Create dimension table for city dataframe

city = staging_demog_df.join(staging_ustemp_df, "city_code") \
    .select("city_code", 
            "state_code", 
            "city_name", 
            "median_age", 
            "pct_male_pop", 
            "pct_female_pop", 
            "pct_veterans",
           "pct_foreign_born", 
            "pct_native_american",
            "pct_asian", "pct_black",
           "pct_hispanic_or_latino",
            "pct_white", 
            "total_pop", 
            "lat",
            "long").drop_duplicates()

In [47]:
city.count()

142

In [48]:
city.show(5)

+---------+----------+-----------+----------+------------+--------------+------------+----------------+-------------------+---------+---------+----------------------+---------+---------+------+-------+
|city_code|state_code|  city_name|median_age|pct_male_pop|pct_female_pop|pct_veterans|pct_foreign_born|pct_native_american|pct_asian|pct_black|pct_hispanic_or_latino|pct_white|total_pop|   lat|   long|
+---------+----------+-----------+----------+------------+--------------+------------+----------------+-------------------+---------+---------+----------------------+---------+---------+------+-------+
|      BRO|        TX|Brownsville|      30.6|        47.7|          52.3|         2.3|             2.3|                0.6|      0.9|      0.7|                  92.5|     95.0|   183888|26.52N| 96.72W|
|      HSV|        WI|    Madison|      30.7|        49.2|          50.8|         3.9|             3.9|                0.9|      9.6|      8.2|                   7.9|     82.1|   248956|34.56N

In [49]:
# Write city dimension table to parquet
city.write.mode("overwrite").partitionBy("state_code").parquet("cities")

In [50]:
# Create dimension table for monthly city temperature data frame

city_month_temp = staging_ustemp_df.select("city_code",
                                                "year",
                                                "month",
                                                "avg_temperature").drop_duplicates()

In [51]:
city_month_temp.count()

1044

In [52]:
city_month_temp.show(5)

+---------+----+-----+---------------+
|city_code|year|month|avg_temperature|
+---------+----+-----+---------------+
|      HOU|2013|    4|           66.2|
|      BTN|2013|    6|           83.0|
|      RDU|2013|    1|           42.4|
|      CIN|2013|    1|           31.5|
|      LCB|2013|    9|           83.8|
+---------+----+-----+---------------+
only showing top 5 rows



In [53]:
# Write montly city temperature dimension table to parquet
city_month_temp.write.mode("overwrite").parquet("city_month_temperatures")

In [54]:
# Create dimension table for time dataframe

time = staging_immig.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))
                        
time = time.select("date",
                         "dayofweek",
                         "weekofyear",
                         "month").drop_duplicates()

In [55]:
time.count()

30

In [56]:
time.show(5)

+----------+---------+----------+-----+
|      date|dayofweek|weekofyear|month|
+----------+---------+----------+-----+
|2016-04-23|        7|        16|    4|
|2016-04-22|        6|        16|    4|
|2016-04-08|        6|        14|    4|
|2016-04-09|        7|        14|    4|
|2016-04-26|        3|        17|    4|
+----------+---------+----------+-----+
only showing top 5 rows



In [57]:
# Write time dimension table to parquet
time.write.mode("overwrite").parquet("time")

In [58]:
# Create fact table for immigration dataframe

immigration = staging_immig.select("id", 
                                         "state_code", 
                                         "city_code", 
                                         "date", "count").drop_duplicates()

In [59]:
immigration.count()

2435922

In [60]:
immigration.show(5)

+---------+----------+---------+----------+-----+
|       id|state_code|city_code|      date|count|
+---------+----------+---------+----------+-----+
|4750990.0|        HI|      HHW|2016-04-25|  1.0|
|4308064.0|        HI|      HHW|2016-04-23|  1.0|
|4232231.0|        FL|      ATL|2016-04-22|  1.0|
|  56083.0|        HI|      HHW|2016-04-01|  1.0|
|3488147.0|        NY|      NYC|2016-04-19|  1.0|
+---------+----------+---------+----------+-----+
only showing top 5 rows



In [69]:
# Write fact table to parquet
#immigration.write.mode("overwrite").partitionBy("state_code", "city_code").parquet("immigration_facts")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [63]:
# Perform quality checks on dimension tables

def dim_table(df):
    if df is not None:
        return True
    else:
        return False
        
if dim_table(immigrants) & dim_table(city) \
& dim_table(city_month_temp) & dim_table(time):
    print("dimension data quality checks passed")
    print("dimension tables exist")
    print()
else:
    print("dimension data quality check failed")    

dimension data quality checks passed
dimension tables exist



In [64]:
# Perform quality checks on fact table
def fact_table(df):
    if df is not None:
        return True
    else:
        return False
        
if fact_table(immigration):
    print("fact data quality checks passed")
    print("fact table exist")
    print()
else:
    print("fact data quality check failed")

fact data quality checks passed
fact table exist



In [68]:
# Perform quality checks on fact table
def non_zero_records(df):
    return df.count() !=0
            
if non_zero_records(immigration):
    print("data quality checks passed")
    print("non zero records check passed")
    print()
else:
    print("non zero records failed check failed")

data quality checks passed
non zero records check passed



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

**Step 5: Complete Project Write Up**

* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

**Rationale for the choice of tools and technologies for the project.**

Spark has a Python API, PySpark capable of handling big data with speed due to it's in-memory compute technology. Spark can handle different data types such as JSON, Parquest, SAS and CSV and can also scale very well in distributed or parallel environment. Moreover, Spark integrates very well with cloud data stores such as S3 and cloud databases such as Amazon Redshift.

**How often the data should be updated and why.**

The data should be updated on monthly cycles because the format of the raw files are monthly. This cycle will also work for most entities such as  organizations and governments. 

**How my approach to the problem would differ under the following scenarios:**

* The data was increased by 100x.

Amazon Redshift would be used since it provides analytical database which is optimized for aggregation and read-heavy workloads

* The data populates a dashboard that must be updated on a daily basis by 7am every day.

Apache Airflow or Apache Nifi could be used to create DAG pipelines or automate the process that send failures notices or perform retries.
Daily quality checks that send failure emails to operators could be employed for consistent monitoring in order to meet users and client requiremnts.

* The database needed to be accessed by 100+ people.

The auto scaling capacities of redshift could be used to handle heavy loads and also attain good read performance.