Importing libraries

In [4]:
import pandas as pd
# Gonna use postgres, since it runs on the same port and has a similar structure than Redshift.
import psycopg2

# Gonna use this library to read config file
import configparser

import json

import boto3

In [5]:
# Reading config file.
config = configparser.ConfigParser()
config.read_file(open('../../AWS credetials/DataPipeline_python_IaC/cluster.config'))

In [6]:
# Using config object to get config parameters.
key = config.get('AWS', 'KEY')
secret = config.get('AWS', 'SECRET')

dwh_cluster_type = config.get('DWH', 'DWH_CLUSTER_TYPE')
dwh_num_nodes = config.get('DWH', 'DWH_NUM_NODES')
dwh_node_type = config.get('DWH', 'DWH_NODE_TYPE')
dwh_cluster_identifier = config.get('DWH', 'DWH_CLUSTER_IDENTIFIER')
dwh_db = config.get('DWH', 'DWH_DB')
dwh_db_user = config.get('DWH', 'DWH_DB_USER')
dwh_db_password = config.get('DWH', 'DWH_DB_PASSWORD')
dwh_port = config.get('DWH', 'DWH_PORT')
dwh_iam_role_name = config.get('DWH', 'DWH_IAM_ROLE_NAME')

In [7]:
# Connecting to diferent services using boto3.
ec2 = boto3.resource('ec2', 
                     region_name='sa-east-1',
                     aws_access_key_id=key,
                     aws_secret_access_key = secret
                     )

s3 = boto3.resource('s3', 
                     region_name='sa-east-1',
                     aws_access_key_id=key,
                     aws_secret_access_key = secret
                     )

iam = boto3.client('iam', 
                     region_name='sa-east-1',
                     aws_access_key_id=key,
                     aws_secret_access_key = secret
                     )

redshift = boto3.client('redshift', 
                     region_name='sa-east-1',
                     aws_access_key_id=key,
                     aws_secret_access_key = secret
                     )

In [8]:
# Obtaning all my files available in S3.
bucket = s3.Bucket('datapipeline-python-iac')

log_data_files = [filename.key for filename in bucket.objects.all()]
log_data_files

['allevents_pipe.txt',
 'allusers_pipe.txt',
 'category_pipe.txt',
 'date2008_pipe.txt',
 'listings_pipe.txt',
 'sales_tab.txt',
 'venue_pipe.txt']

In [9]:
# Getting Iam Role ARN, so i can pass Iam access to my cluster.
role_arn = iam.get_role(RoleName=dwh_iam_role_name)['Role']['Arn']

In [10]:
# Creating my Redshift cluster.
try:
    response = redshift.create_cluster(
        ClusterType = dwh_cluster_type,
        NodeType = dwh_node_type,
        
        # Identifiers and credentials.
        DBName = dwh_db,
        ClusterIdentifier = dwh_cluster_identifier,
        MasterUsername = dwh_db_user,
        MasterUserPassword = dwh_db_password,
        
        # Roles (for s3 acces)
        IamRoles = [role_arn]
    )
# If cluster is already created or there is a problem creating it, exception is raised.
except Exception as e:
    print(e)

An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


In [11]:
# Buckle to wait till cluster is created.
status = redshift.describe_clusters(ClusterIdentifier = dwh_cluster_identifier)['Clusters'][0]['ClusterStatus']

while status == 'creating':
    status = redshift.describe_clusters(ClusterIdentifier = dwh_cluster_identifier)['Clusters'][0]['ClusterStatus']
    if status != 'creating':
        print('Cluster Available')

In [12]:
# Defining a function that takes json values and returns a dataframe, so its easier to read.
def pretty_redshift_properties (props):
    # Defining keys i want to show.
    keys_to_show = ['CusterIdentifier', 'NodeType', 'ClusterStatus', 'MasterUsername', 'DBName', 'Endpoint', 'VpcId']
    # Looking for keys in the json items.
    x = [(key, value) for key, value in props.items() if key in keys_to_show]
    df = pd.DataFrame(data=x, columns=['Key', 'Value'])
    
    return df

# Getting the json value of cluster properties.    
my_cluster_properties = redshift.describe_clusters(ClusterIdentifier = dwh_cluster_identifier)['Clusters'][0]
# Passing previous value to my function.
pretty_redshift_properties(my_cluster_properties)

