### We first set up PySpark

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F 

conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

### We set up the tables as dataframes and output them

In [2]:
# We create a dataframe for Clicks, containing eventid and cpc
# And we insert some mock-up data (and assume eventid is unique)

Clicks = spark.createDataFrame(
    sc.parallelize([
    (0, 5),
    (1, 2),
    (2, 3),
    (3, 1),
    (4, 12),
    (5, 7)]), 
    ['eventid', 'cpc']
)

# We create a dataframe for Impressions, containing eventid, userid and testgroup
# We insert some mock-up data
# We also assume a user can be in multiple testgroups (see rows with userid = 3)
# We assume there can be more impressions than clicks
Impressions = spark.createDataFrame(
    sc.parallelize([
    (0, 1, 'A'),
    (1, 1, 'A'),      
    (2, 2, 'B'),
    (3, 2, 'B'),
    (4, 3, 'A'),
    (5, 3, 'B'),
    (6, 2, 'B'),
    (8, 4, 'A'),
    (9, 3, 'A'),
    (10, 4, 'A'),
    (11, 5, 'B'),
    (12, 6, 'A'),
    (13, 6, 'B')]),
    ['eventid', 'userid', 'testgroup']
)
print("Clicks dataframe 'Clicks':")
Clicks.show()
print("Impressions dataframe 'Impressions':")
Impressions.show()

Clicks dataframe 'Clicks':
+-------+---+
|eventid|cpc|
+-------+---+
|      0|  5|
|      1|  2|
|      2|  3|
|      3|  1|
|      4| 12|
|      5|  7|
+-------+---+

Impressions dataframe 'Impressions':
+-------+------+---------+
|eventid|userid|testgroup|
+-------+------+---------+
|      0|     1|        A|
|      1|     1|        A|
|      2|     2|        B|
|      3|     2|        B|
|      4|     3|        A|
|      5|     3|        B|
|      6|     2|        B|
|      8|     4|        A|
|      9|     3|        A|
|     10|     4|        A|
|     11|     5|        B|
|     12|     6|        A|
|     13|     6|        B|
+-------+------+---------+



### We join the two tables through an inner join and a left join. We will use this later on. We output them again to know what they look like. 

In [3]:
df_inner_join = Impressions.join(Clicks, ['eventid'], 'inner').orderBy('eventid')
print("Inner joined dataframe 'df_inner_join':")
df_inner_join.show()

# We add .na.fill(0) which replaces the null values from the left join with 0's
df_left_join = Impressions.join(Clicks, ['eventid'], 'left').na.fill(0).orderBy('eventid')
print("Left joined dataframe 'df_left_join':")
df_left_join.show()

Inner joined dataframe 'df_inner_join':
+-------+------+---------+---+
|eventid|userid|testgroup|cpc|
+-------+------+---------+---+
|      0|     1|        A|  5|
|      1|     1|        A|  2|
|      2|     2|        B|  3|
|      3|     2|        B|  1|
|      4|     3|        A| 12|
|      5|     3|        B|  7|
+-------+------+---------+---+

Left joined dataframe 'df_left_join':
+-------+------+---------+---+
|eventid|userid|testgroup|cpc|
+-------+------+---------+---+
|      0|     1|        A|  5|
|      1|     1|        A|  2|
|      2|     2|        B|  3|
|      3|     2|        B|  1|
|      4|     3|        A| 12|
|      5|     3|        B|  7|
|      6|     2|        B|  0|
|      8|     4|        A|  0|
|      9|     3|        A|  0|
|     10|     4|        A|  0|
|     11|     5|        B|  0|
|     12|     6|        A|  0|
|     13|     6|        B|  0|
+-------+------+---------+---+



### We output the Click-Through Rate (CTR) per user of each test group (‘A’ and ‘B’)
- We assume that CTR is defined as total number of clicks divided by the total number of impressions of each user in each testgroup

In [4]:
# We first create a dataframe with columns userid and testgroup from the df_inner_join
# We add an additional column counting each distinct userid, resulting into user_clicks_pergroup
df_user_CTR_pergroup_1 = df_inner_join.select('userid', 'testgroup').orderBy('userid')
df_user_CTR_pergroup_1 = df_user_CTR_pergroup_1.groupBy('userid', 'testgroup').agg(F.count('userid').alias('user_clicks_pergroup'))

