# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project will include data on immigration to the United States, and supplementary datasets will include data on airport codes, U.S. city demographics, and temperature data and SAS data Description to build the analytic database.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [6]:
import pandas as pd
from pyspark.sql import SparkSession
import datetime as dt
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.functions import monotonically_increasing_id
import os
import configparser

In [14]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

output_data='s3a://datalake-udacity-haipd4'

In [15]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [16]:
# create a udf to convert arrival date in SAS format to datetime object
get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

### Step 1: Scope the Project and Gather Data

#### Scope 
In This Project we will use **EMR** with **Spark** technology to extract data from SAS data and store data to Fact Table and Dimentional tables into **Data Lake** in S3. This project also use **airflow** to run tasks such as copy to **redshift** from s3 and check data quality as well.

#### Describe and Gather Data 
The following datasets were used to create the data analytics:
1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.*
2. World Temperature Data: This dataset came from Kaggle. You can read more about it here.
3. U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it here.
4. Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from here.

### I94 Immigration Data

In [5]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [7]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [8]:

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [9]:
#write to parquet
df_spark.write.partitionBy('i94yr', 'i94mon') \
                     .parquet(os.path.join(output_data, 'immigrations'), 'overwrite')


In [6]:
# Show the total number of records
df_spark=spark.read.parquet("sas_data")
df_spark.count()

3096313

