# Data Engineering - Capstone Project - ETL Pipeline

### Step 1: Scope the Project and Gather Data

This project aims to gather electrical consumption data from multiple sources, clean and aggregate this data into one consolidated single source of truth database ready to be queried by analysts.  
Data will be aggregated into daily and monthly values and placed into seperate tables to speed up analytical queries.  The final dataset will enable analysts to identify the submetering coverage percentage of each site. 

For the purpose of the project data will come from three seperate sources, these contain actual *anonmyised* submeter and main meter data which have been placed in an S3 bucket for learning purposes.  This is a proof of concept idea, to demonstrate knowledge of data engineering principles (AWS and Data warehouisng) learnt throughout the Nanodegree.
Each sources contains half-hourly kilowatt consumption values, with some of these dating back to 2015.  

A more detailed breakdown of these can be found below.

#### Source 1:
- Subset of 100 different submeter values dating back to 2015.
- Derived from Tridium JACE devices.
- Values are given as sequential meter reads.
- Values have been exported in CSV format and are located on an S3 bucket.
- An example file `SOURCE_ONE/AAA069_AAA069_dba_kwh.csv` has been included.

#### Source 2: 
- Subset of 200 different submeter values dating back to 2019.
- Values derived from a proprietary sub meter data provider.
- Values are given as delta between sequential meter reads, this is the actual amount consumed in that half-hourly period.
- Values have been exported to JSON format and are located on an S3 bucket.
- An example file `SOURCE_TWO/AAA013_AAA013_dba_kwh.json` has been included.

#### Source 3: 
- Subset of monthly main meter reads for June 2018.
- Values are given as the actual amount consumed in that half-hourly period.
- An example file `SOURCE_THREE/2019_05.csv` has been included.

### Step 2: Explore and Assess the Data


In [1]:
# Required Imports
import boto3
import os
import configparser
import pandas as pd
import json
import transformations as tf
import psycopg2

from io import StringIO
from utils import get_matching_s3_keys

config = configparser.ConfigParser()
config.read("Pre-Processing/config.ini")

ACCESS_KEY = config['AWS']['ACCESS_KEY']
SECRET_KEY = config['AWS']['SECRET_KEY']
BUCKET_NAME = 'dend-capstone-sample'

#### Exploring Source 1

Source ONE contains a subset of submeter data stored in CSV files.
Each file represents a different submeter for a particular site.  Each file is uniformly partitioned using the following convention... 

- `SITEID_SITEID_meterid_kwh.csv`
where SITEID is a combination of REGIONCODE (AAA, BBB.... FFF) and a three digit numeric ID.
- `TIMESTAMP` is a unixtimestamp that occurs at half-hourly intervals
- `VALUE` represents the current kWh meter read


In [4]:
SOURCE_ONE_FILE = "SOURCE_1/AAA069_AAA069_dba_kwh.csv"

s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY,
                      aws_secret_access_key=SECRET_KEY)
obj = s3.get_object(Bucket=BUCKET_NAME, Key=SOURCE_ONE_FILE)
example_source_one = pd.read_csv(obj['Body'])


In [13]:
print(example_source_one[['meter_name','TIMESTAMP','VALUE']].head())

              meter_name      TIMESTAMP         VALUE
0  AAA069_AAA069_dba_kwh  1455521400013  55228.500000
1  AAA069_AAA069_dba_kwh  1455523200018  55232.800781
2  AAA069_AAA069_dba_kwh  1455525000010  55237.398438
3  AAA069_AAA069_dba_kwh  1455526800012  55241.500000
4  AAA069_AAA069_dba_kwh  1455528600004  55246.101562


#### Exploring Source 2

Source 2 has a similar structure to Source 1, however files are stored in JSON format.
It is important to note that VALUE reads are the actual kWh consumed at that half-hourly interval

- `SITEID_SITEID_meterid_kwh.csv`
where SITEID is a combination of REGIONCODE (AAA, BBB.... FFF) and a three digit numeric ID.
- `TIMESTAMP` is a unixtimestamp that occurs at half-hourly intervals
- `VALUE` represents the kWh value consumed at that half-hourly interval

In [11]:
SOURCE_TWO_FILE = "SOURCE_2/AAA013_AAA013_dba_kwh.json"

s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY,
                      aws_secret_access_key=SECRET_KEY)
obj = s3.get_object(Bucket=BUCKET_NAME, Key=SOURCE_TWO_FILE)
json_content = obj["Body"].read().decode()
json_obj = json.loads(json_content)


In [12]:
print(json_obj['data'])

