In [0]:
"""
1. Campaign Response Data (Last 6 months) from Wunderman SQL Database
2. Campaign Response Data (Last 13 - 6 months) from OHUB Database
"""

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import *
import datetime
import time
from datetime import date, timedelta
from datetime import date
from pyspark.sql.window import Window
import pysftp
import os

In [0]:
min_date_ohub                 = '2017-12-31 23:59:59' # Jan 2018
max_date_ohub                 = '2019-01-01 00:00:01'
min_date_acm                  = '2018-12-31 23:59:59'

is_countrySpecific            = 0
is_onlynewsletters            = 0
country_code                  = {"DK"}

current_date                  = datetime.datetime.today()
export_timestamp              = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
file_generated_timestamp      = datetime.datetime.now().strftime('%Y%m%d%H%M')

In [0]:
############################# Input Tables #############################
acm_input_table_databricks    = "data_acm.acmdata_databrickscopy_all_countries"
#Prep   - /Users/menaka.viswanathan@unilever.com/Pipelines/ACM/ProcessACM_Databricks/ProcessCampaignData_From_Databricks_ACM_Copy_For_SuppressionModel

acm_input_table_ohub          = "data_acm.ohub2_campaign_response_data_processed" 
# Prep - /Users/menaka.viswanathan@unilever.com/Pipelines/ACM/ProcessACM_OHUB2/Process_ACM_OHUB2

marketo_input_table_databricks = "data_acm.marketo_campaign_response_data_processed"

cp_consent_input_table_ohub   =  "data_user_menaka.perfect_golden_record_cp_consent" 
# Prep /Users/menaka.viswanathan@unilever.com/Projects/Consent/Consent_of_ContactPersons
##########################################################################

In [0]:
destination_table_ufs                       = "data_suppression.perfect_golden_ufs_recipientScores_" +  str(file_generated_timestamp)
destination_table_wunderman                 = "data_suppression.perfect_golden_wunderman_recipientScores_" + str(file_generated_timestamp) 
ufs_suppression_scores_overview_table       = "data_suppression_scores_overview.golden_ufs_scores_overview_" + str(file_generated_timestamp)
wunderman_suppression_scores_overview_table = "data_suppression_scores_overview.golden_wunderman_scores_overview_" + str(file_generated_timestamp)
print ('UFS Scores Databricks table : ', destination_table_ufs)
print ('ACM Scores Databricks table : ', destination_table_wunderman)

In [0]:
dbfs_mount_path               = "dbfs:/mnt/menaka"
csv_folderpath                = "%s/suppression_model/production/" % dbfs_mount_path

ufs_export_filename           = "PerfectGolden_UFS_RecipientScores_" +  str(file_generated_timestamp) 
wunderman_export_filename     = "PerfectGolden_Wunderman_RecipientScores_" + str(file_generated_timestamp) 

print ('DBFS File path : ' , csv_folderpath)
print(ufs_export_filename)
print(wunderman_export_filename)

In [0]:
cp                              = spark.table(cp_consent_input_table_ohub)
if is_countrySpecific:
  cp                            = cp.where(col("cp_countryCode").isin(country_code))
  
cp_reachable_and_valid          = cp.where( ( col("reachableEmail") == 'Y') & (col("cp_isEmailAddressValid") == 'true'))

In [0]:
marketo_db                     = spark.table(marketo_input_table_databricks)

if is_onlynewsletters:
  marketo_db = marketo_db.filter(col("CampaignType") == 1) # Only Newsletters

if is_countrySpecific:
  marketo_db = marketo_db.where(col("countryCode").isin(country_code))
  
marketo_db = marketo_db.select(col("CountryCode").alias("countryCode"), "broadlogId","contactPersonOhubId", "waveName", "sendDate", "NoofOpens", "NoofClicks", "Min_OpenDate", "Max_OpenDate", "Min_ClickDate", "Max_ClickDate","data_source","isOpenorClick","isNewsletter")    

In [0]:
wunderman_db = spark.table(acm_input_table_databricks)

if is_onlynewsletters:
  wunderman_db = wunderman_db.filter(col("CampaignType") == 1) # Only Newsletters

