# Udacity DEND Project-3: AWS Set-up
### IaC set-up for Project-3 AWS Redshift

In [33]:
import pandas as pd
import boto3
import json
!pip install psycopg2-binary



# STEP 0: Make sure you have an AWS secret and access key

- Create a new IAM user in your AWS account
- Give it `AdministratorAccess`, From `Attach existing policies directly` Tab
- Take note of the access key and secret 
- Edit the file `dwh.cfg` in the same folder as this notebook and fill
<font color='red'>
<BR>
[AWS]<BR>
KEY= YOUR_AWS_KEY<BR>
SECRET= YOUR_AWS_SECRET<BR>
<font/>


# Load DWH Params from a file

In [34]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

# NOTE: Un-comment this to print the result.
pd.DataFrame({"Param":
                 ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
             "Value":
                 [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
            })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,redshift-cluster
4,DWH_DB,redshift-cluster
5,DWH_DB_USER,dev
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,redshift_role


## Create clients for EC2, S3, IAM, and Redshift

In [35]:
import boto3

ec2 = boto3.resource(   'ec2', 
                      region_name="us-west-2",
                      aws_access_key_id=KEY,
                      aws_secret_access_key=SECRET)

s3 = boto3.resource(  's3',
                    region_name="us-west-2",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET)

iam = boto3.client('iam',
                    region_name="us-west-2",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET)

redshift = boto3.client('redshift',
                        region_name="us-west-2",
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET)

## Check out the sample data sources on S3

In [36]:
LOG_DATA      = config.get("S3", "BUCKET")
logDataBucket = s3.Bucket(LOG_DATA)
count = 0

# Iterate over log_data bucket objects and print
for object in logDataBucket.objects.filter(Prefix='log_data'):
    count += 1
    print(object)
print("COUNT: " + str(count))
# => COUNT: 31

s3.ObjectSummary(bucket_name='temporary-sathwik', key='log_data/2018-11-01-events.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='log_data/2018-11-02-events.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='log_data/2018-11-03-events.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='log_data/2018-11-04-events.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='log_data/2018-11-05-events.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='log_data/2018-11-06-events.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='log_data/2018-11-07-events.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='log_data/2018-11-08-events.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='log_data/2018-11-09-events.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='log_data/2018-11-10-events.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='log_data/2018-11-11-events.json')
s3.ObjectSummary(buck

In [37]:
size = sum(1 for _ in logDataBucket.objects.filter(Prefix='log_data'))
print(size)

30


In [38]:
SONG_DATA      = config.get("S3", "BUCKET")
songDataBucket = s3.Bucket(SONG_DATA)
count = 0

# Iterate over song_data bucket objects and print
for object in songDataBucket.objects.filter(Prefix='song_data'):
    count += 1
    print(object)
print("COUNT: " + str(count))

s3.ObjectSummary(bucket_name='temporary-sathwik', key='song_data/A/A/A/TRAAAAW128F429D538.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='song_data/A/A/A/TRAAABD128F429CF47.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='song_data/A/A/A/TRAAADZ128F9348C2E.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='song_data/A/A/A/TRAAAEF128F4273421.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='song_data/A/A/A/TRAAAFD128F92F423A.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='song_data/A/A/A/TRAAAMO128F1481E7F.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='song_data/A/A/A/TRAAAMQ128F1460CD3.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='song_data/A/A/A/TRAAAPK128E0786D96.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='song_data/A/A/A/TRAAARJ128F9320760.json')
s3.ObjectSummary(bucket_name='temporary-sathwik', key='song_data/A/A/A/TRAAAVG12903CFA543.json')
s3.ObjectSummary(bucket_name='

In [39]:
SONG_DATA      = config.get("S3", "BUCKET")
songDataBucket = s3.Bucket(SONG_DATA)
count = 0

# Iterate over song_data bucket objects and print
for object in songDataBucket.objects.filter(Prefix='songs_json'):
    count += 1
    print(object)
print("COUNT: " + str(count))

s3.ObjectSummary(bucket_name='temporary-sathwik', key='songs_json_paths.json')
COUNT: 1


In [40]:
LOG_DATA      = config.get("S3", "BUCKET")
LOCAL_PATH    = config.get("S3", "LOCAL_PATH")
SONGS_JSONPATH = config.get("S3", "SONGS_JSONPATH")
print(LOG_DATA)
print(SONGS_JSONPATH)

temporary-sathwik
's3://temporary-sathwik/songs_json_path.json'


## STEP 1: IAM ROLE
- Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [41]:
# Create the IAM role (if not exists)

try:
    print('1.1 Creating a new IAM Role')
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description="Allow Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
                'Effect': 'Allow',
                'Principal': {'Service': 'redshift.amazonaws.com'}}],
            'Version': '2012-10-17'})
    )
    

