In [3]:
from pyspark.sql.functions import *
from pyspark.sql.functions import udf,col,desc
import matplotlib.pyplot as plt
import numpy as np
import datetime
from datetime import datetime,date,timedelta
import time

## Obtain reach, number of impressions, frequency and spend over a specified period of time for chosen brands

You will be asked to specify start date, end date and a list of brands you want to include. The code will pull data from PartG (to extract ad views) and Insights (to extract spend) and return a table containing brand id (a number identifying the brand), total brand spend over the period, total airings over the period, total reach over the period and average frequency over the period.

In [4]:
#User specifies a start date, an end date and the brands included in the report
print("Enter the start Date (YYYY-MM-DD format)")
start_date=str(input())
print("Enter the end Date (YYYY-MM-DD format)")
end_date=str(input())
print("Enter the brands you want to include in your report (one by one, press enter between each, write STOP when you are done)")
brand_list=[]
inp=str(input())
while inp!="STOP" and inp!="END":
    brand_list.append(inp)
    inp=str(input())
print("===================================================================")

start_time = time.time ()

#Read input date strings and converting to datetime format
start_ymd = datetime.strptime(start_date, "%Y-%m-%d")
end_ymd = datetime.strptime(end_date, "%Y-%m-%d")
delta = end_ymd - start_ymd

#Pulling the data from Insights in order to get brand spend.
#(for loop used to pull data day by day)
for i in range(delta.days+1):
    d=start_ymd+ timedelta(days=i)
    file_path="/insights_v2/airings_all/v2/yr="+str(d.year)+"/dt="+str(d.year)+"-"+str(d.month).zfill(2)+"-"+str(d.day).zfill(2)+"/*.parquet" 
    if i==0:
        airings=spark.read.parquet(file_path)
    else:
        airings=airings.union(spark.read.parquet(file_path))

#Calculating brand spend using Insights data
spend = airings.groupby("brand_id", "brand_name").sum("spend2")
spend=spend.filter(spend.brand_name.isin(brand_list)) #filtering over the brands the user specified
spend=spend.select('brand_id','brand_name',col('sum(spend2)').alias('spend'))

#Fetching the ids corresponding to the brands by matching brand_name to brand_id (Insights Data)
brandids=spend.select('brand_id').collect()
brandIds=[]

#In PartG, brands are placed in different buckets. The bucket ID of a given brand corresponds to the last digit
#of the brand's ID (i.e if brand ID is 1234, the bucket ID is 4). We need to extract bucket ID for the brands
#specified and load the data for the distinct buckets

brand_bucket_list=[]
for i in range(len(brandids)):
    brandIds.append(brandids[i]['brand_id'])
    if str(brandids[i]['brand_id'])[-1] not in brand_bucket_list:
        brand_bucket_list.append(str(brandids[i]['brand_id'])[-1])

#Pulling the data from PartG in order to get brand airings and reach.
#(for loop used to pull data day by day)       
file_path=[]
for i in range(delta.days+1):
    d=start_ymd+ timedelta(days=i)
    for k in range(len(brand_bucket_list)):
        file_path.append("/data/part-g/slice/v5.0/adviews/parquet/date="+str(d.year)+"-"+str(d.month).zfill(2)+"-"+str(d.day).zfill(2)+"/*/brandBucket="+brand_bucket_list[k]+"/p=*/*.parquet")

adviews=sqlContext.read.load(file_path)

print("Data has been imported")
print("===================================================================")

#Getting total airings per brand (not for distinct households)
adviews=adviews.select('brandId', 'householdId').filter(adviews.brandId.isin(brandIds))
adviews_count = adviews.groupby('brandId').count()
adviews_count=adviews_count.select('brandId',col('count').alias('airings'))

#Getting unique households reached per brand
adviews_distinct=adviews.select('brandId', 'householdId').distinct()
adviews_distinct_count = adviews_distinct.groupBy('brandId').count()
adviews_distinct_count=adviews_distinct_count.select(col('brandId').alias('brandId2'),col('count').alias('reach'))

#Joining the 3 tables using brand id (common variable), calculating frequency and showing final result
adviews_count = adviews_count.join(adviews_distinct_count, adviews_count.brandId == adviews_distinct_count.brandId2, 'inner').select('brandId','airings','reach')
adviews_count = adviews_count.join(spend, adviews_count.brandId == spend.brand_id, 'inner').select('brandId','airings','reach','spend','brand_name')
adviews_count=adviews_count.withColumn('frequency',1.0*col('airings')/col('reach'))
adviews_count.show()

end_time = time.time ()
print("DONE - It took "+"{0}".format(end_time-start_time)+" s")

Enter the start Date (YYYY-MM-DD format)
2018-12-24
Enter the end Date (YYYY-MM-DD format)
2019-01-23
Enter the brands you want to include in your report (one by one, press enter between each, write STOP when you are done)
Axe
Secret
Old Spice
Degree
Amazon
Apple
Google
Facebook
KFC
Bose
JBL
Walmart
END
Data has been imported
+-------+--------+-------+---------+----------+------------------+
|brandId| airings|  reach|    spend|brand_name|         frequency|
+-------+--------+-------+---------+----------+------------------+
|   3396|91241158|4725364|380508677|     Apple|19.308810495868677|
|   2621| 1439255| 935179| 22099501|      Bose|1.5390155253700093|
|    883|76866802|4216849|175074283|    Google|18.228492886513127|
|1259515|27659709|3429698| 20434582|    Secret| 8.064765177575401|
|   1425|   71642|  63513|    47981|       JBL|1.1279895454473887|
|   3795|  779044| 451558|   774495|  Facebook| 1.725235739373458|
|    746|85008349|4655454|220250729|    Amazon|18.259948224168898|
| 