## Sparkify Data Warehouse

### Create a data warehouse for the million song dataset and Eventsim

This notebook implements the ETL needed for the Sparkify Cloud Data Warehouse.  Sparkify is a ficticious music streaming app based on data from the following sources.
 * [Million Song Dataset](http://millionsongdataset.com/)
 * [EventSim](https://github.com/Interana/eventsim)
 
The cloud data warehouse is hosted on AWS.  JSON log files from the EventSim are stored in an AWS S3 bucket hosted by Udacity.
This project performs ETL processes to place data in a star schema in AWS Redshift for efficient OLAP use.

In [1]:
# Imports
import boto3
import configparser
import matplotlib.pyplot as plt
import pandas as pd
from time import time
import json

#### Read Configuration File

The configuration file stores some database meta data.  Here we read it in to python variables

In [2]:
# Read Config file
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
KEY=config.get('IAM_ROLE','key')
SECRET= config.get('IAM_ROLE','secret')

DWH_CLUSTER_TYPE=config.get("SETUP", "DWH_CLUSTER_TYPE")
DWH_NUM_NODES=config.get("SETUP", "DWH_NUM_NODES")
DWH_NODE_TYPE=config.get("SETUP", "DWH_NODE_TYPE")
DWH_IAM_ROLE_NAME=config.get("SETUP", "DWH_IAM_ROLE_NAME")
DWH_CLUSTER_IDENTIFIER=config.get("SETUP", "DWH_CLUSTER_IDENTIFIER")
DWH_DB=config.get("CLUSTER", "DB_NAME")
DWH_DB_USER=config.get("CLUSTER", "DB_USER")
DWH_DB_PASSWORD=config.get("CLUSTER", "DB_PASSWORD")
DWH_PORT=config.get("CLUSTER", "DB_PORT")
DB_HOST = config.get("CLUSTER", "HOST")

#### Create AWS Resources

We need an EC2 server for the ETL, an S3 resource to read the log_data from the S3 bucket's json files, and a redshift cluster to host the DWH.

Of course, this also requires an IAM role from AWS

In [3]:
ec2 = boto3.resource(
    "ec2",
    region_name="us-west-2",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

s3 = boto3.resource(
    "s3",
    region_name="us-west-2",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

iam = boto3.client(
    "iam",
    region_name="us-west-2",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

redshift = boto3.client(
    "redshift",
    region_name="us-west-2",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

#### JSON Log Bucket

Let's check out the S3 Bucket and confirm the event logs, song files, and jsonpath files are available

In [4]:
log_bucket = s3.Bucket("udacity-dend")
a_few_logs = [obj for obj in log_bucket.objects.filter(Prefix="log-data/")][0:5]
print("Event Log Files:")
for el in a_few_logs:
    print(el)
a_few_songs = [obj for obj in log_bucket.objects.filter(Prefix="song_data/A/A/A")][0:5]
print("Song Log Files:")
for sl in a_few_songs:
    print(sl)
    
log_bucket.download_file("log_json_path.json", "udacity_jsonpaths_file.json")

Event Log Files:
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-01-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-02-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-03-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-04-events.json')
Song Log Files:
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAABD128F429CF47.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAACN128F9355673.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEA128F935A30D.json')


#### Get ARN Role

Here we can create a new iam role for interacting with redshift, attach it to our boto3 `iam` client, and get the ARN for this role. 

In [5]:
try:
    print('1.1 Creating a new IAM Role')
    dwhRole = iam.create_role(
    Path='/',
    RoleName=DWH_IAM_ROLE_NAME,
    AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'}),
    Description='Allows Redshift clusters to call AWS services on your behalf.',
)
    

except Exception as e:
    print(e)
    

# Attach The role to the iam object
iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']


1.1 Creating a new IAM Role


#### Launch Redshift cluster

Now we are ready to launch the redshift cluster

In [6]:
try:
    response = redshift.create_cluster(        
        #Hardware
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
         
    )
except Exception as e:
    print(e)

In [15]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwhcluster
5,Endpoint,"{'Address': 'dwhcluster.ck2nmy7mqyc7.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-500f6f28
7,NumberOfNodes,4


! Don't run until cluster is available!

In [None]:
#DWH_ENDPOINT = "dwhcluster.ck2nmy7mqyc7.us-west-2.redshift.amazonaws.com:5439/dwh"
#DWH_HOST = "dwhcluster.ck2nmy7mqyc7.us-west-2.redshift.amazonaws.com"
# Following line of code not working, get this manually from AWS console
# DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
#DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
#print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
#print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

#### Next it is time to open a TCP port

In [9]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName='default',
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-d80b6284')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


#### Test the DB Connection

In [10]:
%load_ext sql

In [11]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DB_HOST, DWH_PORT,DWH_DB)
%sql $conn_string

'Connected: dwhuser@dwhcluster'

### CREATE TABLES

We'll create the tables needed on the redshift cluster by executing the `create_tables.py` script

In [12]:
!python create_tables.py

In [13]:
# after `python create_tables.py`
%sql SELECT * FROM songplay

 * postgresql://dwhuser:***@dwhcluster.ck2nmy7mqyc7.us-west-2.redshift.amazonaws.com:5439/dwhcluster
0 rows affected.


songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location


## ETL
We'll execute the `etl.py` script to copy json files from s3 to redshift
and transform log data to star schema

In [17]:
!python etl.py


COPY staging_events
FROM 's3://udacity-dend/log_data'
IAM_ROLE AS 'aws_iam_role=arn:aws:iam::747157885091:role/dwhRole'
JSON 's3://udacity-dend/log_json_path.json'
REGION 'us-west-2'
;

Traceback (most recent call last):
  File "etl.py", line 36, in <module>
    main()
  File "etl.py", line 29, in main
    load_staging_tables(cur, conn)
  File "etl.py", line 9, in load_staging_tables
    cur.execute(query)
psycopg2.InternalError: User arn:aws:redshift:us-west-2:747157885091:dbuser:dwhcluster/dwhuser is not authorized to assume IAM Role aws_iam_role=arn:aws:iam::747157885091:role/dwhRole
DETAIL:  
  -----------------------------------------------
  error:  User arn:aws:redshift:us-west-2:747157885091:dbuser:dwhcluster/dwhuser is not authorized to assume IAM Role aws_iam_role=arn:aws:iam::747157885091:role/dwhRole
  code:      8001
  context:   IAM Role=aws_iam_role=arn:aws:iam::747157885091:role/dwhRole
  query:     506
  location:  xen_aws_credentials_mgr.cpp:324
  process:   padbmast

In [None]:
%sql SELECT * FROM staging_events LIMIT 20;
# misc sql queries, test drop/create tables, copy from JSON, INSERT INTO queries, etc

## Back to the Notebook

One all the ETL is completed, we can shut down the redshift cluster to save costs

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)