if is_countrySpecific:
  wunderman_db = wunderman_db.where(col("countryCode").isin(country_code))
  
#wunderman_db = wunderman_db.filter( (col("sendDate") > min_date_acm ) & (col("sendDate") <  max_date_acm))
wunderman_db = wunderman_db.filter( (col("sendDate") > min_date_acm ) ) # Changed on 18 Nov 2019
wunderman_db = wunderman_db.withColumn("data_source", lit("wunderman_db"))
wunderman_db = wunderman_db.withColumn("isOpenorClick", when( (col("NoofOpens") > 0) | (col("NoofClicks") > 0), 1).otherwise(0))
wunderman_db = wunderman_db.withColumn("isNewsletter", when( col("CampaignType") == 1, 1).otherwise(0))


wunderman_db = wunderman_db.select(col("CountryCode").alias("countryCode"), "broadlogId",col("UniqueContactIdentifier").alias("contactPersonOhubId"), "waveName", "sendDate", "NoofOpens", "NoofClicks", "Min_OpenDate", "Max_OpenDate", "Min_ClickDate", "Max_ClickDate","data_source","isOpenorClick","isNewsletter")    



# display(wunderman_db.groupby("CountryCode").agg(min("sendDate").alias("Min_SendDate"),
#                                                max("sendDate").alias("Max_SendDate"), 
#                                                countDistinct(col("contactPersonOhubId")).alias("NoofRecipients"),
#                                                countDistinct(when(col("isNewsletter") == 1, col("contactPersonOhubId"))).alias("NoofNewsletter_Recipients")))

In [0]:
ohub_db                         = spark.table(acm_input_table_ohub)

if is_onlynewsletters:
  ohub_db                         = ohub_db.filter(col("isNewsletter") == 1) # Only newsletters
  
if is_countrySpecific:
  ohub_db                       = ohub_db.where(col("countryCode").isin(country_code))
  
