In [0]:
%spark.pyspark
import pyspark
from pyspark.context import SparkContext
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number, expr, explode, udf, col,  monotonically_increasing_id, lit
from pyspark.sql.types import StructType, IntegerType, StringType, ByteType
from pyspark.sql.window import Window
import datetime, time
import random
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.functions import *
from pyspark.sql import types as T

In [1]:
%spark.pyspark
sample = spark.read.parquet("s3://audience-generation-output-prod-na/1025819546169638082/impression_log/593353618315725011/2022/11/16/part-00000-bd696fa9-e02d-42c1-91f1-e86dde4c27ac-c000.snappy.parquet")
sample.printSchema()

In [2]:
%spark.pyspark
sample.show(10)

In [3]:
%spark.pyspark
# SD Impression
sd_impression = spark.sql("""
select *
from spektr_ach.d_sd_ad_impressions
where region_id = 1 and spektr_date > '2023-01-01' and marketplace_id = 1 and viewed = 1
""")
sd_impression.printSchema()

In [4]:
%spark.pyspark
sd_impression.filter(F.col("spektr_date")<="2022-11-23").agg(F.count('*')).show()

In [5]:
%spark.pyspark
sd_impression.createOrReplaceTempView("sd_impression")

In [6]:
%spark.pyspark
spark.sql("""select distinct total_impressions, valid_impressions, invalid_impressions from sd_impression""").show()

In [7]:
%spark.pyspark
sd_impression.select(["total_impressions","valid_impressions","invalid_impressions"]).dropDuplicates().show()

In [8]:
%spark.pyspark
# ASIN brand mapping
d_mp_asins_snapshot = spark.sql("""
SELECT asin, brand_id, merchant_brand_name
FROM spektr_booker.d_mp_asins_snapshot
WHERE region_id = 1 AND marketplace_id = 1 AND spektr_date = '2022-11-20'
AND merchant_brand_name IN ('Nilight', 'OnePlus', 'Columbia', 'Rainbocorns', 'Kingston', 'Utopia Kitchen', '3Doodler', 'Arctix', 'Catalonia', 'Panda Grip')
""").dropDuplicates()

d_mp_asins_snapshot.printSchema()

In [9]:
%spark.pyspark
d_mp_asins_snapshot.select("merchant_brand_name").distinct().show()

In [10]:
%spark.pyspark
d_mp_asins_snapshot.createOrReplaceTempView("d_mp_asins_snapshot")

In [11]:
%spark.pyspark
spark.sql("""select merchant_brand_name 
             from d_mp_asins_snapshot 
             where upper(merchant_brand_name) like '%NILIGHT%' 
                or upper(merchant_brand_name) like '%ONEPLUS%'
                or upper(merchant_brand_name) like '%COLUMBIA%'
                or upper(merchant_brand_name) like '%RAINBOCORNS%'
                or upper(merchant_brand_name) like '%KINGSTON%'
                or upper(merchant_brand_name) like '%UTOPIA KITCHEN%'
                or upper(merchant_brand_name) like '%3DOODLER%'
                or upper(merchant_brand_name) like '%ARCTIX%'
                or upper(merchant_brand_name) like '%CATALONIA%'
                or upper(merchant_brand_name) like '%PANDAS GRIP%' 
            group by 1
                """).show()

In [12]:
%spark.pyspark
//  d_mp_asins_snapshot.select("merchant_brand_name").distinct().repartition(1).write.option("header", True).csv("s3u://fortunax/survey-analysis/brand_name")

In [13]:
%spark.pyspark
# method 1
# join sd impressoin with asin to bring in brand name
brand_imp = sd_impression.select("customer_id", F.col("asin"), "campaign_id","total_impressions").join(d_mp_asins_snapshot, "asin", "inner")

#  brand_imp.repartition(1).write.option("header", True).csv("s3u://fortunax/survey-analysis/sd_survey_imp/")

brand_imp.repartition(10, "merchant_brand_name").write.mode('overwrite').partitionBy("merchant_brand_name").parquet("s3u://fortunax/survey-analysis/sd_survey_imp/")

In [14]:
%spark.pyspark
# method 2
# sd_impression = sd_impression.withColumn("asin_sd", F.col("asin"))
# brand_imp = sd_impression.select("customer_id", "asin_sd", "campaign_id","total_impressions").join(d_mp_asins_snapshot, sd_impression.asin_sd == d_mp_asins_snapshot.asin, "inner")\
#             .drop("asin_sd")

# #  brand_imp.repartition(1).write.option("header", True).csv("s3u://fortunax/survey-analysis/sd_survey_imp/")

# # write temp folder for complete output
# brand_imp.repartition(10, "merchant_brand_name").write.mode('overwrite').partitionBy("merchant_brand_name").parquet("s3u://fortunax/survey-analysis/sd_survey_imp/")

In [15]:
%spark.pyspark
brand_imp_new = spark.read.option("header","true").option("recursiveFileLookup","true").parquet("s3u://fortunax/survey-analysis/sd_survey_imp/")
brand_imp_new.printSchema()

In [16]:
%spark.pyspark
# validation of impressions to previous period
z.show(brand_imp_new.groupby("merchant_brand_name").agg(F.sum("total_impressions"), F.countDistinct("customer_id"),  F.countDistinct("campaign_id"), F.countDistinct("brand_id")))

In [17]:
%spark.pyspark
# create campaign id dummary data
campaignid_dummy = spark.createDataFrame([
               Row(merchant_brand_name="Nilight", campaignId="7777777777777777001"),
               Row(merchant_brand_name="OnePlus", campaignId="7777777777777777002"),
               Row(merchant_brand_name="Columbia", campaignId="7777777777777777003"),
               Row(merchant_brand_name="Rainbocorns", campaignId="7777777777777777004"),
               Row(merchant_brand_name="Kingston", campaignId="7777777777777777005"),
               Row(merchant_brand_name="Utopia Kitchen", campaignId="7777777777777777006"),
               Row(merchant_brand_name="3Doodler", campaignId="7777777777777777007"),
               Row(merchant_brand_name="Arctix", campaignId="7777777777777777008"),
               Row(merchant_brand_name="Catalonia", campaignId="7777777777777777009"),
               Row(merchant_brand_name="Panda Grip", campaignId="7777777777777777010")
               ])

campaignid_dummy.show()
campaignid_dummy.printSchema()
   

In [18]:
%spark.pyspark
campaignid_dummy.createOrReplaceTempView("campaignid_dummy")

In [19]:
%spark.pyspark
# spark.sql("""SET spark.sql.shuffle.partitions = 2""")
sqlContext.setConf("spark.sql.shuffle.partitions", "1000")
sqlContext.setConf("spark.sql.adaptive.enabled", "true")
sqlContext.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true")

# # or
# spark.sql("""SET spark.sql.shuffle.partitions = 1000""")
# spark.sql("""SET spark.sql.adaptive.enabled = true""")
# spark.sql("""SET spark.sql.adaptive.coalescePartitions.enabled = true""")

campaignid_dummy = spark.sql("""select * from campaignid_dummy CLUSTER BY merchant_brand_name""")
campaignid_dummy.show()


In [20]:
%spark.pyspark
# create final data
start_time = time.time()

final_brand = brand_imp_new.select(F.col("customer_id").alias("AD_USER_ID").cast("string"), F.col("merchant_brand_name").cast("string")).distinct()\
                           .withColumn("adProductTypeCode",lit(None).cast("string"))\
                           .withColumn("campaignAttributionAlgoId",lit(None).cast("int"))\
                           .withColumn("campaignCfId", lit(None).cast("long"))\
                           .withColumn("campaignDimId", lit(None).cast("long"))\
                           .withColumn("demandChannelCode", lit(None).cast("string"))\
                           .withColumn("demandChannelDimId", lit(None).cast("int"))\
                           .withColumn("deviceType", lit(None).cast("string"))\
                           .withColumn("siteName", lit(None).cast("string"))\
                           .drop("campaign_id")\
                           .join(campaignid_dummy.hint("broadcast"), "merchant_brand_name", "left")\
                           .select("merchant_brand_name","AD_USER_ID","adProductTypeCode","campaignAttributionAlgoId","campaignCfId","campaignDimId",F.col("campaignId").cast("long"),\
                                   "demandChannelCode","demandChannelDimId", "deviceType", "siteName")

final_brand.show()

end_time = time.time()
print("Total time for join {}".format(end_time - start_time))

final_brand.printSchema()

In [21]:
%spark.pyspark
final_brand.explain()

In [22]:
%spark.pyspark
# create final data
start_time = time.time()

