In [1]:
import configparser
import psycopg2
from sql_queries import copy_table_queries, insert_table_queries
import pandas as pd
import boto3
import json


In [None]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
KEY = config.get('AWS', 'KEY')
SECRET = config.get('AWS', 'SECRET') 
ROLE_NAME=config.get('IAM_ROLE','ROLE_NAME')
CLUSTER_TYPE=config.get('CLUSTER','CLUSTER_TYPE')
NUM_NODES=config.get('CLUSTER','NUM_NODES')
NODE_TYPE=config.get('CLUSTER','NODE_TYPE')

DB_NAME=config.get('CLUSTER','DB_NAME')
DB_USER=config.get('CLUSTER','DB_USER')
DB_PASSWORD=config.get('CLUSTER','DB_PASSWORD')
DB_PORT=config.get('CLUSTER','DB_PORT')
CLUSTER_IDENTIFIER=config.get('CLUSTER','CLUSTER_IDENTIFIER')


pd.DataFrame({
    "Param": ['KEY','SECRET', 'ROLE_NAME','CLUSTER_TYPE','NUM_NODES','NODE_TYPE', 'DB_NAME', 'DB_USER', 'DB_PASSWORD', 'DB_PORT','CLUSER_IDENTIFIER'],
    "Value": [KEY, SECRET, ROLE_NAME, CLUSTER_TYPE, NUM_NODES, NODE_TYPE, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT, CLUSTER_IDENTIFIER]
})

#### Create clients for IAM, EC2, S3 and Redshift

In [3]:
ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

#### Check out the data sources on S3

In [4]:
dbBucket = s3.Bucket("udacity-dend")
for obj in dbBucket.objects.filter(Prefix="song_data").limit(10):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAABD128F429CF47.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAACN128F9355673.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEA128F935A30D.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAED128E0783FAB.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEM128F93347B9.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEW128F42930C0.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAFD128F92F423A.json')


In [5]:
dbBucket = s3.Bucket("udacity-dend")
for obj in dbBucket.objects.filter(Prefix="log_data").limit(10):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-01-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-02-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-03-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-04-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-05-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-06-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-07-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-08-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-09-events.json')


In [6]:
dbBucket = s3.Bucket("udacity-dend")
for obj in dbBucket.objects.filter(Prefix="log_json_path").limit(10):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_json_path.json')


#### Create an IAM role for accessing S3 bucket from Redshift

In [None]:
try:
    trust_policy ={
        'Statement' : [{
            "Action": "sts:AssumeRole",
            "Effect": "Allow",
            "Principal": {'Service': 'redshift.amazonaws.com'}
        }],
        'Version': '2012-10-17'
    }

    nedRole = iam.create_role(
        Path='/',
        RoleName=ROLE_NAME,
        Description="Allows Redshift clusters to call AWS services.",
        AssumeRolePolicyDocument=json.dumps(trust_policy)
    )
    print("New IAM role created")
except Exception as e:
    print(e)


#### Attach Policy to role and get IAM role ARN

In [None]:
iam.attach_role_policy(
    RoleName=ROLE_NAME,
    PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
)['ResponseMetadata']['HTTPStatusCode']

roleArn = iam.get_role(RoleName=ROLE_NAME)['Role']['Arn']
print(roleArn)

#### Create Redshift Cluster

In [9]:
try:
    response = redshift.create_cluster(
        ClusterType = CLUSTER_TYPE,
        NodeType = NODE_TYPE,
        NumberOfNodes = int(NUM_NODES),

        #Identifiers & Credentials
        DBName = DB_NAME, 
        MasterUsername=DB_USER, 
        MasterUserPassword=DB_PASSWORD, 
        ClusterIdentifier=CLUSTER_IDENTIFIER,

        #Role (for S3 access)
        IamRoles=[roleArn]
    )
except Exception as e:
    print(e)

#### Describes the cluster to see its status

In [None]:
myClusterProps =  redshift.describe_clusters(ClusterIdentifier=CLUSTER_IDENTIFIER)['Clusters'][0]

pd.set_option('display.max_colwidth', -1)

keysToShow = ['ClusterIdentifier', 'NodeType', 'ClusterStatus', 'MasterUsername', 'DBName', 'Endpoint', 'NumberOfNodes', 'VpcId']

x = [(k, v) for k, v in myClusterProps.items() if k in keysToShow]

pd.DataFrame(data=x, columns=['key', 'value'])

#### Cluster endpoint and role ARN

In [33]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  nedcluster24.cy9k8ycfdhaa.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::647329836013:role/nedRole


In [None]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))

cur = conn.cursor()

In [None]:
stg_events = pd.read_sql("SELECT  COUNT(*) FROM staging_events;", conn)
stg_songs = pd.read_sql("SELECT  COUNT(*) FROM staging_songs;", conn)
songplay_count = pd.read_sql("SELECT  COUNT(*) FROM songplays;", conn)
artist_count = pd.read_sql("SELECT  COUNT(*) FROM artists;", conn)
song_count = pd.read_sql("SELECT  COUNT(*) FROM songs;", conn)
user_count = pd.read_sql("SELECT  COUNT(*) FROM users;", conn)

In [55]:
print('count of staging_events is {}.'.format(stg_events.iloc[0]['count']))
print('count of staging_songs is {}.'.format(stg_songs.iloc[0]['count']))
print('count from songplays is {}.'.format(songplay_count.iloc[0]['count']))
print('count of artists is {}.'.format(artist_count.iloc[0]['count']))
print('count of songs is {}.'.format(song_count.iloc[0]['count']))
print('count from users is {}.'.format(user_count.iloc[0]['count']))


count of staging_events is 8056.
count of staging_songs is 14896.
count from songplays is 9957.
count of artists is 10025.
count of songs is 14896.
count from users is 104.


#### Clean up resources

In [None]:
redshift.delete_cluster( ClusterIdentifier=CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

In [None]:
iam.detach_role_policy(RoleName=ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=ROLE_NAME)