In [1]:
import boto3
from botocore.exceptions import ClientError
import configparser
import json

from boto3_utils import (
    get_bucket,
    get_subbucket,
)

In [2]:
config = configparser.ConfigParser()
with open('dwh.cfg') as inp:
    config.read_file(inp)

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

HOST                   = config.get('CLUSTER', 'HOST')
CLUSTER_NAME           = config.get('CLUSTER', 'CLUSTER_NAME')
DWH_CLUSTER_TYPE       = config.get('CLUSTER', 'DWH_CLUSTER_TYPE')
DWH_NUM_NODES          = config.get('CLUSTER', 'DWH_NUM_NODES')
DWH_NODE_TYPE          = config.get('CLUSTER', 'DWH_NODE_TYPE')

DB_NAME                = config.get('CLUSTER', 'DB_NAME')
DB_USER                = config.get('CLUSTER', 'DB_USER')
DB_PASSWORD            = config.get('CLUSTER', 'DB_PASSWORD')
DB_PORT                = config.get('CLUSTER', 'DB_PORT')

ARN                    = config.get('IAM_ROLE', 'ARN')
IAM_ROLE_NAME          = config.get('IAM_ROLE', 'IAM_ROLE_NAME')

S3_LOG_DATA            = config.get('S3', 'LOG_DATA')
S3_LOG_JSONPATH        = config.get('S3', 'LOG_JSONPATH')
S3_SONG_DATA           = config.get('S3', 'SONG_DATA')

In [3]:
common_kwargs = {
    'region_name': 'us-west-2',
    'aws_access_key_id': KEY,
    'aws_secret_access_key': SECRET,
}

ec2 = boto3.resource(
    'ec2',
    **common_kwargs,
)

s3_resource = boto3.resource(
    's3',
    **common_kwargs,
)

iam = boto3.client(
    'iam',
    **common_kwargs,
)

redshift = boto3.client(
    'redshift',
    **common_kwargs,
)

## Let's first check out the log data

In [4]:
bucket = s3_resource.Bucket(get_bucket(S3_LOG_DATA))
for obj in bucket.objects.filter(Prefix=get_subbucket(S3_LOG_DATA)):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-01-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-02-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-03-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-04-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-05-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-06-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-07-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-08-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-09-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-10-events.json')
s3.ObjectSummary(b

In [5]:
json_filename = obj.key.split('/')[-1]
s3_resource.meta.client.download_file(
    obj.bucket_name,
    obj.key,
    json_filename,
)

with open(json_filename) as inp:
    for line in inp.readlines():
        log_json = json.loads(line)
        break
    
log_json

{'artist': 'Stephen Lynch',
 'auth': 'Logged In',
 'firstName': 'Jayden',
 'gender': 'M',
 'itemInSession': 0,
 'lastName': 'Bell',
 'length': 182.85669,
 'level': 'free',
 'location': 'Dallas-Fort Worth-Arlington, TX',
 'method': 'PUT',
 'page': 'NextSong',
 'registration': 1540991795796.0,
 'sessionId': 829,
 'song': "Jim Henson's Dead",
 'status': 200,
 'ts': 1543537327796,
 'userAgent': 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64; Trident/6.0)',
 'userId': '91'}

## Now, let's check out the song data

In [6]:
bucket = s3_resource.Bucket(get_bucket(S3_SONG_DATA))
for i, obj in enumerate(bucket.objects.filter(Prefix=get_subbucket(S3_SONG_DATA))):
    print(obj)
    if i == 10:
        break

s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAABD128F429CF47.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAACN128F9355673.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEA128F935A30D.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAED128E0783FAB.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEM128F93347B9.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEW128F42930C0.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAFD128F92F423A.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAGR128F425B14B.json')


In [7]:
json_filename = obj.key.split('/')[-1]
s3_resource.meta.client.download_file(
    obj.bucket_name,
    obj.key,
    json_filename,
)