final_brand = brand_imp_new.select(F.col("customer_id").alias("AD_USER_ID").cast("string"), F.col("merchant_brand_name").cast("string")).distinct()\
                           .withColumn("adProductTypeCode",lit(None).cast("string"))\
                           .withColumn("campaignAttributionAlgoId",lit(None).cast("int"))\
                           .withColumn("campaignCfId", lit(None).cast("long"))\
                           .withColumn("campaignDimId", lit(None).cast("long"))\
                           .withColumn("demandChannelCode", lit(None).cast("string"))\
                           .withColumn("demandChannelDimId", lit(None).cast("int"))\
                           .withColumn("deviceType", lit(None).cast("string"))\
                           .withColumn("siteName", lit(None).cast("string"))\
                           .drop("campaign_id")\
                           .join(campaignid_dummy, "merchant_brand_name", "left")\
                           .select("merchant_brand_name","AD_USER_ID","adProductTypeCode","campaignAttributionAlgoId","campaignCfId","campaignDimId",F.col("campaignId").cast("long"),\
                                   "demandChannelCode","demandChannelDimId", "deviceType", "siteName")

final_brand.show()

end_time = time.time()
print("Total time for join {}".format(end_time - start_time))

final_brand.printSchema()

In [23]:
%spark.pyspark
final_brand.explain()

In [24]:
%spark.pyspark
final_brand.explain()

In [25]:
%spark.pyspark
spark.sql("""SET spark.sql.adaptive.enabled = true""")
# create final data
start_time = time.time()

final_brand = brand_imp_new.select(F.col("customer_id").alias("AD_USER_ID").cast("string"), F.col("merchant_brand_name").cast("string")).distinct()\
                           .withColumn("adProductTypeCode",lit(None).cast("string"))\
                           .withColumn("campaignAttributionAlgoId",lit(None).cast("int"))\
                           .withColumn("campaignCfId", lit(None).cast("long"))\
                           .withColumn("campaignDimId", lit(None).cast("long"))\
                           .withColumn("demandChannelCode", lit(None).cast("string"))\
                           .withColumn("demandChannelDimId", lit(None).cast("int"))\
                           .withColumn("deviceType", lit(None).cast("string"))\
                           .withColumn("siteName", lit(None).cast("string"))\
                           .drop("campaign_id")\
                           .join(campaignid_dummy, "merchant_brand_name", "left")\
                           .select("merchant_brand_name","AD_USER_ID","adProductTypeCode","campaignAttributionAlgoId","campaignCfId","campaignDimId",F.col("campaignId").cast("long"),\
                                   "demandChannelCode","demandChannelDimId", "deviceType", "siteName")

final_brand.show()

end_time = time.time()
print("Total time for join {}".format(end_time - start_time))

final_brand.printSchema()

In [26]:
%spark.pyspark
z.show(final_brand.sort("campaignId").groupby("merchant_brand_name","campaignId").agg(F.count('*')))

In [27]:
%spark.pyspark
# write to s3 partition by survey id and campaign id
Nilight = final_brand.filter(F.col("merchant_brand_name") == "Nilight").drop("merchant_brand_name")
Nilight.repartition(1, "campaignId").write.mode('overwrite').parquet("s3u://audience-generation-output-prod-na/65083402-f90f-4400-b673-d56dcbd53688/impression_log/7777777777777001/2022/11/23/")
OnePlus = final_brand.filter(F.col("merchant_brand_name") == "OnePlus").drop("merchant_brand_name")
OnePlus.repartition(1, "campaignId").write.mode('overwrite').parquet("s3u://audience-generation-output-prod-na/948b9355-445a-4930-b696-c1e336080e4b/impression_log/7777777777777002/2022/11/23/")
Columbia = final_brand.filter(F.col("merchant_brand_name") == "Columbia").drop("merchant_brand_name")
Columbia.repartition(1, "campaignId").write.mode('overwrite').parquet("s3u://audience-generation-output-prod-na/eed00fe9-f36c-4e04-8396-2e1a523bfa27/impression_log/7777777777777003/2022/11/23/")
Rainbocorns = final_brand.filter(F.col("merchant_brand_name") == "Rainbocorns").drop("merchant_brand_name")
Rainbocorns.repartition(1, "campaignId").write.mode('overwrite').parquet("s3u://audience-generation-output-prod-na/11c393ba-76c9-42f3-b07e-eaef14572c1f/impression_log/7777777777777004/2022/11/23/")
Kingston = final_brand.filter(F.col("merchant_brand_name") == "Kingston").drop("merchant_brand_name")
Kingston.repartition(1, "campaignId").write.mode('overwrite').parquet("s3u://audience-generation-output-prod-na/762a62fe-f204-414e-8a75-66418488758a/impression_log/7777777777777005/2022/11/23/")
UtopiaKitchen = final_brand.filter(F.col("merchant_brand_name") == "Utopia Kitchen").drop("merchant_brand_name")
UtopiaKitchen.repartition(1, "campaignId").write.mode('overwrite').parquet("s3u://audience-generation-output-prod-na/c0980a5a-867e-4c1f-a1cd-a2a3efaeff68/impression_log/7777777777777006/2022/11/23/")
Doodler = final_brand.filter(F.col("merchant_brand_name") == "3Doodler").drop("merchant_brand_name")
Doodler.repartition(1, "campaignId").write.mode('overwrite').parquet("s3u://audience-generation-output-prod-na/ac811bb5-7c9a-47c2-80af-1d2e9ec61f3b/impression_log/7777777777777007/2022/11/23/")
Arctix = final_brand.filter(F.col("merchant_brand_name") == "Arctix").drop("merchant_brand_name")
Arctix.repartition(1, "campaignId").write.mode('overwrite').parquet("s3u://audience-generation-output-prod-na/bf789156-a9f4-49d8-8e61-e9b6c9beecfe/impression_log/7777777777777008/2022/11/23/")
Catalonia = final_brand.filter(F.col("merchant_brand_name") == "Catalonia").drop("merchant_brand_name")
Catalonia.repartition(1, "campaignId").write.mode('overwrite').parquet("s3u://audience-generation-output-prod-na/39ca59c6-4467-4209-b848-1d820443750b/impression_log/7777777777777009/2022/11/23/")
PandaGrip = final_brand.filter(F.col("merchant_brand_name") == "Panda Grip").drop("merchant_brand_name")
PandaGrip.repartition(1, "campaignId").write.mode('overwrite').parquet("s3u://audience-generation-output-prod-na/fde0fa22-2b04-49e7-9e5a-6c73b93ea5c2/impression_log/7777777777777010/2022/11/23/")



In [28]:
%spark.pyspark
# final check writing result
final_brand_check = spark.read.option("header","true").option("recursiveFileLookup","true").parquet("s3u://audience-generation-output-prod-na/fde0fa22-2b04-49e7-9e5a-6c73b93ea5c2/impression_log/7777777777777010/2022/11/23/")

final_brand_check.printSchema()
final_brand_check.show()

In [29]:
%spark.pyspark
final_brand_check.groupby("campaignId").agg(F.count('*')).show()

In [30]:
%spark.pyspark
# SD Impression
sd_impression = spark.sql("""
select *
from spektr_ach.d_sd_ad_impressions
where region_id = 1 and spektr_date >= '2023-01-01' and marketplace_id = 1 and viewed = 1
""")
sd_impression.printSchema()

sd_impression.createOrReplaceTempView("sd_impression")

sd_impression.show()


In [31]:
%spark.pyspark
# ASIN brand mapping
d_mp_asins_snapshot = spark.sql("""
SELECT asin, brand_id, merchant_brand_name
FROM spektr_booker.d_mp_asins_snapshot
WHERE region_id = 1 AND marketplace_id = 1 AND spektr_date = '2023-03-15'
""").dropDuplicates()

d_mp_asins_snapshot.createOrReplaceTempView("d_mp_asins_snapshot")

d_mp_asins_snapshot.printSchema()

In [32]:
%spark.pyspark
sqlContext.setConf("spark.driver.maxResultSize","100g")
sqlContext.setConf("spark.sql.shuffle.partitions", "2")
sqlContext.setConf("spark.sql.adaptive.enabled", "false")
sqlContext.setConf("spark.sql.adaptive.coalescePartitions.enabled", "false")


# join sd impressoin with asin to bring in brand name
# top_sd_imp =  sd_impression.select("asin", "gl_product_group","total_impressions","total_clicks","total_cost")\
#                           .groupBy("asin", "gl_product_group")\
#                           .agg(F.sum("total_impressions").alias("total_impressions"), F.sum("total_clicks").alias("total_clicks"), F.sum("total_cost").alias("total_cost")).orderBy(F.col("total_impressions").desc()).limit(300)