# We then create a dataframe with columns userid and testgroup from the df_left_join
# We add an additional column counting each distinct userid, resulting into user_impressions_pergroup
df_user_CTR_pergroup_2 = df_left_join.select('userid', 'testgroup').orderBy('userid')
df_user_CTR_pergroup_2 = df_user_CTR_pergroup_2.groupBy('userid', 'testgroup').agg(F.count('userid').alias('user_impressions_pergroup'))

# We join the two dataframes through a left join and fill all null values with 0's
df_user_CTR_pergroup = df_user_CTR_pergroup_2.join(df_user_CTR_pergroup_1, ['userid', 'testgroup'], 'left').na.fill(0)
# We add an additional column calculating the user_CTR_pergroup
df_user_CTR_pergroup = df_user_CTR_pergroup.withColumn('user_CTR_pergroup', (F.col('user_clicks_pergroup') / F.col('user_impressions_pergroup')))
# We sort the columns so they are the same as in the SQL output and output the dataframe
print("Click-Through Rate per user of each test group dataframe 'df_user_CTR_pergroup':")
df_user_CTR_pergroup.select('userid', 'testgroup', 'user_clicks_pergroup', 'user_impressions_pergroup', 'user_CTR_pergroup').orderBy('userid', 'testgroup').show()

Click-Through Rate per user of each test group dataframe 'df_user_CTR_pergroup':
+------+---------+--------------------+-------------------------+------------------+
|userid|testgroup|user_clicks_pergroup|user_impressions_pergroup| user_CTR_pergroup|
+------+---------+--------------------+-------------------------+------------------+
|     1|        A|                   2|                        2|               1.0|
|     2|        B|                   2|                        3|0.6666666666666666|
|     3|        A|                   1|                        2|               0.5|
|     3|        B|                   1|                        1|               1.0|
|     4|        A|                   0|                        2|               0.0|
|     5|        B|                   0|                        1|               0.0|
|     6|        A|                   0|                        1|               0.0|
|     6|        B|                   0|                        1|    

### We output the average Revenue per user of each test group (‘A’ and ‘B’)
- Revenue was not defined in the exercise, so we assume the total revenue is equal to the total cpc
- We assume average revenue (per user of each test group) is equal to the total cpc (per user of each test group) divided by number of impressions (per user of each test group)

In [5]:
# We first create a dataframe with columns userid and testgroup from the df_inner_join
df_user_avg_cpc_pergroup = df_left_join.select('eventid', 'userid', 'testgroup', 'cpc').orderBy('userid', 'testgroup', 'cpc')
# We add an additional column and put in the average cpc per distinct userid and testgroup, resulting into user_avg_cpc_pergroup
df_user_avg_cpc_pergroup = df_user_avg_cpc_pergroup.groupBy('userid', 'testgroup').agg(F.mean('cpc').alias('user_avg_cpc_pergroup'))
print("Average Cost Per Click per user of each test group dataframe 'df_user_CTR_pergroup':")
# We sort the columns so they are the same as in the SQL output and output the dataframe
df_user_avg_cpc_pergroup.orderBy('userid', 'testgroup').show()

Average Cost Per Click per user of each test group dataframe 'df_user_CTR_pergroup':
+------+---------+---------------------+
|userid|testgroup|user_avg_cpc_pergroup|
+------+---------+---------------------+
|     1|        A|                  3.5|
|     2|        B|   1.3333333333333333|
|     3|        A|                  6.0|
|     3|        B|                  7.0|
|     4|        A|                  0.0|
|     5|        B|                  0.0|
|     6|        A|                  0.0|
|     6|        B|                  0.0|
+------+---------+---------------------+



---------------------------------------------------------------------
##### Some extra calculations I made, somewhat showing my step-by-step process and resulting in extra insights

##### Click-Through Rate 

In [6]:
# Output of our total number of clicks and impressions and calculation of CTR 
df1 = spark.createDataFrame(
    sc.parallelize([
    (Clicks.count(), Impressions.count(), Clicks.count()/Impressions.count())
    ]),
    ['clicks_total', 'impressions_total', 'CTR_total']
)

df1.show()

+------------+-----------------+-------------------+
|clicks_total|impressions_total|          CTR_total|
+------------+-----------------+-------------------+
|           6|               13|0.46153846153846156|
+------------+-----------------+-------------------+