[{'meter_name': 'AAA013_AAA013_dba_kwh', 'TIMESTAMP': 1529280000000, 'TRENDFLAGS': None, 'STATUS': None, 'VALUE': 3.59375, 'TRENDFLAGS_TAG': None, 'STATUS_TAG': None}, {'meter_name': 'AAA013_AAA013_dba_kwh', 'TIMESTAMP': 1529281800000, 'TRENDFLAGS': None, 'STATUS': None, 'VALUE': 3.40625, 'TRENDFLAGS_TAG': None, 'STATUS_TAG': None}, {'meter_name': 'AAA013_AAA013_dba_kwh', 'TIMESTAMP': 1529283600000, 'TRENDFLAGS': None, 'STATUS': None, 'VALUE': 3.40625, 'TRENDFLAGS_TAG': None, 'STATUS_TAG': None}, {'meter_name': 'AAA013_AAA013_dba_kwh', 'TIMESTAMP': 1529285400000, 'TRENDFLAGS': None, 'STATUS': None, 'VALUE': 3.5, 'TRENDFLAGS_TAG': None, 'STATUS_TAG': None}, {'meter_name': 'AAA013_AAA013_dba_kwh', 'TIMESTAMP': 1529287200000, 'TRENDFLAGS': None, 'STATUS': None, 'VALUE': 3.375, 'TRENDFLAGS_TAG': None, 'STATUS_TAG': None}, {'meter_name': 'AAA013_AAA013_dba_kwh', 'TIMESTAMP': 1529289000000, 'TRENDFLAGS': None, 'STATUS': None, 'VALUE': 3.40625, 'TRENDFLAGS_TAG': None, 'STATUS_TAG': None}, {'m

#### Exploring Source 3
Source 3 is a CSV file of main meter data, recieved monthly from a data collector.

In [8]:
SOURCE_THREE_FILE = "SOURCE_3/2019_07.csv"

s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY,
                      aws_secret_access_key=SECRET_KEY)
obj = s3.get_object(Bucket=BUCKET_NAME, Key=SOURCE_THREE_FILE)
df = pd.read_csv(obj['Body'])


In [9]:
df.head()

Unnamed: 0.1,Unnamed: 0,Company Name,Date,00:30,01:00,01:30,02:00,02:30,03:00,03:30,...,19:30,20:00,20:30,21:00,21:30,22:00,22:30,23:00,23:30,00:00
0,0,AAA001,30/06/2019,15.7,14.7,14.1,13.45,13.32,12.9,11.77,...,11.67,12.61,11.83,12.26,14.84,15.47,14.75,12.95,13.18,13.57
1,1,AAA001,01/07/2019,12.12,11.82,11.92,11.91,12.7,14.01,13.13,...,24.91,26.53,27.5,25.08,22.57,14.58,14.16,13.11,12.3,12.65
2,2,AAA001,02/07/2019,12.26,11.23,11.69,11.53,11.81,11.36,12.67,...,24.4,26.32,25.14,25.35,22.8,14.44,13.53,12.99,12.35,12.75
3,3,AAA001,03/07/2019,11.76,11.68,11.41,11.07,13.6,13.37,11.01,...,25.03,27.4,27.36,27.95,25.07,15.37,14.75,13.54,13.71,14.48
4,4,AAA001,04/07/2019,13.24,12.16,12.3,11.44,12.31,13.8,12.98,...,25.11,27.52,26.94,27.35,23.94,14.84,14.34,13.73,13.67,13.21


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
Listed bellow are the steps necessary to pipeline the data into the final database.

For each of the source files the following process will need to be completed.

- Take CSV from unprocessed S3 SOURCE_X bucket.
- Transform CSV (using pandas) into the following structure: `SITE_ID`, `REGION_ID`, `METER_NAME`, `TIMESTAMP`, `CONSUMPTION`.
- Save CSV in S3 SOURCE_X_PROCESSED bucket.
- Use Redshift COPY to stage SOURCE_X_PROCESSED into S3.

The transformation process for each source will be as follows...

###### Source 1
- Create a delta value between half-hourly intervals.
- Transform `meter_name` to to `SITE_ID`, `REGION_ID` and `METER_NAME` columns.
- Only import valid meter names

###### Source 2
- Transform `meter_name` to to `SITE_ID`, `REGION_ID` and `METER_NAME` columns.
- Only import valid meter names

###### Source 3
- Transform the dataset into "tidy" data format, as it is currently in "long" data format.
- Transform `Company Name` to to `SITE_ID`, `REGION_ID` and `METER_NAME` columns.
- Only import valid meter names

### Step 4: Run ETL to Model the Data

In [2]:
def transform_to_formatted(transformation_method, prefix, src_format="csv"):
    """ Given specified transformation method (Source 1, Source 2, Source 3)
        this method will connect to S3, download the file, transform the file
        and upload back to the FORMATTED S3 folder"""
    
    s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)

    #Loop through each key found in bucket
    for key in get_matching_s3_keys(s3, BUCKET_NAME, prefix):

        obj = s3.get_object(Bucket=BUCKET_NAME, Key=key)
        
        print("reading file... {}".format(key))
        
        #Read into DF
        if src_format == "csv":
            df_raw = pd.read_csv(obj['Body'])
        
        if src_format == "json":
            json_content = obj["Body"].read().decode()
            json_obj = json.loads(json_content)
            df_raw = pd.DataFrame(json_obj["data"])
        
        
        #Transform DF
        print("transforming file... {}".format(key))
        df_transformed = transformation_method(df=df_raw)

        #Upload to "FORMATTED" folder for staging to redshift
        data_out_key = key
        data_out_key = data_out_key.replace(prefix,"FORMATTED_" + prefix)
        data_out_key = data_out_key.replace("json", "csv")
        
        print("loading file... {}".format(data_out_key))
        csv_buffer = StringIO()
        df_transformed.to_csv(csv_buffer, index=False)
        s3.put_object(Bucket=BUCKET_NAME, 
                      Key=data_out_key,
                      Body=csv_buffer.getvalue())