ohub_db                         = ohub_db.withColumn("sendDate", from_unixtime(unix_timestamp(col("sendDate"), 'dd-MM-yyyy HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'))
ohub_db                         = ohub_db.filter((col("sendDate") > min_date_ohub) & (col("sendDate") < max_date_ohub)) # May to Dec 2018
ohub_db                         = ohub_db.withColumn("data_source", lit("ohub"))
ohub_db                         = ohub_db.withColumnRenamed("deliveryLogId","broadLogId")
ohub_db                         = ohub_db.withColumn("isOpenorClick", when( (col("NoofOpens") > 0) | (col("NoofClicks") > 0), 1).otherwise(0))

ohub_db                         = ohub_db.select("countryCode", "broadLogId", "contactPersonOhubId", "waveName", "sendDate", "NoofOpens", "NoofClicks", "Min_OpenDate", "Max_OpenDate", "Min_ClickDate", "Max_ClickDate","data_source","isOpenorClick","isNewsletter")


# display(ohub_db.groupby("CountryCode").agg(min("sendDate").alias("Min_SendDate"),
#                                                max("sendDate").alias("Max_SendDate"), 
#                                                countDistinct(col("contactPersonOhubId")).alias("NoofRecipients"),
#                                                countDistinct(when(col("isNewsletter") == 1, col("contactPersonOhubId"))).alias("NoofNewsletter_Recipients")))

In [0]:
combined_data                      = marketo_db.union(wunderman_db).union(ohub_db)
combined_data                      = combined_data.withColumn("monthDiff", months_between(lit(current_date), col("sendDate")))


# display(combined_data.groupby("CountryCode").agg(min("sendDate").alias("Min_SendDate"),
#                                                max("sendDate").alias("Max_SendDate"), 
#                                                countDistinct(col("contactPersonOhubId")).alias("NoofRecipients"),
#                                                countDistinct(when(col("isNewsletter") == 1, col("contactPersonOhubId"))).alias("NoofNewsletter_Recipients")))


In [0]:
rcp_campaign_data_insights     = combined_data.groupby("countryCode", "contactPersonOhubId").agg(
countDistinct(col("broadlogId")).alias("Total_NoofNewsLettersReceived"),
  
countDistinct(when( (col("monthDiff") <= 3), col("broadlogId"))).alias("Sends_Less_3_Months"),
countDistinct(when( (col("monthDiff") <= 3)   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Less_3_Months"),
  
countDistinct(when( (col("monthDiff") <= 6), col("broadlogId"))).alias("Sends_Less_6_Months"),
countDistinct(when( (col("monthDiff") <= 6)   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Less_6_Months"),
  
  
countDistinct(when( (col("monthDiff") <= 9), col("broadlogId"))).alias("Sends_Less_9_Months"),
countDistinct(when( (col("monthDiff") <= 9)   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Less_9_Months"),
  
countDistinct(when( (col("monthDiff") <= 11), col("broadlogId"))).alias("Sends_Less_11_Months"),
countDistinct(when( (col("monthDiff") <= 11)   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Less_11_Months"),
   
countDistinct(when( (col("monthDiff") <= 12), col("broadlogId"))).alias("Sends_Less_12_Months"),
countDistinct(when( (col("monthDiff") <= 12)   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Less_12_Months"),
  
countDistinct(when( (col("monthDiff") > 12), col("broadlogId"))).alias("Sends_Greaterthan_12_Months"),
countDistinct(when( (col("monthDiff") > 12)   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Greater_12_Months"),
  
countDistinct(when( (col("monthDiff") > 3) & (col("monthDiff") <= 6), col("broadlogId"))).alias("Sends_Between_3_and_6_Months"),
countDistinct(when( ((col("monthDiff") > 3) & (col("monthDiff") <= 6))   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Between_3and_6_Months"),

countDistinct(when( (col("monthDiff") > 6) & (col("monthDiff") <= 9), col("broadlogId"))).alias("Sends_Between_6_and_9_Months"),
countDistinct(when( ((col("monthDiff") > 6) & (col("monthDiff") <= 9))   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Between_6_and_9_Months"),

  
countDistinct(when(  (col("monthDiff") <= 18), col("broadlogId"))).alias("Sends_Less_18_Months"),
countDistinct(when(  (col("monthDiff") <= 18)  & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Less_18_Months"),
 

countDistinct(when( (col("monthDiff") <= 3), col("broadlogId"))).alias("Sends_Month_0_3"),
countDistinct(when( (col("monthDiff") <= 3)   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Month_0_3"),
  
countDistinct(when( (col("monthDiff") > 3) & (col("monthDiff") <= 6), col("broadlogId"))).alias("Sends_Month_3_6"),
countDistinct(when( ((col("monthDiff") > 3) & (col("monthDiff") <= 6))   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Month_3_6"),

countDistinct(when( (col("monthDiff") > 6) & (col("monthDiff") <= 9), col("broadlogId"))).alias("Sends_Month_6_9"),
countDistinct(when( ((col("monthDiff") > 6) & (col("monthDiff") <= 9))   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Month_6_9"),

countDistinct(when( (col("monthDiff") > 9) & (col("monthDiff") <= 12), col("broadlogId"))).alias("Sends_Month_9_12"),
countDistinct(when( ((col("monthDiff") > 9) & (col("monthDiff") <= 12))   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Month_9_12"),  

countDistinct(when( (col("monthDiff") > 12) & (col("monthDiff") <= 15), col("broadlogId"))).alias("Sends_Month_12_15"),
countDistinct(when( ((col("monthDiff") > 12) & (col("monthDiff") <= 15))   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Month_12_15"),  

countDistinct(when( (col("monthDiff") > 15) & (col("monthDiff") <= 18), col("broadlogId"))).alias("Sends_Month_15_18"),
countDistinct(when( ((col("monthDiff") > 15) & (col("monthDiff") <= 18))   & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Month_15_18"),
  
# countDistinct(when( (col("monthDiff") > 6) & (col("monthDiff") <= 18), col("broadlogId"))).alias("Sends_Less_18_Months"),
# countDistinct(when( (col("monthDiff") > 6) & (col("monthDiff") <= 18)  & (col("isOpenorClick") > 0),   col("broadlogId"))).alias("Opens_Less_18_Months"),
  
max("sendDate").alias("LastSendOutDate"),
max("Max_OpenDate").alias("LastEngagedDate"),
max("Max_ClickDate").alias("LastClickDate") ,
  
min(col("monthDiff")).alias("Min_MonthsDiff"),
max(col("monthDiff")).alias("Max_MonthsDiff")
)

rcp_campaign_data_insights = rcp_campaign_data_insights.withColumn("LastSendOutDate",
                             from_unixtime(unix_timestamp(rcp_campaign_data_insights.LastSendOutDate), "yyyy-MM-dd HH:mm:ss"))

rcp_campaign_data_insights = rcp_campaign_data_insights.withColumn("LastEngagedDate",
                             from_unixtime(unix_timestamp(rcp_campaign_data_insights.LastEngagedDate), "yyyy-MM-dd HH:mm:ss"))

rcp_campaign_data_insights = rcp_campaign_data_insights.withColumn("LastClickDate",
                             from_unixtime(unix_timestamp(rcp_campaign_data_insights.LastClickDate), "yyyy-MM-dd HH:mm:ss"))

In [0]:
rcp_campaign_data_insights = rcp_campaign_data_insights.withColumn("Score",
when(((col("Sends_Less_3_Months") >= 2) & (col("Opens_Less_3_Months") >= 1)) , 1) # Received atleast 2 newsletters in last 3 months and opened 1 of them
.when(((col("Sends_Less_6_Months") >= 4) & (col("Opens_Less_6_Months") >= 1)), 2) # Received atleast 4 newsletters in last 6 months and opened atleast 1
.when( (col("Sends_Less_6_Months") >= 4) & (col("Opens_Less_6_Months") < 1), 3)   # Received atleast 4 newsletters in last 6 months and no opens  
.when( (col("Sends_Less_9_Months") >= 5) & (col("Opens_Less_9_Months") < 1), 4)   # Received atleast 5 newsletters in last 9 months and no opens
.when( (col("Sends_Less_11_Months") >= 6) & (col("Opens_Less_11_Months") < 1), 5) # Received atleast 6 newsletters in last 11 months and no opens
                                                                   
#.when( (col("Sends_Less_6_Months") > 0 ) & (col("Sends_Less_12_Months") > 5 ) & (col("Sends_Less_18_Months") >= 6) & (col("Opens_Less_12_Months") < 1), 6) # Received atleast 6 newsletters in last 12 months and no opens # Changed
.when( (col("Sends_Less_6_Months") > 0 )  & (col("Sends_Less_18_Months") >= 6) & (col("Opens_Less_18_Months") < 1), 6) # Received atleast 6 newsletters in last 12 months and no opens # Changed
.when(( (col("Sends_Less_3_Months") == 1) & (col("Opens_Less_3_Months") >= 0) ) 
      | ((col("Sends_Less_6_Months") > 0) & (col("Sends_Less_6_Months") < 4) & (col("Opens_Less_6_Months") >= 0))
      ,7)
.when( (col("Sends_Less_9_Months") >= 5)  & (col("Opens_Less_9_Months") > 0), 7) # Received atleast 5 newsletters in last 9 months and opened atleast 1 
.when( (col("Sends_Less_11_Months") >= 6)  & (col("Opens_Less_11_Months") > 0), 7) # Received atleast 6 newsletters in last 11 months and opened atleast 1 
.when( (col("Sends_Less_12_Months") >= 6)  & (col("Opens_Less_12_Months") > 0), 7) # Received atleast 6 newsletters in last 12 months and opened atleast 1
 
                                                                  
.when( (col("Sends_Less_6_Months") > 0 ) & (col("Sends_Less_9_Months") < 5)  & (col("Opens_Less_9_Months") == 0), 10) # Received less than 5 newsletters in last 9 months and no opens
.when( (col("Sends_Less_6_Months") > 0 ) & (col("Sends_Less_9_Months") < 5)  & (col("Opens_Less_9_Months") > 0), 10) # Received less than 5 newsletters in last 9 months and opened atleast 1 
.when( (col("Sends_Less_6_Months") > 0 ) & (col("Sends_Less_11_Months") < 6)  & (col("Opens_Less_11_Months") == 0), 10) # Received less than 6 newsletters in last 11 months and no opens
.when( (col("Sends_Less_6_Months") > 0 ) & (col("Sends_Less_11_Months") < 6)  & (col("Opens_Less_11_Months") > 0), 10) # Received less than 6 newsletters in last 11 months and opened atleast 1
.when( (col("Sends_Less_6_Months") > 0 ) & (col("Sends_Less_12_Months") < 6)  & (col("Opens_Less_12_Months") == 0), 10) # Received less than 6 newsletters in last 12 months and no opens
.when( (col("Sends_Less_6_Months") > 0 ) & (col("Sends_Less_12_Months") < 6)  & (col("Opens_Less_12_Months") > 0), 10) # Received less than 6 newsletters in last 12 months and opened atleast 1  
                                                                   
.when( (col("Sends_Less_6_Months") == 0 ), 0)    # Received no newsletters in last 6 months #                                                                    
  
.when(col("Sends_Less_12_Months") < 1, -2)                                                                  
.otherwise('Unknown') )   

In [0]:
window                      = Window.partitionBy('contactPersonOhubId').orderBy('Min_MonthsDiff')
rcp_campaign_data_insights  = rcp_campaign_data_insights.withColumn("rank",rank().over(window))
rcp_campaign_data_insights  = rcp_campaign_data_insights.where(col("rank") == 1)

# temp_df = rcp_campaign_data_insights.groupby("contactPersonOhubId").agg(countDistinct(col("Score")).alias("NoofScores"))
# display(temp_df.where(col("NoofScores") > 1).sort("contactPersonOhubId"))

In [0]:
contactpersons          = cp_reachable_and_valid

if is_countrySpecific:
  contactpersons        = contactpersons.where(col("cp_countryCode").isin(country_code))
  
ufs_suppression_scores_final = contactpersons.join(rcp_campaign_data_insights, contactpersons.cp_ohubId == rcp_campaign_data_insights.contactPersonOhubId, how = "left")


# @unilever and @wunderman domains - Exception
ufs_suppression_scores_final = ufs_suppression_scores_final.withColumn("Score", when( (lower(col("cp_emailAddress")).like('%@unilever.com%')) | (lower(col("cp_emailAddress")).like('%@wunderman.com%')), -3).otherwise(col("Score")))

# # Changed on 11 Dec, 2019 (Profiles that recently subscribed to the newsletters)
ufs_suppression_scores_final = ufs_suppression_scores_final.withColumn("Total_NoofNewsLettersReceived", when(col("Total_NoofNewsLettersReceived").isNull(),0).otherwise(col("Total_NoofNewsLettersReceived")))  # Changed on 11 Dec, 2019 (Profiles that recently subscribed to the newsletters)

ufs_suppression_scores_final = ufs_suppression_scores_final.withColumn("Score", when ( (months_between(lit(current_date), col("emailOptInDate")) < 6 ) & (col("Total_NoofNewsLettersReceived") == 0) , 1).otherwise(col("Score"))) 
# Changed on 11 Dec, 2019 (Profiles that recently subscribed to the newsletters)


# Score -1
ufs_suppression_scores_final = ufs_suppression_scores_final.withColumn("Score", when(col("Score").isNotNull(), col("Score")).otherwise(-1).cast("int"))

# Score 0 - Changed on March 11, 2020
ufs_suppression_scores_final = ufs_suppression_scores_final.withColumn("Score", 
           when( (col("Score") == 0)  & (col("Sends_Less_18_Months") > 4) & (col("Opens_Less_18_Months") == 0), 6)
          .when( (col("Score") == 0)  & (col("Sends_Less_18_Months") < 5) , 3).otherwise(col("Score")))


In [0]:
# Score defintions and actions
ufs_suppression_scores_final = ufs_suppression_scores_final.withColumn("Score_Definition", 
when(col("Score")  == 1,  'Received atleast 2 newsletters in last 3 months and Opened atleast 1')
.when(col("Score") == 2, 'Received atleast 4 newsletters in last 6 months and Opened atleast 1')                                                                    
.when(col("Score") == 3, 'Received atleast 4 newsletters in last 6 months and no opens')      
.when(col("Score") == 4, 'Received atleast 5 newsletters in last 9 months and no opens' )
.when(col("Score") == 5, 'Received atleast 6 newsletters in last 11 months and no opens')
.when(col("Score") == 6, 'Received atleast 6 newsletters in last 12 months and no opens')
.when(col("Score") == 7, 'Exceptions')
.when(col("Score") == 0, 'Did not receive any newsletters in last 6 months')
.when(col("Score") == 10, 'Exceptions_6_12_Months')
.when(col("Score") == -2, 'No Email Activity in last 12 months')
.when(col("Score") == -1, 'No Email Activity in last 18 months (No campaign data)')
.when(col("Score") == -3, 'Unilever & Wunderman Domain Exceptions')                                                                       
)


ufs_suppression_scores_final = ufs_suppression_scores_final.withColumn("Email_Action", 
when((col("Score") == 1) | (col("Score") == 2), 'Keep_Sending_Emails')
.when(col("Score") == 3, 'Start_ReEngagement_Campaign')
.when(col("Score") == 4, 'Start_ReActivation_Campaign')
.when(col("Score") == 5, 'LastOff_ReEngagement_Campaign')
.when(col("Score") == 6, 'Suppress_Operator')
.when(col("Score") == 7, 'Keep_Sending_Emails')   
.when(col("Score") == 0, 'Keep_Sending_Emails') 
.when(col("Score") == -1, 'Manual_Sendout')
.when(col("Score") == 10, 'Keep_Sending_Emails')
.when(col("Score") == -3, 'Keep_Sending_Emails')
)



# UFS Scores Overview
ufs_suppression_scores_overview =  ufs_suppression_scores_final.groupby("cp_countryCode", "Score","Score_Definition","Email_Action").agg(

        countDistinct(when( (col("reachableEmail") == 'Y') & (col("cp_isEmailAddressValid") == 'true'),col("cp_ohubId"))).alias("NoofReachableandValidContactPersons"),
        countDistinct(when( (col("reachableEmail") == 'Y') & (col("cp_isEmailAddressValid") == 'true'),col("cp_emailAddress"))).alias("NoofReachableandValid_EmailAddresses")
).sort("cp_countryCode","Score")

In [0]:
ufs_suppression_scores_final = ufs_suppression_scores_final.where( (col("reachableEmail") == 'Y') & (col("cp_isEmailAddressValid") == 'true'))
ufs_suppression_scores_final.write.mode("overwrite").saveAsTable(destination_table_ufs)

In [0]:
# wunderman_suppression_scores_final = ufs_suppression_scores_final.withColumn("Score", when(col("Score").isin(1,2,7,10,-3), 1) #Keep sending all emails
#                                                               .when(col("Score") == 6,3) # Suppress operator 
#                                                               .when( col("Score").isin(3,-1,0), 6) # Re-engagement campaigns  
#                                                               .when(col("Score").isin(4), 7) # Re-engagement campaigns
#                                                               .when(col("Score").isin(5), 8) # Re-engagement campaigns
                                                             
#                                                               .otherwise(col("Score")))

# wunderman_suppression_scores_final = wunderman_suppression_scores_final.withColumn("Email_Action", 
#  when(col("Score") == 1, 'Keep_Sending_Emails')
# .when(col("Score") == 3, 'Suppress_Operator')      
# .when(col("Score") == 6, 'Start_Reactivation _Campaign_1')         
# .when(col("Score") == 7, 'Start_Reactivation _Campaign_2') 
# .when(col("Score") == 8, 'Start_Last_Off_Reactivation')                                                                                   
# )

# # Overview
# wunderman_suppression_scores_overview =  wunderman_suppression_scores_final.groupby("cp_countryCode", "Score","Email_Action").agg(

#         countDistinct(when( (col("reachableEmail") == 'Y') & (col("cp_isEmailAddressValid") == 'true'),col("cp_ohubId"))).alias("NoofReachableandValidContactPersons"),
#         countDistinct(when( (col("reachableEmail") == 'Y') & (col("cp_isEmailAddressValid") == 'true'),col("cp_emailAddress"))).alias("NoofReachableandValid_EmailAddresses")
# ).sort("cp_countryCode","Score")

In [0]:
#ufs_suppression_scores_overview.write.mode("overwrite").saveAsTable(ufs_suppression_scores_overview_table)
#wunderman_suppression_scores_overview.write.mode("overwrite").saveAsTable(wunderman_suppression_scores_overview_table)

In [0]:
#ufs_suppression_scores_final = ufs_suppression_scores_final.where( (col("reachableEmail") == 'Y') & (col("cp_isEmailAddressValid") == 'true'))
#ufs_suppression_scores_final.write.mode("overwrite").saveAsTable(destination_table_ufs)

# wunderman_suppression_scores_final = wunderman_suppression_scores_final.where( (col("reachableEmail") == 'Y') & (col("cp_isEmailAddressValid") == 'true'))
# wunderman_suppression_scores_final.write.mode("overwrite").saveAsTable(destination_table_wunderman)

In [0]:
#%run "/Users/menaka.viswanathan@unilever.com/Pipelines/Library_Functions/S01_Libraries_Functions"

In [0]:
# wunderman_data_format = wunderman_suppression_scores_final.withColumn("ScoreType", lit("EMAIL_SUPPRESSION"))
# wunderman_data_format = wunderman_data_format.withColumn("Calculationdate", lit(export_timestamp))

# wunderman_data_format = wunderman_data_format.select(col("cp_ohubId").alias("RecipientPartyID"), "ScoreType","Calculationdate","Score").distinct()

# wunderman_data_format.repartition(1).write.option("sep",",").csv(csv_folderpath + wunderman_export_filename +".csv" , mode="overwrite" , header=True)
# copyMerge(csv_folderpath + wunderman_export_filename +".csv", csv_folderpath + wunderman_export_filename + "_merged.csv", debug=True, overwrite=True, deleteSource=True)


# ufs_data_format      = ufs_suppression_scores_final.withColumn("ScoreType", lit("EMAIL_SUPPRESSION"))
# ufs_data_format      = ufs_data_format.withColumn("Calculationdate", lit(export_timestamp))

# ufs_data_format      = ufs_data_format.select(col("cp_ohubId").alias("RecipientPartyID"), "ScoreType","Calculationdate","Score").distinct()

# ufs_data_format.repartition(1).write.option("sep",",").csv(csv_folderpath + ufs_export_filename +".csv" , mode="overwrite" , header=True)
# copyMerge(csv_folderpath + ufs_export_filename +".csv", csv_folderpath + ufs_export_filename + "_merged.csv", debug=True, overwrite=True, deleteSource=True)

In [0]:
# host                     = 'unilever-sftp.neolane.net'
# port                     = 22
# user                     = dbutils.secrets.get(scope = 'unileversftp', key = 'username')
# password                 = dbutils.secrets.get(scope = 'unileversftp', key = 'password')



# #local                    = '/dbfs:/mnt/menaka/suppression_model/production/' 
# local_filename           = '/dbfs/mnt/menaka/suppression_model/production/' + str(wunderman_export_filename) +"_merged.csv"
# remote                   = '/incoming/import/RecipientScores_' + str(file_generated_timestamp) + ".csv" 
# cnopts                   = pysftp.CnOpts()
# cnopts.hostkeys          = None

# print(local_filename)
# print(remote)

In [0]:
#time.sleep(5)

In [0]:
# while True:
#   try:
#     with pysftp.Connection(host=host, username=user, password=password, cnopts=cnopts) as sftp:
#       print ("Connection succesfully stablished ... ")
#       sftp.put(local_filename,remote)  
#       sftp.close()
#       print ("File Uploaded succesfully")
#       break     
#   except Exception as e:
#     print('Upload Exception: ' + str(e))