top_sd_imp = spark.sql("""select asin, gl_product_group, sum(total_impressions) as total_impressions, sum(total_clicks) as total_clicks, sum(total_cost) as total_cost 
                          from sd_impression 
                          group by  asin, gl_product_group
                          order by total_impressions desc
                          limit 300 """)
                          
top_sd_imp.createOrReplaceTempView("top_sd_imp")
top_sd_imp.show()

# top_asin = spark.sql("""select asin, merchant_brand_name, brand_id from d_mp_asins_snapshot where asin in (select asin from top_sd_imp)""")

# top_brand_imp = top_sd_imp.select("asin","gl_product_group","total_impressions","total_clicks","total_cost").join(top_asin, "asin", "inner")\
#                          .groupBy("merchant_brand_name","brand_id","gl_product_group").agg(F.sum("total_impressions").alias("total_impressions"), F.sum("total_clicks").alias("total_clicks"), F.sum("total_cost").alias("total_cost"))\
#                          .orderBy(F.col("total_impressions").desc())
# top_brand_imp.show(300, False)

In [33]:
%spark.pyspark
sd_model_input =  spark.read.option("header", True).csv("s3://fortunax/survey-analysis/sd_pilot_result/psm_model_input/model_input_agg.csv")
sd_model_input.printSchema()
sd_model_input.show(truncate = False)
sd_results =  spark.read.option("header", True).csv("s3://fortunax/survey-analysis/sd_pilot_result/data_agg.csv")
sd_results.printSchema()
sd_results.show(truncate = False)

In [34]:
%spark.pyspark
sd_results_join = sd_results.select("customer_id","Brand","Type").distinct().withColumnRenamed("Brand","Brand_new")
sd_final = sd_model_input.drop(F.col("_c0")).join(sd_results_join,
           (sd_model_input.customer_decoration_key == sd_results_join.customer_id) & (sd_model_input.Brand == sd_results_join.Brand_new), how = "left")\
           .drop("customer_id","Brand_new")
sd_final.printSchema()
sd_final.repartition(1).write.mode('overwrite').parquet("s3u://fortunax/survey-analysis/sd_pilot_result/model_input_results_agg/")

In [35]:
%spark.pyspark
sd_results_join = sd_results.select("customer_id","Brand","Type").distinct().withColumnRenamed("Brand","Brand_new")
sd_final = sd_model_input.drop(F.col("_c0")).join(sd_results_join,
           (sd_model_input.customer_decoration_key == sd_results_join.customer_id) & (sd_model_input.Brand == sd_results_join.Brand_new), how = "inner")\
           .drop("customer_id","Brand_new")
sd_final.printSchema()
sd_final.repartition(1).write.mode('overwrite').parquet("s3u://fortunax/survey-analysis/sd_pilot_result/model_input_results_only_agg/")

In [36]:
%spark.pyspark
modeldata_results = spark.read.parquet("s3://audience-generation-output-prod-na/65083402-f90f-4400-b673-d56dcbd53688/65083402-f90f-4400-b673-d56dcbd53688/65083402-f90f-4400-b673-d56dcbd53688/model_data_IPW_final_launch/*")
modeldata_results.printSchema()
modeldata_results.show(10)
modeldata_results.createOrReplaceTempView("modeldata_results")

In [37]:
%spark.pyspark
# write model data to consolidated s3 bucket
brand = 'Catalonia'
modeldata_results.repartition(1).write.mode("overwrite").option("header", True).csv("s3u://fortunax/survey-analysis/sd_pilot_result/psm_model_input/"+brand+"_psm_model_input")

In [38]:
%spark.pyspark
model_input_agg = spark.read.csv("s3://fortunax/survey-analysis/sd_pilot_result/psm_model_input/model_input_agg.csv")
model_input_agg.printSchema()

In [39]:
%spark.pyspark
feature_results = spark.read.parquet("s3://audience-generation-output-prod-na/65083402-f90f-4400-b673-d56dcbd53688/65083402-f90f-4400-b673-d56dcbd53688/65083402-f90f-4400-b673-d56dcbd53688/featurized_data_IPW_final_launch/*")
feature_results.printSchema()
feature_results.show(10)
feature_results.createOrReplaceTempView("feature_results")

In [40]:
%spark.pyspark
projection_results = spark.read.parquet("s3://audience-generation-output-prod-na/65083402-f90f-4400-b673-d56dcbd53688/65083402-f90f-4400-b673-d56dcbd53688/65083402-f90f-4400-b673-d56dcbd53688/projection_IPW_final_launch/*")
projection_results.printSchema()
projection_results.show(10)
projection_results.createOrReplaceTempView("projection_results")

In [41]:
%spark.pyspark
Nilight_projection_results = spark.read.parquet("s3://audience-generation-output-prod-na/65083402-f90f-4400-b673-d56dcbd53688/65083402-f90f-4400-b673-d56dcbd53688/65083402-f90f-4400-b673-d56dcbd53688/projection_IPW_final_launch/*")
Nilight_projection_results = Nilight_projection_results.withColumn("Brand",lit("Nilight"))
OnePlus_projection_results = spark.read.parquet("s3://audience-generation-output-prod-na/948b9355-445a-4930-b696-c1e336080e4b/948b9355-445a-4930-b696-c1e336080e4b/8dddae88-7be0-4899-8b3b-fd6c47615ce8/projection_IPW_final_launch/*")
OnePlus_projection_results = OnePlus_projection_results.withColumn("Brand",lit("OnePlus"))
Columbia_projection_results = spark.read.parquet("s3://audience-generation-output-prod-na/eed00fe9-f36c-4e04-8396-2e1a523bfa27/eed00fe9-f36c-4e04-8396-2e1a523bfa27/7ed0ad45-fe64-42ec-99a8-702a6ba534dd/projection_IPW_final_launch/*")
Columbia_projection_results = Columbia_projection_results.withColumn("Brand",lit("Columbia"))
Rainbocorns_projection_results = spark.read.parquet("s3://audience-generation-output-prod-na/11c393ba-76c9-42f3-b07e-eaef14572c1f/11c393ba-76c9-42f3-b07e-eaef14572c1f/db59afb9-1007-4492-9c09-fbe9b126eca2/projection_IPW_final_launch/*")
Rainbocorns_projection_results = Rainbocorns_projection_results.withColumn("Brand",lit("Rainbocorns"))
Kingston_projection_results = spark.read.parquet("s3://audience-generation-output-prod-na/762a62fe-f204-414e-8a75-66418488758a/762a62fe-f204-414e-8a75-66418488758a/0e583649-4277-4f5c-8eec-a966490441a8/projection_IPW_final_launch/*")
Kingston_projection_results = Kingston_projection_results.withColumn("Brand",lit("Kingston"))
Utopia_Kitchen_projection_results = spark.read.parquet("s3://audience-generation-output-prod-na/c0980a5a-867e-4c1f-a1cd-a2a3efaeff68/c0980a5a-867e-4c1f-a1cd-a2a3efaeff68/546d323b-944b-4b2f-9467-711f3d669540/projection_IPW_final_launch/*")
Utopia_Kitchen_projection_results = Utopia_Kitchen_projection_results.withColumn("Brand",lit("Utopia_Kitche"))
Catalonia_projection_results = spark.read.parquet("s3://audience-generation-output-prod-na/39ca59c6-4467-4209-b848-1d820443750b/39ca59c6-4467-4209-b848-1d820443750b/2bc6aaa8-1af8-45af-826f-5cf0cb438ffc/projection_IPW_final_launch/*")
Catalonia_projection_results = Catalonia_projection_results.withColumn("Brand",lit("Catalonia"))

In [42]:
%spark.pyspark
dfs = [Nilight_projection_results, OnePlus_projection_results,Columbia_projection_results,Rainbocorns_projection_results,Kingston_projection_results, Utopia_Kitchen_projection_results,Catalonia_projection_results]
prediction_union = reduce(DataFrame.unionAll, dfs)
prediction_union.select("Brand").distinct().show()
prediction_union.printSchema()



In [43]:
%spark.pyspark
brand = 'Nilight'
# projection_results.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/sd_pilot_result/psm_model_projection/"+brand+"_psm_projection")
prediction_union.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/sd_pilot_result/psm_model_projection/psm_model_projection/aggregation/")

In [44]:
%spark.pyspark
spark.sql("select label, count(distinct customer_decoration_key) from feature_results group by 1 ").show()

In [45]:
%spark.pyspark
spark.sql("select label, count(distinct customer_decoration_key) from projection_results group by 1 ").show()

In [46]:
%spark.pyspark
spark.sql("select label, count(distinct customer_decoration_key) from modeldata_results group by 1 ").show()

