In [28]:
import configparser
import boto3
import pandas as pd
from io import BytesIO

In [29]:
# Grab our configurations
config_file='dwh.cfg'
config=configparser.ConfigParser()
config.read(config_file)
sec=config.sections()
sec

['CLUSTER', 'IAM_ROLE', 'S3', 'AWS']

In [30]:
print("Our Config Values: \nHOST: {}\nDB_NAME: {}\nDB_USER: {}\nDB_PASSWORD: {}\n"
        "DB_PORT: {}".format(*config['CLUSTER'].values()))

Our Config Values: 
HOST: dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com
DB_NAME: dwh
DB_USER: dwhuser
DB_PASSWORD: Passw0rd
DB_PORT: 5439


In [31]:
# View our Configurations

keys=[]
values=[]
for s in config.sections():
    keys.append(s)
    values.append('-'*30)

# # Method 1. Using Extend
#     keys.extend([*config[s].keys()])
#     values.extend([v for v in config[s].values()])

# Method 2. Using Map
    list(map(keys.append, [k for k in config[s].keys()]))
    any(map(values.append, [*config[s].values()]))

pd.DataFrame({'Key':keys, 'Value':values})

Unnamed: 0,Key,Value
0,CLUSTER,------------------------------
1,host,dwhcluster.cdpdzsz8lijw.us-west-2.redshift.ama...
2,db_name,dwh
3,db_user,dwhuser
4,db_password,Passw0rd
5,db_port,5439
6,dwh_cluster_identifier,dwhCluster
7,dwh_cluster_type,multi-node
8,dwh_num_nodes,4
9,dwh_node_type,dc2.large


# 1. What is our data?
1. connect to S3
1. view a file
1. Create IAM roles?
1. load data into staging tables
1. view data in pandas DF

In [16]:
s3 = boto3.resource('s3',
                    region_name=config['AWS']['REGION'],
                    aws_access_key_id=config['AWS']['KEY'],
                    aws_secret_access_key=config['AWS']['SECRET'])

In [17]:
[*config['S3'].values()]

["'s3://udacity-dend/log_data'",
 "'s3://udacity-dend/log_json_path.json'",
 "'s3://udacity-dend/song_data'",
 "'s3://udacity-demobucket-2/jsonpaths.json'",
 'demobucket-udacity-2022']

# Data Overview

## Data 1: log-data

In [70]:
objects=[]
for obj in s3.Bucket('udacity-dend').objects.filter(Prefix="log-data"):
    objects.append(obj)
pd.DataFrame({'Bucket':[o.bucket_name for o in objects], 
              'Key':[o.key for o in objects],
              'Size (MB)':[round(s3.ObjectSummary(o.bucket_name, o.key).size/1000000, 3) for o in objects]
              }).head()

Unnamed: 0,Bucket,Key,Size (MB)
0,udacity-dend,log-data/,0.0
1,udacity-dend,log-data/2018/11...,0.007
2,udacity-dend,log-data/2018/11...,0.084
3,udacity-dend,log-data/2018/11...,0.054
4,udacity-dend,log-data/2018/11...,0.086


