In [213]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import *
import pyspark.sql.functions as func

In [214]:
data = spark.read.parquet("gs://ds-url-catag/plenty_stickers_data/event=BBM-STICKER_SPONSORED_LINK-CLICK/pre_final_ds/*.parquet")
Sticker_datasources = ['event=BBM-STICKER-RECEIVED','event=BBM-STICKER-CLICK','event=BBM-STICKER-DOWNLOAD','event=BBM-STICKER-SEND']

for ds in Sticker_datasources:
    data_read = spark.read.parquet("gs://ds-url-catag/plenty_stickers_data/"+ds+"/pre_final_ds/*.parquet")
    data = data.union(data_read)
new_data = data.dropDuplicates()
new_data = new_data.where(col('user_id_n').isNotNull())

#processing the Abilty_to_pay column for ease of use
new_data = new_data.withColumn('Abilty_to_pay',when(col('Abilty_to_pay') == 'High','2High') \
                               .otherwise(when(col('Abilty_to_pay') == 'Very High','1Very High') \
                               .otherwise(when(col('Abilty_to_pay') == 'Medium','3Medium') \
                               .otherwise(when(col('Abilty_to_pay') == 'low','4low') \
                               .otherwise(when(col('Abilty_to_pay') == 'very low','5very low') \
                               .otherwise(when(col('Abilty_to_pay') == 'NA','6NA')))))))

#processing the Free_sticker_usage column for ease of use
new_data = new_data.withColumn('Free_sticker_usage',when(col('Free_sticker_usage') == 'High','1High') \
                               .otherwise(when(col('Free_sticker_usage') == 'Medium','2Medium') \
                               .otherwise(when(col('Free_sticker_usage') == 'low','3low') \
                               .otherwise(when(col('Free_sticker_usage') == 'NA','4NA')))))

#processing the paid_sticker_usage column for ease of use
new_data = new_data.withColumn('paid_sticker_usage',when(col('paid_sticker_usage') == 'High','1High') \
                               .otherwise(when(col('paid_sticker_usage') == 'Medium','2Medium') \
                               .otherwise(when(col('paid_sticker_usage') == 'low','3low') \
                               .otherwise(when(col('paid_sticker_usage') == 'NA','4NA')))))


can_pay_list = new_data.groupby("user_id_n").agg(func.collect_list("Can_Pay").alias("Can_Pay_list"))
from pyspark.sql.types import StringType
def can_pay(raw):
        raw = list(set(raw))
        n =len(raw)
        if "yes" in raw:
            new_value = "yes"
        else:
            new_value = "No"
        return new_value
can_pay_udf = udf(can_pay,StringType())
can_pay_list = can_pay_list.withColumn("Can_pay",can_pay_udf('Can_Pay_list'))

Abilty_to_pay_list = new_data.groupby("user_id_n").agg(func.collect_list("Abilty_to_pay").alias("Abilty_to_pay_list"))
Free_list = new_data.groupby("user_id_n").agg(func.collect_list("Free_sticker_usage").alias("Free_sticker_usage_list"))
paid_list = new_data.groupby("user_id_n").agg(func.collect_list("paid_sticker_usage").alias("paid_sticker_usage_list"))

import re
def getting_highest_values(raw):
    raw = list(set(raw))
    raw = sorted(raw)
    n =len(raw)
    new_value = raw[0]
    new_value = re.sub(r'[0-9]','',str(raw[0]))
    return new_value
get_high_valuesudf = udf(getting_highest_values,StringType())
Abilty_to_pay_list = Abilty_to_pay_list.withColumn("Ability_to_pay",get_high_valuesudf('Abilty_to_pay_list'))
Free_list = Free_list.withColumn("Free_sticker_usage",get_high_valuesudf('Free_sticker_usage_list'))
paid_list = paid_list.withColumn("paid_sticker_usage",get_high_valuesudf('paid_sticker_usage_list'))

#joining all the dataframes

data_one = can_pay_list.join(Abilty_to_pay_list,Abilty_to_pay_list.user_id_n == can_pay_list.user_id_n,'left').select([can_pay_list.user_id_n,can_pay_list.Can_pay]+[Abilty_to_pay_list.Ability_to_pay])
data_two = data_one.join(Free_list,Free_list.user_id_n == Free_list.user_id_n,'left').select([data_one.user_id_n,data_one.Can_pay,data_one.Ability_to_pay]+[Free_list.Free_sticker_usage])
data_three = data_two.join(paid_list,paid_list.user_id_n == data_two.user_id_n,'left').select([data_two.user_id_n,data_two.Can_pay,data_two.Ability_to_pay,data_two.Free_sticker_usage]+[paid_list.paid_sticker_usage])
final_sticker_data = data_three
final_sticker_data.write.mode('overwrite').parquet(OUTPUT_BUCKET+"Sticker_data/")