### Test notebook for connecting to AWS buckets and designing tables/ETL

In [4]:
import configparser
import psycopg2
import pandas as pd
import boto3
import json
import time

### Check datawarehouse config file

In [16]:
!cat song_dwh.cfg

[AWS]
key = AKIARMKLSTNC3GBQLM6A
secret = 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

[DWH]
dwh_region = us-west-2
dwh_cluster_type = multi-node
dwh_num_nodes = 2
dwh_node_type = dc2.large
dwh_iam_role_name = dwhuser
dwh_cluster_identifier = songCluster
dwh_db = songdwh
dwh_db_user = dwhuser
dwh_db_password = Passw0rd
dwh_port = 5439

[ARN]
arn = arn:aws:iam::095184657221:role/dwhuser

[S3]
log_data = 's3://udacity-dend/log_data'
log_jsonpath = 's3://udacity-dend/log_json_path.json'
song_data = 's3://udacity-dend/song_data'



In [17]:
#Get credentials
config = configparser.ConfigParser()
config.read('song_dwh.cfg')

KEY = config.get('AWS', 'key')
SECRET = config.get('AWS', 'secret')
ARN = config.get("ARN", "arn")

DWH_REGION = config.get("DWH", "dwh_region")
DWH_CLUSTER_TYPE = config.get("DWH", "dwh_cluster_type")
DWH_NUM_NODES = config.get("DWH","dwh_num_nodes")
DWH_NODE_TYPE = config.get("DWH","dwh_node_type")
DWH_IAM_ROLE_NAME = config.get("DWH", "dwh_iam_role_name")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","dwh_cluster_identifier")
DWH_DB = config.get("DWH","dwh_db")
DWH_DB_USER = config.get("DWH","dwh_db_user")
DWH_DB_PASSWORD = config.get("DWH","dwh_db_password")
DWH_PORT = config.get("DWH","dwh_port")

LOG_DATA = config.get('S3','log_data')
SONG_DATA = config.get('S3', 'song_data')

In [4]:
s3 = boto3.resource('s3', aws_access_key_id=KEY,
                          aws_secret_access_key=SECRET,
                          region_name="us-west-2")

In [27]:
#Download sample files
sampleDbBucket =  s3.Bucket("udacity-dend")
for obj in sampleDbBucket.objects.filter(Prefix="log-data/2018/11/2018-11-02-events.json"):
    print(obj)
sampleDbBucket.download_file("log-data/2018/11/2018-11-02-events.json", "2018-11-02-events.json")
#sampleDbBucket.download_file("song-data/A/A/A/TRAAAAK128F9318786.json", "TRAAAAK128F9318786.json")
#sampleDbBucket.download_file("log_json_path.json", "log_json_path.json")

s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-02-events.json')


In [44]:
# Check sample log file
with open("samples/2018-11-02-events.json", "r") as f:    
    data = f.readlines()


d = json.loads(data[0])
print(list(d.keys()))
#print(d)
#df = pd.DataFrame(data[0], cols=list(data[0].keys()))
df = pd.DataFrame(d, columns=list(d.keys()), index=[0])
df.head()
#df.head()

['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName', 'length', 'level', 'location', 'method', 'page', 'registration', 'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId']


Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,N.E.R.D. FEATURING MALICE,Logged In,Jayden,M,0,Fox,288.9922,free,"New Orleans-Metairie, LA",PUT,NextSong,1541034000000.0,184,Am I High (Feat. Malice),200,1541121934796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",101


In [45]:
# Check sample song file
with open("samples/TRAAAAK128F9318786.json", "r") as f:    
    data = json.load(f)

cols = list(data.keys())
print(cols)
df = pd.DataFrame(data, columns=cols, index=[0])

df.head()



['song_id', 'num_songs', 'title', 'artist_name', 'artist_latitude', 'year', 'duration', 'artist_id', 'artist_longitude', 'artist_location']


Unnamed: 0,song_id,num_songs,title,artist_name,artist_latitude,year,duration,artist_id,artist_longitude,artist_location
0,SOBLFFE12AF72AA5BA,1,Scream,Adelitas Way,,2009,213.9424,ARJNIUY12298900C91,,