In [100]:
# First object is a folder and second object is rather small
object_json=objects[20].get()['Body'].read()
log_data_df = pd.read_json(BytesIO(object_json), lines=True)
log_data_df

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,The Killers,Logged In,Jayden,M,32,Graves,246.80444,paid,"Marinette, WI-MI",PUT,NextSong,1.540664e+12,594,Read My Mind,200,1542672042796,"""Mozilla/5.0 (Wi...",25
1,Tamia,Logged In,Jayden,M,33,Graves,243.09506,paid,"Marinette, WI-MI",PUT,NextSong,1.540664e+12,594,Officially Missi...,200,1542672288796,"""Mozilla/5.0 (Wi...",25
2,Randy Crawford,Logged In,Jayden,M,34,Graves,270.75873,paid,"Marinette, WI-MI",PUT,NextSong,1.540664e+12,594,Almaz,200,1542672531796,"""Mozilla/5.0 (Wi...",25
3,Frumpies,Logged In,Jayden,M,35,Graves,134.47791,paid,"Marinette, WI-MI",PUT,NextSong,1.540664e+12,594,Fuck Kitty,200,1542672801796,"""Mozilla/5.0 (Wi...",25
4,Julia Fordham,Logged In,Jayden,M,36,Graves,279.50975,paid,"Marinette, WI-MI",PUT,NextSong,1.540664e+12,594,Girlfriend,200,1542672935796,"""Mozilla/5.0 (Wi...",25
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
374,The Darkness,Logged In,Chloe,F,61,Cuevas,206.81098,paid,San Francisco-Oa...,PUT,NextSong,1.540941e+12,758,One Way Ticket [...,200,1542752226796,Mozilla/5.0 (Win...,49
375,Florence + The M...,Logged In,Chloe,F,62,Cuevas,168.64608,paid,San Francisco-Oa...,PUT,NextSong,1.540941e+12,758,You've Got The Love,200,1542752432796,Mozilla/5.0 (Win...,49
376,,Logged In,Chloe,F,63,Cuevas,,paid,San Francisco-Oa...,PUT,Logout,1.540941e+12,758,,307,1542752433796,Mozilla/5.0 (Win...,49
377,,Logged Out,,,64,,,paid,,GET,Home,,758,,200,1542752925796,,


In [99]:
# Visually Inspect
unique_list=[]
for o in objects[1:-1]:
    object_json=o.get()['Body'].read()
    log_data_df = pd.read_json(BytesIO(object_json), lines=True)
    unique_list.extend(log_data_df['userId'].unique())
print([i for i in unique_list], end=" ")

[39, 8, 10, 26, 101, 101, 83, 66, 48, 86, 17, 15, 89, 80, 44, 88, 49, 100, 26, 61, 75, 50, 10, 12, 71, 54, 3, '', '53', '69', '62', '101', '95', '10', '15', '63', '49', '6', '52', '99', '43', '6', '25', '', '51', '26', '44', '16', '69', '80', '32', '10', '37', '28', '77', '78', '74', '100', '55', '33', '61', '73', '58', '83', '69', '44', '52', '94', '32', '43', '57', '49', '26', '', '42', '28', '60', '95', '10', '80', '84', '91', '24', '97', '101', '75', '73', '6', '35', '81', '25', '27', '29', '12', '100', '61', '55', '37', '32', '26', '80', '66', '12', '53', '88', '50', '33', '16', '51', '81', '34', '', '30', '2', '57', '97', '92', '97', '63', '55', '', '8', '9', '89', '49', '50', '101', '10', '14', '100', '80', '86', '26', '66', '15', '44', '32', '23', '29', '84', '2', '98', '33', '52', '54', '45', '20', '11', '', '85', '43', '66', '12', '29', '48', '80', '83', '72', '58', '69', '9', '26', '63', '101', '61', '81', '36', '7', '16', '24', '35', '33', '27', '42', '42', '49', '6', '24',

In [113]:
%sql SELECT * FROM users LIMIT 4;

 * postgresql://dwhuser:***@dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com:5439/dwh
4 rows affected.


user_id,first_name,last_name,gender,level
55,Martin,Johnson,M,free
80,Tegan,Levine,F,free
66,Kevin,Arellano,M,free
50,Ava,Robinson,F,free


In [72]:
log_data_df.columns

Index(['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName',
       'length', 'level', 'location', 'method', 'page', 'registration',
       'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId'],
      dtype='object')

In [104]:
type(log_data_df['userId'].iloc[0])

str

In [101]:
log_data_df['userId'].unique()

array(['25', '51', '94', '', '85', '32', '86', '15', '79', '6', '96',
       '60', '26', '58', '42', '54', '41', '83', '55', '44', '8', '35',
       '100', '81', '18', '49', '40', '63', '67', '10', '101', '69', '64'],
      dtype=object)

## Data 2: log_json_path.json

In [31]:
objects=[]
for obj in s3.Bucket('udacity-dend').objects.filter(Prefix="log_json_path.json"):
    objects.append(obj)
pd.DataFrame({'Bucket':[o.bucket_name for o in objects], 
              'Key':[o.key for o in objects],
              'Size (MB)':[round(s3.ObjectSummary(o.bucket_name, o.key).size/1000000, 3) for o in objects]
              })

Unnamed: 0,Bucket,Key,Size (MB)
0,udacity-dend,log_json_path.json,0.0


In [32]:
object_json=objects[0].get()['Body'].read()
object_json

b'{\n    "jsonpaths": [\n        "$[\'artist\']",\n        "$[\'auth\']",\n        "$[\'firstName\']",\n        "$[\'gender\']",\n        "$[\'itemInSession\']",\n        "$[\'lastName\']",\n        "$[\'length\']",\n        "$[\'level\']",\n        "$[\'location\']",\n        "$[\'method\']",\n        "$[\'page\']",\n        "$[\'registration\']",\n        "$[\'sessionId\']",\n        "$[\'song\']",\n        "$[\'status\']",\n        "$[\'ts\']",\n        "$[\'userAgent\']",\n        "$[\'userId\']"\n    ]\n}'

In [15]:
# There's only one object to look at, so the we'll look at the first
object_json=objects[0].get()['Body'].read()
pd.read_json(BytesIO(object_json))

Unnamed: 0,jsonpaths
0,$['artist']
1,$['auth']
2,$['firstName']
3,$['gender']
4,$['itemInSession']
5,$['lastName']
6,$['length']
7,$['level']
8,$['location']
9,$['method']


## Data 3. song_data

In [16]:
# About 1 minute to run
objects = []
for obj in s3.Bucket('udacity-dend').objects.filter(Prefix='song_data'):
    objects.append(obj)
objects

[s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAABD128F429CF47.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAACN128F9355673.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEA128F935A30D.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAED128E0783FAB.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEM128F93347B9.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEW128F42930C0.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAFD128F92F423A.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAGR128F425B14B.jso

In [17]:
# This can take 15 minutes
pd.DataFrame({
             'Bucket':[o.bucket_name for o in objects],
             'Key':[o.key for o in objects],
             'Size (MB)':[round(s3.ObjectSummary(o.bucket_name, o.key).size/1000000, 3) for o in objects]
            })

Unnamed: 0,Bucket,Key,Size (MB)
0,udacity-dend,song_data/,0.0
1,udacity-dend,song_data/A/A/A/TRAAAAK128F9318786.json,0.0
2,udacity-dend,song_data/A/A/A/TRAAAAV128F421A322.json,0.0
3,udacity-dend,song_data/A/A/A/TRAAABD128F429CF47.json,0.0
4,udacity-dend,song_data/A/A/A/TRAAACN128F9355673.json,0.0
...,...,...,...
14892,udacity-dend,song_data/A/Z/Z/TRAZZUM128F4288C2A.json,0.0
14893,udacity-dend,song_data/A/Z/Z/TRAZZVZ128F9326FE1.json,0.0
14894,udacity-dend,song_data/A/Z/Z/TRAZZWL128F4239037.json,0.0
14895,udacity-dend,song_data/A/Z/Z/TRAZZXF128F4247094.json,0.0


In [18]:
# First object is a folder, so we'll look at the second
object_json=objects[1].get()['Body'].read()
song_data_df = pd.read_json(BytesIO(object_json), lines=True)
song_data_df

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARJNIUY12298900C91,,,,Adelitas Way,213.9424,1,SOBLFFE12AF72AA5BA,Scream,2009


In [19]:
for i in BytesIO(objects[60].get()['Body'].read()):
    print(i)
    print('k')

b'{"artist_id":"ARNCNV91187FB4D552","artist_latitude":31.3893,"artist_location":"Israel","artist_longitude":35.36124,"artist_name":"Astral Projection","duration":444.83873,"num_songs":1,"song_id":"SOIGIVK12AB018E9AA","title":"Ionized","year":1996}'
k


In [20]:
# This can take 20 minutes
# Build a DataFrame of all our song data
multiple_song_df_list = []
for o in objects:
    object_json = o.get()['Body'].read()
    _df = pd.read_json(BytesIO(object_json), lines=True)
    multiple_song_df_list.append(_df)

In [21]:
multiple_songs_df = pd.concat(multiple_song_df_list, ignore_index=True)
multiple_songs_df.head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARJNIUY12298900C91,,,,Adelitas Way,213.9424,1,SOBLFFE12AF72AA5BA,Scream,2009
1,AR73AIO1187B9AD57B,37.77916,"San Francisco, CA",-122.42005,Western Addiction,118.07302,1,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,2005
2,ARMJAGH1187FB546F3,35.14968,"Memphis, TN",-90.04892,The Box Tops,148.03546,1,SOCIWDW12A8C13D406,Soul Deep,1969
3,AR9Q9YC1187FB5609B,,New Jersey,,Quest_ Pup_ Kevo,252.94322,1,SOFRDWL12A58A7CEF7,Hit Da Scene,0
4,ARSVTNL1187B992A91,51.50632,"London, England",-0.12714,Jonathan King,129.85424,1,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),2001


In [22]:
for c in song_data_df.columns: print(c)

artist_id
artist_latitude
artist_location
artist_longitude
artist_name
duration
num_songs
song_id
title
year


In [23]:
# multiple_song_df_list = []
# for num in range(0,5):
#     object_json=objects[num].get()['Body'].read()
#     _df = pd.read_json(BytesIO(object_json), lines=True)
#     print(_df)
#     multiple_song_df_list.append(_df)


# # Crude method to look at multiple data
# song_data_df = pd.read_json(BytesIO(objects[1].get()['Body'].read()), lines=True)
# for num in range(2,5):
#     object_json=objects[num].get()['Body'].read()
#     song_data_df = song_data_df.append(pd.read_json(BytesIO(object_json), lines=True))
# song_data_df

In [280]:
# We can print everthing but this can get very long
# for obj in s3.Bucket('udacity-dend').objects.all():
#     print(obj)
# sampleDbBucket = s3.Bucket('awssampledbuswest2')
# for obj in sampleDbBucket.objects.filter(Prefix='ssbgz'): print(obj)

Now that we know what the actual data look like, we can create our sql tables

## Create tables using create_tables.py

Need to:
1. Create IAM Role
1. Attach Policy
1. Create Redshift Cluster

# Create IAM Role in order to access S3

In [32]:
ec2 = boto3.resource('ec2',
                     region_name=config['AWS']['REGION'],
                     aws_access_key_id=config['AWS']['KEY'],
                     aws_secret_access_key=config['AWS']['secret'])
s3 = boto3.resource('s3',
                    region_name=config['AWS']['REGION'],
                    aws_access_key_id=config['AWS']['KEY'],
                    aws_secret_access_key=config['AWS']['SECRET'])
iam = boto3.client('iam',
                    region_name=config['AWS']['REGION'],
                    aws_access_key_id=config['AWS']['KEY'],
                    aws_secret_access_key=config['AWS']['SECRET'])
redshift = boto3.client('redshift',
                    region_name=config['AWS']['REGION'],
                    aws_access_key_id=config['AWS']['KEY'],
                    aws_secret_access_key=config['AWS']['SECRET'])

In [33]:
from botocore.exceptions import ClientError
import json
# Create IAM Role
try:
    print("Creating new IAM Role")
    dwhRole = iam.create_role(      # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html#IAM.Client.create_role
        Path='/',    # this is default anyways. Used for organizing roles: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html
        RoleName=config['CLUSTER']['DWH_IAM_ROLE_NAME'],
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
                            'Effect': 'Allow',
                            'Principal': {
                                            'Service': 'redshift.amazonaws.com'
                                          }
                            }],
             'Version': '2012-10-17'
             }
        )
    )
except Exception as e:
    print(e)

Creating new IAM Role
An error occurred (InvalidClientTokenId) when calling the CreateRole operation: The security token included in the request is invalid.


In [7]:
# Attach Policy
# We want to give this role S3 Read-only Access
print('Attaching Policy')
iam.attach_role_policy(RoleName=config['CLUSTER']['DWH_IAM_ROLE_NAME'],         # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html#IAM.Client.attach_role_policy
                       PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'   # Copied the Policy ARN from AWS 'Policies' where we selected AmazonS3ReadOnlyAccess
)['ResponseMetadata']

Attaching Policy


{'RequestId': '55d476bc-7f1f-47e8-ba4d-8fabd12da9f9',
 'HTTPStatusCode': 200,
 'HTTPHeaders': {'x-amzn-requestid': '55d476bc-7f1f-47e8-ba4d-8fabd12da9f9',
  'content-type': 'text/xml',
  'content-length': '212',
  'date': 'Sun, 10 Apr 2022 01:35:35 GMT'},
 'RetryAttempts': 0}

In [8]:
# Get IAM Role ARN
print("Getting IAM role Amazon Resource Number (ARN)")
roleArn = iam.get_role(RoleName=config['CLUSTER']['DWH_IAM_ROLE_NAME'])     # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html#IAM.Client.get_role
roleArn['Role']['Arn']

Getting IAM role Amazon Resource Number (ARN)


'arn:aws:iam::134015651665:role/dwhRole'

In [9]:
# Update our dwh.cfg to include our Role ARN
# This might need to be more explicitly done, but for now we can just update the variable
config['IAM_ROLE']['ARN'] = roleArn['Role']['Arn']

# Create Redshift Cluster

In [10]:
try:
    response = redshift.create_cluster(
        # Hardware
        ClusterType=config['CLUSTER']['DWH_CLUSTER_TYPE'],
        NodeType=config['CLUSTER']['DWH_NODE_TYPE'],
        NumberOfNodes=int(config['CLUSTER']['DWH_NUM_NODES']),

        # Identifiers & Credentials
        DBName=config['CLUSTER']['DB_NAME'],
        ClusterIdentifier=config['CLUSTER']['DWH_CLUSTER_IDENTIFIER'],
        MasterUsername=config['CLUSTER']['DB_USER'],
        MasterUserPassword=config['CLUSTER']['DB_PASSWORD'],

        # Roles
        IamRoles=[config['IAM_ROLE']['ARN']]
    )
except Exception as e:
    print(f"Error: {e}")

In [13]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', 20)
    keysToShow = ['ClusterIdentifier', 'NodeType', 'ClusterStatus', 'MasterUsername', 'DBName', 'Endpoint', 
                  'NumberOfNodes', 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=config['CLUSTER']['DWH_CLUSTER_IDENTIFIER'])['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,{'Address': 'dwh...
6,VpcId,vpc-0795b0f1cc2b...
7,NumberOfNodes,4


In [12]:
# look at tqdm and /r carriage return to improve the progress bar aspect
from time import sleep
while True:
    myClusterProps = redshift.describe_clusters(ClusterIdentifier=config['CLUSTER']['DWH_CLUSTER_IDENTIFIER'])['Clusters'][0]
    _df = prettyRedshiftProps(myClusterProps)
    done = _df[_df['Key']=='ClusterStatus']['Value'].values[0]
    if done == 'creating':
        print('.', end=" ")
        pass
    else:
        print('Done')
        break
    sleep(10)

. . . . . . . . . . . . . Done


# Make sure ClusterStatus says available before moving on

In [14]:
config['CLUSTER']['HOST']

'dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com'

In [15]:
config['CLUSTER']['HOST']=myClusterProps['Endpoint']['Address']
# myClusterProps['IamRoles'][0]['IamRoleArn']

In [16]:
myClusterProps['Endpoint']['Address'] == config['CLUSTER']['HOST']

True

## Open an incoming TCP port to access the cluster endpoint

In [17]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(config['CLUSTER']['DB_PORT']),
        ToPort=int(config['CLUSTER']['DB_PORT'])
    )
except Exception as e:
    print(f"Error: {e}")

ec2.SecurityGroup(id='sg-0953ef2f3d6ad00a9')
Error: An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


# Connect to cluster

In [21]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(
                config['CLUSTER']['DB_USER'],
                config['CLUSTER']['DB_PASSWORD'],
                config['CLUSTER']['HOST'],
                config['CLUSTER']['DB_PORT'],
                config['CLUSTER']['DB_NAME']
)
print(conn_string)

postgresql://dwhuser:Passw0rd@dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com:5439/dwh


In [22]:
%load_ext sql

In [23]:
%sql $conn_string

(psycopg2.OperationalError) connection to server at "dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com" (100.21.214.74), port 5439 failed: Connection timed out
	Is the server running on that host and accepting TCP/IP connections?

(Background on this error at: https://sqlalche.me/e/14/e3q8)
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


In [26]:
%sql SELECT * FROM pg_stat_activity;

 * postgresql://dwhuser:***@dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com:5439/dwh
8 rows affected.


datid,datname,procpid,usesysid,usename,current_query,query_start
101811,dev,1073832323,1,rdsdb,<command string not enabled>,
101811,dev,1073815879,1,rdsdb,<command string not enabled>,
101811,dev,1073955126,1,rdsdb,<command string not enabled>,
101811,dev,1073856681,1,rdsdb,<command string not enabled>,
101811,dev,1073774854,1,rdsdb,<command string not enabled>,
101811,dev,1073946748,1,rdsdb,<command string not enabled>,
101811,dev,1073742077,1,rdsdb,<command string not enabled>,
108036,dwh,1073799296,100,dwhuser,<command string not enabled>,


### Using psycopg2

In [19]:
print(config['CLUSTER']['DB_NAME'],
        config['CLUSTER']['DB_USER'],
        config['CLUSTER']['DB_PASSWORD'],
        config['CLUSTER']['HOST'],
        config['CLUSTER']['DB_PORT'])

dwh dwhuser Passw0rd dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com 5439


In [20]:
import psycopg2
conn = psycopg2.connect(dbname=config['CLUSTER']['DB_NAME'],
                        user=config['CLUSTER']['DB_USER'],
                        password=config['CLUSTER']['DB_PASSWORD'],
                        host=config['CLUSTER']['HOST'],
                        port=config['CLUSTER']['DB_PORT'],
                        connect_timeout=5
)

In [21]:
cur = conn.cursor()

In [22]:
cur.execute('SELECT * FROM pg_stat_activity')
conn.commit()

In [23]:
data_ =cur.fetchall()
print(data_)

[(108036, 'dwh', 1073799270, 100, 'dwhuser', '<command string not enabled>', None)]


Load json paths to S3 programatically

In [24]:
cool_bucket = s3.Bucket('arn:aws:s3:::demobucket-udacity-2022')

In [45]:
cool_bucket

s3.Bucket(name='arn:aws:s3:::demobucket-udacity-2022')

In [21]:
# import logging
# import boto3
# from botocore.exceptions import ClientError
# import os


# def upload_file(file_name, bucket, object_name=None):
#     """Upload a file to an S3 bucket

#     :param file_name: File to upload
#     :param bucket: Bucket to upload to
#     :param object_name: S3 object name. If not specified then file_name is used
#     :return: True if file was uploaded, else False
#     """

#     # If S3 object_name was not specified, use file_name
#     if object_name is None:
#         object_name = os.path.basename(file_name)

#     # Upload the file
#     s3_client = boto3.client('s3')
#     try:
#         response = s3_client.upload_file(file_name, bucket, object_name)
#     except ClientError as e:
#         logging.error(e)
#         return False
#     return True

# s3_upload = boto3.client('s3',
#                         region_name=config['AWS']['REGION'],
#                         aws_access_key_id=config['AWS']['KEY'],
#                         aws_secret_access_key=config['AWS']['SECRET'])
# filename = '/home/george/src/cloud_data_warehouses/data_warehouse/jsonpaths.json'
# object_name = os.path.basename(filename)
# with open(filename, 'rb') as f:
#     s3_upload.upload_fileobj(f, 'demobucket-udacity-2022', object_name)

# Check for Errors

In [14]:
# Pull error messages
conn_string="postgresql://{}:{}@{}:{}/{}".format(
                config['CLUSTER']['DB_USER'],
                config['CLUSTER']['DB_PASSWORD'],
                config['CLUSTER']['HOST'],
                config['CLUSTER']['DB_PORT'],
                config['CLUSTER']['DB_NAME']
)
print(conn_string)

postgresql://dwhuser:Passw0rd@'dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com':5439/dwh


In [17]:
%load_ext sql

In [18]:
%sql $conn_string

In [69]:
%%sql
SELECT colname, type, raw_field_value, err_reason FROM 
stl_load_errors

ORDER BY starttime DESC
LIMIT 4

 * postgresql://dwhuser:***@dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com:5439/dwh
4 rows affected.


colname,type,raw_field_value,err_reason
,,,Extra column(s) found
,,,Extra column(s) found
,,,Extra column(s) found
,,,Extra column(s) found


In [30]:
%%sql
SELECT starttime, filename, colname, type, position, raw_line, raw_field_value, err_code, err_reason FROM 
stl_load_errors
ORDER BY starttime DESC
LIMIT 4

 * postgresql://dwhuser:***@dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com:5439/dwh
4 rows affected.


starttime,filename,colname,type,position,raw_line,raw_field_value,err_code,err_reason
2022-04-07 17:10:38.452182,s3://udacity-dend/song_data/A/A/A/TRAAAHJ128F931194C.json,,,214,"{""artist_id"":""ARSZ7L31187FB4E610"",""artist_latitude"":39.74001,""artist_location"":""Denver, CO"",""artist_longitude"":-104.99226,""artist_name"":""Devotchka"",""duration"":337.81506,""num_songs"":1,""song_id"":""SORRNOC12AB017F52B"",""title"":""The Last Beat Of My Heart (b-side)"",""year"":2004}",,1202,Extra column(s) found
2022-04-07 17:10:38.452182,s3://udacity-dend/song_data/A/A/A/TRAAABD128F429CF47.json,,,217,"{""artist_id"":""ARMJAGH1187FB546F3"",""artist_latitude"":35.14968,""artist_location"":""Memphis, TN"",""artist_longitude"":-90.04892,""artist_name"":""The Box Tops"",""duration"":148.03546,""num_songs"":1,""song_id"":""SOCIWDW12A8C13D406"",""title"":""Soul Deep"",""year"":1969}",,1202,Extra column(s) found
2022-04-07 17:10:38.452182,s3://udacity-dend/song_data/A/A/A/TRAAAEA128F935A30D.json,,,221,"{""artist_id"":""ARSVTNL1187B992A91"",""artist_latitude"":51.50632,""artist_location"":""London, England"",""artist_longitude"":-0.12714,""artist_name"":""Jonathan King"",""duration"":129.85424,""num_songs"":1,""song_id"":""SOEKAZG12AB018837E"",""title"":""I'll Slap Your Face (Entertainment USA Theme)"",""year"":2001}",,1202,Extra column(s) found
2022-04-07 17:10:38.452182,s3://udacity-dend/song_data/A/A/A/TRAAAAV128F421A322.json,,,229,"{""artist_id"":""AR73AIO1187B9AD57B"",""artist_latitude"":37.77916,""artist_location"":""San Francisco, CA"",""artist_longitude"":-122.42005,""artist_name"":""Western Addiction"",""duration"":118.07302,""num_songs"":1,""song_id"":""SOQPWCR12A6D4FB2A3"",""title"":""A Poor Recipe For Civic Cohesion"",""year"":2005}",,1202,Extra column(s) found


### Convert ts into a real timestamp

In [67]:
%sql SELECT timestamp 'epoch' + cast(1541121934796 AS bigint)/1000 * interval '1 second' AS timestamp

 * postgresql://dwhuser:***@dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


timestamp
2018-11-02 01:25:34


In [60]:
%%sql SELECT timestamp 'epoch' + cast(timestamp_col AS bigint)/1000 * interval '1 second' AS epoch_to_timestamp
FROM (SELECT 1541121934796 AS timestamp_col) AS a;

 * postgresql://dwhuser:***@dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


epoch_to_timestamp
2018-11-02 01:25:34


In [19]:
%%sql
SELECT *
FROM staging_songs
LIMIT 5

 * postgresql://dwhuser:***@dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com:5439/dwh
5 rows affected.


artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
AR5KOSW1187FB35FF4,49.80388,Dubai UAE,,Elena,269.58322,1,SOZCTXZ12AB0182364,Setanta matins,0
AR7RUDQ1187B98C147,,,,Alabama Thunderpussy,274.49424,1,SOBKPPQ12A58A78407,Hunting By Echo,2001
AR3GZLR1187FB3D817,,,,Biffy Clyro,255.16363,1,SOQEYZY12A58A77AF2,Folding Stars,2007
ARMTTNS1187FB39C92,,,,Saratoga,573.04771,1,SOQPIFV12AB017C713,Semillas de odio,0
ARSRAKS11F4C83FDB5,,,,Santa Claws and the Naughty But Nice Orchestra,311.82322,1,SOFZCOT12A8C1403F4,Master of Puppets,2007


In [20]:
%%sql
SELECT 
    (SELECT count(*)
    FROM songplay) AS songplay,
    (SELECT count(*) 
    FROM time) AS time,
    (SELECT count(*)
    FROM artist) AS artist_,
    (SELECT count(*)
    FROM users) AS users_,
    (SELECT count(*)
    FROM song) AS song_

 * postgresql://dwhuser:***@dwhcluster.cdpdzsz8lijw.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


songplay,time,artist_,users_,song_
119,119,14896,8056,14896


# Make sure you're done before continuing

# Cleanup your resources and shutdown Cluster

In [None]:
redshift.delete_cluster(ClusterIdentifier=config['CLUSTER']['DWH_CLUSTER_IDENTIFIER'], 
                        SkipFinalClusterSnapshot=True)

In [98]:
deleted_cluster_props = redshift.describe_clusters(ClusterIdentifier=config['CLUSTER']['DWH_CLUSTER_IDENTIFIER'])['Clusters'][0]
prettyRedshiftProps(deleted_cluster_props)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,deleting
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,{'Address': 'dwh...
6,VpcId,vpc-0795b0f1cc2b...
7,NumberOfNodes,4


# Wait until cluster is actually deleted before moving on

### Detatch Policy

In [100]:
iam.detach_role_policy(RoleName=config['CLUSTER']['DWH_IAM_ROLE_NAME'],
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")

{'ResponseMetadata': {'RequestId': '2840a3db-f14e-4d9a-baec-c8a8450a1ee4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '2840a3db-f14e-4d9a-baec-c8a8450a1ee4',
   'content-type': 'text/xml',
   'content-length': '212',
   'date': 'Sat, 09 Apr 2022 00:52:13 GMT'},
  'RetryAttempts': 0}}

### Delete Role

In [101]:
iam.delete_role(RoleName=config['CLUSTER']['DWH_IAM_ROLE_NAME'])

{'ResponseMetadata': {'RequestId': '7914f706-1667-4345-a09a-6edf9e34fc69',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '7914f706-1667-4345-a09a-6edf9e34fc69',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Sat, 09 Apr 2022 00:52:18 GMT'},
  'RetryAttempts': 0}}