In [1]:
%load_ext sql

In [2]:
# Import required libraries
import configparser
import pandas as pd
import boto3
import json

In [3]:
# Read configuration parameters

config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY=config.get('AWS','key')
SECRET= config.get('AWS','secret')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")


DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


In [4]:
# Initialize AWS services

ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

In [6]:
# List top 10 log data files

bucket=s3.Bucket('udacity-dend')
log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='log_data')]
log_data_files[:10]

['log_data/',
 'log_data/2018/11/2018-11-01-events.json',
 'log_data/2018/11/2018-11-02-events.json',
 'log_data/2018/11/2018-11-03-events.json',
 'log_data/2018/11/2018-11-04-events.json',
 'log_data/2018/11/2018-11-05-events.json',
 'log_data/2018/11/2018-11-06-events.json',
 'log_data/2018/11/2018-11-07-events.json',
 'log_data/2018/11/2018-11-08-events.json',
 'log_data/2018/11/2018-11-09-events.json']

In [7]:
# List top 10 song data files

song_data_files = [filename.key for filename in bucket.objects.filter(Prefix='song_data/A/A')]
song_data_files[:10]

['song_data/A/A/A/TRAAAAK128F9318786.json',
 'song_data/A/A/A/TRAAAAV128F421A322.json',
 'song_data/A/A/A/TRAAABD128F429CF47.json',
 'song_data/A/A/A/TRAAACN128F9355673.json',
 'song_data/A/A/A/TRAAAEA128F935A30D.json',
 'song_data/A/A/A/TRAAAED128E0783FAB.json',
 'song_data/A/A/A/TRAAAEM128F93347B9.json',
 'song_data/A/A/A/TRAAAEW128F42930C0.json',
 'song_data/A/A/A/TRAAAFD128F92F423A.json',
 'song_data/A/A/A/TRAAAGR128F425B14B.json']

In [7]:
# Create IAM role

try:
    print('1.1 Creating a new IAM Role')
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )
    

except Exception as e:
    print(e)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.


In [8]:
# Attach Policy to IAM role

print('1.2 Attaching Policy')
iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']


1.2 Attaching Policy


200

In [9]:
# Get and print the IAM role ARN - Update the config file

print('1.3 Get the IAM role ARN')
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.3 Get the IAM role ARN
arn:aws:iam::849907613343:role/dwhRole


In [10]:
# Create Redshift cluster

try:
    response = redshift.create_cluster(        
        # parameters for hardware
         ClusterType=DWH_CLUSTER_TYPE,
         NodeType=DWH_NODE_TYPE,
         NumberOfNodes=int(DWH_NUM_NODES),


        # parameters for identifiers & credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        # add parameter for role to allow s3 access
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

In [5]:
# Verify cluster properties

def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.cjwnfkuqcjgl.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-98bfece0
7,NumberOfNodes,4


In [6]:
# Verify end point and update config file

DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.cjwnfkuqcjgl.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::849907613343:role/dwhRole


In [13]:
# Authorize Security Access Group to Default TCP/IP Address
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name ,
        CidrIp='0.0.0.0/0',  
        IpProtocol='TCP',  
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-7ddbde5f')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


In [17]:
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [7]:
# Connect to database

conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.cjwnfkuqcjgl.us-west-2.redshift.amazonaws.com:5439/dwh


'Connected: dwhuser@dwh'

In [8]:
# Run the script to create staging and analytics tables 

%run create_tables.py

In [11]:
# Run the ETL Pipeline process to copy data from S3 to Staging and from Staging to Analytics tables

%run etl.py

### Display row count of each table

In [17]:
%%sql
select 'user count-' as entity, count(*)  from dim_user 
UNION ALL
select 'song count-' as entity, count(*)  from dim_song
UNION ALL
select 'artist count-' as entity, count(*)  from dim_artist
UNION ALL
select 'time count-' as entity,count(*)  from dim_time
UNION ALL
select 'songplay count-' as entity,count(*)  from fact_songplay
UNION ALL
select 'stag_event count-' as entity,count(*)  from staging_events
UNION ALL
select 'stag_songs count-'as entity ,count(*)  from staging_songs


 * postgresql://dwhuser:***@dwhcluster.cjwnfkuqcjgl.us-west-2.redshift.amazonaws.com:5439/dwh
7 rows affected.


entity,count
song count-,14896
artist count-,10025
stag_event count-,8056
stag_songs count-,14896
songplay count-,319
user count-,104
time count-,319


### Which song is most listened to 

In [18]:
%%sql

SELECT TOP 10 fs.song_id , ds.Title, COUNT(1) 
FROM fact_songplay fs
JOIN dim_song ds ON fs.song_id=ds.song_id
Group BY fs.song_id,ds.Title
ORDER by 3 desc


 * postgresql://dwhuser:***@dwhcluster.cjwnfkuqcjgl.us-west-2.redshift.amazonaws.com:5439/dwh
10 rows affected.


song_id,title,count
SOBONKR12A58A7A7E0,You're The One,37
SOUNZHU12A8AE47481,I CAN'T GET STARTED,9
SOHTKMO12AB01843B0,Catch You Baby (Steve Pitron & Max Sanna Radio Edit),9
SOULTKQ12AB018A183,Nothin' On You [feat. Bruno Mars] (Album Version),8
SOLZOBD12AB0185720,Hey Daddy (Daddy's Home),6
SOTNHIP12AB0183131,Make Her Say,5
SOARUPP12AB01842E0,Up Up & Away,5
SOIOESO12A6D4F621D,Unwell (Album Version),4
SONQEYS12AF72AABC9,Mr. Jones,4
SOIZLKI12A6D4F7B61,Supermassive Black Hole (Album Version),4


### Which artist is most popular amongst users 

In [19]:
%%sql

SELECT TOP 10
fs.artist_id ,name, COUNT(distinct fs.user_id ) 
FROM fact_songplay fs
JOIN dim_artist da ON fs.artist_id=da.artist_id
Group BY fs.artist_id,name
ORDER by 3 desc

 * postgresql://dwhuser:***@dwhcluster.cjwnfkuqcjgl.us-west-2.redshift.amazonaws.com:5439/dwh
10 rows affected.


artist_id,name,count
AR5E44Z1187B9A1D74,Dwight Yoakam,22
ARD46C811C8A414F3F,Kid Cudi / Kanye West / Common,10
ARD46C811C8A414F3F,Kid Cudi,10
AR5EYTL1187B98EDA0,Lonnie Gordon,9
ARKQQZA12086C116FC,B.o.B,7
AR37SX11187FB3E164,Ron Carter,6
ARR3ONV1187B9A2F59,Muse,6
ARQUMH41187B9AF699,Linkin Park,4
ARM0P6Z1187FB4D466,Arctic Monkeys,4
ARPDVPJ1187B9ADBE9,Usher,4


In [23]:
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.cjwnfkuqcjgl.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2021, 1, 22, 3, 25, 34, 487000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-7ddbde5f',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-98bfece0',
  'AvailabilityZone': 'us-west-2c',
  'PreferredMaintenanceWindow': 'wed:13:00-wed:13:30',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessible': True,
  'Encrypted': False,
  'Tags': [],
  'EnhancedVpcRouting': False,
  'Iam

In [24]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the DescribeClusters operation: Cluster dwhcluster not found.

In [25]:
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)

{'ResponseMetadata': {'RequestId': '6f733186-f8e6-4ad6-adc6-1ebfcca27910',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6f733186-f8e6-4ad6-adc6-1ebfcca27910',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Fri, 22 Jan 2021 04:20:42 GMT'},
  'RetryAttempts': 0}}