In [10]:
# Import libraries

import psycopg2
import boto
import boto.s3.connection
import json
import sys

#Import and parse credentials_File
input_data='credentials.json'

cred_input = json.loads(open(input_data).read())


#variable assignments
#redshift credentials
database=cred_input['Redshift_credentials']['database']
RSURL=cred_input['Redshift_credentials']['RSURL']
hostname=cred_input['Redshift_credentials']['hostname']
dbuser=cred_input['Redshift_credentials']['dbuser']
RSpwd=cred_input['Redshift_credentials']['pwd']
redshift_port=cred_input['Redshift_credentials']['redshift_port']
RSSchema=cred_input['Redshift_credentials']['RSSchema']

#S3 credentials
S3accesskey = cred_input['S3_credentials']['S3accesskey']
S3secretkey = cred_input['S3_credentials']['S3secretkey']
S3sourcebucket=cred_input['S3_credentials']['S3sourcebucket']
S3destbucket=cred_input['S3_credentials']['S3destbucket']
S3bucketloc=cred_input['S3_credentials']['S3bucketloc']


#defining redshift destination table as part of ETL if it doesnt exists
#########################################
try:
    con = psycopg2.connect(dbname= database, host=hostname, 
port= redshift_port, user= dbuser, password= RSpwd)
    print("Redshift Connection Successful!")
except Exception as e:
    print(e)
    sys.exit("Unable to connect to Redshift");

    
cursor = con.cursor()
sql="CREATE TABLE IF NOT EXISTS buy_vd(  PID  INTEGER NOT NULL,  Ad_id        VARCHAR(50) NOT NULL,  id_type      VARCHAR(20) NOT NULL,  UA       TEXT NOT NULL,  IP   VARCHAR(20) NOT NULL,  epoch_timestamp   TIMESTAMP NOT NULL,  lat DECIMAL(8, 5) NOT NULL,  long  DECIMAL(8, 5) NOT NULL,  accuracy INTEGER  NOT NULL,  optout VARCHAR(1) NOT NULL,  country VARCHAR(20) NOT NULL,  proc_date     VARCHAR(10) NOT NULL);"
try:
    cursor.execute(sql)
    con.commit()
    con.close()
except Exception as e:
    print(e)
    sys.exit("Unable to create table in Redshift");

#destination:Redshift table name
desttable='buy_vd'
#########################################



#download files from S3 as list 
#########################################
def download_sourcefile_list_S3(S3key,S3secretkey,src_bucket):
    file_list=[]
    # since bucket name contains dot issue accessing it ref: https://github.com/boto/boto/issues/2836 
    #using calling_format=boto.s3.connection.OrdinaryCallingFormat()
    conn = boto.connect_s3(
        aws_access_key_id = S3key,
        aws_secret_access_key = S3secretkey,
        calling_format=boto.s3.connection.OrdinaryCallingFormat()
        )
    bucket = conn.get_bucket(src_bucket)
    
    try:
        for key in bucket.list():
            file_list.append(key.name)
        print("Source S3 files being retrieved")
    except Exception as e:
        print(e)
        sys.exit("Unable to access S3 source files");
        
    #List of files thouse are at source bucket
    return file_list

#########################################

#Create Staging table
#Copy the file to Redshift Stage Table
#Query the staged data to get year of file
#Get(unload) required data(format) from stage table and export to destination S3 bucket
#get the unloaded count
#Load the data from staging table to destination Table with conversion of epoch time to Timestamp
#Drop the staging table

