In [4]:
import pandas as pd
import boto3
import json
import psycopg2

In [9]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))


DWH_CLUSTER_TYPE       = config.get("CLUSTER","CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("CLUSTER","NUM_NODES")
DWH_NODE_TYPE          = config.get("CLUSTER","NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("CLUSTER","CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("CLUSTER","DB_NAME")
DWH_DB_USER            = config.get("CLUSTER","DB_USER")
DWH_DB_PASSWORD        = config.get("CLUSTER","DB_PASSWORD")
DWH_HOST               = config.get('CLUSTER','HOST')
DWH_PORT               = config.get("CLUSTER","DB_PORT")

DWH_IAM_ROLE_NAME      = config.get("CLUSTER", "IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,2
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhcluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


In [3]:
KEY = 'AKIA533WDJEZCZSFX66H'
SECRET = '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'
s3 = boto3.resource('s3', region_name="us-west-2", aws_access_key_id=KEY, aws_secret_access_key=SECRET)

iam = boto3.client('iam',aws_access_key_id=KEY,aws_secret_access_key=SECRET,region_name='us-west-2')

redshift = boto3.client('redshift',region_name="us-west-2",aws_access_key_id=KEY,aws_secret_access_key=SECRET)

dwhRole = iam.create_role(
    Path='/',
    RoleName=DWH_IAM_ROLE_NAME,
    Description = "Allows Redshift clusters to call AWS services on your behalf.",
    AssumeRolePolicyDocument=json.dumps(
        {'Statement': [{'Action': 'sts:AssumeRole',
           'Effect': 'Allow',
           'Principal': {'Service': 'redshift.amazonaws.com'}}],
         'Version': '2012-10-17'})
)

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.3 Get the IAM role ARN
arn:aws:iam::953193548082:role/dwhRole


In [4]:
response = redshift.create_cluster(        
    ClusterType=DWH_CLUSTER_TYPE,
    NodeType=DWH_NODE_TYPE,
    NumberOfNodes=int(DWH_NUM_NODES),

    #Identifiers & Credentials
    DBName=DWH_DB,
    ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
    MasterUsername=DWH_DB_USER,
    MasterUserPassword=DWH_DB_PASSWORD,

    #Roles (for s3 access)
    IamRoles=[roleArn]  
)

In [7]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.ct9l8tmdfqok.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-40a5d838
7,NumberOfNodes,2


In [6]:
from sql_queries import copy_table_queries, insert_table_queries

def load_staging_tables(cur, conn):
    for query in copy_table_queries:
        cur.execute(query)
        conn.commit()


def insert_tables(cur, conn):
    for query in insert_table_queries:
        print(query)
        cur.execute(query)
        conn.commit()

In [10]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(config['CLUSTER']['HOST'], config['CLUSTER']['DB_NAME'], config['CLUSTER']['DB_USER'], config['CLUSTER']['DB_PASSWORD'], config['CLUSTER']['DB_PORT']))
cur = conn.cursor()

load_staging_tables(cur, conn)

In [35]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(config['CLUSTER']['HOST'], config['CLUSTER']['DB_NAME'], config['CLUSTER']['DB_USER'], config['CLUSTER']['DB_PASSWORD'], config['CLUSTER']['DB_PORT']))
cur = conn.cursor()
query = """create table if not exists songplays (
    songplay_id bigint identity(0,1) primary key, 
    start_time bigint, 
    user_id smallint, 
    level text, 
    song_id text, 
    artist_id text, 
    session_id integer, 
    location text,
    user_agent text
)"""
cur.execute(query)
conn.commit()

In [20]:

user_table_insert = ("""
    insert into users (user_id, first_name, last_name, gender, level)
    (
        select user_id, first_name, last_name, gender, level
        from staging_events
        where user_id not in (select distinct user_id from users)
        and user_id is not null
        and page = 'NextSong'
    )
""")

song_table_insert = ("""
    insert into songs (song_id, title, artist_id, year, duration)
    (
        select song_id, title, artist_id, year, duration
        from staging_songs
        where song_id not in (select distinct song_id from songs)
    )
""")

artist_table_insert = ("""
    insert into artists (artist_id, name, location, latitude, longitude)
    (
        select artist_id, artist_name, artist_location, artist_latitude, artist_longitude
        from staging_songs
        where artist_id not in (select distinct artist_id from artists)
    )
""")

time_table_insert = ("""
    insert into time (start_time, hour, day, week, month, year, weekday)
    (
        select start_time, 
            extract(hour from start_time) as hour, 
            extract(day from start_time) as day,
            extract(week from start_time) as week,
            extract(month from start_time) as month,
            extract(year from start_time) as year,
            extract(DOW from start_time) as weekday
        from (select distinct TIMESTAMP 'epoch' + ts/1000 * interval '1 second' as start_time from staging_events)
    )
""")
insert_table_queries = [time_table_insert]

In [21]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(config['CLUSTER']['HOST'], config['CLUSTER']['DB_NAME'], config['CLUSTER']['DB_USER'], config['CLUSTER']['DB_PASSWORD'], config['CLUSTER']['DB_PORT']))
cur = conn.cursor()
insert_tables(cur, conn)


    insert into time (start_time, hour, day, week, month, year, weekday)
    (
        select start_time, 
            extract(hour from start_time) as hour, 
            extract(day from start_time) as day,
            extract(week from start_time) as week,
            extract(year from start_time) as year,
            extract(DOW from start_time) as weekday
        from (select distinct TIMESTAMP 'epoch' + ts/1000 * interval '1 second' as start_time from staging_events)
    )



ProgrammingError: INSERT has more target columns than expressions


In [106]:
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.ct9l8tmdfqok.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2020, 8, 2, 20, 44, 14, 892000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-83974bd7',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-40a5d838',
  'AvailabilityZone': 'us-west-2c',
  'PreferredMaintenanceWindow': 'thu:08:00-thu:08:30',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 2,
  'PubliclyAccessible': True,
  'Encrypted': False,
  'Tags': [],
  'EnhancedVpcRouting': False,
  'Iam

In [109]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the DescribeClusters operation: Cluster dwhcluster not found.

In [110]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!

{'ResponseMetadata': {'RequestId': '0be9162a-effe-4c75-95cc-fb61d1db7945',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '0be9162a-effe-4c75-95cc-fb61d1db7945',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Sun, 02 Aug 2020 21:54:39 GMT'},
  'RetryAttempts': 0}}