except Exception as e:
    print(e)


1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name redshift_role already exists.


In [43]:
# Attach Policy

print('1.2 Attaching Policy')
iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                      PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']


1.2 Attaching Policy


200

In [44]:
# Get and print the IAM role ARN
print('1.3 Get the IAM role ARN')
iam_role = iam.get_role(
                        RoleName=DWH_IAM_ROLE_NAME
                        )
roleArn = iam_role['Role']['Arn']
# NOTE: Un-comment this to print the result.
print(roleArn)

1.3 Get the IAM role ARN
arn:aws:iam::354280233834:role/redshift_role


## STEP 2:  Redshift Cluster

- Create a RedShift Cluster
- For complete arguments to `create_cluster`, see [docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster)

In [18]:
try:
    response = redshift.create_cluster( 
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        IamRoles=[roleArn]
    )
except Exception as e:
    print(e)

## 2.1 *Describe* the cluster to see its status
- run this block several times until the cluster status becomes `Available`

In [16]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', None)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
# NOTE: Un-comment this to print the result.
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,redshift-cluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dev
4,DBName,redshift-cluster
5,Endpoint,"{'Address': 'redshift-cluster.c9irqexzniz5.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-056f292b4663a579e
7,NumberOfNodes,4


<h2> 2.2 Take note of the cluster <font color='red'> endpoint and role ARN </font> </h2>

<font color='red'>DO NOT RUN THIS unless the cluster status becomes "Available" </font>

In [17]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']

print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  redshift-cluster.c9irqexzniz5.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::354280233834:role/redshift_role


## STEP 3: Open an incoming  TCP port to access the cluster endpoint

In [30]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName= defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP', 
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-0852bdada106e19f2')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


## STEP 4: Make sure you can connect to the cluster

In [23]:
%load_ext sql
!pip install --upgrade psycopg2-binary
import sys
print(sys.executable)

The sql extension is already loaded. To reload it, use:
  %reload_ext sql
/Users/sathwikmaddi/opt/anaconda3/bin/python


In [45]:
import psycopg2

conn_string = "dbname='{}' port='{}' user='{}' password='{}' host='{}'".format(DWH_DB, DWH_PORT, DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT)

conn = psycopg2.connect(conn_string)

print(conn)

<connection object at 0x7f93680b22e0; dsn: 'user=dev password=xxx dbname=redshift-cluster host=redshift-cluster.c9irqexzniz5.us-west-2.redshift.amazonaws.com port=5439', closed: 0>


## STEP 5: Test COPIED and INSERTED data

### 5.1: Query staging tables

In [46]:
#%load_ext sql

In [47]:
# Number of items in staging_events table
%%time
%%sql
SELECT COUNT(*)
FROM staging_events;

SyntaxError: invalid syntax (1260358806.py, line 4)

In [None]:
# Number of items in staging_songs table
%%time
%%sql
SELECT COUNT(*)
FROM staging_songs;

### 5.2 Query Analysis tables

In [32]:
# Number of items in users table
%%time
%%sql
SELECT COUNT(*)
FROM users;

SyntaxError: invalid syntax (2724355246.py, line 4)

In [None]:
# Number of items in songs table
%%time
%%sql
SELECT COUNT(*)
FROM songs;

In [None]:
# Number of items in artists table
%%time
%%sql
SELECT COUNT(*)
FROM artists;

In [None]:
# Number of items in time table
%%time
%%sql
SELECT COUNT(*)
FROM time;

In [None]:
# Number of items in songplay table
%%time
%%sql
SELECT COUNT(*)
FROM songplays;

In [29]:
# Query to answer a question: Who played which song and when.
%%time
%%sql
SELECT  sp.songplay_id,
        u.user_id,
        u.last_name,
        u.first_name,
        sp.start_time,
        a.name,
        s.title
FROM songplays AS sp
        JOIN users   AS u ON (u.user_id = sp.user_id)
        JOIN songs   AS s ON (s.song_id = sp.song_id)
        JOIN artists AS a ON (a.artist_id = sp.artist_id)
        JOIN time    AS t ON (t.start_time = sp.start_time)
ORDER BY (u.last_name)
LIMIT 100;

## STEP 6: Clean up your resources

<b><font color='red'>DO NOT RUN THIS UNLESS YOU ARE SURE <br/> 
    We will be using these resources in the next exercises</span></b>

In [69]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
#redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

- run this block several times until the cluster really deleted

In [27]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
# NOTE: Un-comment this to print the result.
#prettyRedshiftProps(myClusterProps)

In [28]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
#iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
#iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!