In [None]:
%load_ext sql

In [None]:
# import modules to use for AWS SDK for python
import pandas as pd
import boto3
import json
import pandas as pd
import os

In [None]:
# The user, dwhadmin, was created by AWS console. Dwhadmin has AWS administrator access. This is needed for 
# security reasons to access resourses on AWS with access key and secret key. 

In [None]:
# Get the parameters in dwh.cfg
# The key and secret were retrieved from the IAM user (dwhadmin) which 
# has policy as AdministerAccess
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

CLUSTER_TYPE       = config.get("CLUSTER","CLUSTER_TYPE")
NUM_NODES          = config.get("CLUSTER","NUM_NODES")
NODE_TYPE          = config.get("CLUSTER","NODE_TYPE")

CLUSTER_IDENTIFIER = config.get("CLUSTER","CLUSTER_IDENTIFIER")
DB_NAME            = config.get("DWH","DB_NAME")
DB_USER            = config.get("DWH","DB_USER")
DB_PASSWORD        = config.get("DWH","DB_PASSWORD")
PORT               = config.get("DWH","DB_PORT")

DB_IAM_ROLE_NAME      = config.get("CLUSTER", "DB_IAM_ROLE_NAME")
ARN                   = config.get("IAM_ROLE", "ARN")

#(DB_USER, DB_PASSWORD, DB_NAME)

pd.DataFrame({"Param":
                  ["CLUSTER_TYPE", "NUM_NODES", "NODE_TYPE", "CLUSTER_IDENTIFIER", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT", "DB_IAM_ROLE_NAME", "ARN"],
              "Value":
                  [CLUSTER_TYPE, NUM_NODES, NODE_TYPE, CLUSTER_IDENTIFIER, DB_NAME, DB_USER, DB_PASSWORD, PORT, DB_IAM_ROLE_NAME, ARN]
             })

In [None]:
# Create AWS resources and clients
import boto3

ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

s3client = boto3.client('s3',
                   region_name="us-west-2",
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET
                   )



In [None]:
### LIST json files in s3://udacity-dend/log_data
sampleDbBucket_log = s3.Bucket('udacity-dend')
for obj_log in sampleDbBucket_log.objects.filter(Prefix='log_data'):
    print(obj_log)

In [None]:
### LIST contents of s3://udacity-dend/log_json_path.json
obj_log_file = s3client.get_object(Bucket='udacity-dend', Key='log_data/2018/11/2018-11-01-events.json')
df_log = pd.read_json(obj_log_file['Body'], lines=True)
df_log.head()


In [None]:
### LIST some json files in s3://udacity-dend/song_data. For examle in /A/A/A subdirectories
sampleDbBucket_song = s3.Bucket('udacity-dend')
for obj_song in sampleDbBucket_song.objects.filter(Prefix='song_data/A/A/A'):
    print(obj_song)
 

In [None]:
### LIST contents of s3://udacity-dend/song_data/A/A/A/RAAAAK128F9318786.json
obj_song_file = s3client.get_object(Bucket='udacity-dend', Key='song_data/A/A/A/TRAAAAK128F9318786.json')
df_song = pd.read_json(obj_song_file['Body'],typ='series')
df_song
#df_song_frame = df_song.to_frame().reset_index().T
#df_song_frame.dtypes
#df_song_frame.dtypes

In [None]:
############### START HERE #####################

In [None]:
### LIST contents of s3://udacity-dend/song_data/A/A/A/TRAAAAK128F9318786.json
#obj_song_file = s3client.get_object(Bucket='udacity-dend', Key='song_data/A/A/A/TRAAAAK128F9318786.json')
#df_song = pd.read_json(obj_song_file)
#df_song

In [None]:
### LIST the s3://udacity-dend/log_json_path.json file  
sampleDbBucket_logpath = s3.Bucket('udacity-dend')
for obj_logpath in sampleDbBucket_logpath.objects.filter(Prefix='log_json_path.json'):
    print(obj_logpath)


In [None]:
### LIST contents of s3://udacity-dend/log_json_path.json
obj_logpath_file = s3client.get_object(Bucket='udacity-dend', Key='log_json_path.json')
df_logpath = pd.read_json(obj_logpath_file['Body'])
df_logpath

In [None]:
# The IAM ROLE, dwhRole, was created manually using the aws console. The Role give S3 access to redshift.  

In [None]:
# Create a Redshift cluster named dwhcluster

try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=CLUSTER_TYPE,
        NodeType=NODE_TYPE,
        NumberOfNodes=int(NUM_NODES),

        #Identifiers & Credentials
        DBName=DB_NAME,
        ClusterIdentifier=CLUSTER_IDENTIFIER,
        MasterUsername=DB_USER,
        MasterUserPassword=DB_PASSWORD,
        
        #Roles
        IamRoles=['arn:aws:iam::488211246959:role/dwhRole']
    )