In [47]:
%spark.pyspark
spark.sql("select test_group_selected, count(distinct customer_decoration_key) from modeldata_results group by 1").show()

In [48]:
%spark.pyspark
spark.sql("select label, min(rawPrediction), max(rawPrediction) from projection_results group by 1 ").show()

In [49]:
%spark.pyspark
modeldata_results = spark.read.parquet("s3://audience-generation-output-prod-na/f056f7d5-2899-4afb-8d5a-ca42c86f2189/f056f7d5-2899-4afb-8d5a-ca42c86f2189/3e7999e3-09ea-4fe5-95f5-9bd120f75d11/model_data_IPW_final_launch/*")
modeldata_results.printSchema()
modeldata_results.show(10)
modeldata_results.createOrReplaceTempView("modeldata_results")

In [50]:
%spark.pyspark
20230109_NBA
s3://audience-generation-output-prod-na/82599fa8-7b41-4e8a-bc38-90d748db4093/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/model_data_IPW_final_launch/*
20230109_H1_Brita
s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/model_data_IPW_final_launch/*
20221208_Venmo_Caitlin
s3://audience-generation-output-prod-na/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/48759272-2e28-46b4-bf27-72b44be43483/model_data_IPW_final_launch/*
20221206_Ed_FX_Kindred_STV
s3://audience-generation-output-prod-na/ead972d3-b50b-4626-b1b7-c48a85d2d728/ead972d3-b50b-4626-b1b7-c48a85d2d728/2f45a367-c6e9-4d2e-ae4c-1a77dc28b3b2/model_data_IPW_final_launch/*
20221205_H1_Venmo
s3://audience-generation-output-prod-na/2354f96c-4706-4a94-b455-3fb32a2d4c82/2354f96c-4706-4a94-b455-3fb32a2d4c82/5120ee9f-cf49-4a45-a0d5-9b245ce6bc11/model_data_IPW_final_launch/*
20221109_Kodiak_Cakes
s3://audience-generation-output-prod-na/f056f7d5-2899-4afb-8d5a-ca42c86f2189/f056f7d5-2899-4afb-8d5a-ca42c86f2189/3e7999e3-09ea-4fe5-95f5-9bd120f75d11/model_data_IPW_final_launch/*

In [51]:
%spark.pyspark
survey = '20221109_Kodiak_Cakes'
modeldata_results.repartition(1).write.mode("overwrite").option("header", True).csv("s3u://fortunax/survey-analysis/adsp_bl_model/psm_model_input/"+survey+"_psm_model_input")

In [52]:
%spark.pyspark
Model_input1 = spark.read.parquet("s3://audience-generation-output-prod-na/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/48759272-2e28-46b4-bf27-72b44be43483/model_data_IPW_final_launch/*")
Model_input1 = Model_input1.withColumn("Survey",lit("20221208_Venmo_Caitlin"))
Model_input2 = spark.read.parquet("s3://audience-generation-output-prod-na/ead972d3-b50b-4626-b1b7-c48a85d2d728/ead972d3-b50b-4626-b1b7-c48a85d2d728/2f45a367-c6e9-4d2e-ae4c-1a77dc28b3b2/model_data_IPW_final_launch/*")
Model_input2 = Model_input2.withColumn("Survey",lit("20221206_Ed_FX_Kindred_STV"))
Model_input3 = spark.read.parquet("s3://audience-generation-output-prod-na/2354f96c-4706-4a94-b455-3fb32a2d4c82/2354f96c-4706-4a94-b455-3fb32a2d4c82/5120ee9f-cf49-4a45-a0d5-9b245ce6bc11/model_data_IPW_final_launch/*")
Model_input3 = Model_input3.withColumn("Survey",lit("20221205_H1_Venmo"))
Model_input4 = spark.read.parquet("s3://audience-generation-output-prod-na/f056f7d5-2899-4afb-8d5a-ca42c86f2189/f056f7d5-2899-4afb-8d5a-ca42c86f2189/3e7999e3-09ea-4fe5-95f5-9bd120f75d11/model_data_IPW_final_launch/*")
Model_input4 = Model_input4.withColumn("Survey",lit("20221109_Kodiak_Cakes"))

dfs = [Model_input1,Model_input2,Model_input3,Model_input4]
model_input_union = reduce(DataFrame.unionAll, dfs)
model_input_union.select("Survey").distinct().show()
model_input_union.printSchema()

model_input_union.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/psm_model_input/aggregation")

# add new ADSP BL survey
# model_input_union_old = spark.read.parquet("s3://fortunax/survey-analysis/adsp_bl_model/psm_model_input/aggregation")
# Model_input5 = spark.read.parquet("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/model_data_IPW_final_launch/*")
# Model_input5 = Model_input5.withColumn("Survey",lit("20230109_H1_Brita"))

# dfs = [model_input_union_old,Model_input5]
# model_input_union = reduce(DataFrame.unionAll, dfs)
# model_input_union.select("Survey").distinct().show()
# model_input_union.printSchema()

# model_input_union.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/psm_model_input/aggregation")

In [53]:
%spark.pyspark
Control1 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/48759272-2e28-46b4-bf27-72b44be43483/control_group_final_launch/*")
Control1 = Control1.withColumn("Survey",lit("20221208_Venmo_Caitlin"))
Control2 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/ead972d3-b50b-4626-b1b7-c48a85d2d728/ead972d3-b50b-4626-b1b7-c48a85d2d728/2f45a367-c6e9-4d2e-ae4c-1a77dc28b3b2/control_group_final_launch/*")
Control2 = Control2.withColumn("Survey",lit("20221206_Ed_FX_Kindred_STV"))
Control3 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/2354f96c-4706-4a94-b455-3fb32a2d4c82/2354f96c-4706-4a94-b455-3fb32a2d4c82/5120ee9f-cf49-4a45-a0d5-9b245ce6bc11/control_group_final_launch/*")
Control3 = Control3.withColumn("Survey",lit("20221205_H1_Venmo"))
Control4 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/f056f7d5-2899-4afb-8d5a-ca42c86f2189/f056f7d5-2899-4afb-8d5a-ca42c86f2189/3e7999e3-09ea-4fe5-95f5-9bd120f75d11/control_group_final_launch/*")
Control4 = Control4.withColumn("Survey",lit("20221109_Kodiak_Cakes"))

dfs = [Control1,Control2,Control3,Control4]
control_union = reduce(DataFrame.unionAll, dfs)
control_union.select("Survey").distinct().show()
control_union.printSchema()

control_union.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/control_group_final_launch/aggregation")


# # add new ADSP BL survey
# control_union_old =  spark.read.parquet("s3://fortunax/survey-analysis/adsp_bl_model/control_group_final_launch/aggregation")
# Control5 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/control_group_final_launch/*")
# Control5 = Control5.withColumn("Survey",lit("20230109_H1_Brita"))

# dfs = [control_union_old,Control5]
# control_union = reduce(DataFrame.unionAll, dfs)
# control_union.select("Survey").distinct().show()
# control_union.printSchema()

# control_union.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/control_group_final_launch/aggregation")


In [54]:
%spark.pyspark
Test1 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/48759272-2e28-46b4-bf27-72b44be43483/test_group_final_launch/*")
Test1 = Test1.withColumn("Survey",lit("20221208_Venmo_Caitlin"))
Test2 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/ead972d3-b50b-4626-b1b7-c48a85d2d728/ead972d3-b50b-4626-b1b7-c48a85d2d728/2f45a367-c6e9-4d2e-ae4c-1a77dc28b3b2/test_group_final_launch/*")
Test2 = Test2.withColumn("Survey",lit("20221206_Ed_FX_Kindred_STV"))
Test3 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/2354f96c-4706-4a94-b455-3fb32a2d4c82/2354f96c-4706-4a94-b455-3fb32a2d4c82/5120ee9f-cf49-4a45-a0d5-9b245ce6bc11/test_group_final_launch/*")
Test3 = Test3.withColumn("Survey",lit("20221205_H1_Venmo"))
Test4 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/f056f7d5-2899-4afb-8d5a-ca42c86f2189/f056f7d5-2899-4afb-8d5a-ca42c86f2189/3e7999e3-09ea-4fe5-95f5-9bd120f75d11/test_group_final_launch/*")
Test4 = Test4.withColumn("Survey",lit("20221109_Kodiak_Cakes"))

dfs = [Test1,Test2,Test3,Test4]
test_union = reduce(DataFrame.unionAll, dfs)
test_union.select("Survey").distinct().show()
test_union.printSchema()

test_union.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/test_group_final_launch/aggregation")



In [55]:
%spark.pyspark
projection1 = spark.read.parquet("s3://audience-generation-output-prod-na/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/48759272-2e28-46b4-bf27-72b44be43483/projection_IPW_final_launch/*")
projection1 = projection1.withColumn("Survey",lit("20221208_Venmo_Caitlin"))
projection2 = spark.read.parquet("s3://audience-generation-output-prod-na/ead972d3-b50b-4626-b1b7-c48a85d2d728/ead972d3-b50b-4626-b1b7-c48a85d2d728/2f45a367-c6e9-4d2e-ae4c-1a77dc28b3b2/projection_IPW_final_launch/*")
projection2 = projection2.withColumn("Survey",lit("20221206_Ed_FX_Kindred_STV"))
projection3 = spark.read.parquet("s3://audience-generation-output-prod-na/2354f96c-4706-4a94-b455-3fb32a2d4c82/2354f96c-4706-4a94-b455-3fb32a2d4c82/5120ee9f-cf49-4a45-a0d5-9b245ce6bc11/projection_IPW_final_launch/*")
projection3 = projection3.withColumn("Survey",lit("20221205_H1_Venmo"))
projection4 = spark.read.parquet("s3://audience-generation-output-prod-na/f056f7d5-2899-4afb-8d5a-ca42c86f2189/f056f7d5-2899-4afb-8d5a-ca42c86f2189/3e7999e3-09ea-4fe5-95f5-9bd120f75d11/projection_IPW_final_launch/*")
projection4 = projection4.withColumn("Survey",lit("20221109_Kodiak_Cakes"))

dfs = [projection1, projection2, projection3, projection4]
projection_union = reduce(DataFrame.unionAll, dfs)
projection_union.select("Survey").distinct().show()
projection_union.printSchema()

projection_union.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/projection/aggregation")

In [56]:
%spark.pyspark
model_input_union = spark.read.parquet("s3://fortunax/survey-analysis/adsp_bl_model/psm_model_input/aggregation/*")
control_union =spark.read.parquet("s3://fortunax/survey-analysis/adsp_bl_model/control_group_final_launch/aggregation/*")
test_union = spark.read.parquet("s3://fortunax/survey-analysis/adsp_bl_model/test_group_final_launch/aggregation/*")
projection_union = spark.read.parquet("s3://fortunax/survey-analysis/adsp_bl_model/projection/aggregation/*")

In [57]:
%spark.pyspark
# join model input data to psm model selected group
dfs = [control_union.select("customer_decoration_key","Survey").withColumn("Type",lit("Control")),test_union.select("customer_decoration_key","Survey").withColumn("Type",lit("Test"))]
group_union = reduce(DataFrame.unionAll, dfs)

final_agg = model_input_union.join(group_union, ["customer_decoration_key","Survey"],"left")\
                             .join(projection_union.select("customer_decoration_key","Survey","rawPrediction"), ["customer_decoration_key","Survey"],"left")

final_agg.printSchema()           

final_agg.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/final_agg/")


In [58]:
%spark.pyspark
# Add new ADSP BL survey
Model_input5 = spark.read.parquet("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/model_data_IPW_final_launch/*")
Model_input5 = Model_input5.withColumn("Survey",lit("20230109_H1_Brita"))
Control5 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/control_group_final_launch/*")
Control5 = Control5.withColumn("Survey",lit("20230109_H1_Brita"))
Test5 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/test_group_final_launch/*")
Test5 = Test5.withColumn("Survey",lit("20230109_H1_Brita"))
projection5 = spark.read.parquet("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/projection_IPW_final_launch/*")
projection5 = projection5.withColumn("Survey",lit("20230109_H1_Brita"))

final_agg = spark.read.parquet("s3u://fortunax/survey-analysis/adsp_bl_model/final_agg/")
final_agg.printSchema()

dfs = [Control5.select("customer_decoration_key","Survey").withColumn("Type",lit("Control")),Test5.select("customer_decoration_key","Survey").withColumn("Type",lit("Test"))]
group_union = reduce(DataFrame.unionAll, dfs)

final_agg5 = Model_input5.join(group_union, ["customer_decoration_key","Survey"],"left")\
                         .join(projection5.select("customer_decoration_key","Survey","rawPrediction"), ["customer_decoration_key","Survey"],"left")
final_agg5.printSchema()    

dfs2 = [final_agg, final_agg5]
final_agg_new = reduce(DataFrame.unionAll, dfs2)
final_agg_new.printSchema()    

final_agg_new.select("Survey").distinct().show()

final_agg_new.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/final_agg_new/")


In [59]:
%spark.pyspark
sqlContext.setConf("spark.driver.maxResultSize","30g")
sqlContext.setConf("spark.sql.shuffle.partitions", "10")
sqlContext.setConf("spark.sql.adaptive.enabled", "true")
sqlContext.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true")

# survey_list = spark.read.csv("s3://fortunax/survey-analysis/adsp_bl_model/100surveylist/ADSP_SurveyAll.csv", header = True)
survey_list  =spark.read.csv("s3://fortunax/survey-analysis/adsp_bl_model/100surveylist/ADSP_SurveyAll0127.csv", header = True)\
                        .withColumn("Created_Date",date_format("Created_Date","YYYY-MM-dd").cast("date"))\
                        .withColumnRenamed("Survey_Title","Survey")\
                         .withColumnRenamed("StudyID","SurveyId")
survey_list.printSchema()
name_dict = {row['SurveyId']: row['Survey'] for row in survey_list.collect()}
print(name_dict)


schema = spark.read.parquet("s3://fortunax/survey-analysis/adsp_bl_model/final_agg_new/*").withColumn("SurveyId", lit("NA")).schema

# schema1 = spark.read.parquet("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/model_data_IPW_final_launch/*")\
#               .withColumn("Survey",lit("NA")).schema
# schema2 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/control_group_final_launch/*")\
#               .withColumn("Survey",lit("NA")).schema
# schema3 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/test_group_final_launch/*")\
#              .withColumn("Survey",lit("NA")).schema
# schema4 = spark.read.parquet("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/projection_IPW_final_launch/*")\
#               .withColumn("Survey",lit("NA")).schema

final_agg100 = spark.createDataFrame([], schema)
no_result = 0

for surveyid, surveyname in name_dict.items(): 
    try: 
        # model input
        print(surveyid, surveyname)
        indi_survey = spark.read.parquet("s3://audience-generation-output-prod-na/"+surveyid+"/*/*/model_data_IPW_final_launch/")
        indi_survey =  indi_survey.withColumn("Survey",lit(surveyname))
        # control and test
        indi_control = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/"+surveyid+"/*/*/control_group_final_launch/*")\
                                 .withColumn("Survey",lit(surveyname))\
                                 .withColumn("Type",lit("Control"))\
                                 .select("customer_decoration_key","Survey","Type")
        indi_test = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/"+surveyid+"/*/*/test_group_final_launch/*")\
                                 .withColumn("Survey",lit(surveyname))\
                                 .withColumn("Type",lit("Test"))\
                                 .select("customer_decoration_key","Survey","Type")
                                 
        dfs_tc = [indi_control, indi_test]
        tc_union = reduce(DataFrame.unionAll, dfs_tc)
        # projection
        indi_project = spark.read.parquet("s3://audience-generation-output-prod-na/"+surveyid+"/*/*/projection_IPW_final_launch/*")\
                                 .withColumn("Survey",lit(surveyname))\
                                 .select("customer_decoration_key","Survey","rawPrediction")
        # final agg individual survey
        indi_final_agg = indi_survey.join(tc_union, ["customer_decoration_key","Survey"],"left")\
                                    .join(indi_project, ["customer_decoration_key","Survey"],"left")\
                                    .withColumn("SurveyId", lit(surveyid))
                                    
        indi_final_agg.groupBy("Survey").agg(F.count("test_group_selected")).show(truncate = False)
        # union all survey
        dfs = [final_agg100,  indi_final_agg]
        final_agg100 = reduce(DataFrame.unionAll, dfs).distinct()
        
        continue
    
    except: 
        print(f"{surveyid}, {surveyname}  does not have model generated results")
        no_result += 1
        
final_agg100.groupBy(["Survey"]).agg(F.count("test_group_selected")).show(200, truncate = False)
final_agg100.agg(F.countDistinct("Survey")).show()
print(f"Total {no_result} surveys no found model generated results")

# join survey info
if "Created_Date" not in final_agg100.schema.fieldNames():
    final_agg100 = final_agg100.join(survey_list, "SurveyId","left").drop(survey_list.SurveyId).drop(survey_list.Survey)

# add old adsp survey
final_agg_add =  spark.read.parquet("s3://fortunax/survey-analysis/adsp_bl_model/100final_agg/*").filter("Survey='Starbucks_AudSurvey_SnS_Dec22'")\
                      .join(survey_list, "Survey","left").drop(survey_list.Survey)
offline_bl = spark.read.parquet("s3://fortunax/survey-analysis/adsp_bl_model/final_agg_new/*")\
                      .join(survey_list, "Survey","left").drop(survey_list.Survey)
final_agg100_new = final_agg100.unionByName(final_agg_add).unionByName(offline_bl)
final_agg100_new.agg(F.countDistinct("Survey")).show()

# write to s3
final_agg100_new.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/100final_agg_0128/")
        

In [60]:
%spark.pyspark

sqlContext.setConf("spark.driver.maxResultSize","30g")
sqlContext.setConf("spark.sql.shuffle.partitions", "10")
sqlContext.setConf("spark.sql.adaptive.enabled", "true")
sqlContext.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true")

# survey_list = spark.read.csv("s3://fortunax/survey-analysis/adsp_bl_model/100surveylist/ADSP_SurveyAll.csv", header = True)
survey_list  =spark.read.csv("s3://fortunax/survey-analysis/adsp_bl_model/100surveylist/ADSP_Survey_All_0228.csv", header = True)\
                        .withColumn("Created_Date",date_format("Created_Date","YYYY-MM-dd").cast("date"))\
                        .withColumnRenamed("Survey_Title","Survey")\
                         .withColumnRenamed("StudyID","SurveyId")
survey_list.printSchema()
name_dict = {row['SurveyId']: row['Survey'] for row in survey_list.collect()}
print(name_dict)


schema = spark.read.parquet("s3://fortunax/survey-analysis/adsp_bl_model/final_agg_new/*").withColumn("SurveyId", lit("NA")).schema

# schema1 = spark.read.parquet("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/model_data_IPW_final_launch/*")\
#               .withColumn("Survey",lit("NA")).schema
# schema2 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/control_group_final_launch/*")\
#               .withColumn("Survey",lit("NA")).schema
# schema3 = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/test_group_final_launch/*")\
#              .withColumn("Survey",lit("NA")).schema
# schema4 = spark.read.parquet("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/projection_IPW_final_launch/*")\
#               .withColumn("Survey",lit("NA")).schema

final_agg100 = spark.createDataFrame([], schema)
no_result = 0

for surveyid, surveyname in name_dict.items(): 
    try: 
        # model input
        print(surveyid, surveyname)
        indi_survey = spark.read.parquet("s3://audience-generation-output-prod-na/"+surveyid+"/*/*/model_data_IPW_final_launch/")
        indi_survey =  indi_survey.withColumn("Survey",lit(surveyname))
        # control and test
        indi_control = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/"+surveyid+"/*/*/control_group_final_launch/*")\
                                 .withColumn("Survey",lit(surveyname))\
                                 .withColumn("Type",lit("Control"))\
                                 .select("customer_decoration_key","Survey","Type")
        indi_test = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/"+surveyid+"/*/*/test_group_final_launch/*")\
                                 .withColumn("Survey",lit(surveyname))\
                                 .withColumn("Type",lit("Test"))\
                                 .select("customer_decoration_key","Survey","Type")
                                 
        dfs_tc = [indi_control, indi_test]
        tc_union = reduce(DataFrame.unionAll, dfs_tc)
        # projection
        indi_project = spark.read.parquet("s3://audience-generation-output-prod-na/"+surveyid+"/*/*/projection_IPW_final_launch/*")\
                                 .withColumn("Survey",lit(surveyname))\
                                 .select("customer_decoration_key","Survey","rawPrediction")
        # final agg individual survey
        indi_final_agg = indi_survey.join(tc_union, ["customer_decoration_key","Survey"],"left")\
                                    .join(indi_project, ["customer_decoration_key","Survey"],"left")\
                                    .withColumn("SurveyId", lit(surveyid))
                                    
        indi_final_agg.groupBy("Survey").agg(F.count("test_group_selected")).show(truncate = False)
        # union all survey
        dfs = [final_agg100,  indi_final_agg]
        final_agg100 = reduce(DataFrame.unionAll, dfs).distinct()
        
        continue
    
    except: 
        print(f"{surveyid}, {surveyname}  does not have model generated results")
        no_result += 1
        
final_agg100.groupBy(["Survey"]).agg(F.count("test_group_selected")).show(200, truncate = False)
final_agg100.agg(F.countDistinct("Survey")).show()
print(f"Total {no_result} surveys no found model generated results")

# join survey info
if "Created_Date" not in final_agg100.schema.fieldNames():
    final_agg100 = final_agg100.join(survey_list, "SurveyId","left").drop(survey_list.SurveyId).drop(survey_list.Survey)


# write to s3
final_agg100.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/100final_agg_0228/")
        

In [61]:
%spark.pyspark
# Remove duplicated columns - need to run multiple times if more than 2 duplicates
if "Created_Date" not in final_agg100_s.schema.fieldNames()):
    final_agg100_s = final_agg100.join(survey_list, "SurveyId","left").drop(survey_list.SurveyId)
final_agg100_s.printSchema()
# drop duplicated columns
df_cols = final_agg100_s.columns
# get index of the duplicate columns
duplicate_col_index = list(set([df_cols.index(c) for c in df_cols if df_cols.count(c) >= 2]))

# rename by adding suffix '_duplicated'
for i in duplicate_col_index:
    df_cols[i] = df_cols[i] + '_duplicated'

# rename the column in DF
final_agg100_s = final_agg100_s.toDF(*df_cols)

# remove flagged columns
cols_to_remove = [c for c in df_cols if '_duplicated' in c]
final_agg100_s = final_agg100_s.drop(*cols_to_remove)
final_agg100_s.printSchema()

In [62]:
%spark.pyspark
final_agg100_new = final_agg100_s.unionByName(final_agg_add).unionByName(offline_bl)
final_agg100_new.agg(F.countDistinct("Survey")).show()

# write to s3
final_agg100_new.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/100final_agg_0128/")

In [63]:
%spark.pyspark
final_agg100_check =  spark.read.parquet("s3://fortunax/survey-analysis/adsp_bl_model/100final_agg_0228/*")
final_agg100_check.printSchema()
final_agg100_check.agg(F.countDistinct("SurveyId")).show(300, truncate = False)
final_agg100_check.select("Survey","SurveyId").distinct().orderBy("Survey").show(300, truncate = False)
final_agg100_check.groupBy("Survey","SurveyId").agg(F.count("Type")).show(300, truncate = False)
final_agg100_check.groupBy("segment").agg(F.countDistinct("customer_decoration_key")).show(300, truncate = False)
final_agg100_check.select("purchase_segment_code","recency_segment_id","tenure_segment_id","segment").distinct().show(300, truncate = False)

In [64]:
%spark.pyspark
shadow_test = ['1010199001735609393',
 '1013478236810494046',
 '1011410602925589885',
 '1022953695740577527',
 '1016559059479652690',
 '1021307103947166529',
 '1024260143819101763',
 '1025409659045532045',
 '1019913940017188116',
 '1022500192045985969',
 '1015215940946229133',
 '1018995918905193997',
 '1010391264163697834']
# final_agg100_check_shadow = final_agg100_check.filter(F.col("SurveyId").isin(shadow_test))
final_agg100_check_shadow.agg(F.countDistinct("SurveyId")).show(300, truncate = False)
# final_agg100_check_shadow.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/100final_agg_0314_shadow2/")
# final_agg100_check.repartition(1).write.mode("overwrite").option("header", True).csv("s3u://fortunax/survey-analysis/100final_agg_0314_shadow2/")

for survey_id in shadow_test:
    df_test =final_agg100_check_shadow.filter((F.col("SurveyId")==survey_id) & (F.col("Type")=="Test"))
    df_control =final_agg100_check_shadow.filter((F.col("SurveyId")==survey_id) & (F.col("Type")=="Control"))
    df_test.repartition(1).write.mode("overwrite").option("header", True).csv("s3u://fortunax/survey-analysis/brand_lift_test&control/"+survey_id+"/test/")
    df_control.repartition(1).write.mode("overwrite").option("header", True).csv("s3u://fortunax/survey-analysis/brand_lift_test&control/"+survey_id+"/control/")
    

In [65]:
%spark.pyspark
surveyid = '1016799845470573444'
# indi_check = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/"+surveyid+"/*/*/test_group_final_launch/*")
indi_check =  spark.read.parquet("s3://audience-generation-output-prod-na/"+surveyid+"/*/*/model_data_IPW_final_launch/")
indi_check.agg(F.countDistinct("customer_decoration_key"),F.count("customer_decoration_key")).show(300, truncate = False)
indi_check.agg(F.count("test_group_selected")).show(300, truncate = False)
indi_check.select("customer_decoration_key","test_group_selected").distinct().agg(F.count("test_group_selected")).show(300, truncate = False)
final_agg100_check.filter(F.col("SurveyId") == surveyid).distinct().groupBy("Survey","SurveyId").agg(F.count("Type")).show(300, truncate = False)

