In [1]:
from time import time
import configparser
import matplotlib.pyplot as plt
import pandas as pd
import boto3
import json
import sql_queries
import psycopg2
import create_tables
import etl

### This function shows the key properties and status of the Redshift Cluster

In [2]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

### Here, we read the config doc for this project including the DWH properties, key & secret and ARN & DWH Endpoints

In [3]:
config = configparser.ConfigParser()

In [4]:
config.read_file(open('project_dwh.cfg'))

In [5]:
KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')
s3_song = config.get("S3","SONG_DATA")
DWH_IAM_ROLE_NAME = config.get('IAM_ROLE','DWH_IAM_ROLE_NAME')

DWH_CLUSTER_TYPE = config.get('CLUSTER','DWH_CLUSTER_TYPE')
DWH_NODE_TYPE = config.get('CLUSTER','DWH_NODE_TYPE')
DWH_NUM_NODES = config.get('CLUSTER','DWH_NUM_NODES')
DB_NAME = config.get('CLUSTER','DB_NAME')
HOST = config.get('CLUSTER','HOST')
DB_USER = config.get('CLUSTER','DB_USER')
DB_PASSWORD = config.get('CLUSTER','DB_PASSWORD')
DB_PORT = config.get('CLUSTER','DB_PORT')

## Read Sample Data from the S3 Bucket
#### Goal is to see the column names and datatypes

In [6]:
s3 = boto3.resource('s3',region_name = 'us-west-2', aws_access_key_id = KEY,aws_secret_access_key = SECRET)

In [7]:
#sampleDbBucket =  s3.Bucket(s3_song)
#print(sampleDbBucket)
#for obj in sampleDbBucket.objects.filter(Prefix = "song-data/A/A"):
 #   print(obj)
    

In [8]:
df_log_data = pd.read_json('https://udacity-dend.s3.us-west-2.amazonaws.com/log-data/2018/11/2018-11-01-events.json',  lines=True)
df_log_data.head()
#Somehow this Object URL works for me the S3 URL did not work.

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919166796,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540344794796,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
3,,Logged In,Kaylee,F,2,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Upgrade,1540344794796,139,,200,1541106132796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [9]:
#df_log_data = pd.read_json('s3://udacity-dend/log-data/2018/11/2018-11-01-events.json',  lines=True)
#df_log_data.head()
#Somehow the Object URL works for me the S3 URL did not work.

In [10]:
df_log_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15 entries, 0 to 14
Data columns (total 18 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   artist         11 non-null     object 
 1   auth           15 non-null     object 
 2   firstName      15 non-null     object 
 3   gender         15 non-null     object 
 4   itemInSession  15 non-null     int64  
 5   lastName       15 non-null     object 
 6   length         11 non-null     float64
 7   level          15 non-null     object 
 8   location       15 non-null     object 
 9   method         15 non-null     object 
 10  page           15 non-null     object 
 11  registration   15 non-null     int64  
 12  sessionId      15 non-null     int64  
 13  song           11 non-null     object 
 14  status         15 non-null     int64  
 15  ts             15 non-null     int64  
 16  userAgent      15 non-null     object 
 17  userId         15 non-null     int64  
dtypes: float64(1

In [11]:
df_song_data = pd.read_json('https://udacity-dend.s3.us-west-2.amazonaws.com/song_data/A/A/A/TRAAAAK128F9318786.json',  lines=True)
df_song_data.head()
#Somehow this Object URL works for me the S3 URL did not work.

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARJNIUY12298900C91,,,,Adelitas Way,213.9424,1,SOBLFFE12AF72AA5BA,Scream,2009


## Create Redshift Cluster

In [12]:
iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',region_name = 'us-west-2', 
                        aws_access_key_id = KEY,
                        aws_secret_access_key = SECRET
                       )


ec2 = boto3.resource('ec2',region_name = 'us-west-2', 
                        aws_access_key_id = KEY,
                        aws_secret_access_key = SECRET
                       )


### Creating the IAM role with the relevant policies attached for the Redshift Cluster to be allowed to communicate with the necessary S3 buckets

In [13]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
An error occurred (InvalidClientTokenId) when calling the CreateRole operation: The security token included in the request is invalid.
1.2 Attaching Policy


ClientError: An error occurred (InvalidClientTokenId) when calling the AttachRolePolicy operation: The security token included in the request is invalid.

### Here, I created the Redshift cluster with the same specs as in the exercises

In [None]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DB_NAME,
        ClusterIdentifier=HOST,
        MasterUsername=DB_USER,
        MasterUserPassword=DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=HOST)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

#### I saved the endpoint and ARN for this DWH into the project_dwh.config file. (manually)

In [None]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

### Here I opened a TCP port for the cluster endpoint. 
#### This part is the part that I have least understanding about. Ec2 server issues commands such as ordering Redshift which resides in a VPC to take the data from the S3 bucket.

In [None]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

### Here, I tested the connection to the Redshift Cluster I created.

In [None]:
%reload_ext sql

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, DWH_ENDPOINT, DB_PORT,DB_NAME)
print(conn_string)

In [None]:
%sql $conn_string

### This is to delete the Redshift cluster when the job is done.

In [None]:
redshift.delete_cluster( ClusterIdentifier=HOST,  SkipFinalClusterSnapshot=True)
myClusterProps = redshift.describe_clusters(ClusterIdentifier=HOST)['Clusters'][0]
prettyRedshiftProps(myClusterProps)