except Exception as e:
    print(e)

In [None]:
# This will give the status of the cluster . The cluster will eventually become available. 
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
# Get the endpoint of the cluster and update HOST in the dwh.cfg file
# The cluster should be available before running this code . 
# Get the role arn 
DB_ENDPOINT = myClusterProps['Endpoint']['Address']
DB_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DB_ENDPOINT :: ", DB_ENDPOINT)
print("DB_ROLE_ARN :: ", DB_ROLE_ARN)

In [None]:
# Open the tcp port to open connection to cluster
# The secrutiy group , sg-ac82dfd1, was created using the AWS console. This security group 
# will allow any incoming public traffic to connect to the Redshift cluster using TCP port 5439

In [None]:
# Confirm that you can connect to the cluster , dwhCluster 
conn_string="postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, DB_ENDPOINT, PORT, DB_NAME)
print(conn_string)
%sql $conn_string

In [None]:
# Before running the sql SELECT queries below run 'create_tables.py' first and then 'etl.py' to create all 
# staging, fact , and dimension tables and to then load data in these tables. 

In [None]:
%%sql
select * from staging_events
limit 5;

In [None]:
%%sql
select * from staging_songs
limit 5 ;

In [None]:
%%sql
INSERT INTO users
(
    SELECT DISTINCT userid as user_id, firstname as first_name, lastname as last_name, gender, level
    FROM staging_events
    WHERE (staging_events.userid IS NOT NULL AND staging_events.level IS NOT NULL)
);

In [None]:
%%sql
SELECT * FROM users
limit 5;

In [None]:
%%sql
INSERT INTO songs
(
    SELECT song_id, title, artist_id, year, duration
    FROM staging_songs
    WHERE (staging_songs.title IS NOT NULL AND staging_songs.artist_id IS NOT NULL AND
          staging_songs.year IS NOT NULL AND staging_songs.duration IS NOT NULL)
);

In [None]:
%%sql
SELECT * FROM songs
limit 5;

In [None]:
%%sql
INSERT INTO artists
(
    SELECT artist_id, artist_name AS name, artist_location AS location,
    artist_latitude AS latitude, artist_longtitude AS longtitude
    FROM staging_songs
    WHERE (staging_songs.artist_name IS NOT NULL AND staging_songs.artist_location IS NOT NULL)
);

In [None]:
%%sql
SELECT * FROM artists
limit 5;

In [None]:
%%sql
INSERT INTO time
(
  SELECT TIMESTAMP 'epoch' + ts/1000 * interval '1 second' AS start_time, 
  EXTRACT(HOUR FROM TIMESTAMP 'epoch' + ts/1000 * interval '1 second' ) AS hour,
  EXTRACT(DAY FROM TIMESTAMP 'epoch' + ts/1000 * interval '1 second' ) AS day,
  EXTRACT(WEEK FROM TIMESTAMP 'epoch' + ts/1000 * interval '1 second' ) AS week,
  EXTRACT(MONTH FROM TIMESTAMP 'epoch' + ts/1000 * interval '1 second' ) AS month,
  EXTRACT(YEAR FROM TIMESTAMP 'epoch' + ts/1000 * interval '1 second' ) AS year,
  EXTRACT(DOW FROM TIMESTAMP 'epoch' + ts/1000 * interval '1 second' ) AS weekday  
  FROM staging_events
  WHERE staging_events.ts IS NOT NULL  
);

In [None]:
%%sql
SELECT * FROM time
limit 5;

In [None]:
#%%sql
#INSERT INTO songplays
#(start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
#(
    #SELECT TIMESTAMP 'epoch' + se.ts/1000 * interval '1 second' AS start_time,
    #se.userid AS user_id, 
    #se.level AS level, 
    #songs.song_id AS song_id,
    #artists.artist_id AS artist_id, 
    #se.sessionid AS session_id, 
    #se.location AS location, 
    #se.useragent AS user_agent
    #FROM staging_events se, artists
    #LEFT JOIN songs ON songs.artist_id = artists.artist_id
    #WHERE (page = 'NextSong' AND start_time IS NOT NULL)
#);

In [None]:
%%sql
SELECT * FROM songplays
limit 5;

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

In [None]:
#check status of cluster
myClusterProps = redshift.describe_clusters(ClusterIdentifier=CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)