In [66]:
%spark.pyspark
survey_list  =spark.read.csv("s3://fortunax/survey-analysis/adsp_bl_model/100surveylist/ADSP_Survey_All_0228.csv", header = True)\
                        .withColumn("Created_Date",date_format("Created_Date","YYYY-MM-dd").cast("date"))
bias_diff = spark.read.csv("s3://klensink-dev/surveys/shadow-test-control-groups/2023-03-03/bias_improvement.csv", header = True)
survey_bias_join = survey_list.join(bias_diff.select("diff","SurveyId"),survey_list.StudyId == bias_diff.SurveyId, "left").drop(bias_diff.SurveyId)\
                              .withColumn("StudyId",col("StudyId").cast("string"))
survey_bias_join.printSchema()
survey_bias_join.show()
survey_bias_join.repartition(1).write.mode("overwrite").option("header", True).csv("s3u://fortunax/survey-analysis/adsp_bl_model/100surveylist/ADSP_Survey_All_0228_with_bias")

In [67]:
%spark.pyspark
# number of survey per panelist
final_agg100_check.filter(F.col("Type").isNotNull()).groupBy("customer_decoration_key").agg(F.countDistinct("SurveyId").alias("Survey")).agg(F.max("Survey")).show()
max_survey = final_agg100_check.filter(F.col("Type").isNotNull()).groupBy("customer_decoration_key").agg(F.countDistinct("SurveyId").alias("Survey")).agg(F.max("Survey")).collect()[0][0]
print(max_survey)
final_agg100_check.filter(F.col("Type").isNotNull()).groupBy("customer_decoration_key").agg(F.countDistinct("SurveyId").alias("Survey")).orderBy(F.col("Survey").desc()).show(truncate = False)
final_agg100_check.filter(F.col("Type").isNotNull()).groupBy("customer_decoration_key").agg(F.countDistinct("SurveyId").alias("Survey")).groupBy("Survey").agg(F.countDistinct("customer_decoration_key").alias("Count_Cust")).orderBy("Count_Cust").show(truncate = False)
final_agg100_check.filter(F.col("Type").isNotNull()).agg(F.countDistinct("customer_decoration_key")).show()

