# Project: Data Warehouse (S3 to AWS Redshift ETL)

In [21]:
import pandas as pd
import boto3
import json
import psycopg2


# STEP 1: Provision computing resources with Infrasctructure as Code paradigm.

## Load Datawarehouse Params from a file
Loads datawarehouse credentials that are required to provision the computing resource in AWS Redshift datawarehouse. Also, the credentials allows the project to interact with the databases in the Redshift clusters.

        

In [22]:
# open the configuration file so that the credntials values
# can be extracted to initialize the corresponding variables
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

# Access Keys
KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

# Cluster Details
DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

# Cluster Database Details
HOST                   = config.get("CLUSTER", "HOST")
DB_NAME                = config.get("CLUSTER","DB_NAME")
DB_USER                = config.get("CLUSTER","DB_USER")
DB_PASSWORD            = config.get("CLUSTER","DB_PASSWORD")
DB_PORT                = config.get("CLUSTER","DB_PORT")


## Create clients for IAM, EC2, S3 and Redshift
**Note** that these resources are created in the the **us-west-2** region. 

In [23]:
import boto3

ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET,
                       region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

## Create IAM ROLE
- Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [83]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::213424942515:role/dwhRole


## Create Redshift Cluster

Create redshift cluster in **us-west-2** region

In [25]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DB_NAME,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DB_USER,
        MasterUserPassword=DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


### *Describe* the cluster to see its status
- This block od code should be run several times until the cluster status becomes `Available`

In [86]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the DescribeClusters operation: Cluster dwhcluster not found.

### Take note of the cluster **endpoint** and **role ARN**

<font color='red'>Ensure that the cluster status becomes "Available" before running this code. </font>


In [84]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)

print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.cotzwka5s709.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::213424942515:role/dwhRole


### Open an incoming  TCP port to access the cluster ednpoint

In [29]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DB_PORT),
        ToPort=int(DB_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-000bcd5a66f7fb94f')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


### Validate connection to the cluster

In [30]:
# Connect to the redshift
try: 
    connect = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(DWH_ENDPOINT, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT))
except psycopg2.Error as e: 
    print("Error: Could not make connection to the cluster database")
    print(e)
try: 
    cursor = connect.cursor()
except psycopg2.Error as e: 
    print("Error: Could not get cursor to the Clsuter Database")
    print(e)
connect.set_session(autocommit=True)

## Create Tables Based on Star Schema approach.

# STEP 2: Create staging tables and then load dataset from S3 to the tables in redshift

## 2.1: Review the structure of the datasets from S3
Since the staging tables are copies of the data sources, the structure of the data sources are reviewed before creating the staging tables.

There are three datasets in S3 (Song, log, and log_json_path datasets). The log_json_path file contains the meta information that is required by AWS to correctly load lod datasets. The structure of the datasets are shown below


### Song Dataset


<img src="images/song_data.png" width="400" height="400">

### Log Data

<img src="images/log_data.png" width="100%">

### log_json_path Dataset

<img src="images/log_json_path.png" width="40%">

## 2.2: Create staging song events table
The staging tables are copies of the datasets from S3. Hence, the structure of the staging table aligns with the datasets

### Reset staging tables to facilitate ETL pipeline test
It is recommended to run these block of code after creating tables

In [67]:
# Reset staging song events table
staging_events_table_drop = "DROP TABLE staging_events_table"

try:
    cursor.execute(staging_events_table_drop)
except psycopg2.Error as e: 
    print("Error: Issue dropping table")
    print (e)

In [68]:
# Reset staging song data table
staging_songs_table_drop = "DROP TABLE staging_songs_table"

try:
    cursor.execute(staging_songs_table_drop)
except psycopg2.Error as e: 
    print("Error: Issue dropping table")
    print (e)


### Create Dataset Staging Table

In [33]:
# Create staging song events table
staging_events_table_create = ("""
    CREATE TABLE IF NOT EXISTS staging_events_table 
        (
            artist text, 
            auth text,
            firstName text,
            gender text,
            itemInSession int4,
            lastName text,
            length float,
            level text,
            location text,
            method text,
            page text,
            registration float,
            sessionId int4,
            song text,
            status int4,
            ts bigint,
            userAgent text,
            userId int
        )
""")

