# Create Data Warehouse on AWS using the AWS python SDK 
## Notebook for development purposes

In [None]:
# import libraries
import pandas as pd
import boto3
import json
import configparser
from botocore.exceptions import ClientError
import pandas as pd

# STEP 0:  AWS secret and access key

- Create a new IAM user in your AWS account
- Give it `AdministratorAccess`, From `Attach existing policies directly` Tab
- Take note of the access key and secret 
- Edit the file `dwh.cfg` in the same folder as this notebook and fill

# Load DWH Params from a file

In [None]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY');
SECRET                 = config.get('AWS','SECRET');

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE");
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES");
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE");

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER");
DWH_DB                 = config.get("DWH","DWH_DB");
DWH_DB_USER            = config.get("DWH","DWH_DB_USER");
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD");
DWH_PORT               = config.get("DWH","DWH_PORT");

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME");

## Create clients for EC2, S3, IAM, and Redshift

In [None]:
ec2 = boto3.resource('ec2',
                    region_name='us-west-2',
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET
                   )

s3 = boto3.resource('s3',
                  region_name='us-west-2',
                  aws_access_key_id=KEY,
                  aws_secret_access_key=SECRET
                  ) 

iam = boto3.client('iam',
                   region_name='us-west-2',
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET
                  )

