In [1]:
import psycopg2
import boto3
import configparser
from tqdm import tqdm_notebook
import json
import pandas as pd

# 1) Setting Up the Redshift Cluster (IAC)

In [2]:
def read_display_dwh_config(file):
    """
    Reading the Config File
    """
    config = configparser.ConfigParser()
    config.read_file(open(file))
    
    DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
    DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
    DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")
    DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
    DB_NAME                 = config.get("CLUSTER","DB_NAME")
    DB_USER            = config.get("CLUSTER","DB_USER")
    DB_PASSWORD        = config.get("CLUSTER","DB_PASSWORD")
    DB_PORT               = config.get("CLUSTER","DB_PORT")
    DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")
    
    KEY = config.get("AWS", "KEY")
    SECRET = config.get("AWS", "SECRET")
    
    dwh_config=pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT]
             })
    return dwh_config, KEY, SECRET, DWH_IAM_ROLE_NAME

In [3]:
def create_clients(KEY, SECRET):
    """
    Creating the AWS Clients
    """
    ec2 = boto3.client('ec2', region_name='eu-central-1', aws_access_key_id = KEY, aws_secret_access_key = SECRET)

    s3 = boto3.resource('s3',region_name='eu-central-1', aws_access_key_id = KEY, aws_secret_access_key = SECRET)

    iam = boto3.client('iam',region_name='eu-central-1', aws_access_key_id = KEY, aws_secret_access_key = SECRET)

    redshift = boto3.client('redshift',region_name='eu-central-1', aws_access_key_id = KEY, aws_secret_access_key = SECRET)
    return ec2, s3, iam, redshift

In [4]:
def setup_iam_role(DWH_IAM_ROLE_NAME):
    """
    Setting UP IAM ROLE
    """
    try:
        print('1.1 Creating a new IAM Role')
        dwhRole = iam.create_role(Path='/',
        RoleName= DWH_IAM_ROLE_NAME,
        AssumeRolePolicyDocument= json.dumps({
                        "Version": "2012-10-17",
                        "Statement": [{"Effect": "Allow",
                                "Principal": {"Service": ["redshift.amazonaws.com"]},
                                "Action": ["sts:AssumeRole"]
                            }]
                    }),
        Description='Allows Redshift clusters to call AWS services on your behalf.')
        print("1.2 Attaching Policy")
        iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                           PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                          )['ResponseMetadata']['HTTPStatusCode']
        print('1.3 Get the IAM role ARN')
        roleArn = iam.get_role(RoleName = DWH_IAM_ROLE_NAME)['Role']['Arn']

        print(roleArn)
        
        return roleArn
    
    except Exception as e:
        print(e)

In [5]:
def create_redshift_cluster(dwh_config, roleArn):
    """
    CREATING REDSHIFT CLUSTER
    """
    try:
        response = redshift.create_cluster(        
        # TODO: add parameters for hardware
        ClusterType= dwh_config.loc[dwh_config.Param == 'DWH_CLUSTER_TYPE','Value'].item(),
        NodeType = dwh_config.loc[dwh_config.Param == 'DWH_NODE_TYPE','Value'].item(),
        NumberOfNodes= int(dwh_config.loc[dwh_config.Param == 'DWH_NUM_NODES','Value'].item()),
        # TODO: add parameters for identifiers & credentials
        DBName = dwh_config.loc[dwh_config.Param == 'DB_NAME','Value'].item(),
        ClusterIdentifier=dwh_config.loc[dwh_config.Param == 'DWH_CLUSTER_IDENTIFIER','Value'].item(),
        MasterUsername= dwh_config.loc[dwh_config.Param == 'DB_USER','Value'].item(),
        MasterUserPassword= dwh_config.loc[dwh_config.Param == 'DB_PASSWORD','Value'].item(),
        Port= int(dwh_config.loc[dwh_config.Param == 'DB_PORT','Value'].item()),
        # TODO: add parameter for role (to allow s3 access)
        IamRoles=[roleArn])
    except Exception as e:
        print(e)