try:
    cursor.execute(staging_events_table_create)
except psycopg2.Error as e: 
    print("Error: Issue creating table")
    print (e)

In [34]:
# Create staging song dataset
staging_songs_table_create = ("""
    CREATE TABLE IF NOT EXISTS staging_songs_table
    (
      num_songs int4,
      artist_id text,
      artist_latitude float,
      artist_longitude float,
      artist_location text,
      artist_name text,
      song_id text,
      title text,
      duration float,
      year int

    )
""")

try:
    cursor.execute(staging_songs_table_create)
except psycopg2.Error as e: 
    print("Error: Issue creating table")
    print (e)

## Copy Datasets from S3 to Staging Staging Tables

In [35]:
#copy staging event datasets
staging_events_copy = ("""
    COPY staging_events_table FROM 's3://udacity-dend/log_data'
    CREDENTIALS 'aws_iam_role={}'
    REGION 'us-west-2'
    json 's3://udacity-dend/log_json_path.json'
    dateformat 'auto';
""").format(DWH_ROLE_ARN)

try:
    cursor.execute(staging_events_copy)
except Exception as e:
    print("Error: Issues copying data from S3")
    print(e)


    COPY staging_events_table FROM 's3://udacity-dend/log_data'
    CREDENTIALS 'aws_iam_role=arn:aws:iam::213424942515:role/dwhRole'
    REGION 'us-west-2'
    json 's3://udacity-dend/log_json_path.json'
    dateformat 'auto';



Query staging song events table to ensure that datasets were copied successfully

In [38]:
query = ("""
    SELECT COUNT(*) AS rows FROM staging_events_table
""")

try:
    cursor.execute(query)
except Exception as e:
    print(e)
    
row = cursor.fetchone()
while row:
   print(row)
   row = cursor.fetchone()

(8056,)


In [39]:
#copy staging song data datasets
staging_songs_copy = ("""
    COPY staging_songs_table FROM 's3://udacity-dend/song_data'
    CREDENTIALS 'aws_iam_role={}'
    REGION 'us-west-2'
    json 'auto';
""").format(DWH_ROLE_ARN)

print(staging_songs_copy)




    COPY staging_songs_table FROM 's3://udacity-dend/song_data'
    CREDENTIALS 'aws_iam_role=arn:aws:iam::213424942515:role/dwhRole'
    REGION 'us-west-2'
    json 'auto';



In [40]:
try:
    cursor.execute(staging_songs_copy)
except Exception as e:
    print("Error: Issues copying data from S3")
    print(e)
 

In [108]:
 query = ("""
    SELECT COUNT(*) AS count FROM staging_songs_table
""")

try:
    cursor.execute(query)
except Exception as e:
    print(e)
    
row = cursor.fetchone()
while row:
   print(row)
   row = cursor.fetchone()

(14896,)


# STEP 3: Create Fact and Dimensional Tables That are Optimized for Data Analysis

To analyse these datasets in redshift, I create dimensional tables from the staging tables based on a **star Schema** approach, as shown below

<img src="images/star_schema.png" width="90%">

## 6.1: Create Tables - Star Schema


### Create Dimensional Tables: users, songs, artist, and time tables
These tables targets meta information about the domain concepts of the datasets


#### Create User Table

In [76]:
# Reset users table to facilitate testing the ETL pipeline
user_table_drop = "DROP TABLE user_table"

try:
    cursor.execute(user_table_drop)
except Exception as e:
    print("Error: Issues deleting a table")
    print(e)


Error: Issues deleting a table
Table "user_table" does not exist



In [44]:
# Create User Table
user_table_create = ("""
    CREATE TABLE IF NOT EXISTS user_table
    (
        user_id INT4 IDENTITY(0, 1) NOT NULL PRIMARY KEY,
        first_name TEXT,
        last_name TEXT,
        gender TEXT,
        level TEXT
    );
""")


try:
    cursor.execute(user_table_create)
except Exception as e:
    print("Error: Issues Creating a table")
    print(e)

#### Create Songs Table

In [77]:
# Reset songs table to facilitate testing the ETL pipeline
song_table_drop = "DROP TABLE song_table"

try:
    cursor.execute(song_table_drop)