In [32]:
def create_dwhuser():
    # Create iam client
    iam = boto3.client('iam',aws_access_key_id=KEY,
                         aws_secret_access_key=SECRET,
                         region_name=DWH_REGION)

    try:
        roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
    except Exception as e:

        try:
            dwhRole = iam.create_role(
                Path='/',
                RoleName=DWH_IAM_ROLE_NAME,
                Description = "Allows Redshift clusters to call AWS services on your behalf.",
                AssumeRolePolicyDocument=json.dumps(
                    {'Statement': [{'Action': 'sts:AssumeRole',
                       'Effect': 'Allow',
                       'Principal': {'Service': 'redshift.amazonaws.com'}}],
                     'Version': '2012-10-17'})
            )    
        except Exception as e:
            print(e)

        iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                               PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                              )['ResponseMetadata']['HTTPStatusCode']


In [33]:
create_dwhuser()

In [34]:
def getroleArn():
    iam = boto3.client('iam',aws_access_key_id=KEY,
                         aws_secret_access_key=SECRET,
                         region_name=DWH_REGION)
    
    return iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

In [35]:
getroleArn()

'arn:aws:iam::095184657221:role/dwhuser'

In [36]:
def create_redshift_cluster():
    
    redshift = boto3.client('redshift',
                       region_name=DWH_REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )
    
    roleArn = getroleArn()
    
    try:
        response = redshift.create_cluster(        
            #HW
            ClusterType=DWH_CLUSTER_TYPE,
            NodeType=DWH_NODE_TYPE,
            NumberOfNodes=int(DWH_NUM_NODES),

            #Identifiers & Credentials
            DBName=DWH_DB,
            ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
            MasterUsername=DWH_DB_USER,
            MasterUserPassword=DWH_DB_PASSWORD,

            #Roles (for s3 access)
            IamRoles=[roleArn]  
        )
        
        myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
        prettyRedshiftProps(myClusterProps)
    
    except Exception as e:
        print(e)

In [37]:
create_redshift_cluster()

name 'prettyRedshiftProps' is not defined


In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

