# Creating cdd_odl_customer table in ce schema 

In [0]:
from pyspark.sql import functions as F #import libraries 
import pandas as pn, functools, operator

### Function to indicate if the account is a test account or valid 

In [0]:
def with_column_test_account(inputdf, column_mapping):
        """check a member if it is test account, and add a column to the input dataframe """

        # ingest vw_test_account table
        vw_test_account_string_table = 'coreprod.clbadm.vw_test_account_string'
        vw_test_account_string_sdf = spark.read.table(vw_test_account_string_table)

        vw_test_account_string_field_list = vw_test_account_string_sdf.select('field').distinct().toPandas()['field'].to_list()
        # Check if all the mapping keys are with EMAIL and FIRST NAME which it would be appear in the csv files
        check_keys_is_valid = all(item in column_mapping.keys() for item in vw_test_account_string_field_list)
        assert check_keys_is_valid, f'column_mapping keys has to contain the field:{vw_test_account_string_field_list} in coreprod.clbadm.vw_test_account_string'
        # Check if all the column values is in the columns
        check_values_is_valid = all(item in inputdf.columns for item in column_mapping.values())
        assert check_values_is_valid, 'column_mapping values has to exists in the input Spark DataFrame'

        # Unpack the Spark DataFrame into List of list (row-oriented)
        vw_test_account_string_field_lists = vw_test_account_string_sdf.select("string", "field").toPandas().values.tolist()

        # Map the field to the PySpark Like Expression
        # Replacing [_] with \r (Different Syntax for T-SQL and Databricks SQL)
        
        rules_field_in_pyspark = [
            F.col(column_mapping[field]).like(rules.replace('[_]', r'\_')) 
            for rules, field in vw_test_account_string_field_lists
            ]
        rules_field_in_pyspark_reduced_or = functools.reduce(operator.or_, rules_field_in_pyspark, F.lit(False))

        # initiate the is_test_account column
        inputdf = inputdf.withColumn('is_test_account', F.lit('0'))
        outputdf = inputdf.withColumn('is_test_account', 
                                            F.when(rules_field_in_pyspark_reduced_or, '1')
                                            .otherwise(F.col('is_test_account')))
        return outputdf
        

### Function to create tnc_accepted_at_sng column from kiosk and mobile

In [0]:
  
def getSNGData(df):
    df_output = df.withColumn("tnc_accepted_at_sng",
        F.when(F.col("tnc_accepted_at_sng_kiosk") < F.col("tnc_accepted_at_sng_mobile"),
                F.col("tnc_accepted_at_sng_kiosk")).otherwise(F.col("tnc_accepted_at_sng_mobile"))
    )

    return (df_output)

### Main 

In [0]:
class SinglProflCustomer:

    def __init__(self):
        print("Ingesting data ...")
        #create df of all data to be joined from source 
        self.dim_cust = (spark.sql("""
            SELECT NULL AS unified_cust_id,
                   bk_crm_customer_id AS crm_id,
                   bk_singl_profl_id AS singl_profl_id,
                   secondary_loginid AS scndry_login_id,
                   account_status,
                   prefix,
                   first_name AS first_nm,
                   initcap(first_name) as contactable_first_nm,
                   last_name AS last_nm,
                   email,
                   is_guest_customer AS guest_ind,
                   registration_date,
                   registration_channel,
                   date_of_birth,
                   NULL AS gdpr_del_ind,
                   suspend_status,
                   suspend_reason,
                   suspend_start_date AS suspend_ts
            FROM coreprod.infomart.dim_customer
            
             """)
        )  # Filter by date when loading history 