except Exception as e:
    print("Error: Issues deleting a table")
    print(e)


Error: Issues deleting a table
Table "song_table" does not exist



In [46]:
# Create song table
song_table_create = ("""
    CREATE TABLE IF NOT EXISTS song_table
    (
        song_id INT4 IDENTITY(0, 1) NOT NULL PRIMARY KEY,
        title TEXT,
        artist_id TEXT,
        year INT4,
        duration float
    );
""")


try:
    cursor.execute(song_table_create)
except Exception as e:
    print("Error: Issues Creating a table")
    print(e)

#### Create Artists Table

In [78]:
# Reset songs table to facilitate testing the ETL pipeline
artist_table_drop = "DROP TABLE artist_table"

try:
    cursor.execute(artist_table_drop)
except Exception as e:
    print("Error: Issues deleting a table")
    print(e)



Error: Issues deleting a table
Table "artist_table" does not exist



In [48]:
# Create artist table
artist_table_create = ("""
    CREATE TABLE IF NOT EXISTS artist_table
    (
        artist_id INT4 IDENTITY(0, 1) NOT NULL PRIMARY KEY,
        name TEXT,
        location TEXT,
        latitude float,
        longitude float
    );
""")


try:
    cursor.execute(artist_table_create)
except Exception as e:
    print("Error: Issues Creating a table")
    print(e)

#### Create Time Table

In [79]:
# Reset songs table to facilitate testing the ETL pipeline
time_table_drop = "DROP TABLE time_table CASCADE"

try:
    cursor.execute(time_table_drop)
except Exception as e:
    print("Error: Issues deleting a table")
    print(e)



Error: Issues deleting a table
Table "time_table" does not exist



In [50]:
# Create time table
time_table_create = ("""
    CREATE TABLE IF NOT EXISTS time_table
    (
        time_id INT4 IDENTITY(0, 1) NOT NULL PRIMARY KEY,
        start_time TIMESTAMP,
        hour INT4,
        day INT4,
        week INT4,
        month INT4,
        year INT4,
        weekday TEXT
    );
""")


try:
    cursor.execute(time_table_create)
except Exception as e:
    print("Error: Issues Creating a table")
    print(e)

### Create Fact Table - Song Play Table
This table targets the log details of played songs, i.e., songs with page = `NextPage` 

In [80]:
# Reset datable to facilate testing the ETL pipeline
songplay_table_drop = "DROP TABLE songplay_table"

try:
    cursor.execute(songplay_table_drop)
except Exception as e:
    print("Error: Issues deleting a table")
    print(e)

Error: Issues deleting a table
Table "songplay_table" does not exist



In [52]:
# Create song play query
songplay_table_create = ("""
    CREATE TABLE IF NOT EXISTS songplay_table
    (
        songplay_id INT4 IDENTITY(0, 1) NOT NULL PRIMARY KEY,
        start_time date REFERENCES time_table,
        user_id INT4 REFERENCES user_table,
        level TEXT,
        song_id TEXT REFERENCES song_table,
        artist_id TEXT REFERENCES artist_table,
        sessionId int4,
        location TEXT,
        user_agent TEXT
    );
""")

# Create song play table
try:
    cursor.execute(songplay_table_create)
except Exception as e:
    print("Error: Issues Creating a table")
    print(e)

### Insert Datasets into Song Play Table

In [53]:
songplay_table_insert = ("""
    INSERT INTO songplay_table (start_time, user_id, level, song_id, artist_id, sessionId, location, user_agent)
    SELECT
        to_timestamp(ts, 'YYYYMMDD HHMISS') AS start_time,
        userId AS user_id,
        level,
        song_id,
        artist_id,
        sessionId,
        location,
        userAgent AS user_agent
    FROM staging_events_table e
    JOIN staging_songs_table s
    ON (e.artist = s.artist_name)
    AND (e.length = s.duration)
    AND (e.song = s.title)
    AND page = 'NextSong'
    ;
""")

try:
    cursor.execute(songplay_table_insert)
except Exception as e:
    print("Error: Issues Inserting data to a table")
    print(e)


## Verify the insertion of the datasets into song play table

In [54]:

query = ("""
    SELECT COUNT(*) AS rows FROM songplay_table
""")

try:
    cursor.execute(query)