### by type
final_agg100_check.filter(F.col("Type")=='Test').groupBy("customer_decoration_key").agg(F.countDistinct("SurveyId").alias("Survey")).groupBy("Survey").agg(F.countDistinct("customer_decoration_key").alias("Count_Cust")).orderBy("Count_Cust").show(truncate = False)
final_agg100_check.filter(F.col("Type")=='Control').groupBy("customer_decoration_key").agg(F.countDistinct("SurveyId").alias("Survey")).groupBy("Survey").agg(F.countDistinct("customer_decoration_key").alias("Count_Cust")).orderBy("Count_Cust").show(truncate = False)
final_agg100_check.filter(F.col("Type").isNotNull()).groupBy("Type").agg(F.countDistinct("customer_decoration_key")).show()

# final_agg100_check.filter(F.col("Type").isNotNull()).select("customer_decoration_key","Survey").filter(F.col("customer_decoration_key") == '01010e41e1dde97d5696d2ce3bdd4d4a14df64b2673c17fd65ba65b7a98da4825613').distinct().show(truncate = False)
final_agg100_check.filter(F.col("Type").isNotNull()).groupBy("Survey","Type").agg(F.countDistinct("customer_decoration_key")).show(truncate = False)
final_agg100_check.filter(F.col("Type").isNotNull()).groupBy("Survey","Type").agg(F.countDistinct("customer_decoration_key").alias("Count_Cust")).agg(F.mean("Count_Cust")).show(truncate = False)

