In [None]:
#OUTPUT_BUCKET_FOLDER = "gs://<GCS_BUCKET_NAME>/outbrain-click-prediction/output/"
#DATA_BUCKET_FOLDER = "gs://<GCS_BUCKET_NAME>/outbrain-click-prediction/data/"

## Loading data

In [None]:
truncate_day_from_timestamp_udf = F.udf(lambda ts: int(ts / 1000 / 60 / 60 / 24), IntegerType())

In [None]:
events_schema = StructType(
                    [StructField("display_id", IntegerType(), True),
                    StructField("uuid_event", StringType(), True),                    
                    StructField("document_id_event", IntegerType(), True),
                    StructField("timestamp_event", IntegerType(), True),
                    StructField("platform_event", IntegerType(), True),
                    StructField("geo_location_event", StringType(), True)]
                    )

events_df = spark.read.schema(events_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER + "events.csv") \
                .withColumn('day_event', truncate_day_from_timestamp_udf('timestamp_event')) \
                .alias('events')   

In [None]:
promoted_content_schema = StructType(
                    [StructField("ad_id", IntegerType(), True),
                    StructField("document_id_promo", IntegerType(), True),                    
                    StructField("campaign_id", IntegerType(), True),
                    StructField("advertiser_id", IntegerType(), True)]
                    )

promoted_content_df = spark.read.schema(promoted_content_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"promoted_content.csv") \
                .alias('promoted_content')

In [None]:
clicks_train_schema = StructType(
                    [StructField("display_id", IntegerType(), True),
                    StructField("ad_id", IntegerType(), True),                    
                    StructField("clicked", IntegerType(), True)]
                    )

clicks_train_df = spark.read.schema(clicks_train_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"clicks_train.csv") \
                .alias('clicks_train')

In [None]:
clicks_train_joined_df = clicks_train_df \
                         .join(promoted_content_df, on='ad_id', how='left') \
                         .join(events_joined_df, on='display_id', how='left')                         
clicks_train_joined_df.createOrReplaceTempView('clicks_train_joined')

In [None]:
validation_display_ids_df = clicks_train_joined_df.select('display_id','day_event').distinct() \
                                .sampleBy("day_event", fractions={0: 0.2, 1: 0.2, 2: 0.2, 3: 0.2, 4: 0.2, \
                                                                5: 0.2, 6: 0.2, 7: 0.2, 8: 0.2, 9: 0.2, 10: 0.2, \
                                                               11: 1.0, 12: 1.0}, seed=0)   
validation_display_ids_df.createOrReplaceTempView("validation_display_ids")                                                                 

In [None]:
validation_set_df = spark.sql('''SELECT display_id, ad_id, uuid_event, day_event, timestamp_event,
                                        document_id_promo, platform_event, geo_location_event FROM clicks_train_joined t 
             WHERE EXISTS (SELECT display_id FROM validation_display_ids 
                           WHERE display_id = t.display_id)''')

In [None]:
validation_set_gcs_output = "validation_set.parquet"
validation_set_df.write.parquet(OUTPUT_BUCKET_FOLDER+validation_set_gcs_output, mode='overwrite')