#########################################
def copy_to_redshift_cluster_and_export_to_S3(db,hst,prt,usr,pwd,desttable,S3path,S3key,S3secretkey,S3bucketloc,delim,quote,zipsol,dest_path):
    con=psycopg2.connect(dbname= db, host=hst, 
    port= prt, user= usr, password= RSpwd)
    cursor = con.cursor()
    
    #create staging table
    sql="CREATE TABLE IF NOT EXISTS buy_vd_stage(  PID  INTEGER NOT NULL,  Ad_id        VARCHAR(50) NOT NULL,  id_type      VARCHAR(20) NOT NULL,  UA       TEXT NOT NULL,  IP   VARCHAR(20) NOT NULL,  epoch_timestamp   BIGINT NOT NULL,  lat DECIMAL(8, 5) NOT NULL,  long  DECIMAL(8, 5) NOT NULL,  accuracy INTEGER  NOT NULL,  optout VARCHAR(1) NOT NULL,  country VARCHAR(20) NOT NULL,  proc_date     VARCHAR(10) NOT NULL);"
    try:
        cursor.execute(sql)
        con.commit()
    except Exception as e:
        print(e)
        sys.exit('Staging table creation failed')
        
    
    Rstagstable='buy_vd_stage'
    
    #copy to redshift table
    cpy_cmd="""COPY %s FROM '%s'  \
      credentials 'aws_access_key_id=%s;aws_secret_access_key=%s' \
       region '%s' delimiter '%s' %s %s;""" %\
      (Rstagstable, S3path, S3key, S3secretkey,S3bucketloc,delim,quote,zipsol)
    
    try:
        cursor.execute(cpy_cmd)
        con.commit
    except Exception as e:
        print(e)
        sys.exit("Unable to copy file data to redshift cliuster(table)");
    
    #get year
    gtyrsql="SELECT REPLACE(proc_date,'-','') AS YR FROM %s LIMIT 1;"%\
             (Rstagstable)
    
    try:
        cursor.execute(gtyrsql)
        rows = cursor.fetchall()
    except Exception as e:
        print(e)
        sys.exit("Unable to query staging table");
    
    YR=''
    for row in rows:
        YR=row[0]
    print(YR)
    
    dest_path=dest_path+YR
    print(dest_path)
    
    
        
    #Unloading to S3 destinatio bucket
    #print('unloading')
    unld_cmd="""UNLOAD ('SELECT DISTINCT CHR(123)||  \''"Ad_id":"\''||nvl(Ad_id,\''\'')||\''",\''||  \''"id_type":\''||nvl(id_type)||\''",\''||  \''"lat":\''||nvl(CAST(round(lat,3)AS FLOAT))||\''",\''||  \''"long":\''||nvl(CAST(round(long,3)AS FLOAT))|| chr(125) FROM %s;') TO '%s'  \
        credentials 'aws_access_key_id=%s;aws_secret_access_key=%s' \
        GZIP ALLOWOVERWRITE   parallel off;""" \
        % (Rstagstable,dest_path,S3key,S3secretkey)
       
    try:
        cursor.execute(unld_cmd)
        con.commit()
    except Exception as e:
        print(e)
        sys.exit("Unable to unload redshift table data to S3 bucket");
    
    #get count of rows unloaded
    rowsql="select pg_last_unload_count();"
    try:
        cursor.execute(rowsql)
        rows = cursor.fetchall()
    except Exception as e:
        print(e)
        sys.exit("Unable to get unloaded data rowcount");
    
    for row in rows:
        rowcount=row[0]
        
    print('Unloaded rows to S3 Destination Bucket')
    print(rowcount)
    
    #sqluniq_adid="SELECT DISTINCT Ad_id FROM buyETL_VaiDes;"
    #cursor.execute(sqluniq_adid)
    #rows = cursor.fetchall()
    
    Unique_Ad_id=[]
    #for row in rows:
     #   Unique_Ad_id.append(row[0])
    
   # print('unique')
    #print(Unique_Ad_id)
    
    #loadng data into final table
    #print('finaltable loading')
    finaltableload="INSERT INTO %s (PID, Ad_id, id_type, UA, IP, epoch_timestamp, lat, long, accuracy, optout, country, proc_date) \
                    SELECT PID, Ad_id, id_type, UA, IP, (TIMESTAMP 'epoch' + epoch_timestamp * INTERVAL '1 Second ')  AS val_timestamp,\
                    lat, long, accuracy, optout, country, proc_date FROM %s ;"%\
                        (desttable,Rstagstable)
    
    try:
        cursor.execute(finaltableload)
        con.commit()
    except Exception as e:
        print(e)
        sys.exit("Final table loading form stage table failed");
    
    #drop staging table
    drpcmd="DROP TABLE %s;"%\
             (Rstagstable)
        
    try:
        cursor.execute(drpcmd)
        con.commit()
    except Exception as e:
        print(e)
        sys.exit("Unable to drop staging table");
        
    #print('dropped')
    
    #close the connection
    con.close()
    
    
    
    
#delim=','
#quote='removequotes'
#zipsol='GZIP'   
#Get the list of S3:source bucket  files     
SrcS3files=download_sourcefile_list_S3(S3accesskey,S3secretkey,S3sourcebucket)
#print(SrcS3files)
#Load each file and export the required data to desyination S3 bucket
for file in SrcS3files:
    print(file)
    copy_to_redshift_cluster_and_export_to_S3(database,hostname,redshift_port,dbuser,RSpwd,desttable,'s3://'+S3sourcebucket+'/'+file,S3accesskey,S3secretkey,S3bucketloc,',','removequotes','GZIP','s3://'+S3destbucket+'/'+'buy_VD-')


Redshift Connection Successful!
Source S3 files being retrieved
20170701_20170701165514569.gz
20170701
s3://tmp2.sl.com/scanbuy_VD-20170701
Unloaded rows to S3 Destination Bucket
144923
20170701_20170702004210139.gz
20170702
s3://tmp2.sl.com/scanbuy_VD-20170702
Unloaded rows to S3 Destination Bucket
156539
20170701_20170702013337800.gz
20170702
s3://tmp2.sl.com/scanbuy_VD-20170702
Unloaded rows to S3 Destination Bucket
39503
20170701_20170702154430430.gz
20170702
s3://tmp2.sl.com/scanbuy_VD-20170702
Unloaded rows to S3 Destination Bucket
258260


s3://tmp1.sl.com/20170701_20170701165514569.gz


's3://tmp2.sl.com/scanbuy-201707011045'