In [22]:
def opening_incoming_TCP(myClusterProps, DWH_PORT):
    """
    OPENING INCOMING TCP Traffic PORT
    """
    ec_2 = boto3.resource('ec2', region_name='eu-central-1', aws_access_key_id = KEY, aws_secret_access_key = SECRET)
    try:
        vpc = ec_2.Vpc(id=myClusterProps['VpcId'])
        defaultSg = list(vpc.security_groups.all())[1]
        print(defaultSg)
        defaultSg.authorize_ingress(
            GroupName=defaultSg.group_name,
            CidrIp='0.0.0.0/0',
            IpProtocol='TCP',
            FromPort=int(DWH_PORT),
            ToPort=int(DWH_PORT)
        )
    except Exception as e:
        print(e)

In [7]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

## 1.1) Creating the Cluster

In [8]:
dwh_config, KEY, SECRET, DWH_IAM_ROLE_NAME = read_display_dwh_config('dwh.cfg')
dwh_config

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DB_NAME,dwh
5,DB_USER,dwhuser
6,DB_PASSWORD,Passw0rd
7,DB_PORT,5439


In [9]:
ec2, s3, iam, redshift = create_clients(KEY, SECRET)

In [10]:
roleArn = setup_iam_role(DWH_IAM_ROLE_NAME)

1.1 Creating a new IAM Role
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::057666384869:role/dwhRole


In [12]:
create_redshift_cluster(dwh_config, roleArn)

In [16]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=dwh_config.loc[dwh_config.Param=='DWH_CLUSTER_IDENTIFIER','Value'].item())['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.csymdzczedw2.eu-central-1.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-596c8f32
7,NumberOfNodes,4


In [17]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)

DWH_ROLE_ARN ::  arn:aws:iam::057666384869:role/dwhRole
DWH_ENDPOINT ::  dwhcluster.csymdzczedw2.eu-central-1.redshift.amazonaws.com


In [23]:
opening_incoming_TCP(myClusterProps, int(dwh_config.loc[dwh_config.Param == 'DB_PORT','Value'].item()))

ec2.SecurityGroup(id='sg-1f41d875')


In [24]:
DWH_DB = dwh_config.loc[dwh_config.Param == 'DB_NAME','Value'].item()
DWH_DB_USER= dwh_config.loc[dwh_config.Param == 'DB_USER','Value'].item()
DWH_DB_PASSWORD= dwh_config.loc[dwh_config.Param == 'DB_PASSWORD','Value'].item()
DWH_PORT = int(dwh_config.loc[dwh_config.Param == 'DB_PORT','Value'].item())

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(DWH_ENDPOINT,DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD,DWH_PORT))
print('Testing Open Connection -- 0 is Active: ')
print(conn.closed)

Testing Open Connection -- 0 is Active: 
0


## 1.2) Creating DB and Tables in Redshift

In [26]:
!python create_tables.py

----- DROPPING EXISTING TABLES -----
100%|█████████████████████████████████████████████| 7/7 [00:02<00:00,  2.50it/s]
----- DROPPING FINISHED -----
----- CREATING NEW TABLES -----
100%|█████████████████████████████████████████████| 7/7 [00:02<00:00,  2.40it/s]


## 1.3) Staging S3 Data into staging Tables and ETL From S3 to Redshift Star Schema

In [27]:
!python etl.py

----- LOADING FROM S3 TO STAGING TABLES -----
100%|████████████████████████████████████████████| 2/2 [44:56<00:00, 814.82s/it]
----- FINISHED LOADING INTO STAGING TABLES -----
100%|█████████████████████████████████████████████| 5/5 [00:03<00:00,  1.28it/s]
----- REDSHIFT INSERTION FINISHED -----


## 1.4) Sample SQL queries to see the performance

In [86]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [72]:
query1 = ("""select count(*) from songs;""")
query2 = ("""select count(*) from songplays;""")
query3 = ("""select count(*) from users;""")
query4 = ("""select count(*) from artists;""")
query5 = ("""select count(*) from time;""")
query6 = ("""select count(*) from staging_events;""")
query7 = ("""select count(*) from staging_songs;""")

queries = {'songs':query1, 'songplays':query2, 'users':query3, 'artists':query4, 'time':query5, 'staging_events':query6, 'staging_songs':query7}

for query in queries.keys():
    cur.execute(queries.get(query))
    print('Number of Records in '+query+': '+str(cur.fetchall()[0][0]))

Number of Records in songs: 14896
Number of Records in songplays: 333
Number of Records in users: 105
Number of Records in artists: 10025
Number of Records in time: 6813
Number of Records in staging_events: 8056
Number of Records in staging_songs: 14896