In [None]:
redshift = boto3.client('redshift',
                       region_name=DWH_REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [44]:
def create_tcp_route():
    
    redshift = boto3.client('redshift',
                       region_name=DWH_REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )
    myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
    
    while myClusterProps["ClusterStatus"] != "Available":
        print("sleeping 60 sec......")
        time.sleep(60)
    
    ec2 = boto3.resource('ec2',
                       region_name=DWH_REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )
    
    try:
        vpc = ec2.Vpc(id=myClusterProps['VpcId'])
        defaultSg = list(vpc.security_groups.all())[0]
        print(defaultSg)
        defaultSg.authorize_ingress(
            GroupName=defaultSg.group_name,
            CidrIp='0.0.0.0/0',
            IpProtocol='TCP',
            FromPort=int(DWH_PORT),
            ToPort=int(DWH_PORT)
        )
    except Exception as e:
        print(e)
    

In [47]:
create_tcp_route()

sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......
sleeping 60 sec......


KeyboardInterrupt: 

In [52]:
ec2 = boto3.resource('ec2',
                       region_name=DWH_REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )
redshift = boto3.client('redshift',
                       region_name=DWH_REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

vpc = ec2.Vpc(id=myClusterProps['VpcId'])
defaultSg = list(vpc.security_groups.all())[0]
print(defaultSg)
        
defaultSg.authorize_ingress(
            GroupName=defaultSg.group_name,
            CidrIp='0.0.0.0/0',
            IpProtocol='TCP',
            FromPort=int(DWH_PORT),
            ToPort=int(DWH_PORT)
        )

ec2.SecurityGroup(id='sg-12d2990e')


{'ResponseMetadata': {'RequestId': 'e3c37591-7517-491e-9d5b-3de73924a0d0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e3c37591-7517-491e-9d5b-3de73924a0d0',
   'cache-control': 'no-cache, no-store',
   'strict-transport-security': 'max-age=31536000; includeSubDomains',
   'content-type': 'text/xml;charset=UTF-8',
   'content-length': '714',
   'date': 'Fri, 20 Aug 2021 14:23:35 GMT',
   'server': 'AmazonEC2'},
  'RetryAttempts': 0}}

In [8]:
def update_arn(ARN):
        
    config = configparser.ConfigParser()
    config.read('song_dwh.cfg')
    
    config.set("ARN","ARN", ARN)
    
    with open("song_dwh.cfg", "w") as con:
        config.write(con)
    
update_arn("arn:aws:iam::095184657221:role/dwhuser")

In [9]:
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [6]:
DWH_ENDPOINT='songcluster.cy513anz522l.us-west-2.redshift.amazonaws.com:5439/songdwh'
#conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT, DWH_DB)
conn_string="postgresql://{}:{}@{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@songcluster.cy513anz522l.us-west-2.redshift.amazonaws.com:5439/songdwh


'Connected: dwhuser@songdwh'

In [10]:
%%sql

CREATE TABLE IF NOT EXISTS staging_events (
        artist varchar(100),
        auth varchar(50),
        firstName varchar(100),
        gender varchar(1),
        itemInSession int,
        lastName varchar(100), 
        length decimal(7,5),
        level varchar(5), 
        location varchar(255),
        method varchar(5),
        page varchar(25), 
        registration varchar(100),
        sessionId int,
        song varchar(200),
        status varchar(5),
        ts timestamp,
        userAgent varchar(255),
        userId int
    );

 * postgresql://dwhuser:***@songcluster.cy513anz522l.us-west-2.redshift.amazonaws.com:5439/songdwh
Done.


[]

In [11]:
%%sql
CREATE TABLE IF NOT EXISTS staging_songs (
        song_id varchar(100),
        num_songs int,
        title varchar(200),
        artist_name varchar(100),
        artist_latitude decimal(8,6),
        year int,
        duration decimal(7,4),
        artist_id varchar(200),
        artist_longitude decimal(9,6),
        artist_location varchar(255)
    );

 * postgresql://dwhuser:***@songcluster.cy513anz522l.us-west-2.redshift.amazonaws.com:5439/songdwh
Done.


[]

In [20]:
staging_events_copy = (""" 
    COPY staging_events 
    FROM {}
    IAM_ROLE '{}'
    REGION '{}'
""").format(LOG_DATA, ARN, DWH_REGION)
print(staging_events_copy)

staging_songs_copy = ("""
    COPY staging_songs
    FROM {}
    IAM_ROLE '{}'
    REGION '{}'
""").format(SONG_DATA, ARN, DWH_REGION)
print(staging_songs_copy)

 
    COPY staging_events 
    FROM ''s3://udacity-dend/log_data''
    IAM_ROLE 'arn:aws:iam::095184657221:role/dwhuser'
    REGION 'us-west-2'


    COPY staging_songs
    FROM ''s3://udacity-dend/song_data''
    IAM_ROLE 'arn:aws:iam::095184657221:role/dwhuser'
    REGION 'us-west-2'



In [26]:
%%sql
COPY staging_events 
    FROM 's3://udacity-dend/log_data'
    IAM_ROLE 'arn:aws:iam::095184657221:role/dwhuser'
    FORMAT AS JSON 'auto'
    REGION 'us-west-2'
    ;

 * postgresql://dwhuser:***@songcluster.cy513anz522l.us-west-2.redshift.amazonaws.com:5439/songdwh


InternalError: (psycopg2.InternalError) Load into table 'staging_events' failed.  Check 'stl_load_errors' system table for details.
 [SQL: "COPY staging_events \n    FROM 's3://udacity-dend/log_data'\n    IAM_ROLE 'arn:aws:iam::095184657221:role/dwhuser'\n    FORMAT AS JSON 'auto'\n    REGION 'us-west-2'\n    ;"]

In [24]:
%%sql
COPY staging_songs
    FROM 's3://udacity-dend/song_data'
    IAM_ROLE 'arn:aws:iam::095184657221:role/dwhuser'
    FORMAT AS JSON 'auto'
    REGION 'us-west-2'
    ;

 * postgresql://dwhuser:***@songcluster.cy513anz522l.us-west-2.redshift.amazonaws.com:5439/songdwh


InternalError: (psycopg2.InternalError) Load into table 'staging_songs' failed.  Check 'stl_load_errors' system table for details.
 [SQL: "COPY staging_songs\n    FROM 's3://udacity-dend/song_data'\n    IAM_ROLE 'arn:aws:iam::095184657221:role/dwhuser'\n    REGION 'us-west-2'"]

In [28]:
%%sql
select * from stl_load_errors
WHERE starttime = (select max(starttime) from stl_load_errors);

 * postgresql://dwhuser:***@songcluster.cy513anz522l.us-west-2.redshift.amazonaws.com:5439/songdwh
2 rows affected.


userid,slice,tbl,starttime,session,query,filename,line_number,colname,type,col_length,position,raw_line,raw_field_value,err_code,err_reason,is_partial,start_offset
100,0,101596,2021-08-21 22:09:25.396879,12903,387,s3://udacity-dend/log_data/2018/11/2018-11-03-events.json,1,ts,timestamp,0,0,"{""artist"":null,""auth"":""Logged Out"",""firstName"":null,""gender"":null,""itemInSession"":0,""lastName"":null,""length"":null,""level"":""free"",""location"":null,""method"":""PUT"",""page"":""Login"",""registration"":null,""sessionId"":52,""song"":null,""status"":307,""ts"":1541207073796,""userAgent"":null,""userId"":""""} {""artist"":null,""auth"":""Logged In"",""firstName"":""Celeste"",""gender"":""F"",""itemInSession"":1,""lastName"":""Williams"",""length"":null,""level"":""free"",""location"":""Klamath Falls, OR"",""method"":""GET"",""page"":""Home"",""registration"":1541077528796.0,""sessionId"":52,""song"":null,""status"":200,""ts"":1541207123796,""userAgent"":""\\""Mozilla\\/5.0 (Windows NT 6.1; WOW64) AppleWebKit\\/537.36 (KHTML, like Gecko) Chrome\\/37.0.2062.103 Safari\\/537.36\\"""",""userId"":""53""} {""artist"":""Mynt"",""auth"":""Logged In"",""firstName"":""Celeste"",""gender"":""F"",""itemInSession"":2,""lastName"":""Williams"",""length"":166.94812,""level"":""free"",""location"":""Klamath Falls, OR"",""method"":""PUT"",""page"":""NextSong"",""registration"":1541077528796.0,""sessionId"":52,""song"":""Playa Haters"",""status"":200,""ts"":1541",,1206,Invalid timestamp format or value [YYYY-MM-DD HH24:MI:SS],0,0
100,2,101596,2021-08-21 22:09:25.396879,12903,387,s3://udacity-dend/log_data/2018/11/2018-11-02-events.json,1,length,numeric,"7, 5",0,"{""artist"":""N.E.R.D. FEATURING MALICE"",""auth"":""Logged In"",""firstName"":""Jayden"",""gender"":""M"",""itemInSession"":0,""lastName"":""Fox"",""length"":288.9922,""level"":""free"",""location"":""New Orleans-Metairie, LA"",""method"":""PUT"",""page"":""NextSong"",""registration"":1541033612796.0,""sessionId"":184,""song"":""Am I High (Feat. Malice)"",""status"":200,""ts"":1541121934796,""userAgent"":""\\""Mozilla\\/5.0 (Windows NT 6.3; WOW64) AppleWebKit\\/537.36 (KHTML, like Gecko) Chrome\\/36.0.1985.143 Safari\\/537.36\\"""",""userId"":""101""} {""artist"":null,""auth"":""Logged In"",""firstName"":""Stefany"",""gender"":""F"",""itemInSession"":0,""lastName"":""White"",""length"":null,""level"":""free"",""location"":""Lubbock, TX"",""method"":""GET"",""page"":""Home"",""registration"":1540708070796.0,""sessionId"":82,""song"":null,""status"":200,""ts"":1541122176796,""userAgent"":""\\""Mozilla\\/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit\\/537.36 (KHTML, like Gecko) Chrome\\/36.0.1985.143 Safari\\/537.36\\"""",""userId"":""83""} {""artist"":""Death Cab for Cutie"",""auth"":""Logged In"",""firstName"":""Stefany"",""gender"":""F",,1207,"Overflow for NUMERIC(7,5)",0,0