except Exception as e:
    print(e)
    
n_row = cursor.fetchone()
print(n_row)

(319,)


## Insert Datasets to User Table

In [55]:
user_table_insert = ("""
    INSERT INTO user_table (first_name, last_name, gender, level)
    SELECT DISTINCT
        firstName AS first_name,
        lastName AS last_name,
        gender,
        level
    FROM staging_events_table;
""")

try:
    cursor.execute(user_table_insert)
except Exception as e:
    print("Error: Issues Inserting data to a table")
    print(e)


## Verify the insertion of the datasets into user table

In [56]:

query = ("""
    SELECT COUNT(*) AS rows FROM user_table
""")

try:
    cursor.execute(query)
except Exception as e:
    print(e)
    
row = cursor.fetchone()
while row:
   print(row)
   row = cursor.fetchone()

(106,)


## Insert Datasets into Songs Table

In [57]:
song_table_insert = ("""
    INSERT INTO song_table (title, artist_id, year, duration)
    SELECT DISTINCT
        title,
        artist_id,
        year,
        duration
    FROM staging_songs_table;
""")

try:
    cursor.execute(song_table_insert)
except Exception as e:
    print("Error: Issues Inserting data to a table")
    print(e)

## Verify the insertion of the datasets into song table

In [58]:
query = ("""
    SELECT COUNT(*) AS rows FROM song_table
""")

try:
    cursor.execute(query)
except Exception as e:
    print(e)
    
row = cursor.fetchone()
while row:
   print(row)
   row = cursor.fetchone()

(14896,)


## Insert Datasets into Artist Table

In [60]:
artist_table_insert = ("""
    INSERT INTO artist_table (name, location, latitude, longitude)
    SELECT DISTINCT
        artist_name AS name,
        artist_location AS location,
        artist_latitude AS latitude,
        artist_longitude AS longitude
    FROM staging_songs_table;
""")

try:
    cursor.execute(artist_table_insert)
except Exception as e:
    print("Error: Issues Inserting data to a table")
    print(e)

## Verify the insertion of the datasets into song table

In [61]:
query = ("""
    SELECT COUNT(*) AS rows FROM artist_table
""")

try:
    cursor.execute(query)
except Exception as e:
    print(e)
    
nRows = cursor.fetchone()
print(nRows)

(10001,)


## Insert Datasets into Time Table

In [62]:
time_table_insert = ("""
    INSERT INTO time_table (start_time, hour, day, week, month, year, weekday)
    SELECT DISTINCT
        start_time,
        EXTRACT(hour FROM start_time) AS hour,
        EXTRACT(day FROM start_time) AS day,
        EXTRACT(week FROM start_time) AS week,
        EXTRACT(month FROM start_time) AS month,
        EXTRACT(year FROM start_time) AS year,
        EXTRACT(weekday FROM start_time) AS weekday
    FROM songplay_table;
""")

try:
    cursor.execute(time_table_insert)
except Exception as e:
    print("Error: Issues Inserting data to a table")
    print(e)

## Verify the insertion of the datasets into time table

In [66]:
query = ("""
    SELECT COUNT(*) FROM time_table
""")

try:
    cursor.execute(query)
except Exception as e:
    print(e)
    
count = cursor.fetchone()
print(row)


(319,)


In [187]:
connect.close()
cursor.close()

# STEP 5: Clean up your resources

<b><font color='red'>DO NOT RUN THIS UNLESS YOU ARE SURE <br/> 
    We will be using these resources in the next exercises</span></b>

In [70]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'dwh_user',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.cotzwka5s709.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2023, 2, 10, 3, 33, 47, 28000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-000bcd5a66f7fb94f',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-0da6892cf67a5882d',
  'AvailabilityZone': 'us-west-2c',
  'PreferredMaintenanceWindow': 'wed:06:30-wed:07:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'P

- run this block several times until the cluster really deleted

In [81]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the DescribeClusters operation: Cluster dwhcluster not found.

In [72]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!

{'ResponseMetadata': {'RequestId': '1c94f719-386d-4674-bc27-e2513f219979',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '1c94f719-386d-4674-bc27-e2513f219979',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Fri, 10 Feb 2023 05:29:02 GMT'},
  'RetryAttempts': 0}}