### by create day
final_agg100_check.filter(F.col("Type").isNotNull()).groupBy("Created_Date","customer_decoration_key").agg(F.countDistinct("SurveyId").alias("Survey")).orderBy(F.col("Survey").desc()).show(truncate = False)
final_agg100_check.filter(F.col("Type").isNotNull()).groupBy("Created_Date","customer_decoration_key").agg(F.countDistinct("SurveyId").alias("Survey")).groupBy("Survey").agg(F.countDistinct("customer_decoration_key").alias("Count_Cust")).orderBy("Count_Cust").show(truncate = False)

final_agg100_check.filter(F.col("Type").isNotNull()).filter(F.col("customer_decoration_key") == '0101e130ef7ab0d6dcf468f39ffc3668c5dc6d3722e477c0021e09a580ca79b91063').filter(F.col("Created_Date") == '2022-12-30').select("Survey","Type").distinct().show(truncate = False)

In [68]:
%spark.pyspark
final_agg100_check.filter(F.col("Type").isNotNull()).agg(F.countDistinct("customer_decoration_key")).show()

In [69]:
%spark.pyspark
z.show(final_agg100_check.filter(F.col("Type")=='Control').groupBy("customer_decoration_key").agg(F.countDistinct("SurveyId").alias("Survey")).groupBy("Survey").agg(F.countDistinct("customer_decoration_key").alias("Count_Cust")).orderBy("Count_Cust"))

In [70]:
%spark.pyspark
# by day
z.show(final_agg100_check.filter(F.col("Type").isNotNull()).groupBy("Created_Date","customer_decoration_key").agg(F.countDistinct("SurveyId").alias("Survey")).groupBy("Survey").agg(F.countDistinct("customer_decoration_key").alias("Count_Cust")).orderBy("Count_Cust"))
z.show(final_agg100_check.filter(F.col("Type").isNotNull()).groupBy("Created_Date","customer_decoration_key").agg(F.countDistinct("SurveyId").alias("Survey")).groupBy("Created_Date").agg(F.max("Survey").alias("Max_Survey")).orderBy("Created_Date"))
z.show(final_agg100_check.filter(F.col("Type").isNotNull()).groupBy("Created_Date","customer_decoration_key").agg(F.countDistinct("SurveyId").alias("Survey")).select("Created_Date","Survey"))


In [71]:
%spark.pyspark
shadow_test_list = ['1015930497617062914','1012205384885535143','1022024940649855646','1018740571717925961','1015352820333445760','1013328087937099804','1009907040961550818','1024809370913572286','1015750600094221428','1015381936683407494','1009186255172395887','1016157025496975007','1022446369444034105','1012657064956340970','1015348741617230198']
final_agg100_check.filter(F.col("SurveyId").isin(shadow_test_list)).filter(F.col("Type").isNotNull()).groupBy("Survey","Type").agg(F.countDistinct("customer_decoration_key")).orderBy("Survey","Type").show(100, False)

In [72]:
%spark.pyspark
final_agg100.groupBy(["Survey"]).agg(F.count("test_group_selected")).alias("test").agg(F.concat_ws("|",F.collect_list(F.col("Survey")))).first()[0]
# final_agg100.select("Survey").distinct().agg(F.concat_ws("|",F.collect_list(F.col("Survey")))).first()[0]

In [73]:
%spark.pyspark
from pyspark.sql.functions import monotonically_increasing_id 
from pyspark.sql.window import Window  
final_agg_100check =  spark.read.parquet("s3u://fortunax/survey-analysis/adsp_bl_model/100final_agg/")
w = Window.orderBy("Survey")
# final_agg100.select("Survey").distinct().withColumn("id", monotonically_increasing_id()).show(100)
final_agg_100check.select("Survey").distinct().withColumn("id", row_number().over(w)).select("id","Survey").show(300, truncate = False)

In [74]:
%spark.pyspark
final_agg_100check =  spark.read.parquet("s3u://fortunax/survey-analysis/adsp_bl_model/100final_agg/")
model_input100check =  spark.read.parquet("s3u://fortunax/survey-analysis/adsp_bl_model/100psm_model_input/")
model_input100check.select("Survey").distinct().join(final_agg_100check.select("Survey").distinct(), "Survey", "leftanti").show(truncate = False)

In [75]:
%spark.pyspark
final_agg_new = spark.read.parquet("s3u://fortunax/survey-analysis/adsp_bl_model/final_agg_new/")
final_agg_new.printSchema()