with open(json_filename) as inp:
    for line in inp.readlines():
        log_json = json.loads(line)
        break
    
log_json

{'artist_id': 'ARGE7G11187FB37E05',
 'artist_latitude': None,
 'artist_location': 'Brooklyn, NY',
 'artist_longitude': None,
 'artist_name': 'Cyndi Lauper',
 'duration': 240.63955,
 'num_songs': 1,
 'song_id': 'SONRWUU12AF72A4283',
 'title': 'Into The Nightlife',
 'year': 2008}

## Finally, let's see what's in the log data json path

In [8]:
bucket = s3_resource.Bucket(get_bucket(S3_LOG_JSONPATH))
for obj in bucket.objects.filter(Prefix=get_subbucket(S3_LOG_JSONPATH)):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_json_path.json')


In [9]:
S3_LOG_JSONPATH

's3://udacity-dend/log_json_path.json'

In [10]:
json_filename = S3_LOG_JSONPATH.split('/')[-1]
s3_resource.meta.client.download_file(
    get_bucket(S3_LOG_JSONPATH),
    get_subbucket(S3_LOG_JSONPATH),
    json_filename,
)

In [11]:
with open(json_filename) as inp:
    log_json = json.load(inp)
    
log_json

{'jsonpaths': ["$['artist']",
  "$['auth']",
  "$['firstName']",
  "$['gender']",
  "$['itemInSession']",
  "$['lastName']",
  "$['length']",
  "$['level']",
  "$['location']",
  "$['method']",
  "$['page']",
  "$['registration']",
  "$['sessionId']",
  "$['song']",
  "$['status']",
  "$['ts']",
  "$['userAgent']",
  "$['userId']"]}

# Creating an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)


In [12]:
if ARN:
    roleArn = iam.get_role(
        RoleName=IAM_ROLE_NAME
    )['Role']['Arn']
else:    
    iam.create_role(
        Path='/',
        RoleName=IAM_ROLE_NAME,
        Description = 'Allows Redshift clusters to call AWS services on your behalf.',
        AssumeRolePolicyDocument=json.dumps({
            'Statement': [{
                'Action': 'sts:AssumeRole',
                'Effect': 'Allow',
                'Principal': {
                    'Service': 'redshift.amazonaws.com'
                }
            }],
            'Version': '2012-10-17'
        })
    )

    iam.attach_role_policy(
        RoleName=IAM_ROLE_NAME,
        PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
    )['ResponseMetadata']['HTTPStatusCode']
    
    roleArn = iam.get_role(RoleName=IAM_ROLE_NAME)['Role']['Arn']

# Creating a Redshift cluster

In [13]:
try:
    response = redshift.create_cluster(        
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DB_NAME,
        ClusterIdentifier=CLUSTER_NAME,
        MasterUsername=DB_USER,
        MasterUserPassword=DB_PASSWORD,

        # Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except ClientError as e:
    print(e)

In [14]:
myClusterProps = redshift.describe_clusters(
    ClusterIdentifier=CLUSTER_NAME
)['Clusters'][0]

print(myClusterProps)

{'ClusterIdentifier': 'dwhcluster', 'NodeType': 'dc2.large', 'ClusterStatus': 'available', 'ClusterAvailabilityStatus': 'Available', 'MasterUsername': 'dwhuser', 'DBName': 'dwh', 'Endpoint': {'Address': 'dwhcluster.c2ff0nc4tz4r.us-west-2.redshift.amazonaws.com', 'Port': 5439}, 'ClusterCreateTime': datetime.datetime(2020, 4, 28, 14, 24, 44, 7000, tzinfo=tzutc()), 'AutomatedSnapshotRetentionPeriod': 1, 'ManualSnapshotRetentionPeriod': -1, 'ClusterSecurityGroups': [], 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-6e97ea35', 'Status': 'active'}], 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0', 'ParameterApplyStatus': 'in-sync'}], 'ClusterSubnetGroupName': 'default', 'VpcId': 'vpc-ba1981c2', 'AvailabilityZone': 'us-west-2b', 'PreferredMaintenanceWindow': 'mon:09:30-mon:10:00', 'PendingModifiedValues': {}, 'ClusterVersion': '1.0', 'AllowVersionUpgrade': True, 'NumberOfNodes': 4, 'PubliclyAccessible': True, 'Encrypted': False, 'ClusterPublicKey': 'ssh-rsa AAAAB3Nz

