In [1]:
import pandas as pd
import boto3
import json

In [2]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh-aws-setup.cfg'))


KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,sparkifyDwhCluster
4,DWH_DB,sparkify
5,DWH_DB_USER,etl_dev
6,DWH_DB_PASSWORD,EtlSprkfy_202001
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,sparkifyDwhRole


In [3]:
ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

In [4]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
                            'Effect': 'Allow',
                         'Principal': {'Service': 'redshift.amazonaws.com'}
                           }],
               'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name sparkifyDwhRole already exists.
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::624507093404:role/sparkifyDwhRole


In [5]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


In [None]:
print(redshift.waiter_names)
chk = redshift.get_waiter('cluster_available')
chk.wait(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)
print('Done - Cluster Created')

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

In [None]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

In [None]:
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

In [5]:
sparkify_s3 =  s3.Bucket("udacity-dend")
sparkify_s4 = s3.Bucket("awssampledbuswest2")

s3client = boto3.client(
    's3',
    region_name="us-west-2",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

filedt = s3client.get_object(Bucket='udacity-dend',Key='log_json_path.json')
print(filedt['Body'].read().decode('utf-8'))

filedt = s3client.get_object(Bucket='udacity-dend-bhavik',Key='song_json_path.json')
print(filedt['Body'].read().decode('utf-8'))

file2 = s3client.get_object(Bucket='udacity-dend',Key='song-data/A/A/A/TRAAAAK128F9318786.json')
print(file2['Body'].read().decode('utf-8'))

file2 = s3client.get_object(Bucket='udacity-dend',Key='song-data/A/A/A/TRAAAAV128F421A322.json')
print(file2['Body'].read().decode('utf-8'))

counter=0
for rec in sparkify_s3.objects.filter(Prefix='song-data'):
    counter+=1
print(counter)


    #filedt = s3client.get_object(Bucket='udacity-dend',Key=rec.key)
    #df = pd.read_json(filedt['Body'].read().decode('utf-8'),lines=True)
    #df = df[df['page']=='NextSong']
    #df.sort_values(['ts'],inplace=True)

#fileobj = s3client.get_object(Bucket='udacity-dend',Key='log-data/2018/11/2018-11-01-events.json')
#filedata = fileobj['Body'].read()
#contents = filedata.decode('utf-8')
#print(contents)


{
    "jsonpaths": [
        "$['artist']",
        "$['auth']",
        "$['firstName']",
        "$['gender']",
        "$['itemInSession']",
        "$['lastName']",
        "$['length']",
        "$['level']",
        "$['location']",
        "$['method']",
        "$['page']",
        "$['registration']",
        "$['sessionId']",
        "$['song']",
        "$['status']",
        "$['ts']",
        "$['userAgent']",
        "$['userId']"
    ]
}
{
    "jsonpaths": [
        "$['song_id']",
        "$['num_songs']",
        "$['title']",
        "$['artist_name']",
        "$['artist_latitude']",
        "$['year']",
        "$['duration']",
        "$['artist_id']",
        "$['artist_longitude']",
        "$['artist_location']"
    ]
}
{"song_id": "SOBLFFE12AF72AA5BA", "num_songs": 1, "title": "Scream", "artist_name": "Adelitas Way", "artist_latitude": null, "year": 2009, "duration": 213.9424, "artist_id": "ARJNIUY12298900C91", "artist_longitude": null, "artist_location": ""}
{

In [6]:
%load_ext sql
import os 
conn_string="postgresql://{}:{}@{}:{}/{}".format('etl_dev', 'EtlSprkfy_202001', 'sparkifydwhcluster.c1wvqxfi6z1g.us-west-2.redshift.amazonaws.com', '5439','sparkify')
print(conn_string)
%sql $conn_string

postgresql://etl_dev:EtlSprkfy_202001@sparkifydwhcluster.c1wvqxfi6z1g.us-west-2.redshift.amazonaws.com:5439/sparkify


'Connected: etl_dev@sparkify'

In [None]:
%%sql
create table events(
                   artist varchar
                   ,auth varchar
                   ,firstName varchar
                   ,gender varchar
                   ,itemInSession varchar
                   ,lastName varchar
                   ,length varchar
                   ,level varchar
                   ,location varchar
                   ,method varchar
                   ,page varchar
                   ,registration varchar
                   ,sessionId varchar
                   ,song varchar
                   ,status varchar
                   ,ts varchar
                   ,userAgent varchar
                   ,userId varchar
                   )

In [None]:
#copy events from 's3://udacity-dend/log_data/2018/11/2018-11-01-events.json' 
#iam_role 'arn:aws:iam::624507093404:role/sparkifyDwhRole'
#format as json 's3://udacity-dend/log_json_path.json';

#for rec in sparkify_s3.objects.filter(Prefix='log-data/2018'):
    
#    sql_copy="""copy events from 's3://udacity-dend/{}' 
#    iam_role 'arn:aws:iam::624507093404:role/sparkifyDwhRole' 
#    timeformat 'epochmillisecs'
#    format as json 's3://udacity-dend/log_json_path.json';""".format(rec.key)
    
#    %sql $sql_copy

sql_copy1="""copy events from 's3://udacity-dend/log-data/'
        iam_role 'arn:aws:iam::624507093404:role/sparkifyDwhRole'
        timeformat 'epochmillisecs'
        maxerror as 10
        format as json 's3://udacity-dend/log_json_path.json'
        """
%sql $sql_copy1

In [None]:

#for rec in sparkify_s3.objects.filter(Prefix='song-data'):
#    
#    if rec.key != 'song-data/':
#        sql_copy="""copy songs_raw from 's3://udacity-dend/{}' 
#        iam_role 'arn:aws:iam::624507093404:role/sparkifyDwhRole' 
#        format as json 'auto'""".format(rec.key)
#
#        %sql $sql_copy

import string
a2z = list(string.ascii_uppercase)
print(a2z)
for rec in a2z:
    sql_copy="""copy songs_raw from 's3://udacity-dend/song-data/{}'
    iam_role 'arn:aws:iam::624507093404:role/sparkifyDwhRole' 
    format as json 'auto'maxerror as 100""".format(rec)
    %sql $sql_copy

In [None]:
import string
dicta = list(string.ascii_uppercase)
print(dicta)

In [None]:
sql_copy="""copy songs_raw from 's3://udacity-dend/song-data'
iam_role 'arn:aws:iam::624507093404:role/sparkifyDwhRole' 
format as json 'auto'
maxerror as 100"""

%sql $sql_copy

In [None]:
counter = 1
for rec in sparkify_s3.objects.filter(Prefix='log-data/2018'):
    sql_copy="""copy events from 's3://udacity-dend/{}'
        iam_role 'arn:aws:iam::624507093404:role/sparkifyDwhRole'
        timeformat 'epochmillisecs'
        maxerror as 10
        format as json 's3://udacity-dend/log_json_path.json'""".format(rec.key)
    %sql $sql_copy

print(counter)
#385254

In [None]:
%%sql
insert into songs
select distinct song_id
       ,title
       ,artist_id
       ,case when year= 0 then null else year end as year
       ,duration
 from songs_raw;

In [None]:
%%sql
insert into time
select distinct ts
       ,extract(hour from ts)
       ,extract(day from ts)
       ,extract(week from ts)
       ,extract(month from ts)
       ,extract(year from ts)
       ,extract(dow from ts)
 from events;

In [55]:
%%sql
select userid
       ,firstname
       ,lastname
       ,gender
       ,level
  from
       (
        select userid
               ,firstname
               ,lastname
               ,gender
               ,level
               ,rank() over(partition by userid order by ts) as rnk
          from events
        ) events_rnk
 where rnk = 1;

 * postgresql://etl_dev:***@sparkifydwhcluster.c1wvqxfi6z1g.us-west-2.redshift.amazonaws.com:5439/sparkify
102 rows affected.


userid,firstname,lastname,gender,level
5.0,Elijah,Davis,M,free
33.0,Bronson,Harris,M,free
35.0,Molly,Taylor,F,free
39.0,Walter,Frye,M,free
44.0,Aleena,Kirby,F,paid
60.0,Devin,Larson,M,free
62.0,Connar,Moreno,M,free
66.0,Kevin,Arellano,M,free
85.0,Kinsley,Young,F,free
86.0,Aiden,Hess,M,free


In [47]:
%%sql
insert into songplays(start_time,user_id,level,song_id,artist_id,session_id,location,user_agent)
select  e.ts
        ,e.userid
        ,e.level
        ,s.song_id
        ,a.artist_id
        ,e.sessionid
        ,e.location
        ,e.useragent
  from events e
  left join songs s
    on e.song = s.title
   and e.length = s.duration
  left join artists a
    on s.artist_id = a.artist_id
   and e.artist = a.name;

 * postgresql://etl_dev:***@sparkifydwhcluster.c1wvqxfi6z1g.us-west-2.redshift.amazonaws.com:5439/sparkify
6836 rows affected.


[]

In [12]:
from time import time

sql_sel="""
select t.month
       ,u.first_name
       ,count(*)
  from songplays s
 inner join time t
    on s.start_time = t.start_time
 inner join users u
    on s.user_id = u.user_id
 group by t.month,u.first_name
"""

t0 = time()
%sql $sql_sel
t1 = time() - t0

print(t1)

 * postgresql://etl_dev:***@sparkifydwhcluster.c1wvqxfi6z1g.us-west-2.redshift.amazonaws.com:5439/sparkify
84 rows affected.
0.1671280860900879
