In [1]:
import pandas as pd
import boto3
import json

## Load DWH Params from a file

In [2]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")


DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("CLUSTER","DWH_DB")
DWH_DB_USER            = config.get("CLUSTER","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("CLUSTER","DWH_DB_PASSWORD")
DWH_PORT               = config.get("CLUSTER","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


## Create clients for EC2, S3, IAM, and Redshift

In [3]:
ec2 = boto3.resource('ec2', region_name = 'us-west-2',
                     aws_access_key_id = KEY,
                     aws_secret_access_key = SECRET)

s3 = boto3.resource('s3', region_name = 'us-west-2',
                     aws_access_key_id = KEY,
                     aws_secret_access_key = SECRET)

iam = boto3.client('iam', region_name = 'us-west-2',
                     aws_access_key_id = KEY,
                     aws_secret_access_key = SECRET)

redshift = boto3.client('redshift', region_name = 'us-west-2',
                     aws_access_key_id = KEY,
                     aws_secret_access_key = SECRET)

## Check the data on S3

In [4]:
sampleDbBucket =  s3.Bucket("udacity-dend")

# TODO: Iterate over bucket objects starting with "ssbgz" and print
for obj in sampleDbBucket.objects.filter(Prefix="log_data"):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-01-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-02-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-03-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-04-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-05-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-06-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-07-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-08-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-09-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-10-events.json')
s3.ObjectSummary(b

In [5]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::752661838876:role/dwhRole


In [6]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


In [21]:
def prettyRedshiftProps(props):
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]

In [22]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']

In [9]:
%load_ext sql

In [10]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
%sql $conn_string

In [11]:
from create_tables import main as table_creation

In [12]:
table_creation()


CREATE TABLE IF NOT EXISTS staging_events
    (
        artist        VARCHAR    NULL,
        auth          VARCHAR    NULL,
        firstName     VARCHAR    NULL,
        gender        VARCHAR    NULL,
        itemInSession INT        NULL,
        lastName      VARCHAR    NULL,
        length        DECIMAL    NULL,
        level         VARCHAR    NULL,
        location      VARCHAR    NULL,
        method        VARCHAR    NULL,
        page          VARCHAR    NULL,
        registration  DECIMAL    NULL,
        sessionId     INT        NOT NULL,
        song          VARCHAR    NULL,
        status        INTEGER    NULL,
        ts            BIGINT     NOT NULL,
        userAgent     VARCHAR    NULL,
        userId        INTEGER    NULL
    )



CREATE TABLE IF NOT EXISTS staging_songs
  (
    num_songs        INT            NULL,
    artist_id        VARCHAR        NULL,
    artist_longitude DECIMAL        NULL,
    artist_latitude  DECIMAL        NULL,
    artist_location 

In [14]:
%sql select * from songplays;

 * postgresql://dwhuser:***@dwhcluster.czjp6tcnbz96.us-west-2.redshift.amazonaws.com:5439/dwh
0 rows affected.


songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent


In [15]:
from etl import main as table_insertions

In [16]:
table_insertions()

In [19]:
%sql SELECT * FROM songplays limit 5;

 * postgresql://dwhuser:***@dwhcluster.czjp6tcnbz96.us-west-2.redshift.amazonaws.com:5439/dwh
5 rows affected.


songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
32,2018-11-09 17:47:05,36,free,SOLNAVA12AB01842E0,AR15R2V1187B9A9803,392,"Janesville-Beloit, WI","""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"""
96,2018-11-29 02:36:13,54,free,SOTNHIP12AB0183131,ARD46C811C8A414F3F,951,"Yuba City, CA",Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0
160,2018-11-24 11:19:06,80,paid,SOAIUDT12AF729F399,AR87P0Q1187B989ADF,903,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"""
224,2018-11-29 16:58:01,49,paid,SOGXSWA12A6D4FBC99,ARPFHN61187FB575F6,1041,"San Francisco-Oakland-Hayward, CA",Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20100101 Firefox/31.0
288,2018-11-04 06:51:12,25,paid,SORKKTY12A8C132F3E,ARIH5GU1187FB4C958,128,"Marinette, WI-MI","""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"""


In [20]:
%sql SELECT * FROM staging_songs LIMIT 5;

 * postgresql://dwhuser:***@dwhcluster.czjp6tcnbz96.us-west-2.redshift.amazonaws.com:5439/dwh
5 rows affected.


num_songs,artist_id,artist_longitude,artist_latitude,artist_location,artist_name,song_id,title,duration,year
1,ARGE7G11187FB37E05,,,"Brooklyn, NY",Cyndi Lauper,SONRWUU12AF72A4283,Into The Nightlife,240,2008
1,AR4E4121187FB51F4E,,,Los Angeles,A Fine Frenzy,SOBBGQK12AB0183F1E,The Beacon,201,2009
1,AREFFQF1187FB3F845,,,"New York, NY",The Hold Steady,SOKGRUO12A67ADBA54,Don't Let Me Explode,141,2005
1,AR1RHCO1187B9AF0BF,-122.0,37.0,"San Francisco, CA",Vince Guaraldi / Bola Sete,SODLSHA12AAF3B29F2,Choro,294,0
1,ARBBHDX1187B9B2EE7,-1.0,52.0,"Birmingham, England",Godflesh,SOPDWEJ12AB018BC6D,Spite,271,1992


# DELETE CLUSTER

In [27]:
try:
    redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
except:
    print("CLUSTER DELETING")

CLUSTER DELETING


In [28]:
try:
    myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
except:
    print("CLUSTER DELETED")

CLUSTER DELETED


In [29]:
try:
    redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
except:
    print("CLUSTER DELETED")

CLUSTER DELETED


In [31]:
try:
    iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")

except:
    print("ROLE DELETED")

ROLE DELETED
