# Loading

In [1]:
%load_ext sql

In [2]:
from boto3 import client, resource
import configparser
import json
import pandas as pd
import time

# Setup

In [3]:
CONFIG_FILE = "dwh.cfg"
config = configparser.ConfigParser()
config.read(CONFIG_FILE)

REGION = config.get("COMMON", "REGION")

IAM_ROLE_NAME = config.get("IAM", "ROLE_NAME")
IAM_ROLE_ARN = config.get("IAM", "ROLE_NAME")

DWH_HOST = config.get("CLUSTER", "HOST")
DWH_DB = config.get("CLUSTER", "DB_NAME")
DWH_DB_USER = config.get("CLUSTER", "DB_USER")
DWH_DB_PASSWORD = config.get("CLUSTER", "DB_PASSWORD")
DWH_PORT = config.get("CLUSTER", "DB_PORT")

CLUSTER_TYPE = config.get("DWH", "TYPE")
CLUSTER_NUM_NODES = config.get("DWH", "NUM_NODES")
CLUSTER_NODE_TYPE = config.get("DWH", "NODE_TYPE")
CLUSTER_IDENTIFIER = config.get("DWH", "IDENTIFIER")

# AWS Setup

## Clients

In [4]:
s3_client = resource("s3", region_name=REGION)
iam_client = client("iam", region_name=REGION)
redshift_client = client("redshift", region_name=REGION)
ec2_client = resource("ec2", region_name=REGION)

## Pinging source bucket

In [5]:
sample_db = s3_client.Bucket("udacity-dend")

for obj in sample_db.objects.filter(Prefix="log_data"):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-01-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-02-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-03-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-04-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-05-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-06-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-07-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-08-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-09-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-10-events.json')
s3.ObjectSummary(b

## IAM Role

In [6]:
try:
    print("Creating a new IAM Role")
    dwh_role = iam_client.create_role(
        Path="/",
        RoleName=IAM_ROLE_NAME,
        Description="Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {
                "Statement": [
                    {
                        "Action": "sts:AssumeRole",
                        "Effect": "Allow",
                        "Principal": {"Service": "redshift.amazonaws.com"},
                    }
                ],
                "Version": "2012-10-17",
            }
        ),
    )
except Exception as e:
    print(e)


print("Attaching Policy")

iam_client.attach_role_policy(
    RoleName=IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
)["ResponseMetadata"]["HTTPStatusCode"]

print("Get the IAM role ARN")
role_arn = iam_client.get_role(RoleName=IAM_ROLE_NAME)["Role"]["Arn"]

print(role_arn)

Creating a new IAM Role
Attaching Policy
Get the IAM role ARN
arn:aws:iam::698233440053:role/sparkify-cluster-s3-role


## Cluster

In [7]:
try:
    print("Creating cluster")
    redshift_client.create_cluster(
        ClusterType=CLUSTER_TYPE,
        NodeType=CLUSTER_NODE_TYPE,
        NumberOfNodes=int(CLUSTER_NUM_NODES),
        DBName=DWH_DB,
        ClusterIdentifier=CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        IamRoles=[role_arn],
    )
except Exception as e:
    print(e)


def get_cluster_props():
    return redshift_client.describe_clusters(ClusterIdentifier=CLUSTER_IDENTIFIER)[
        "Clusters"
    ][0]


cluster_raw_props = get_cluster_props()

Creating cluster


## Waiting cluster up

In [8]:
while "Endpoint" not in cluster_raw_props:
    cluster_raw_props = get_cluster_props()
    time.sleep(5)

In [9]:
def pretty_redshift_props(props):
    keys_to_show = [
        "ClusterIdentifier",
        "NodeType",
        "ClusterStatus",
        "MasterUsername",
        "DBName",
        "NumberOfNodes",
        "VpcId",
    ]
    pretty_props = [(k, v) for k, v in props.items() if k in keys_to_show]

    endpoint = ("Endpoint", props["Endpoint"]["Address"])
    pretty_props.append(endpoint)

    return pd.DataFrame(data=pretty_props, columns=["Key", "Value"])


cluster_props = pretty_redshift_props(cluster_raw_props)

## Cluster network

In [10]:
try:
    print("Configuring cluster network")
    vpc_id = cluster_props.loc[cluster_props["Key"] == "VpcId"].Value.values[0]
    vpc = ec2_client.Vpc(id=vpc_id)

    sg = list(vpc.security_groups.all())[0]
    sg.authorize_ingress(
        GroupName=sg.group_name,
        CidrIp="0.0.0.0/0",
        IpProtocol="TCP",
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT),
    )
    print(sg)
except Exception as e:
    print(e)

Configuring cluster network
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


## Pinging cluster

In [11]:
conn_string = "postgresql://{}:{}@{}:{}/{}".format(
    DWH_DB_USER, DWH_DB_PASSWORD, DWH_HOST, DWH_PORT, DWH_DB
)
%sql $conn_string

In [12]:
%sql SELECT count(*) FROM staging_events;

 * postgresql://admin:***@sparkify-cluster.cbrfk5wat3qt.us-east-1.redshift.amazonaws.com:5439/sparkify
(psycopg2.errors.UndefinedTable) relation "staging_events" does not exist

[SQL: SELECT count(*) FROM staging_events;]
(Background on this error at: https://sqlalche.me/e/14/f405)


# ELT

## Creating tables

In [13]:
!python create_tables.py

# Loading staging

In [14]:
!python etl.py

# Counting

In [15]:
%sql SELECT count(*) from staging_events;

 * postgresql://admin:***@sparkify-cluster.cbrfk5wat3qt.us-east-1.redshift.amazonaws.com:5439/sparkify
1 rows affected.


count
8056


In [16]:
%sql SELECT count(*) from staging_songs;

 * postgresql://admin:***@sparkify-cluster.cbrfk5wat3qt.us-east-1.redshift.amazonaws.com:5439/sparkify
1 rows affected.


count
14896


In [17]:
%sql SELECT count(*) from song_plays;

 * postgresql://admin:***@sparkify-cluster.cbrfk5wat3qt.us-east-1.redshift.amazonaws.com:5439/sparkify
1 rows affected.


count
333


In [18]:
%sql SELECT count(*) from songs;

 * postgresql://admin:***@sparkify-cluster.cbrfk5wat3qt.us-east-1.redshift.amazonaws.com:5439/sparkify
1 rows affected.


count
14896


In [19]:
%sql SELECT count(*) from users;

 * postgresql://admin:***@sparkify-cluster.cbrfk5wat3qt.us-east-1.redshift.amazonaws.com:5439/sparkify
1 rows affected.


count
104


In [20]:
%sql SELECT count(*) from artists;

 * postgresql://admin:***@sparkify-cluster.cbrfk5wat3qt.us-east-1.redshift.amazonaws.com:5439/sparkify
1 rows affected.


count
10025


In [21]:
%sql SELECT count(*) from times;

 * postgresql://admin:***@sparkify-cluster.cbrfk5wat3qt.us-east-1.redshift.amazonaws.com:5439/sparkify
1 rows affected.


count
333


# Cleaning up

In [22]:
redshift_client.delete_cluster(
    ClusterIdentifier=CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True
)
iam_client.detach_role_policy(
    RoleName=IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
)
iam_client.delete_role(RoleName=IAM_ROLE_NAME)

{'ResponseMetadata': {'RequestId': '1306475f-2981-4232-b50e-85200df10488',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '1306475f-2981-4232-b50e-85200df10488',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Tue, 29 Aug 2023 17:37:08 GMT'},
  'RetryAttempts': 0}}