Unnamed: 0,Key,Value
0,NodeType,dc2.large
1,ClusterStatus,available
2,MasterUsername,awsuser
3,DBName,pipeline_python_iac
4,Endpoint,{'Address': 'data-pipeline-python-iac-cluster....
5,VpcId,vpc-0bc0d1833e8dfbcb3


In [13]:
# Fetching all information from the json values, i need all this information in core, to create vpc.
dwh_endpoint = my_cluster_properties['Endpoint']['Address']
dwh_role_arn = my_cluster_properties['IamRoles'][0]['IamRoleArn']
db_name = my_cluster_properties['DBName']
db_user = my_cluster_properties['MasterUsername']

In [14]:
# Creating security group
try:
    vpc_id = my_cluster_properties['VpcId']
    vpc = ec2.Vpc(id=vpc_id)
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort= int(dwh_port),
        ToPort=int(dwh_port)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-041025bcadc8b904a')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


In [15]:
try:
    # Connectint to database.
    conn = psycopg2.connect(host = dwh_endpoint, dbname = db_name, user = db_user, password = dwh_db_password, port = dwh_port)
    conn.set_session(autocommit=True)
    
    # Creating cursor instance.
    cur = conn.cursor()
except psycopg2.Error as e:
    print('Error: Could not connect to postgre database')
    print(e)



In [16]:
# Creating data model.
try:
    create_users_query = ('''
                         CREATE TABLE users(
                             user_id INT NOT NULL DISTKEY SORTKEY,
                             username VARCHAR,
                             first_name VARCHAR,
                             last_name VARCHAR,
                             city VARCHAR,
                             state VARCHAR,
                             email VARCHAR,
                             phone VARCHAR,
                             like_sport BOOLEAN,
                             like_theater BOOLEAN,
                             like_concerts BOOLEAN,
                             like_jazz BOOLEAN,
                             like_classical BOOLEAN,
                             like_opera BOOLEAN,
                             like_rock BOOLEAN,
                             like_vegas BOOLEAN,
                             like_broadway BOOLEAN,
                             like_musicales BOOLEAN
                        )''')    
    cur.execute(create_users_query)
    
    create_venue_query = ('''
                          CREATE TABLE venue(
                              venue_id INT NOT NULL DISTKEY SORTKEY,
                              venue_name VARCHAR,
                              venue_city VARCHAR,
                              venue_state VARCHAR,
                              venue_seats VARCHAR
                        )''')
    cur.execute(create_venue_query)
    
    create_category_query = ('''
                             CREATE TABLE category(
                                 cat_id INT NOT NULL DISTKEY SORTKEY,
                                 cat_group VARCHAR,
                                 cat_name VARCHAR,
                                 cat_desc VARCHAR
                            )''')
    cur.execute(create_category_query)
    
    create_date_query = ('''
                        CREATE TABLE date(
                            date_id INT NOT NULL DISTKEY SORTKEY,
                            cal_date DATE NOT NULL,
                            day VARCHAR NoT NULL,
                            week INT NOT NULL,
                            month VARCHAR NOT NULL,
                            qtr VARCHAR NOT NULL,
                            year VARCHAR NOT NULL,
                            holiday BOOLEAN DEFAULT('N')
                        )''')
    cur.execute(create_date_query)
    
    create_event_query = ('''
                        CREATE TABLE event(
                            event_id INT NOT NULL DISTKEY,
                            venue_id INT NOT NULL,
                            cat_id INT NOT NULL,
                            date_id INT NOT NULL,
                            event_name VARCHAR,
                            start_time TIMESTAMP
                        )''')
    cur.execute(create_event_query)
    
    create_listing_query = ('''
                            CREATE TABLE listing(
                                list_id INT NOT NULL DISTKEY,
                                seller_id INT NOT NULL,
                                event_id INT NOT NULL,
                                date_id INT NOT NULL,
                                num_tickets INT NOT NULL,
                                price_per_ticket DECIMAL(8,2),
                                total_price DECIMAL(8,2),
                                list_time TIMESTAMP
                            )''')
    cur.execute(create_listing_query)

except psycopg2.Error as e:
    print('Error: issue creating table')
    print(e)

Error: issue creating table
Relation "users" already exists



In [17]:
# I already have my data on S3, so now im going to load it to Redshift.
try:
    s3_copy_query = ('''
                     COPY users from 's3://datapipeline-python-iac/allusers_pipe.txt'
                     CREDENTIALS 'aws_iam_role=arn:aws:iam::961251108862:role/redshift-S3-acces'
                     delimiter '|'
                     region 'sa-east-1'
                     ''')
    cur.execute(s3_copy_query)
    
    # Selecting all from table
    cur.execute('''SELECT * FROM users''')
    # Fetching data
    result = cur.fetchall()
    
    # Getting column names
    cols = []
    for etl in cur.description:
        cols.append(etl[0])
    
    # Creating Dataframe
    df_users = pd.DataFrame(data=result, columns=cols)
    
except psycopg2.Error as e:
    print(e)

df_users.sample(3)

Unnamed: 0,user_id,username,first_name,last_name,city,state,email,phone,like_sport,like_theater,like_concerts,like_jazz,like_classical,like_opera,like_rock,like_vegas,like_broadway,like_musicales
248807,47735,APM23BQM,Kelsie,Mcclure,Clovis,DC,vitae.dolor@ametfaucibusut.org,(448) 833-7345,True,,True,,True,,,True,,
195864,46146,QVK69RZD,Jack,Howell,Troy,PE,eget@sodalesatvelit.edu,(150) 824-8019,,True,,,True,True,False,True,,False
186376,38626,BWH07GDX,Caleb,Burris,Duluth,WA,vitae.velit@faucibusorci.org,(913) 492-3436,,,True,,False,True,True,,True,False