In [7]:
# Read schema
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [8]:
df_spark.select(['i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'i94mode', 'i94addr', 'i94bir', 'i94visa']).show()

+------+------+------+------+-------+-------+-------+------+-------+
| i94yr|i94mon|i94cit|i94res|i94port|i94mode|i94addr|i94bir|i94visa|
+------+------+------+------+-------+-------+-------+------+-------+
|2016.0|   4.0| 245.0| 438.0|    LOS|    1.0|     CA|  40.0|    1.0|
|2016.0|   4.0| 245.0| 438.0|    LOS|    1.0|     NV|  32.0|    1.0|
|2016.0|   4.0| 245.0| 438.0|    LOS|    1.0|     WA|  29.0|    1.0|
|2016.0|   4.0| 245.0| 438.0|    LOS|    1.0|     WA|  29.0|    1.0|
|2016.0|   4.0| 245.0| 438.0|    LOS|    1.0|     WA|  28.0|    1.0|
|2016.0|   4.0| 245.0| 464.0|    HHW|    1.0|     HI|  57.0|    2.0|
|2016.0|   4.0| 245.0| 464.0|    HHW|    1.0|     HI|  66.0|    2.0|
|2016.0|   4.0| 245.0| 464.0|    HHW|    1.0|     HI|  41.0|    2.0|
|2016.0|   4.0| 245.0| 464.0|    HOU|    1.0|     FL|  27.0|    2.0|
|2016.0|   4.0| 245.0| 464.0|    LOS|    1.0|     CA|  26.0|    2.0|
|2016.0|   4.0| 245.0| 504.0|    NEW|    1.0|     MA|  44.0|    2.0|
|2016.0|   4.0| 245.0| 504.0|    L

In [12]:
df_spark.select(['cicid', 'arrdate', 'depdate', 'count', 'dtadfile', 'visapost', 'occup']).show()

+---------+-------+-------+-----+--------+--------+-----+
|    cicid|arrdate|depdate|count|dtadfile|visapost|occup|
+---------+-------+-------+-----+--------+--------+-----+
|5748517.0|20574.0|20582.0|  1.0|20160430|     SYD| null|
|5748518.0|20574.0|20591.0|  1.0|20160430|     SYD| null|
|5748519.0|20574.0|20582.0|  1.0|20160430|     SYD| null|
|5748520.0|20574.0|20588.0|  1.0|20160430|     SYD| null|
|5748521.0|20574.0|20588.0|  1.0|20160430|     SYD| null|
|5748522.0|20574.0|20579.0|  1.0|20160430|     ACK| null|
|5748523.0|20574.0|20586.0|  1.0|20160430|     ACK| null|
|5748524.0|20574.0|20586.0|  1.0|20160430|     ACK| null|
|5748525.0|20574.0|20581.0|  1.0|20160430|     ACK| null|
|5748526.0|20574.0|20581.0|  1.0|20160430|     ACK| null|
|5748527.0|20574.0|20576.0|  1.0|20160430|     GUZ| null|
|5748528.0|20574.0|20575.0|  1.0|20160430|     GUZ| null|
|5748529.0|20574.0|20596.0|  1.0|20160430|     PNM| null|
|5748530.0|20574.0|20577.0|  1.0|20160430|     PNM| null|
|5748531.0|205

In [13]:
df_spark.select(['entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline','admnum','fltno','visatype']).show()

+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|      G|      O|   null|      M| 1984.0|10292016|     F|  null|     VA|9.495562283E10|00007|      B1|
|      G|      O|   null|      M| 1987.0|10292016|     M|  null|     DL|9.495640653E10|00040|      B1|
|      G|      O|   null|      M| 1987.0|10292016|     F|  null|     DL|9.495645143E10|00040|      B1|
|      G|      O|   null|      M| 1988.0|10292016|     M|  null|     DL|9.495638813E10|00040|      B1|
|      G|      O|   null|      M| 1959.0|10292016|     M|  null|     NZ|9.498180283E10|00010|      B2|
|      G|      O|   null|      M| 1950.0|10292016|     F|  null|     NZ|9

#### Temperatures By City Data

In [16]:
# Read data from GlobalLandTemperaturesByCity.csv
file_name = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = spark.read.csv(file_name, header=True, inferSchema=True)

In [17]:
# Show the total number of records
temperature_df.count()

8599212

In [20]:
temperature_df.select('dt').dropDuplicates().count()

3239

In [8]:
temperature_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [9]:
# Read schema
temperature_df.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



#### US Cities DemoGraphics Data

In [11]:
# Read data from us-cities-demographics.csv
file_name = "data/us-cities-demographics.csv"
demographics_df = spark.read.csv(file_name, inferSchema=True, header=True, sep=';')

In [None]:
# Show the total number of records
demographics_df.count()

In [12]:
demographics_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [13]:
# Read schema
demographics_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



#### Airport Codes Data

In [15]:
# Read data from airport-codes_csv.csv
file_name = "data/airport-codes_csv.csv"
airport_codes_df = spark.read.csv(file_name, inferSchema=True, header=True, sep=',')

In [None]:
# Show the total number of records
airport_codes_df.count()

In [16]:
airport_codes_df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [17]:
# Read schema
airport_codes_df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### Explore Data Analysis: General Data

In [17]:
def sas_value_parser(sas_file, value, columns):    
    file_string = ''
    
    with open(sas_file) as f:
        file_string = f.read()
    
    file_string = file_string[file_string.index(value):]
    file_string = file_string[:file_string.index(';')]
    
    line_list = file_string.split('\n')[1:]
    data = []
   
    for line in line_list:
        
        if '=' in line:
            code, val = line.split('=')
            code = code.strip()
            val = val.strip()

            if code[0] == "'":
                code = code[1:-1]

            if val[0] == "'":
                val = val[1:-1]
            
            data.append((code, val))
        
            
    return pd.DataFrame(data, columns=columns)

In [18]:
i94cit_res = sas_value_parser('data/I94_SAS_Labels_Descriptions.SAS', 'i94cntyl', ['code', 'name'])
i94cit_res=spark.createDataFrame(i94cit_res)
i94cit_res.show()

+----+--------------------+
|code|                name|
+----+--------------------+
| 582|MEXICO Air Sea, a...|
| 236|         AFGHANISTAN|
| 101|             ALBANIA|
| 316|             ALGERIA|
| 102|             ANDORRA|
| 324|              ANGOLA|
| 529|            ANGUILLA|
| 518|     ANTIGUA-BARBUDA|
| 687|          ARGENTINA |
| 151|             ARMENIA|
| 532|               ARUBA|
| 438|           AUSTRALIA|
| 103|             AUSTRIA|
| 152|          AZERBAIJAN|
| 512|             BAHAMAS|
| 298|             BAHRAIN|
| 274|          BANGLADESH|
| 513|            BARBADOS|
| 104|             BELGIUM|
| 581|              BELIZE|
+----+--------------------+
only showing top 20 rows



In [20]:
i94cit_res.write.csv(os.path.join(output_data, 'i94_countries'), sep=',')

In [22]:
i94prtl_res = sas_value_parser('data/I94_SAS_Labels_Descriptions.SAS', 'i94prtl', ['code', 'name'])
i94prtl_res=spark.createDataFrame(i94prtl_res)
i94prtl_res.show()

+----+--------------------+
|code|                name|
+----+--------------------+
| ALC|ALCAN, AK        ...|
| ANC|ANCHORAGE, AK    ...|
| BAR|BAKER AAF - BAKER...|
| DAC|DALTONS CACHE, AK...|
| PIZ|DEW STATION PT LA...|
| DTH|DUTCH HARBOR, AK ...|
| EGL|EAGLE, AK        ...|
| FRB|FAIRBANKS, AK    ...|
| HOM|HOMER, AK        ...|
| HYD|HYDER, AK        ...|
| JUN|JUNEAU, AK       ...|
| 5KE|       KETCHIKAN, AK|
| KET|KETCHIKAN, AK    ...|
| MOS|MOSES POINT INTER...|
| NIK|NIKISKI, AK      ...|
| NOM|NOM, AK          ...|
| PKC|POKER CREEK, AK  ...|
| ORI|  PORT LIONS SPB, AK|
| SKA|SKAGWAY, AK      ...|
| SNP| ST. PAUL ISLAND, AK|
+----+--------------------+
only showing top 20 rows



In [24]:
i94prtl_res.write.csv(os.path.join(output_data, 'i94_ports'), sep=',')

In [25]:
i94addrl_res = sas_value_parser('data/I94_SAS_Labels_Descriptions.SAS', 'i94addrl', ['code', 'address'])
i94addrl_res=spark.createDataFrame(i94addrl_res)
i94addrl_res.show()

+----+-----------------+
|code|          address|
+----+-----------------+
|  AL|          ALABAMA|
|  AK|           ALASKA|
|  AZ|          ARIZONA|
|  AR|         ARKANSAS|
|  CA|       CALIFORNIA|
|  CO|         COLORADO|
|  CT|      CONNECTICUT|
|  DE|         DELAWARE|
|  DC|DIST. OF COLUMBIA|
|  FL|          FLORIDA|
|  GA|          GEORGIA|
|  GU|             GUAM|
|  HI|           HAWAII|
|  ID|            IDAHO|
|  IL|         ILLINOIS|
|  IN|          INDIANA|
|  IA|             IOWA|
|  KS|           KANSAS|
|  KY|         KENTUCKY|
|  LA|        LOUISIANA|
+----+-----------------+
only showing top 20 rows



In [26]:
i94addrl_res.write.csv(os.path.join(output_data, 'i94_addresses'), sep=',')

In [28]:
i94model_res = sas_value_parser('data/I94_SAS_Labels_Descriptions.SAS', 'i94model', ['mode_id', 'name'])
i94model_res=spark.createDataFrame(i94model_res)
i94model_res.show()

+-------+------------+
|mode_id|        name|
+-------+------------+
|      1|         Air|
|      2|         Sea|
|      3|        Land|
|      9|Not reported|
+-------+------------+



In [29]:
i94model_res.write.csv(os.path.join(output_data, 'i94_models'), sep=',')

In [30]:
I94VISA = sas_value_parser('data/I94_SAS_Labels_Descriptions.SAS', 'I94VISA', ['id', 'name'])
I94VISA=spark.createDataFrame(I94VISA)
I94VISA.show()

+---+--------+
| id|    name|
+---+--------+
|  1|Business|
|  2|Pleasure|
|  3| Student|
+---+--------+



In [31]:
#I94VISA.to_csv('data/i94VISA.csv', sep=',')
I94VISA.write.csv(os.path.join(output_data, 'i94_visas'), sep=',')

### Explore Data Analysis: Immigration Data

In [12]:
# Drop duplicate records
duplicatesImmigrationDf = df_spark.select('cicid').dropDuplicates()
duplicatesImmigrationDf.count()

3096313

### Arrival Date

In [14]:
# create df from arrdate column
date_df = df_spark.select(['arrdate']).withColumn("arrdate", get_datetime(df_spark.arrdate)).distinct()
    
 # expand df by adding other calendar columns
date_df = date_df.withColumn('arrival_day', dayofmonth('arrdate'))
date_df = date_df.withColumn('arrival_week', weekofyear('arrdate'))
date_df = date_df.withColumn('arrival_month', month('arrdate'))
date_df = date_df.withColumn('arrival_year', year('arrdate'))
date_df = date_df.withColumn('arrival_weekday', dayofweek('arrdate'))


In [15]:
date_df.show()

+----------+-----------+------------+-------------+------------+---------------+
|   arrdate|arrival_day|arrival_week|arrival_month|arrival_year|arrival_weekday|
+----------+-----------+------------+-------------+------------+---------------+
|2016-04-22|         22|          16|            4|        2016|              6|
|2016-04-15|         15|          15|            4|        2016|              6|
|2016-04-18|         18|          16|            4|        2016|              2|
|2016-04-09|          9|          14|            4|        2016|              7|
|2016-04-11|         11|          15|            4|        2016|              2|
|2016-04-12|         12|          15|            4|        2016|              3|
|2016-04-27|         27|          17|            4|        2016|              4|
|2016-04-01|          1|          13|            4|        2016|              6|
|2016-04-08|          8|          14|            4|        2016|              6|
|2016-04-26|         26|    

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

![](star-schema-capstone.png)


In This model we will have Fact Table: immigrations and Dimentional Tables: i94_countries, i94_ports, i94_addresses, temperatures, us_cities_demographics, airport_codes, arrival_date

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

![](capstone_data_pipeline.png)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### 1. Process data and load data into S3

python3 elt.py

##### 2. Create Tables in Redshift & Load data from S3 to Redshift

Copy code under airflow folder and Launch Airflow. Once Launch Airflow start **capstone_dag** in Airflow

In [None]:
class CopyToRedshiftOperator(BaseOperator):
    ui_color = '#358140'
    copy_sql = """
        COPY {} 
        FROM '{}'
        ACCESS_KEY_ID '{}'
        SECRET_ACCESS_KEY '{}'
        {} '{}'
        REGION '{}'
        {};
    """
    
    @apply_defaults
    def __init__(self,
                 redshift_conn_id="",
                 aws_credentials_id="",
                 table="",
                 s3_bucket="",
                 s3_key="",
                 region="",
                 format_path="auto",
                 additional="",
                 *args, **kwargs):

        super(CopyToRedshiftOperator, self).__init__(*args, **kwargs)
        self.redshift_conn_id = redshift_conn_id
        self.aws_credentials_id = aws_credentials_id
        self.table = table
        self.s3_bucket = s3_bucket
        self.s3_key = s3_key
        self.region = region
        self.format_path = format_path
        self.additional = additional
        self.execution_date = kwargs.get('execution_date')

    def execute(self, context):
        self.log.info('CopyToRedshiftOperator init external connection')
        aws_hook = AwsHook(self.aws_credentials_id)
        credentials = aws_hook.get_credentials()
        redshift = PostgresHook(postgres_conn_id=self.redshift_conn_id)
        
        self.log.info('CopyToRedshiftOperator start executing')
        self.log.info(f'CopyToRedshiftOperator delete data table {self.table}')
        redshift.run(f"DELETE FROM {self.table}")
        self.log.info(f'CopyToRedshiftOperator deleted data table {self.table} successfully')
        
        self.log.info('CopyToRedshiftOperator copy data from s3 to redshift table')
        
        s3_path = "s3://{}".format(self.s3_bucket)
        if self.execution_date:
            self.log.info('CopyToRedshiftOperator load data for specific date')
            year = self.execution_date.strftime("%Y")
            month = self.execution_date.strftime("%m")
            day = self.execution_date.strftime("%d")
            s3_path = '/'.join([s3_path, str(year), str(month), str(day)])
        s3_path = s3_path + '/' + self.s3_key
        
        self.log.info('CopyToRedshiftOperator start formatting the sql')
        
        formatted_sql = CopyToRedshiftOperator.copy_sql.format(
            self.table,
            s3_path,
            credentials.access_key,
            credentials.secret_key,
            self.format_path,
            self.region            
        )
        
        self.log.info('CopyToRedshiftOperator start running sql to copy')
        redshift.run(formatted_sql)
        self.log.info(f'CopyToRedshiftOperator running {self.table} successfully')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks
We will pass list of table that need to check number of records to operator to validate

In [None]:
class DataQualityOperator(BaseOperator):

    ui_color = '#89DA59'
    select_query = 'SELECT COUNT(*) FROM {}'
    
    @apply_defaults
    def __init__(self,
                 redshift_conn_id = "",
                 tables = "",
                 *args, **kwargs):

        super(DataQualityOperator, self).__init__(*args, **kwargs)
        self.redshift_conn_id=redshift_conn_id
        self.tables=tables

    def execute(self, context):
        self.log.info('DataQualityOperator start executing')
        redshift = PostgresHook(self.redshift_conn_id)
        
        for table in self.tables:
            records = redshift.get_records(select_query.format(table))
            if records is None or len(records) < 1 or len(records[0]) < 1 or records[0][0] < 1:
                self.log.error(f"DataQualityOperator check failed. No records present in destination table {table}")
                raise ValueError(f"DataQualityOperator check failed. No records present in destination table {table}")
            self.log.info(f"DataQualityOperator on table {table} check passed with {records[0][0]} records")

#### 4.3 Data dictionary 

#### Step 5: Complete Project Write Up

##### Clearly state the rationale for the choice of tools and technologies for the project.
1. Apache Airflow
2. Resshift
3. EMR

##### Propose how often the data should be updated and why.
Use EMR to process data and store to Data lake, Use Apache to schedule and run tasks to copy data from s3 to Redshift. Redshift for Data Warehouse 

##### Write a description of how you would approach the problem differently under the following scenarios:

###### The data was increased by 100x.
Currently This project uses Redshift to store data with distribute style key for big dataset already. So we will need load data 
following data partition approach into Redshift and We can use EMR to processing data and store with format parquet and partion data 
following month and year before.

###### The data populates a dashboard that must be updated on a daily basis by 7am every day
We will need to update the DAG accordingly by schedule to run tasks and populate the data

###### The database needed to be accessed by 100+ people
We will create separate roles for different people or group to manage permission following usage purpose.