# Opening an incoming TCP port to access the cluster endpoint

In [15]:
vpc = ec2.Vpc(id=myClusterProps['VpcId'])
defaultSg = list(vpc.security_groups.all())[0]
print(defaultSg)
try:
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DB_PORT),
        ToPort=int(DB_PORT)
    )
except ClientError as e:
    print(e)

ec2.SecurityGroup(id='sg-032764bbae8e21bf0')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


### Connect to the cluster

In [16]:
if not HOST:
    HOST = myClusterProps['Endpoint']['Address']
    
HOST

'dwhcluster.c2ff0nc4tz4r.us-west-2.redshift.amazonaws.com'

In [17]:
%load_ext sql

In [29]:
conn_string = f'postgresql://{DB_USER}:{DB_PASSWORD}@{HOST}:{DB_PORT}/{DB_NAME}'
# print(conn_string)
%sql $conn_string



'Connected: dwhuser@dwh'

This is an example event JSON:
```json
{'artist': 'Stephen Lynch',
 'auth': 'Logged In',
 'firstName': 'Jayden',
 'gender': 'M',
 'itemInSession': 0,
 'lastName': 'Bell',
 'length': 182.85669,
 'level': 'free',
 'location': 'Dallas-Fort Worth-Arlington, TX',
 'method': 'PUT',
 'page': 'NextSong',
 'registration': 1540991795796.0,
 'sessionId': 829,
 'song': "Jim Henson's Dead",
 'status': 200,
 'ts': 1543537327796,
 'userAgent': 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64; Trident/6.0)',
 'userId': '91'}
```
and let's remember:
```json
{'jsonpaths': ["$['artist']",
  "$['auth']",
  "$['firstName']",
  "$['gender']",
  "$['itemInSession']",
  "$['lastName']",
  "$['length']",
  "$['level']",
  "$['location']",
  "$['method']",
  "$['page']",
  "$['registration']",
  "$['sessionId']",
  "$['song']",
  "$['status']",
  "$['ts']",
  "$['userAgent']",
  "$['userId']"]}
```

In [30]:
%%sql

DROP TABLE IF EXISTS events_staging;

CREATE TABLE events_staging (
    artist          TEXT,
    auth            TEXT        NOT NULL,
    firstName       TEXT,
    gender          CHAR(1),
    itemInSession   INTEGER     NOT NULL,
    lastName        TEXT,
    length          NUMERIC,
    level           TEXT        NOT NULL,
    location        TEXT,
    method          TEXT        NOT NULL,
    page            TEXT        NOT NULL,
    registration    NUMERIC,
    sessionId       INTEGER     NOT NULL,
    song            VARCHAR,
    status          INTEGER     NOT NULL,
    ts              NUMERIC     NOT NULL,
    userAgent       VARCHAR,
    userId          INTEGER
);


 * postgresql://dwhuser:***@dwhcluster.c2ff0nc4tz4r.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

OK, now let's remind ourselves how a song JSON looks like:
```json
{'artist_id': 'ARGE7G11187FB37E05',
 'artist_latitude': None,
 'artist_location': 'Brooklyn, NY',
 'artist_longitude': None,
 'artist_name': 'Cyndi Lauper',
 'duration': 240.63955,
 'num_songs': 1,
 'song_id': 'SONRWUU12AF72A4283',
 'title': 'Into The Nightlife',
 'year': 2008}
```