In [7]:
# Output of our number of clicks and impressions and calculation of CTR per user (so disregarding the testgroup)
df_user_CTR_total_1 = df_inner_join.select('userid').orderBy('userid')
df_user_CTR_total_1 = df_user_CTR_total_1.groupBy('userid').agg(F.count('userid').alias('user_clicks_total'))
df_user_CTR_total_2 = df_left_join.select('userid').orderBy('userid')
df_user_CTR_total_2 = df_user_CTR_total_2.groupBy('userid').agg(F.count('userid').alias('user_impressions_total'))

df_user_CTR_total = df_user_CTR_total_2.join(df_user_CTR_total_1, ['userid'], 'left').na.fill(0)
df_user_CTR_total = df_user_CTR_total.withColumn('user_CTR_pergroup', (F.col('user_clicks_total') / F.col('user_impressions_total')))
df_user_CTR_total.select('userid', 'user_clicks_total', 'user_impressions_total', 'user_CTR_pergroup').orderBy('userid').show()

+------+-----------------+----------------------+------------------+
|userid|user_clicks_total|user_impressions_total| user_CTR_pergroup|
+------+-----------------+----------------------+------------------+
|     1|                2|                     2|               1.0|
|     2|                2|                     3|0.6666666666666666|
|     3|                2|                     3|0.6666666666666666|
|     4|                0|                     2|               0.0|
|     5|                0|                     1|               0.0|
|     6|                0|                     2|               0.0|
+------+-----------------+----------------------+------------------+



In [8]:
# Output of our number of clicks and impressions and calculation of CTR per testgroup
df_CTR_pergroup_1 = df_inner_join.select('testgroup').orderBy('testgroup')
df_CTR_pergroup_1 = df_CTR_pergroup_1.groupBy('testgroup').agg(F.count('testgroup').alias('clicks_pergroup'))
df_CTR_pergroup_2 = df_left_join.select('testgroup').orderBy('testgroup')
df_CTR_pergroup_2 = df_CTR_pergroup_2.groupBy('testgroup').agg(F.count('testgroup').alias('impressions_pergroup'))
df_CTR_pergroup = df_CTR_pergroup_2.join(df_CTR_pergroup_1, ['testgroup'], 'left').na.fill(0)
df_CTR_pergroup = df_CTR_pergroup.withColumn('CTR_pergroup', (F.col('clicks_pergroup') / F.col('impressions_pergroup')))
df_CTR_pergroup.select('testgroup', 'clicks_pergroup', 'impressions_pergroup', 'CTR_pergroup').orderBy('testgroup').show()

+---------+---------------+--------------------+-------------------+
|testgroup|clicks_pergroup|impressions_pergroup|       CTR_pergroup|
+---------+---------------+--------------------+-------------------+
|        A|              3|                   7|0.42857142857142855|
|        B|              3|                   6|                0.5|
+---------+---------------+--------------------+-------------------+



##### Average Revenue

In [9]:
# The total cpc
Clicks.groupBy().sum().collect()[0][1]

30

In [10]:
# Output of the average cpc of each user (so disregarding the testgroup)
df_user_avg_cpc = df_left_join.select('eventid', 'userid', 'testgroup', 'cpc').orderBy('userid', 'testgroup', 'cpc')
df_user_avg_cpc = df_user_avg_cpc.groupBy('userid').agg(F.mean('cpc').alias('user_avg_cpc'))
df_user_avg_cpc.orderBy('userid').show()

+------+------------------+
|userid|      user_avg_cpc|
+------+------------------+
|     1|               3.5|
|     2|1.3333333333333333|
|     3| 6.333333333333333|
|     4|               0.0|
|     5|               0.0|
|     6|               0.0|
+------+------------------+



In [11]:
# We calculate the average cpc per test group
df_group_avg_cpc = df_left_join.select('testgroup', 'cpc').orderBy('testgroup', 'cpc')
df_group_avg_cpc.groupBy('testgroup').agg(F.mean('cpc').alias('group_avg_cpc')).orderBy('testgroup').show()

+---------+------------------+
|testgroup|     group_avg_cpc|
+---------+------------------+
|        A|2.7142857142857144|
|        B|1.8333333333333333|
+---------+------------------+

