In [None]:
import boto3 
from botocore.exceptions import NoCredentialsError
from fastavro import json_writer, parse_schema, writer
import json
import psycopg2


### -----------------------------------------------------------
http_log = []
fields = ['timestamp', 'http_method', 'http_path', 'user_id']

fname = 'SOME STRING'
foutput = 'SOME STRING'
with open(fname) as data :
    for line in data :
        temp_http = {
            fields[0]: line.strip().split()[0],
            fields[1]: line.strip().split()[1],
            fields[2]: line.strip().split()[2],
            fields[3]: line.strip().split()[3]
        }
        http_log.append(temp_http)


with open(foutput, 'wr') as out_file:
    json.dump(http_log, out_file, indent = 4)
    out_file.close()

### -----------------------------------------------------------

In [None]:
# Once the file is converted to JSON we will convert it to AVRO
### -----------------------------------------------------------
key = "FILE.json"
schemaFileName = "http_log.avsc"
with open(r'DIRECTORY' + schemaFileName) as sc:
    w = json.load(sc)
schema = parse_schema(w)
with open(r'DIRECTORY' + key) as js:
    x=json.load(js)
with open('C:/Path/to/file/output.avro', 'wb') as out:
    writer(out, schema,x, codec='deflate')
### -----------------------------------------------------------

In [None]:
# Move the files into a S3 Server
### -----------------------------------------------------------
ACCESS_KEY = 'SomeString'
SECRET_KEY = 'SomeString'

def uploadAWS (local_file, bucket, s3_file) :
    s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY,
                      aws_secret_access_key=SECRET_KEY)
    try :
        s3.upload_file (local_file, bucket, s3_file)
        print("Upload Success")
        return True
    except FileNotFoundError:
        print("File was not found")
        return False
    except NoCredentialsError :
        print("Credentials not available")
        return False

# Uploading remaining files to S3
uploaded = uploadAWS('http_log.avsc', 'perxassessment', 'http_log.avsc')
uploaded = uploadAWS('http_log.avro', 'perxassessment', 'http_log.txt')
uploaded = uploadAWS('campaign_reward_mapping.csv', 'perxassessment', 'campaign_reward_mapping.csv')
### -----------------------------------------------------------


In [None]:
# Using Psycopg2 to enable SQL functions within RedShift
### -----------------------------------------------------------
conn = psycopg2.connect(
    host = 'Some String',
    user = 'Some String',
    port = 'Some String',
    password = 'Some String',
    dbname = 'perxassessment'
)
cur = conn.connect()

# Create initial campaign reward mapping table
cur.execute('''
CREATE TABLE campaign_reward_mapping (
campaign_id int
  CONSTRAINT fk_campaign
  	FOREIGN KEY(campaign_id)
  	  REFERENCES campaign(id),
reward_id int
  CONSTRAINT fk_reward
    FOREIGN KEY (reward_id)
      REFERENCES reward_campaign(id)
)
''')


# Create Table for http_log
cur.execute('''
CREATE TABLE http_log (
timestamp DATETIME, http_method string, http_path string, user_id int
)
''')

# I've included region recognizing that Redshifts and S3 servers could be in different locations
# COPY http_log AVRO file into the RedShift table
cur.execute('''
COPY http_log
FROM 's3://perxassessment/http_log.avro'
iam_role 'ROLE'
format as avro 'auto'
region 'REGION'
''')

# COPY campaign_reward_mapping CSV file into its RedShif table
cur.execute('''
COPY campaign_reward_mapping
FROM 's3://perxassessment/campaign_reward_mapping.csv'
iam_role 'ROLE'
format as csv 'auto'
region 'REGION'
''')

conn.commit()

# Construct Output Query
# Notes/Commentary:
# - Use CTE to hold data in temp view and perform some level of data cleanup w/ Regex
#   <http_path> required some level of cleansing
# - Self-Join table
# - There are instances of Users viewing the campaign but claiming no rewards
#   Using a left join we can identify instances where there were entries into a campaign
#   and there were no rewards. Additionally, we can define a booleans for instances where a campaign
#   has a reward but the user never claimed it. 
cur.execute('''
WITH CTE AS (
  SELECT 
    timestamp,
    substring(http_path from '\d.*') AS Campaign_id,
    user_id
  FROM 
    http_logs
)

SELECT
  user_id,
  min(timestamp) AS session_start,
  (CASE WHEN max(timestamp) = min(timestamp) THEN NULL ELSE max(timestamp)) AS session_end,
  L.campaign_id AS campaigns,
  R.reward_id AS rewards_issued,
  (CASE WHEN (L.campaign_id IS NOT NULL AND R.reward_id IS NULL) THEN 'False' ELSE CASE WHEN (min(timestamp) = max(timestamp), 'False','True')) AS reward_driven_by_campaign_view
FROM CTE L
LEFT JOIN campaign_reward_mapping R
  ON L.Campaign_id = R.campaign_id

GROUP BY user_id, L.campaign_id, R.reward_id

ORDER BY user_id, L.campaign_id, R.reward_id
''')


cur.close()
conn.close()