In [5]:
import io
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F, Window

In [6]:
spark = SparkSession.builder.appName('PySpark').getOrCreate()

In [8]:
df = spark.read.parquet('dataset.parquet')
# use window function to agregate by ad_id
windowSpec = Window.partitionBy('ad_id')
df = df.withColumn('day_count', F.approx_count_distinct('date').over(windowSpec))
# make new columns is_cpm and is_cpc
df = df.withColumn('is_cpm', (F.col('ad_cost_type') == 'CPM').cast('integer'))
df = df.withColumn('is_cpc', (F.col('ad_cost_type') == 'CPC').cast('integer'))
# calculate CTR
df = df.withColumn('is_click', (F.col('event') == 'click').cast('integer'))
df = df.withColumn('is_view', (F.col('event') == 'view').cast('integer'))
df = df.withColumn('CTR', F.when(F.sum('is_view').over(windowSpec) == 0, 0).otherwise(
        F.sum('is_click').over(windowSpec) / F.sum('is_view').over(windowSpec)))
# split and write dataframe
train, test = df.randomSplit([0.75, 0.25])
train.coalesce(1).write.parquet('train')
test.coalesce(1).write.parquet('test')