final_agg_new.filter(F.col("Type") == 'Test').groupBy("Survey").agg(F.count("test_group_selected"), F.count("Type"),F.count("rawPrediction")).show()
final_agg_new.filter(F.col("Type") == 'Control').groupBy("Survey").agg(F.count("test_group_selected"), F.count("Type"),F.count("rawPrediction")).show()
final_agg_new.groupBy(["Survey","Type"]).agg(F.count("test_group_selected"), F.count("Type")).orderBy("Survey","Type").show()
final_agg_new.groupBy(["Survey","test_group_selected"]).agg(F.count("test_group_selected"), F.count("Type")).orderBy("Survey","test_group_selected").show()
final_agg_new.filter((F.col("test_group_selected")=='Y') & (F.col("Type") != 'Test')).show()
final_agg_new.filter(F.col("test_group_selected").isNull()).filter(F.col("Type").isNotNull()).show()

In [76]:
%spark.pyspark
final_agg_all = spark.read.option("header", True).csv("s3://science-onlinepanel/klensink-dev/surveys/abl_sd_adsp_agg.csv")
final_agg_all.printSchema()

In [77]:
%spark.pyspark
final_agg_response =  spark.read.parquet("s3u://qqian-test/temp_data/feature_selection/")
final_agg_response.printSchema()
final_agg_response.filter(F.col("has_qualified_answer").isNotNull()).groupby("Survey").agg(F.countDistinct("customer_decoration_key")).show()
final_agg_response.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/feature_selection/")

final_agg_response_bl = final_agg_response.filter(F.col("has_qualified_answer").isNotNull()).filter("question_sequence_number == '1'")
final_agg_response_bl.groupby("Survey").agg(F.countDistinct("customer_decoration_key")).show()

final_agg_response_bl.repartition(1).write.mode("overwrite").option("header", True).parquet("s3u://fortunax/survey-analysis/adsp_bl_model/feature_selection_bl/")

In [78]:
%spark.pyspark
feature_results = spark.read.parquet("s3://audience-generation-output-prod-na/44d145d6-8a55-4bd5-9e50-a0713e261218/44d145d6-8a55-4bd5-9e50-a0713e261218/3b7e8b8d-1422-4400-aaa7-0fb3f0048884/featurized_data_IPW_final_launch/*")
feature_results.printSchema()
feature_results.show(10, truncate = False)
feature_results.createOrReplaceTempView("feature_results")

In [79]:
%spark.pyspark
projection1.agg(F.count("customer_decoration_key"), F.countDistinct("customer_decoration_key")).show()
Test1.agg(F.count("customer_decoration_key"), F.countDistinct("customer_decoration_key")).show()
Control1.agg(F.count("customer_decoration_key"), F.countDistinct("customer_decoration_key")).show()

In [80]:
%spark.pyspark
Control1s = spark.read.option("header", True).csv("s3://audience-generation-output-prod-na/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/48759272-2e28-46b4-bf27-72b44be43483/control_sample_final_launch/*")
Control1s.printSchema()
Control1s.agg(F.count("customer_decoration_key"), F.countDistinct("customer_decoration_key")).show()

In [81]:
%spark.pyspark
Test1s = spark.read.parquet("s3://audience-generation-output-prod-na/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/c9fbc167-edf3-4c39-9748-b1edc0cdaf92/48759272-2e28-46b4-bf27-72b44be43483/test_sample_final_launch/*")
Test1s.printSchema()
Test1s.agg(F.count("customer_decoration_key"), F.countDistinct("customer_decoration_key")).show()

In [82]:
%spark.pyspark
## dim_campaign
dim_campaign = spark.sql("""
select *
from spektr_campaign.dim_campaigns
where marketplace_id = 1
""")
dim_campaign.printSchema()
dim_campaign.show()

dim_campaign.filter(F.col("campaign_id").isin([579861703287912924,593949548380621494,580036844078265991,590778467115612870,588546538790250873])).show()

In [83]:
%spark.pyspark
dim_campaign.select("program_type").distinct().show(truncate = False)

dim_campaign.select("experiment_id").distinct().show(5, truncate = False)

In [84]:
// onehotencoder
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder,OneHotEncoderEstimator, VectorAssembler }
import org.apache.spark.ml.Pipeline

val df = sqlContext.createDataFrame(Seq(
  (0, "a","d"),
  (1, "b","e"),
  (2, "c","c"),
  (3, null,"c"),         //<- original example has "a" here
  (4, "a",null),
  (5, "c","d")
)).toDF("id", "category1","category2")

val dummy_col_set = Array("categoryIndex1", "categoryIndex2")

val pre_dummy = dummy_col_set
val post_dummy = pre_dummy.map(_ + "_vector")
val assembleCols = post_dummy
    
val indexer1 = new StringIndexer()
  .setHandleInvalid("keep") //default is "error", or "skip", "keep"
  .setInputCol("category1")
  .setOutputCol("categoryIndex1")

val indexer2 = new StringIndexer()
  .setHandleInvalid("keep") //default is "error", or "skip", "keep"
  .setInputCol("category2")
  .setOutputCol("categoryIndex2")

// val indexed = indexer1.transform(df)

// val encoder = new OneHotEncoder()
//   .setInputCol("categoryIndex1")
//   .setOutputCol("categoryIndex1Encoded")
  
val encoder = new OneHotEncoderEstimator()
  .setInputCols(pre_dummy)
  .setOutputCols(post_dummy)

// val encoded = encoder.transform(indexed)

// encoded.show()

val feature_assembler = new VectorAssembler()
      .setInputCols(assembleCols)
      .setOutputCol("featureVector")

val feature_pipe = new Pipeline()
      .setStages(Array(indexer1, indexer2, encoder, feature_assembler))

val FeatureModel = feature_pipe.fit(df)

val FeatureModel2 = feature_pipe.fit(df.na.fill("0"))

val featurized_data =   FeatureModel.transform(df)
featurized_data.show()

val featurized_data_na =   FeatureModel.transform(df).na.fill("0")  
featurized_data_na.show()

val featurized_data_na2 =   FeatureModel2.transform(df)
featurized_data_na2.show()

val featurized_data_na3 =   FeatureModel2.transform(df).na.fill("0")  
featurized_data_na3.show()

In [85]:
%spark.pyspark
# taxonomy
taxonomy = spark.read.parquet("s3://panel-cradle-testing/2023-02-01/*")
taxonomy.printSchema()
taxonomy.show(100,  False)


In [86]:
%spark.pyspark
taxonomy.select("segmentId","taxonomy.localizedCategories.en.path.name","taxonomy.localizedCategories.en.path.id","taxonomy.description","taxonomy.name","taxonomy.id").show(truncate = False)
taxonomy.agg(F.count("segmentId")).show()
taxonomy.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ["taxonomy.description","taxonomy.name","taxonomy.id","segmentId"]]).show(truncate = False)
taxonomy.select(F.col("segmentId").cast("Integer")).agg(F.max("segmentId")).show(1000,truncate = False)
taxonomy.select("taxonomy.localizedCategories.en.path.name").distinct().show(300,truncate = False)
# taxonomy_new = taxonomy.withColumn("cate_name","taxonomy.localizedCategories.en.path.name")
# split_col = pyspark.sql.functions.split(taxonomy_new["cate_name"], ', ')
# taxonomy_new.select(split_col.getItem(1)).alias("name").distinct().show(300,truncate = False)

In [87]:
%spark.pyspark
taxonomy.select(F.col("segmentId").cast("Integer")).agg(F.max("segmentId")).show(1000,truncate = False)

In [88]:
%spark.pyspark


In [89]:
%spark.pyspark
taxonomy.select("taxonomy.localizedCategories.en.path.name","taxonomy.localizedCategories.en.path.id").filter("'taxonomy.localizedCategories.en.path.id' in (17,18,19)").distinct().show(1000,truncate = False)

In [90]:
%spark.pyspark
in_market_seg = spark.read.parquet("s3://customers-segments-data-prod-na/SegmentsData/PanelCustomersSegmentsData/denormalizedData/2023/02/01/*")
in_market_seg.show()
in_market_seg2 = spark.read.parquet("s3://customers-segments-data-prod-na/SegmentsData/PanelCustomersSegmentsData/encodedData/2023/02/01/*")
in_market_seg2.show()
in_market_seg3 = spark.read.parquet("s3://customers-segments-data-prod-na/SegmentsData/PanelCustomersSegmentsData/Bucketed/2023/02/01/*")
in_market_seg3.show()


In [91]:
%spark.pyspark
raking_check = spark.read.parquet("s3://analytics-emr-output-bucket/raking_bucket/10/")
raking_check.show(5, False)

In [92]:
%spark.pyspark
feature_selection = spark.read.parquet("s3://qqian-test/temp_data/feature_selection/")
feature_selection.show(2, False)
feature_selection.groupBy("Survey").agg(F.sum("has_qualified_answer")).show(300, False)

In [93]:
%spark.pyspark
spark.sql("""select distinct transaction_date from spektr_shopperpanel.panel_transactions_v3""").show()

In [94]:
%spark.pyspark