In [31]:
%%sql

DROP TABLE IF EXISTS songs_staging;

CREATE TABLE songs_staging (
    artist_id        TEXT      NOT NULL,
    artist_latitude  TEXT,
    artist_longitude TEXT,
    artist_location  TEXT,
    artist_name      TEXT      NOT NULL,
    duration         NUMERIC   NOT NULL,
    num_songs        INTEGER   NOT NULL,
    song_id          TEXT      NOT NULL,
    title            TEXT      NOT NULL,
    year             INTEGER   NOT NULL
);


 * postgresql://dwhuser:***@dwhcluster.c2ff0nc4tz4r.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [32]:
%%sql

DROP TABLE IF EXISTS songplays CASCADE;

CREATE TABLE songplays (
  songplay_id    TEXT            PRIMARY KEY,
  start_time     TIMESTAMP       NOT NULL,
  user_id        INTEGER         NOT NULL,
  level          TEXT            NOT NULL,
  song_id        TEXT,
  artist_id      TEXT,
  session_id     INTEGER         NOT NULL,
  location       TEXT,
  user_agent     TEXT            NOT NULL
);



 * postgresql://dwhuser:***@dwhcluster.c2ff0nc4tz4r.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [33]:
%%sql

DROP TABLE IF EXISTS users;

CREATE TABLE users (
  user_id        INTEGER                            PRIMARY KEY,
  first_name     TEXT            NOT NULL,
  last_name      TEXT            NOT NULL,
  gender         CHAR(1)         NOT NULL,
  level          TEXT            NOT NULL
);


 * postgresql://dwhuser:***@dwhcluster.c2ff0nc4tz4r.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [34]:
%%sql

DROP TABLE IF EXISTS songs;

CREATE TABLE songs (
  song_id        TEXT                               PRIMARY KEY,
  title          TEXT            NOT NULL,
  artist_id      TEXT            NOT NULL,
  year           INTEGER         NOT NULL,
  duration       NUMERIC         NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.c2ff0nc4tz4r.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [35]:
%sql SELECT * from songs LIMIT 2;

 * postgresql://dwhuser:***@dwhcluster.c2ff0nc4tz4r.us-west-2.redshift.amazonaws.com:5439/dwh
0 rows affected.


song_id,title,artist_id,year,duration


In [36]:
%%sql

DROP TABLE IF EXISTS time;

CREATE TABLE time (
  start_time    TIMESTAMP                           PRIMARY KEY,
  hour          INTEGER          NOT NULL,
  day           INTEGER          NOT NULL,
  week          INTEGER          NOT NULL,
  month         INTEGER          NOT NULL,
  year          INTEGER          NOT NULL,
  weekday       INTEGER          NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.c2ff0nc4tz4r.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [37]:
%sql SELECT * FROM time LIMIT 2;

 * postgresql://dwhuser:***@dwhcluster.c2ff0nc4tz4r.us-west-2.redshift.amazonaws.com:5439/dwh
0 rows affected.


start_time,hour,day,week,month,year,weekday


In [38]:
%%sql

DROP TABLE IF EXISTS artists;

CREATE TABLE artists (
  artist_id       TEXT                                PRIMARY KEY,
  name            TEXT             NOT NULL,
  location        TEXT,
  latitude        NUMERIC,
  longitude       NUMERIC
);


 * postgresql://dwhuser:***@dwhcluster.c2ff0nc4tz4r.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

# Delete the Redshift cluster and IAM role

In [39]:
# redshift.delete_cluster(
#     ClusterIdentifier=CLUSTER_NAME,
#     SkipFinalClusterSnapshot=True,
# )
# 
# iam.detach_role_policy(
#     RoleName=IAM_ROLE_NAME,
#     PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess',
# )
# iam.delete_role(RoleName=IAM_ROLE_NAME)