#### Q1) List the number of songs and number of artists listened by users in the 11th month in the DB

In [93]:
large_query1=("""explain SELECT
  songplays.user_id,
  users.first_name,
  COUNT(songplays.songplay_id) AS song_count,
  COUNT(artists.name) AS artist_count
FROM
  songplays
  INNER JOIN songs ON songplays.song_id = songs.song_id
  INNER JOIN time ON songplays.start_time = time.start_time
  INNER JOIN artists ON songplays.artist_id = artists.artist_id
  INNER JOIN users ON songplays.user_id = users.user_id
WHERE
  time.month = 11
GROUP BY
  songplays.user_id,
  users.first_name
""")

In [104]:
cur.execute(large_query1)
print(cur.fetchall())

[('XN HashAggregate  (cost=146524201.02..146524202.69 rows=334 width=34)',), ('  ->  XN Hash Join DS_DIST_ALL_NONE  (cost=3948.39..146524197.68 rows=334 width=34)',), ('        Hash Cond: ("outer".user_id = "inner".user_id)',), ('        ->  XN Hash Join DS_BCAST_INNER  (cost=3943.14..146524184.92 rows=333 width=25)',), ('              Hash Cond: ("outer".start_time = "inner".start_time)',), ('              ->  XN Seq Scan on "time"  (cost=0.00..85.16 rows=6813 width=8)',), ('                    Filter: ("month" = 11)',), ('              ->  XN Hash  (cost=3942.30..3942.30 rows=333 width=33)',), ('                    ->  XN Hash Join DS_DIST_NONE  (cost=4.16..3942.30 rows=333 width=33)',), ('                          Hash Cond: (("outer".artist_id)::text = ("inner".artist_id)::text)',), ('                          ->  XN Seq Scan on artists  (cost=0.00..100.25 rows=10025 width=39)',), ('                          ->  XN Hash  (cost=3.33..3.33 rows=333 width=38)',), ('                   

#### Q2)  Number of songs per year per month

In [91]:
large_query2=("""explain SELECT time.month, time.year, COUNT(sp.songplay_id) as songplay_count FROM songplays sp LEFT JOIN time ON sp.start_time = time.start_time GROUP BY time.month, time.year;""")

In [92]:
cur.execute(large_query2)
print(cur.fetchall())

[('XN HashAggregate  (cost=425430231.41..425430231.42 rows=1 width=12)',), ('  ->  XN Hash Right Join DS_DIST_BOTH  (cost=4.16..425430228.92 rows=333 width=12)',), ('        Outer Dist Key: "time".start_time',), ('        Inner Dist Key: sp.start_time',), ('        Hash Cond: ("outer".start_time = "inner".start_time)',), ('        ->  XN Seq Scan on "time"  (cost=0.00..68.13 rows=6813 width=16)',), ('        ->  XN Hash  (cost=3.33..3.33 rows=333 width=12)',), ('              ->  XN Seq Scan on songplays sp  (cost=0.00..3.33 rows=333 width=12)',), ('----- Tables missing statistics: songplays, time -----',), ('----- Update statistics by running the ANALYZE command on these tables -----',)]


# 2) Cleaning up Resources

In [106]:
redshift.delete_cluster( ClusterIdentifier=dwh_config.loc[dwh_config.Param=='DWH_CLUSTER_IDENTIFIER','Value'].item(),  SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.csymdzczedw2.eu-central-1.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2020, 3, 22, 13, 36, 2, 274000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-1f41d875',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-596c8f32',
  'AvailabilityZone': 'eu-central-1b',
  'PreferredMaintenanceWindow': 'tue:13:30-tue:14:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessible': True,
  'Encrypted': False,
  'Tags': [],
  'EnhancedVpcRouting': False,


In [112]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=dwh_config.loc[dwh_config.Param=='DWH_CLUSTER_IDENTIFIER','Value'].item())['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,deleting
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.csymdzczedw2.eu-central-1.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-596c8f32
7,NumberOfNodes,4


In [113]:
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)

{'ResponseMetadata': {'RequestId': '8fa2e06b-71dd-40a4-bbf0-b7dfd09effbb',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '8fa2e06b-71dd-40a4-bbf0-b7dfd09effbb',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Sun, 22 Mar 2020 16:50:56 GMT'},
  'RetryAttempts': 0}}