In [None]:
import re
from datetime import datetime
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.sql.functions import lit
import pyspark.sql.functions as fn
from pyspark.sql.functions import col
from pyspark import SparkContext, SparkConf, SQLContext
from pgcopy import CopyManager
import psycopg2

In [None]:
# Iniciar spark context
conf = SparkConf().setMaster('local[*]')
sc = SparkContext().getOrCreate(conf)
sqlc =  SQLContext(sc)

In [None]:
#Function to parse data from pageviews file
def parseLog(data):
        ''' Read and parse log data '''
        RE_MASK = '(.*) - \[(.*)\] "(.*)" ([0-9]*) (.*) \| (.*): (.*) \| (.*): (http://www.facebook.com|http://google.com.br)'

        try:
            re_result = re.compile(RE_MASK).match(data)
            host = re_result.group(1)
            dateview = datetime.strptime(re_result.group(2),"%Y-%m-%d %H:%M:%S")
            page_url = re_result.group(5)
            device_id = str(re_result.group(7))
            referer = str(re_result.group(9))
            
            try:
                if referer=="http://www.facebook.com":
                    campaign_id = re.search('campaign_id=?([0-9]*)', re_result.group(5)).group(1)
                elif referer=="http://google.com.br":
                    campaign_id = re.search('campaign_id=([0-9]*)', re_result.group(5)).group(1)
            
                campaign_id = int(campaign_id)

            except Exception as e:
                campaign_id = None

            try:
                if referer=="http://www.facebook.com":
                    ad_creative_id=None
                elif referer=="http://google.com.br":
                    ad_creative_id = re.search('ad_creative_id=([0-9]*)', re_result.group(5)).group(1)
                    ad_creative_id = int(ad_creative_id)
        
            except Exception as e:
                ad_creative_id = None
        
            return host, dateview, page_url, device_id, referer, campaign_id, ad_creative_id
        
        except Exception as e:
            return '', -1, '', -1, -1

In [None]:
# Declaração do Schema de dados a ser utilizado - Pageview
pageview_schema = StructType([StructField('host',StringType(), True),StructField('dateview',TimestampType(), True),StructField('page_url',StringType(), True),StructField('device_id',StringType(), True), StructField('referer',StringType(), True), StructField('campaign_id',StringType(), True), StructField('ad_creative_id',IntegerType(), True)])

In [None]:
# Create dataframes - Facebook
facebook_ads = sqlc.read.json('../data/datasets/facebook_ads_media_costs.jsonl').select(col('clicks'), col('cost'), col('date'), col('facebook_campaign_id').alias('campaign_id'), col('facebook_campaign_name').alias('campaign_name'), col('impressions'))
facebook_ads= facebook_ads.withColumn('ad_creative_id', lit(0)).withColumn('ad_creative_name', lit(None).cast(StringType())).withColumn('source', lit(str('facebook')))

# Create dataframes - google
google_ads = sqlc.read.json('../data/datasets/google_ads_media_costs.jsonl').select(col('clicks'), col('cost'), col('date'), col('google_campaign_id').alias('campaign_id'), col('google_campaign_name').alias('campaign_name'), col('impressions'), col('ad_creative_id'), col('ad_creative_name'))
google_ads = google_ads.withColumn('source', lit(str('google')))

# Merge Google and Facebook into a unique Dataframe
media_ads = google_ads.union(facebook_ads)

In [None]:
# Create Schema for Customer Leads
schema = StructType([StructField("device_id", StringType(), True), StructField("lead_id", IntegerType(), True), StructField("registered_at", TimestampType(), True), StructField("credit_decision", StringType(), True), StructField("credit_decision_at", TimestampType(), True), StructField("signed_at", TimestampType(), True), StructField("revenue", FloatType(), True)])

# Create dataframe - Customer_leads
customer_leads = sqlc.read.load("../data/datasets/customer_leads_funnel.csv", header=False, format='com.databricks.spark.csv', schema=schema)

In [None]:
# Create Structure for pageviews and filter pages with referer google and facebook
pageviews_raw = sc.textFile('../data/datasets/pageview.txt')
pageviews_data = pageviews_raw.map(parseLog).filter(lambda x: x[1]!=-1)
pageviews = sqlc.createDataFrame(pageviews_data, pageview_schema)

In [None]:
media_ads.select('*').show(2,True)

In [None]:
# Answer questions

# What was the most expensive campaign?
expensive_campaign = media_ads.select('*').groupby('campaign_id', 'source').agg(fn.sum('cost').alias('cost')).orderBy('campaign_id').limit(1).show()

In [None]:
# What was the most profitable campaign?
profitable = pageviews.join(customer_leads, pageviews.device_id == customer_leads.device_id)
result_profitable = profitable.groupBy('campaign_id').agg(fn.sum('revenue').alias('total_revenue')).where(col('campaign_id').isNotNull()).orderBy('total_revenue', ascending=False).limit(1).show()

In [None]:
# Which ad creative is the most effective in terms of clicks?
clicks_sum = media_ads.groupBy('ad_creative_id', 'ad_creative_name').agg(fn.sum('clicks').alias('total_clicks')).orderBy('total_clicks', ascending=False).limit(1).show()

In [None]:
# media_ads.groupBy('campaign_id','source').count().orderBy('campaign_id').show()
print(result_profitable)
# What was the most expensive campaign?
media_ads.groupBy('campaign_id').agg(fn.sum('cost').alias('Campaign_cost')).orderBy('Campaign_cost').show()

In [None]:
pageviews.select('*').show(2, True)

In [None]:
# Start connection to database
conn = psycopg2.connect(
    host = 'localhost',
    port = '5432',
    database = 'creditas',
    user = 'denniscardoso'
)

# Start Cursor
cur = conn.cursor()

In [None]:
# create table on creditas database
def create_table(cursor, query):
    conn = psycopg2.connect(
        host = 'localhost',
        port = '5432',
        database = 'creditas',
        user = 'denniscardoso'
    )
    
    command = open(query, 'r').read()
    cursor.execute(command)
    conn.commit()
    cursor.close()

# Create media_ads table
create_table(cur, '../scripts/media_ads.sql')

# Create pageview table
create_table(cur, "../scripts/pageview.sql")

# Create Customer Leads table
create_table(cur, "../scripts/customer_leads.sql")

In [None]:
# Start conn variable
conn = psycopg2.connect(
        host = 'localhost',
        port = '5432',
        database = 'creditas',
        user = 'denniscardoso'
    )

# Start Cursor
cur = conn.cursor()

def append_to_table(row):
    cursor.execute("INSERT INTO media_ads (clicks, cost, date, campaign_id, campaign_name, impressions, ad_creative_id, ad_creative_name, source) VALUES(%s, %s,%s, %s,%s, %s,%s, %s, %s)", (row.clicks, row.cost, row.date, row.campaign_id, row.campaign_name, row.impressions, row.ad_creative_id, row.ad_creative_name, row.source))