### Notebook to download, process and upload json files from s3 bucket to Redshift

<a id='Pre'></a>
### Pre-requisites to run notebook   
1. [s3 Credentials](#s3Credentials)

2. [Enter the name of a new s3 bucket where the processed files would be stored temporarily before being copied to Redshift DB](#Ns3Creds)

3. [Redshift Credentials](#RCredentials)



### Steps implemented in the notebook:

1. [Downloads batches of 20 files from s3 bucket (batch size can be changed to optimize the process)](#P1)  
2. [Processes the files:](#P1)   
    a)Converts to dataframe    
    b)Changes date format to a more readable format  
    c)Drops repetitive columns  
    d)Changes column names to ensure adherence to Redshift's column naming convention    
3. [Uploads the dataframe to the new s3 bucket in csv format](#P3)  
4. [Appends the processed s3 file to Redshift table](#P4)  
5. [Deletes the processed file from s3 to free up storage space](#P5)  

In [1]:
import pandas as pd
import numpy as np
import boto3
import os
import glob
import json
from pandas.io.json import json_normalize
import datetime
import psycopg2
import re
import sys
import copy

In [2]:
#Don't include this in t
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [3]:
# Function to convert unix time to readable date 

def date_clean(df):
    
    dates = [ 'EventCreatedOn','ImpressionCreatedOn','CreatedOn']
    
    for date in dates:
        if date == 'CreatedOn':
            df[date] = df[date].str.extract('(\d+)')
            df[date] = df[date].astype(int)
        df[date] = df[date]/1000
        df[date] = pd.to_datetime(df[date],unit='s')
    
    return df    

In [4]:
# Function to change column names to ensure adherence to Redshift's column naming convention

def column_renaming(df):
    
    column_list = df.columns.tolist()
    
    for col in column_list:
        
        col_old = copy.copy(col)
        
        # Replacing '.' and ':' with '_' as these characters are not allowed
        col = (col.replace('.', '_'))
        col = (col.replace(':', '_'))
        
        # As Redshift columns are case insensitive, column names with upper case letter to indicate start of a new word will not work
        # Adding an '_' before upper case letters to aid comprehension after making the names lower case
        col= '_'.join(re.sub( r"([A-Z])", r" \1", col).split())
        col = col.lower()
        
        # Replacing '__' with '_'
        col = (col.replace('__', '_'))
        
        # Also checked for all column names being within the required length of 1 to 127 bytes
        
        # Rename column in the dataframe
        df.rename(columns= {col_old:col}, inplace = True)
    
    
    return df  

In [5]:
# Function to extract values from list columns as comma separated strings 

def extract_list(df,var1,var2):
    
    
    df[var1] = [','.join(map(str, l)) for l in df[var1]]
    df[var2] = [','.join(map(str, l)) for l in df[var2]]
    
    return df

<a id='P3'></a>
##### Uploads the dataframe to the new s3 bucket in csv format

In [6]:
# Function to upload file to S3

def upload_file_S3(data_frame,tempfile_upload_bucket_name):
    
    print('Uploading single processed file:')
    csv_buffer = StringIO()
    data_frame.to_csv(csv_buffer, index=False)
    s3 = boto3.resource('s3')
    #Adding date and time to file name to identify the file
    ts = datetime.datetime.now()
    temp_file_name = 'temp_file'+str(ts)+'.csv'
    s3.Object(tempfile_upload_bucket_name, temp_file_name).put(Body=csv_buffer.getvalue())
    
    return temp_file_name

[Back](#Pre)

<a id='P4'></a>
##### Appends the processed s3 file to Redshift table

In [7]:
# Function to upload file from s3 to Redshift

def upload_redshift(DBname,Port,User,Password,Host,aws_access_key_id,aws_secret_access_key,file_path,schema,table):

    # Make conection to Redshift DB
    try:
        con = psycopg2.connect(
                host='%s' % (Host),
                user='%s' % (User),
                port='%s' % (Port),
                password='%s' % (Password),
                dbname='%s' % (DBname)   )
        print("Connection Successful!")
    except:
        print("Unable to connect to Redshift")
        
    cur = con.cursor()    
     
        
    # query string to upload S3 file to table  
      
    sql='''COPY {schema}.{table}
           FROM '{file_path}'
           credentials 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}'
           IGNOREHEADER 1
           DELIMITER ','
           ACCEPTINVCHARS
           EMPTYASNULL
           MAXERROR 10 ;'''.format(schema=schema,
                                   table=table, 
                                   file_path=file_path, 
                                   aws_access_key_id=aws_access_key_id,
                                   aws_secret_access_key=aws_secret_access_key)
    

    #Execute the sql query 
    
    try:
        cur.execute(sql)
        con.commit()
        print("Copy command executed successfully")
    except:
        print("Failed to execute copy command")
    con.close() 
    

[Back](#Pre)

<a id='P5'></a>
###### Deletes the processed file from s3 to free up storage space

In [8]:
# Function to delete temporary file from S3 (This is to ensure we are not taking up uncessary storage space)

def delete_file(tempfile_upload_bucket_name,file_to_delete_name):
    
    s3 = boto3.resource("s3")
    s3.Object(tempfile_upload_bucket_name, file_to_delete_name).delete()

[Back](#Pre)

<a id='s3Credentials'></a>
#### s3  Credentials

In [17]:
# s3 bucket details
s3 = boto3.resource('s3')
bucket_name ='leafliink-data-interview-exercise'
my_bucket = s3.Bucket(bucket_name)

#Enter the aws credential here
aws_access_key_id = 'Enter access key id'
aws_secret_access_key = 'Enter secret accesskey'



[Back](#Pre)

<a id='RCredentials'></a>
#### Redshift redentials


In [18]:
# Enter the Redshift DB credentials here
DBname= 'Enter the dbname'
Port= 'Enter port'
User ='Enter user'
Password ='Enter password'
Host ='Enter host'

[Back](#Pre)

<a id='Ns3Creds'></a>


#### New s3 Bucket for temporary storage of processed files

In [20]:
# Create a s3 bucket to store temporary files for copying to Redshift table
tempfile_upload_bucket_name ='Enter the s3 bucketname to store temporary processed files'

[Back](#Pre)

<a id='P1'></a>
### Batch processing and uploading files       
Steps:    
1. Download batches of 20 files from S3 bucket (batch size can be changed to optimize the process)
2. Process the files (convert to dataframe, change date format, drop repetitive columns)
3. Upload the dataframe to S3 bucket in csv format
4. Append the processed S3 file to Redshift table
5. Delete the processed file from S3 (save on storage space)

In [10]:
#Making a list of all keys in S3 bucket to aid in batch processing of files
all_keys =[]
for my_bucket_object in my_bucket.objects.all():
    all_keys.append(my_bucket_object.key)

batch = 20
start = 0
count = 0

# Process s3 json files in batches of 20, i.e. download from s3, clean up and upload

while start < len(all_keys):
    
    df_clicks = pd.DataFrame()
    df_impressions = pd.DataFrame()
    
    print('Batch Details:\n','Start:',start, 'Upto:', start+batch-1)
    
 
    for key in all_keys[start:start+batch]:
        count+=1
        if 'clicks' in key: 
            s3_object = s3.Object(bucket_name, key).get()['Body'].read().decode('utf-8')
            for each_dict in s3_object.split('\n')[:-1]:
                each_dict = json.loads(each_dict)
                
                # Using json_normalize to convert keys in nested dictionaries to individual columns
                df_new= pd.DataFrame( json_normalize(each_dict))
                
                # Change date format 
                df_new = date_clean(df_new)
                
                # Drop repetative columns with the same informations ('User.Key' and 'UserKey')
                del df_new['User.Key']
                
                # Function to extract values from list columns as comma separated strings 
                df_new = extract_list(df_new,'MatchingKeywords','Categories')
                
                # Change column names in accordance to Redshift convention
                df_new = column_renaming(df_new)
                
                # Append processed dataframe to main dataframe for batch batch upload later
                df_clicks = df_clicks.append(df_new, ignore_index = True)
                          
                
        else:
            if 'impressions' in key: 
                s3_object = s3.Object(bucket_name, key).get()['Body'].read().decode('utf-8')
                for each_dict in s3_object.split('\n')[:-1]:
                    each_dict = json.loads(each_dict)
                    
                    # Using json_normalize to convert keys in nested dictionaries like User to individual columns User.Key and User.IsNew
                    df_new= pd.DataFrame( json_normalize(each_dict))

                    # Change date format
                    df_new = date_clean(df_new)
                    
                    # Drop repetitive columns with the same informations ('User.Key' and 'UserKey')
                    del df_new['User.Key']
                    
                    # Function to extract values from list columns as comma separated strings 
                    df_new = extract_list(df_new,'MatchingKeywords','Categories')
                    
                    # Change column names in accordance to Redshift convention
                    df_new = column_renaming(df_new)

                    # Append processed dataframe to main dataframe for batch batch upload later
                    df_impressions = df_impressions.append(df_new, ignore_index = True)
            else:
                pass
    
    
    print(' Clicks:', df_clicks.shape, 'Imp:',df_impressions.shape,'Count:',count)
    
    
    # Upload to processed batch of 20 json files to redshift database table
    
    # upload files to S3
    filename_clicks = upload_files_S3(df_clicks,tempfile_upload_bucket_name) 
    filename_imp = upload_files_S3(df_impressions,tempfile_upload_bucket_name) 
     
    # upload files from S3 to Redshift table
    file_path_clicks = 's3://'+tempfile_upload_bucket_name+'/'+filename_clicks
    file_path_imp = 's3://'+tempfile_upload_bucket_name+'/'+filename_imp
    upload_redshift(DBname,Port,User,Password,Host,aws_access_key_id,aws_secret_access_key,\
                    file_path_clicks ,'ll_schema','clicks') 
    upload_redshift(DBname,Port,User,Password,Host,aws_access_key_id,aws_secret_access_key,\
                    file_path_imp ,'ll_schema','impressions')
    
    # Delete temporary files from S3 
    delete_file(tempfile_upload_bucket_name,file_to_delete_name) 
    delete_file(tempfile_upload_bucket_name,file_to_delete_name) 
    
    
    #Changing start value for processing the next batch of 20 json files
    start = start + batch
    


Batch Details:
 Start: 0 Upto: 19


of pandas will change to not sort by default.

To accept the future behavior, pass 'sort=False'.


  sort=sort,


 Clicks: (20, 57) Imp: (0, 0) Count: 20
Batch Details:
 Start: 20 Upto: 39
 Clicks: (20, 57) Imp: (0, 0) Count: 40
Batch Details:
 Start: 40 Upto: 59
 Clicks: (20, 57) Imp: (0, 0) Count: 60
Batch Details:
 Start: 60 Upto: 79
 Clicks: (9, 57) Imp: (19, 62) Count: 80


[Back](#Pre)

In [11]:
df_clicks.head()

Unnamed: 0,meta_schema,meta_version,gdpr_computed,gdpr_source,remote_i_p,user_agent,ecpm,datacenter,burn_in,is_valid_u_a,...,device_brand_name,device_model_name,device_os_raw_version,device_os_major_version,device_os_minor_version,device_browser,device_browser_raw_version,device_browser_major_version,device_browser_minor_version,device_form_factor
0,event,1.3,True,none,0.0.0.0,,0,False,False,True,...,Google,Chrome,0,0,0,Chrome Desktop,79.0,79,0,desktop
1,event,1.3,True,none,0.0.0.0,,0,False,False,True,...,Google,Chrome,0,0,0,Chrome Desktop,79.0,79,0,desktop
2,event,1.3,True,none,0.0.0.0,,0,False,False,True,...,Google,Chrome,0,0,0,Chrome Desktop,79.0,79,0,desktop
3,event,1.3,True,none,0.0.0.0,,0,False,False,True,...,Google,Chrome,0,0,0,Chrome Desktop,79.0,79,0,desktop
4,event,1.3,True,none,0.0.0.0,,0,False,False,True,...,Google,Chrome,0,0,0,Chrome Desktop,79.0,79,0,desktop


In [12]:
df_impressions.head()

Unnamed: 0,meta_schema,meta_version,gdpr_computed,gdpr_source,remote_i_p,user_agent,ecpm,datacenter,burn_in,is_valid_u_a,...,device_brand_name,device_model_name,device_os_raw_version,device_os_major_version,device_os_minor_version,device_browser,device_browser_raw_version,device_browser_major_version,device_browser_minor_version,device_form_factor
0,event,1.3,True,none,0.0.0.0,,0,False,False,True,...,Google,Chrome,0.0,0,0,Chrome Desktop,79.0,79,0,desktop
1,event,1.3,True,none,0.0.0.0,,0,False,False,True,...,Google,Chrome,0.0,0,0,Chrome Desktop,79.0,79,0,desktop
2,event,1.3,True,none,0.0.0.0,,0,False,False,True,...,Mozilla,Firefox,,0,0,Firefox Desktop,72.0,72,0,desktop
3,event,1.3,True,none,0.0.0.0,,0,False,False,True,...,Google,Chrome,0.0,0,0,Chrome Desktop,79.0,79,0,desktop
4,event,1.3,True,none,0.0.0.0,,0,False,False,True,...,Mozilla,Firefox,,0,0,Firefox Desktop,72.0,72,0,desktop


[Back](#Pre)