#--where registration_date between '2021-10-01' and '2024-11-30'
        self.clean_cust_data = with_column_test_account(self.dim_cust, {
                    "EMAIL":"email",
                    'FIRST_NAME': 'first_nm'
                })
        
        self.cust_agg_attr = spark.sql("""select measure_nm, channel_cd
                            , singl_profl_userid
                            , measure_val
                            , ROW_NUMBER() over (partition by singl_profl_userid, channel_cd order by measure_val desc) rn
                            from coreprod.gb_mb_secured_aggregate_dl_tables.cust_agg_attr where measure_nm = 'LAST_LOGIN_DATE'""")        
        self.cust_agg_attr = self.cust_agg_attr.filter((F.col("rn") == '1') & F.col("channel_cd").isNotNull()).drop("rn")
        
        self.channels = ["GROCERY", "SNGKIOSK", "SNGMOBILE", "GEORGE", "ASDA", "GIFTCARDS", "LOYALTY"]

        self.df_login = {}

        for channel in self.channels:
            self.df_login[channel] = self.cust_agg_attr.filter(
                (F.upper(F.col("channel_cd")) == (channel).upper())) \
                .select(F.col('singl_profl_userid'), #.alias(f'tnc_login_{channel.lower()}_userid'),
                        F.col('measure_val').alias(f'last_login_at_{channel.lower()}'))


        self.logins = (self.df_login.get("GROCERY").join(self.df_login.get("SNGKIOSK"), on='singl_profl_userid', how='leftouter')
                       .join(self.df_login.get("SNGMOBILE"), on='singl_profl_userid', how='leftouter')
                       .join(self.df_login.get("GEORGE"), on='singl_profl_userid', how='leftouter')
                       .join(self.df_login.get("ASDA"), on='singl_profl_userid', how='leftouter')
                       .join(self.df_login.get("GIFTCARDS"), on='singl_profl_userid', how='leftouter')
                       .join(self.df_login.get("LOYALTY"), on='singl_profl_userid', how='leftouter'))
        
        self.logins.write.mode("overwrite").format("delta").saveAsTable("custanwo.ce.lastLoginData")

        self.logins = spark.sql("select * from custanwo.ce.lastLoginData")
        
        self.loyalty_acct = spark.sql("""select distinct wallet_id, singl_profl_id as la_spid from coreprod.eagleeye.loyalty_acct""")
        
        self.df_ctc = spark.sql(""" SELECT distinct bk_singl_profl_id, lastmodifieddate, servicename__c from coreprod.infomart.fact_customer_terms_condition where servicename__c is not null 
                             """)
        
        self.distinct_ids = self.clean_cust_data.filter(F.col("singl_profl_id").isNotNull()).select("singl_profl_id").distinct().withColumnRenamed("singl_profl_id", "bk_singl_profl_id")
        self.distinct_ids.createOrReplaceTempView("distinct_ids")

        service_names = spark.sql("select distinct servicename__c from coreprod.infomart.fact_customer_terms_condition where servicename__c is not null").toPandas().values.tolist()
        service_nameslst = [x[0] for x in service_names]


        self.df_tnc = {}

        for service_name in service_nameslst:
            #print(service_name)
            self.df_tnc[service_name] = self.df_ctc.groupBy("bk_singl_profl_id", "servicename__c").agg(F.min("lastmodifieddate").alias("lastmodifieddate")).filter(F.col("servicename__c") == service_name) \
                .select(F.col('bk_singl_profl_id'),#.alias(f'tnc_{service_name.lower().replace(" ", "_")}_userid'),
                        F.col('lastmodifieddate').alias(f'tnc_accepted_at_{service_name.lower().replace(" ", "_")}'))
            self.df_tnc[service_name].createOrReplaceTempView(service_name.lower().replace(" ", "_"))    
        
        self.df_servicenames = spark.sql("""select distinct_ids.bk_singl_profl_id, groceries.tnc_accepted_at_groceries
                                         from distinct_ids left join (select distinct bk_singl_profl_id, tnc_accepted_at_groceries from groceries)
                                         groceries on distinct_ids.bk_singl_profl_id = groceries.bk_singl_profl_id 
                                         
                                         """)
        print("spids and groceries joined ", self.df_servicenames.count())
        self.df_servicenames.createOrReplaceTempView("servicetable")

        self.df_servicenames = spark.sql("""select distinct servicetable.bk_singl_profl_id
                                         , servicetable.tnc_accepted_at_groceries
                                         , sng_kiosk.tnc_accepted_at_sng_kiosk 
                                                          from servicetable 
                                                          left join (select distinct bk_singl_profl_id, tnc_accepted_at_sng_kiosk from sng_kiosk) sng_kiosk 
                                                          on servicetable.bk_singl_profl_id = sng_kiosk.bk_singl_profl_id 
                                         """)
        print("spids, groceries and kiosk joined ", self.df_servicenames.count())
        self.df_servicenames.createOrReplaceTempView("servicetable")


        self.df_servicenames = spark.sql("""select distinct servicetable.bk_singl_profl_id
                                         ,servicetable.tnc_accepted_at_groceries
                                         ,servicetable.tnc_accepted_at_sng_kiosk
                                         ,sng_mobile.tnc_accepted_at_sng_mobile 
                                         from servicetable left join (select bk_singl_profl_id, tnc_accepted_at_sng_mobile from sng_mobile ) sng_mobile on servicetable.bk_singl_profl_id = sng_mobile.bk_singl_profl_id
                                         """)
        print("spids, groceries, kiosk and mobile joined ", self.df_servicenames.count())
        self.df_servicenames.createOrReplaceTempView("servicetable")

        self.df_servicenames = spark.sql("""select distinct servicetable.bk_singl_profl_id
                                         ,servicetable.tnc_accepted_at_groceries
                                         ,servicetable.tnc_accepted_at_sng_kiosk
                                         ,servicetable.tnc_accepted_at_sng_mobile
                                         ,george.tnc_accepted_at_george 
                                         from servicetable left join (select bk_singl_profl_id, tnc_accepted_at_george from george) george on servicetable.bk_singl_profl_id = george.bk_singl_profl_id
                                         """)
        print("spids, groceries, kiosk, mobile and george joined ", self.df_servicenames.count())
        self.df_servicenames.createOrReplaceTempView("servicetable")

        self.df_servicenames = spark.sql("""select distinct servicetable.bk_singl_profl_id
                                         , servicetable.tnc_accepted_at_groceries
                                         , servicetable.tnc_accepted_at_sng_kiosk
                                         , servicetable.tnc_accepted_at_sng_mobile
                                         , servicetable.tnc_accepted_at_george
                                         , gift_card.tnc_accepted_at_gift_card
                                         from servicetable left join (select distinct bk_singl_profl_id, tnc_accepted_at_gift_card from gift_card) gift_card on servicetable.bk_singl_profl_id = gift_card.bk_singl_profl_id
                                         """)
        print("spids, groceries, kiosk, mobile, george and gift card joined ", self.df_servicenames.count())
        self.df_servicenames.createOrReplaceTempView("servicetable")

        self.df_servicenames = spark.sql("""select distinct servicetable.bk_singl_profl_id
                                         , servicetable.tnc_accepted_at_groceries
                                         , servicetable.tnc_accepted_at_sng_kiosk
                                         , servicetable.tnc_accepted_at_sng_mobile
                                         , servicetable.tnc_accepted_at_george
                                         , servicetable.tnc_accepted_at_gift_card
                                         , asda_rewards.tnc_accepted_at_asda_rewards
                                         from servicetable left join (select distinct bk_singl_profl_id, tnc_accepted_at_asda_rewards from asda_rewards) asda_rewards on servicetable.bk_singl_profl_id = asda_rewards.bk_singl_profl_id
                                         """)
        print("spids, groceries, kiosk, mobile, george, gift card and asda rewards joined ", self.df_servicenames.count())
        self.df_servicenames.createOrReplaceTempView("servicetable")

        self.df_servicenames = spark.sql("""select distinct servicetable.bk_singl_profl_id, servicetable.tnc_accepted_at_groceries, servicetable.tnc_accepted_at_sng_kiosk, servicetable.tnc_accepted_at_sng_mobile, servicetable.tnc_accepted_at_george, servicetable.tnc_accepted_at_gift_card, servicetable.tnc_accepted_at_asda_rewards, credit_card.tnc_accepted_at_credit_card
                                         from servicetable left join (select distinct bk_singl_profl_id, tnc_accepted_at_credit_card from credit_card) credit_card on servicetable.bk_singl_profl_id = credit_card.bk_singl_profl_id
                                         """)
        print("spids, groceries, kiosk, mobile, george, gift card, asda rewards and credit card joined ", self.df_servicenames.count())
        self.df_servicenames.createOrReplaceTempView("servicetable")

        self.df_servicenames = spark.sql("""select distinct servicetable.bk_singl_profl_id, servicetable.tnc_accepted_at_groceries, servicetable.tnc_accepted_at_sng_kiosk, servicetable.tnc_accepted_at_sng_mobile, servicetable.tnc_accepted_at_george, servicetable.tnc_accepted_at_gift_card, servicetable.tnc_accepted_at_asda_rewards, servicetable.tnc_accepted_at_credit_card, asda_mobile.tnc_accepted_at_asda_mobile
                                         from servicetable left join (select distinct bk_singl_profl_id, tnc_accepted_at_asda_mobile from asda_mobile) asda_mobile on servicetable.bk_singl_profl_id = asda_mobile.bk_singl_profl_id
                                         """)
        print("spids, groceries, kiosk, mobile, george, gift card, asda rewards, credit card and asda mobile joined ", self.df_servicenames.count())
        self.df_servicenames.createOrReplaceTempView("servicetable")
                                        
                                         #sng_mobile on distinct_ids.bk_singl_profl_id = sng_mobile.bk_singl_profl_id left join
                                         #george on distinct_ids.bk_singl_profl_id = george.bk_singl_profl_id left join
                                         #gift_card on distinct_ids.bk_singl_profl_id = gift_card.bk_singl_profl_id
                                        # asda_rewards on distinct_ids.bk_singl_profl_id = asda_rewards.bk_singl_profl_id left #join
                                        # credit_card on distinct_ids.bk_singl_profl_id = credit_card.bk_singl_profl_id left join
                                        # asda_mobile on distinct_ids.bk_singl_profl_id = asda_mobile.bk_singl_profl_id                                        
                                         
        self.df_servicenames.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable("custanwo.ce.serviceNameData")

        self.df2_servicenames = spark.sql("select * from custanwo.ce.serviceNameData")
        
    def run(self):
        print("** Execution Start **")
        
        #spids = self.clean_cust_data.select("singl_profl_id").distinct()
        #self.logins = self.logins.join(spids, on=spids.singl_profl_id == self.logins.singl_profl_userid, how='inner').drop("singl_profl_id")
        
        #self.loyalty_acct = self.loyalty_acct.join(spids, on=spids.singl_profl_id == self.loyalty_acct.la_spid, how='inner').drop("singl_profl_id")

        df1 = self.clean_cust_data.join(self.logins, on=self.clean_cust_data.singl_profl_id == self.logins.singl_profl_userid, how='left')

        df1.write.mode("overwrite").format("delta").saveAsTable("custanwo.ce.loginData_joinedCustdata")

        df2 = df1.join(self.df2_servicenames, on=df1.singl_profl_id == self.df2_servicenames.bk_singl_profl_id, how='left').drop("bk_singl_profl_id")
    
        df3 = df2.join(self.loyalty_acct, on=df2.singl_profl_id == self.loyalty_acct.la_spid, how='left')

        df4 = df3.withColumnsRenamed({"tnc_accepted_at_asda_rewards": "tnc_accepted_at_loyalty", "last_login_at_sngkiosk": "last_login_at_sng_kiosk", "last_login_at_sngmobile": "last_login_at_sng_mobile", "last_login_at_giftcards":"last_login_at_gift_card", "is_test_account":"test_account_ind"})

        df5 = getSNGData(df4)

        df = df5.select('unified_cust_id', 'crm_id','singl_profl_id','wallet_id','scndry_login_id'
                       ,'account_status','prefix','first_nm','contactable_first_nm','last_nm','email','guest_ind','registration_date','registration_channel','date_of_birth','gdpr_del_ind','suspend_status'
                        ,'suspend_reason','suspend_ts','tnc_accepted_at_groceries','tnc_accepted_at_sng'
                        ,'tnc_accepted_at_sng_kiosk','tnc_accepted_at_sng_mobile','tnc_accepted_at_george'
                        ,'tnc_accepted_at_gift_card','tnc_accepted_at_loyalty','tnc_accepted_at_credit_card'
                        ,'tnc_accepted_at_asda_mobile','last_login_at_grocery','last_login_at_sng_kiosk'
                       ,'last_login_at_sng_mobile','last_login_at_george','last_login_at_asda','last_login_at_gift_card','last_login_at_loyalty','test_account_ind').distinct()

        df = df.withColumn("unified_cust_id", F.lit(None).cast("string")).withColumn("gdpr_del_ind", F.lit(None).cast("string"))
        df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("custanwo.ce.cdd_odl_customer")
        
        print("** Execution End **")


### Run

In [0]:
aa = SinglProflCustomer()
aa.run()