In [3]:
transform_to_formatted(tf.transform_s1_df, prefix="SOURCE_1", src_format="csv")

reading file... SOURCE_1/AAA069_AAA069_dba_kwh.csv
transforming file... SOURCE_1/AAA069_AAA069_dba_kwh.csv
loading file... FORMATTED_SOURCE_1/AAA069_AAA069_dba_kwh.csv


In [None]:
transform_to_formatted(tf.transform_s2_df, prefix="SOURCE_2", src_format="json")

In [3]:
transform_to_formatted(tf.transform_s3_df, prefix="SOURCE_3", src_format="csv")

reading file... SOURCE_3/2019_07.csv
transforming file... SOURCE_3/2019_07.csv
loading file... FORMATTED_SOURCE_3/2019_07.csv


### Collate Source Files

In [4]:
#Collate all files into a single "collated file", ready for upload.
def collate_source_files():
    
    s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)
    source_folders = ["FORMATTED_SOURCE_1", "FORMATTED_SOURCE_2", "FORMATTED_SOURCE_3"]
    
    #Loop through each of the source folders
    df_collated = pd.DataFrame(columns=['site_id', 'region', 'meter_name', 'date_time', 'consumption'])
    
    for prefix in source_folders:   
        for key in get_matching_s3_keys(s3, BUCKET_NAME, prefix):
            try:
                print("reading file... {}".format(key))
                obj = s3.get_object(Bucket=BUCKET_NAME, Key=key)
                df_raw = pd.read_csv(obj['Body'])
                df_raw = df_raw[['site_id','region','meter_name', 'date_time', 'consumption']]
                df_collated = df_collated.append(df_raw)
            except KeyError:
                print("Could not append file...{}".format(key))
    
    print("Uploading collated CSV")
    csv_buffer = StringIO()
    df_collated.to_csv(csv_buffer, index=False)
    s3.put_object(Bucket=BUCKET_NAME, 
                      Key="COLLATED/COLLATED.csv",
                      Body=csv_buffer.getvalue())
    print("Uploaded CSV")
    
    
collate_source_files()

reading file... FORMATTED_SOURCE_1/AAA069_AAA069_dba_kwh.csv
reading file... FORMATTED_SOURCE_2/AAA013_AAA013_dba_kwh.csv
reading file... FORMATTED_SOURCE_3/2019_07.csv
Uploading collated CSV
Uploaded CSV


### Upload to Redshift
Once each of the files have been transformed, then next job is to take the formatted files and stage them in redshift using the COPY command.

In [5]:
def stage_to_redshift():
    #Format BUCKET/ARN
    s3_copy = ("""
    copy staging_collated_energy 
    from {}
    credentials 'aws_iam_role={}'
    TIMEFORMAT 'auto'
    compupdate off region 'us-west-1'
    IGNOREHEADER 1
    CSV;
    """)
    
    print("Connecting to db...")
    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
    cur = conn.cursor()
    print("Running copy query...")
    query = s3_copy.format("'s3://dend-capstone-sample/COLLATED'",config['IAM']['ARN'])
    cur.execute(query)
    conn.commit()
    conn.close()
    print("Stage to redshift completed")
    

stage_to_redshift()
    

Connecting to db...
Running copy query...


InternalError_: Load into table 'staging_collated_energy' failed.  Check 'stl_load_errors' system table for details.


In [56]:
def create_energy_database():
    sql = """CREATE TABLE IF NOT EXISTS {} (              energy_id    INT     IDENTITY(0,1)  NOT NULL,
                                                          store_id     varchar(10)             NOT NULL,
                                                          region       varchar(10)             NOT NULL,
                                                          meter_name   varchar(20)            NOT NULL,
                                                          date_time    TIMESTAMP              NOT NULL,
                                                          consumption  numeric(18,0)          NOT NULL
                                                            )"""
    print (*config['CLUSTER'].values())
    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
    print("Connected to cluster")
    cur = conn.cursor()
    cur.execute(sql.format("staging_collated_energy"))
    cur.execute(sql.format("collated_energy"))
    conn.commit()
    print("Closing connection")
    conn.close()
    
create_energy_database()

redshift-cluster-1.c3pmmksg1a0f.us-west-1.redshift.amazonaws.com dev awsuser Energy1914 5439
Connected to cluster
Closing connection
