In [6]:
'''
Data cleanup. Can be done in python or through SQL. This is the python version. 
'''
import datetime
import os
file_name = 'event_performance.csv'
file_output = 'event_performance_temp.csv'
date_format = '%m/%d/%y'
output_lines = []
with open(file_name, 'r') as f_in:
    with open(file_output, 'w') as f_out:
        header = f_in.readline()
        f_out.write(header)
        for line in f_in:
            line_arr = line.split(',')
            line_arr[0] = line_arr[0].replace('"', '').strip()
            if line_arr[1] != '':
                try:
                    datetime.datetime.strptime(line_arr[1], date_format)
                    new_date = line_arr[1].split('/')
                    new_date[2] = '20' + new_date[2]
                    line_arr[1] = '/'.join(new_date)
                except Exception as e:
                    print(e)
                    line_arr[1] = ''
            line_arr[2] = line_arr[2].replace('"', '').strip()
            line_arr[3] = line_arr[3].replace('"', '').replace('?', '')
            new_line = ','.join(line_arr)
            f_out.write(new_line)
os.remove(file_name)
os.rename(file_output, file_name)

time data '19/24/2019' does not match format '%m/%d/%y'


In [None]:
import boto3
import os

'''
    Upload data from local
    to s3 bucket ohm-connect-gregory
'''
bucket_name = 'ohm-connect-gregory'
s3 = boto3.resource('s3', aws_access_key_id=os.getenv('aws_access_key'),
    aws_secret_access_key=os.getenv('aws_secret_key'))

file_name = 'event_performance.csv'
obj = s3.Object(bucket_name,file_name)
obj.upload_file(file_name)

file_name = 'users.csv'
obj = s3.Object(bucket_name,file_name)
obj.upload_file(file_name)

In [None]:
'''
    Set up the python query method
'''
cluster_client = boto3.client('redshift', aws_access_key_id=os.getenv('aws_access_key'),
    aws_secret_access_key=os.getenv('aws_secret_key'), region_name = 'us-west-2')
my_cluster = cluster_client.describe_clusters()['Clusters'][0]

data_client = boto3.client('redshift-data', aws_access_key_id=os.getenv('aws_access_key'),
    aws_secret_access_key=os.getenv('aws_secret_key'), region_name = 'us-west-2')

'''
    Designed queries to look into Redshift
'''
def execute_redshift_query(sql):
    print(sql)
    output = data_client.execute_statement(
                    ClusterIdentifier=my_cluster['ClusterIdentifier'],
                    Database='ohmconnect',
                    DbUser='gregory',
                    Sql=sql)
    status = 'SUBMITTED'
    cnt = 2
    time.sleep(min(cnt, 10))
    while cnt < 1000:
        result = data_client.describe_statement(Id=output['Id'])
        status = result['Status']
        if status in {'FAILED', 'ABORTED'}:
            print(result['Error'])
            print(status)
            return (output['Id'], status)
        elif status in {'FINISHED'}:
            print(status)
            return (output['Id'], status)
        else:
            print('Query status: ' + status)
            print(f'Will sleep for {min(cnt, 10)}s \n')
            time.sleep(min(cnt, 10))
            cnt += 1
    print('Query has been running for too long, result will not be checked anymore.')
    print('Last status: ' + status)
    return (output['Id'], status)

In [None]:
'''
    Create schema
    
'''

schema_name = 'ohm_connect'
query = f'''CREATE SCHEMA IF NOT EXISTS {schema_name};'''
print(execute_redshift_query(query))


'''
    Create all the tables, sorted and distributed as necessary
'''
query = f'''
drop table if exists {schema_name}.users;
CREATE table {schema_name}.users
(
    userid VARCHAR(36) NOT NULL PRIMARY KEY
    , attribute1 BIGINT encode zstd
    , attribute2 VARCHAR encode zstd
)
DISTSTYLE KEY
DISTKEY (userid)
SORTKEY (userid);


drop table if exists {schema_name}.event_performance;
CREATE table {schema_name}.event_performance
(
    userid VARCHAR(36) NOT NULL PRIMARY KEY
    , date DATE encode zstd
    , hour INT encode zstd
    , points BIGINT encode zstd
)
DISTSTYLE KEY
DISTKEY (userid)
SORTKEY (userid);
'''
print(execute_redshift_query(query))

In [None]:
'''
    Load the data from s3 bucket into Redshift cluster. 
'''


table_name = 'users'
file_name = table_name + '.csv'
query = f'''
    copy {schema_name}.{table_name}
    from 's3://{bucket_name}/{file_name}'
    iam_role 'arn:aws:iam::648560186937:role/redshift_role'
    format as CSV
    IGNOREHEADER 1
    delimiter ','
    '''
print(execute_redshift_query(query))

table_name = 'event_performance'
file_name = table_name + '.csv'
query = f'''
    copy {schema_name}.{table_name}
    from 's3://{bucket_name}/{file_name}'
    iam_role 'arn:aws:iam::648560186937:role/redshift_role'
    format as CSV
    IGNOREHEADER 1
    DATEFORMAT AS 'MM/DD/YYYY'
    delimiter ','
    '''
print(execute_redshift_query(query))