redshift = boto3.client('redshift',
                       region_name='us-west-2',
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

## STEP 1: IAM ROLE
- Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [None]:
#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
       
print("1.2 Attaching Policy")
iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
print(roleArn)

## STEP 2:  Redshift Cluster

- Create a RedShift Cluster
- For complete arguments to `create_cluster`, see [docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster)

In [None]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

## 2.1 Describe the cluster to see its status
- run this block several times until the cluster status becomes `Available`

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', None)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])
    

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

<h2> 2.2 Get the cluster endpoint and role ARN

<font color='red'>DO NOT RUN THIS unless the cluster status becomes "Available" </font>

In [None]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print('DWH_ENDPOINT and DWH_ROLE_ARN successfully retrieved')


In [None]:
config.set('DWH', 'DWH_ENDPOINT', DWH_ENDPOINT)
config.set('DWH', 'DWH_ROLE_ARN', DWH_ROLE_ARN)
with open('dwh_copy.cfg', 'w') as configfile:    #    
    config.write(configfile)
    config.close()

## STEP 3: Open an incoming  TCP port to access the cluster ednpoint

In [None]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

## STEP 4: Connect to the clusterConnect to the cluster

In [None]:
%load_ext sql

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
# print(conn_string)
%sql $conn_string
print("Connected")

## STEP 4: Create Tables

### Step 4.1 Create tables to load S3 data into

In [None]:
%%sql

CREATE SCHEMA IF NOT EXISTS leo;
SET search_path TO leo;

DROP TABLE IF EXISTS staging_events;
DROP TABLE IF EXISTS staging_songs;

CREATE TABLE staging_events(
    artist              VARCHAR,
    auth                VARCHAR,
    firstName           VARCHAR,
    gender              VARCHAR,
    itemInSession       INTEGER,
    lastName            VARCHAR,
    length              FLOAT,
    level               VARCHAR,
    location            VARCHAR,
    method              VARCHAR,
    page                VARCHAR,
    registration        FLOAT,
    sessionId           INTEGER,
    song                VARCHAR,
    status              INTEGER,
    ts                  TIMESTAMP,
    userAgent           VARCHAR,
    userId              INTEGER 
);

CREATE TABLE staging_songs(
    num_songs           INTEGER,
    artist_id           VARCHAR,
    artist_latitude     FLOAT,
    artist_longitude    FLOAT,
    artist_location     VARCHAR,
    artist_name         VARCHAR,
    song_id             VARCHAR,
    title               VARCHAR,
    duration            FLOAT,
    year                INTEGER
);

### Step 4.2: Fill tables with data from JSON files in S3 buckets

In [None]:
staging_events_copy = ("""
    copy staging_events from {data_bucket}
    credentials 'aws_iam_role={role_arn}'
    region 'us-west-2' format as JSON {log_json_path}
    timeformat as 'epochmillisecs';
""").format(data_bucket=config['S3']['LOG_DATA'], role_arn=DWH_ROLE_ARN, log_json_path=config['S3']['LOG_JSONPATH'])

staging_songs_copy = ("""
    copy staging_songs from {data_bucket}
    credentials 'aws_iam_role={role_arn}'
    region 'us-west-2' format as JSON 'auto';
""").format(data_bucket=config['S3']['SONG_DATA'], role_arn=DWH_ROLE_ARN)

In [None]:
%sql $staging_events_copy

In [None]:
%sql $staging_songs_copy

#### Check how many rows these tables have

In [None]:
%sql SELECT COUNT(*) FROM staging_events;

In [None]:
%sql SELECT COUNT(*) FROM staging_songs;


### Stage 4.3: Create star schema 

In [None]:
%%sql 

DROP TABLE IF EXISTS songplays;
DROP TABLE IF EXISTS users;
DROP TABLE IF EXISTS songs;
DROP TABLE IF EXISTS artists;
DROP TABLE IF EXISTS time;

CREATE TABLE IF NOT EXISTS songplays(
    songplay_id         INTEGER     IDENTITY(0,1)   PRIMARY KEY, 
    start_time          TIMESTAMP   NOT NULL        SORTKEY DISTKEY,
    user_id             INTEGER     NOT NULL, 
    level               VARCHAR,
    song_id             VARCHAR     NOT NULL, 
    artist_id           VARCHAR     NOT NULL,  
    session_id          INTEGER, 
    location            VARCHAR, 
    user_agent          VARCHAR
    );

CREATE TABLE IF NOT EXISTS users(
    user_id             INTEGER     NOT NULL    SORTKEY PRIMARY KEY, 
    first_name          VARCHAR     NOT NULL, 
    last_name           VARCHAR     NOT NULL,
    gender              VARCHAR     NOT NULL, 
    level               VARCHAR     NOT NULL
    )diststyle all;

CREATE TABLE IF NOT EXISTS songs(
    song_id             VARCHAR     NOT NULL    SORTKEY PRIMARY KEY,
    title               VARCHAR     NOT NULL, 
    artist_id           VARCHAR     NOT NULL,  
    year                INTEGER     NOT NULL,  
    duration            FLOAT
    );

CREATE TABLE IF NOT EXISTS artists(
    artist_id           VARCHAR     NOT NULL    SORTKEY PRIMARY KEY,
    name                VARCHAR     NOT NULL,
    location            VARCHAR,
    latitude            FLOAT,
    longitude           FLOAT
    );

CREATE TABLE IF NOT EXISTS time(
    start_time          TIMESTAMP   NOT NULL    DISTKEY SORTKEY PRIMARY KEY,
    hour                INTEGER     NOT NULL, 
    day                 INTEGER     NOT NULL, 
    week                INTEGER     NOT NULL, 
    month               INTEGER     NOT NULL, 
    year                INTEGER     NOT NULL, 
    weekday             VARCHAR     NOT NULL
    );

### Stage 4.4: Fill the facts and dimension tables

In [None]:
%%sql 

INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
SELECT  DISTINCT(e.ts)  AS start_time, 
        e.userId        AS user_id, 
        e.level         AS level, 
        s.song_id       AS song_id, 
        s.artist_id     AS artist_id, 
        e.sessionId     AS session_id, 
        e.location      AS location, 
        e.userAgent     AS user_agent
FROM staging_events e
JOIN staging_songs  s   
ON (e.song = s.title AND e.artist = s.artist_name)
WHERE s.song_id IS NOT NULL;

In [None]:
%%sql 

INSERT INTO users (user_id, first_name, last_name, gender, level)
SELECT  DISTINCT(userId)    AS user_id,
        firstName           AS first_name,
        lastName            AS last_name,
        gender,
        level
FROM staging_events
WHERE user_id IS NOT NULL;

In [None]:
%%sql 

INSERT INTO songs (song_id, title, artist_id, year, duration)
SELECT  DISTINCT(song_id) AS song_id,
        title,
        artist_id,
        year,
        duration
FROM staging_songs
WHERE song_id IS NOT NULL;

In [None]:
%%sql 

INSERT INTO artists (artist_id, name, location, latitude, longitude)
SELECT  DISTINCT(artist_id) AS artist_id,
        artist_name         AS name,
        artist_location     AS location,
        artist_latitude     AS latitude,
        artist_longitude    AS longitude
FROM staging_songs
WHERE artist_id IS NOT NULL;

In [None]:
%%sql 

INSERT INTO time (start_time, hour, day, week, month, year, weekday)
SELECT  DISTINCT(start_time)                AS start_time,
        EXTRACT(hour FROM start_time)       AS hour,
        EXTRACT(day FROM start_time)        AS day,
        EXTRACT(week FROM start_time)       AS week,
        EXTRACT(month FROM start_time)      AS month,
        EXTRACT(year FROM start_time)       AS year,
        EXTRACT(dayofweek FROM start_time)  as weekday
FROM songplays;

## STEP 5: Clean up your resources

<b><font color='red'>DO NOT RUN THIS UNLESS YOU ARE SURE <br/> 
    We will be using these resources in the next exercises</span></b>

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

- run this block several times until the cluster really deleted

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!