#####Create a Unique base data set from crm account based on cust_skey for joining to Horizon. This field is only available in Horizon therefore other integration options were not considered. 

#### Using Catalog Notebook

In [0]:
%run ../General/NB_Configuration

In [0]:
spark.sql(f"""use catalog {catalog}""")

### Date of Death

In [0]:
df_death = spark.sql('''
                select 
                Cust_skey, 
                to_timestamp(cast(RPDOD as string), 'yyyyMMdd') as DateofDeath
                from bronze.ods_rmpdem 
                where currentrecord = 'Yes'
                and RPDOD > 0
''')
df_death.createOrReplaceTempView("vw_death")


In [0]:
%sql
select cust_skey, count(*) from vw_death
group by 1
having count(*) > 1


### BusinessEmail

In [0]:
df_aotm_email = spark.sql('''
                        select 
                        Company_ID, 
                        CASE 
                            WHEN TRIM(Primary_Contact_Email) NOT REGEXP r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' 
                            THEN NULL 
                            ELSE TRIM(Primary_Contact_Email) 
                        END as BeB_Email  
                        from bronze.v_ods_beb_customer
                        where CurrentRecord = 'Yes'
                          ''')
df_aotm_email.createOrReplaceTempView("vw_aotm_email")

In [0]:
df_horizon_business_email = spark.sql('''
select cust_skey, email as Hzn_email
from 
(SELECT 
      CASE 
        WHEN TRIM(rminet.riiadr) NOT REGEXP r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' 
        THEN NULL 
        ELSE TRIM(rminet.riiadr) 
      END AS Email,
      mast.cust_skey,
      ROW_NUMBER() OVER (PARTITION BY mast.cust_skey ORDER BY rimndt DESC) AS rownum
    FROM bronze.ods_rminet rminet
    INNER JOIN (
      SELECT Cust_Skey 
      FROM bronze.ods_rmmast 
      WHERE RMCTYP = 'N' 
        AND CurrentRecord = 'Yes' 
    ) mast ON rminet.CUST_KEY = mast.Cust_Skey
    where rminet.CurrentRecord = 'Yes'
    AND rminet.riityp = 'EML'
    )tab 

    where tab.rownum = 1
''')
df_horizon_business_email.createOrReplaceTempView("vw_horizon_business_email")

In [0]:
df_beb_hzn_business_email =spark.sql('''
WITH RankedEmails AS (
    SELECT
        a.AccountNumber,  
        a.Cust_Skey,
        a.SF_Cust_Key,
        a.Company_ID,
        b.BeB_Email,
        c.Hzn_email,
        ROW_NUMBER() OVER (PARTITION BY a.SF_Cust_Key ORDER BY CASE WHEN b.BeB_Email IS NOT NULL THEN 1 ELSE 2 END) as rn
    FROM
        (select AccountNumber,  Cust_Skey, SF_Cust_Key , Company_ID
                           from silver.customer_master
                            where CurrentRecord = 'Yes' )a
    LEFT JOIN
        vw_aotm_email b ON a.Company_ID = b.Company_ID
    LEFT JOIN
        vw_horizon_business_email c ON a.Cust_Skey = c.Cust_Skey
)
SELECT
    AccountNumber,
    Cust_Skey,
    SF_Cust_Key,
    Company_ID,
    CASE
        WHEN BeB_Email IS NOT NULL THEN BeB_Email
        ELSE Hzn_email
    END AS BusinessEmail
FROM
    RankedEmails
WHERE rn = 1
''')
df_beb_hzn_business_email.createOrReplaceTempView("vw_beb_hzn_business_email")

In [0]:
%sql
select sf_cust_key, count(*)  from vw_beb_hzn_business_email
group by 1 
having count(*) > 1


###Employee Flag 

In [0]:
from pyspark.sql.functions import regexp_replace, regexp_extract

df = spark.read.csv("/Volumes/ab_uat_catalog/bronze/unstructured/Employee_Flag/employee_flag.csv", header=True, inferSchema=True).withColumnRenamed('TAX ID', 'TaxId')

# Refine TaxId to contain only numbers and valid SSNs
df = df.withColumn('TaxId', regexp_extract('TaxId', r'(\d{3}-\d{2}-\d{4}|\d{9})', 0))

# Remove the '-' from the TaxId
df = df.withColumn('TaxId', regexp_replace('TaxId', '-', ''))
df.createOrReplaceTempView("vw_employee_flag")

###Create Left Join to CRM.Account 

In [0]:
df_final_FA = spark.sql(
    """ select * from (
          select a.sf_cust_key,
          b.DateofDeath
     ,c.BusinessEmail
     ,CASE WHEN empl.TaxId IS NOT NULL THEN True ELSE False END AS IsEmployee
     ,row_number() over(partition by a.sf_cust_key order by a.ParentID desc) as rownum
from 
 silver.customer_master a
left join vw_death b on a.cust_skey = b.cust_skey 
left join vw_beb_hzn_business_email c on a.sf_cust_key = c.SF_Cust_Key
left join vw_employee_flag empl on a.DecryptedTaxIDNumber = empl.TaxId

Where a.CurrentRecord = 'Yes' and a.Cust_Skey !='' and a.Cust_Skey !=' ' and a.Cust_Skey is not null
)
where rownum = 1
"""
).drop("a.DateofDeath","rownum")

df_final_FA.createOrReplaceTempView("vw_final_FAvw_final_FA")

In [0]:
%sql
select * from vw_final_FAvw_final_FA

In [0]:
%sql
select SF_Cust_key,count(*) from vw_final_FAvw_final_FA
group by 1 
having count(*) > 1


## Checks And Write Table

###Dynamic Merge

In [0]:

DestinationSchema = dbutils.widgets.get('DestinationSchema')
DestinationTable = dbutils.widgets.get('DestinationTable')
AddOnType = dbutils.widgets.get('AddOnType')

print(DestinationSchema, DestinationTable, AddOnType)

In [0]:
base_column = spark.read.table(f"{DestinationSchema}.{DestinationTable}").columns  # get all the base columns
set_addon = df_final_FA.columns  # get only the addon columns
get_pk = spark.sql(f"""select * from config.metadata where lower(DWHTableName)='{DestinationTable}' and lower(DWHSchemaName) ='{DestinationSchema}' """).collect()[0]['MergeKey']
get_pk_temp = get_pk.split(',')  # split the get_pk
for pk in get_pk_temp:
    set_addon.remove(pk.lower().strip())  # remove pk from the addon
excluded_columns = ['Start_Date', 'End_Date', 'DW_Created_By', 'DW_Created_Date', 'DW_Modified_By', 'DW_Modified_Date', 'MergeHashKey', 'CurrentRecord'] + set_addon
filtered_basetable_columns = [col for col in base_column if col.lower() not in [ex_col.lower() for ex_col in excluded_columns]]

In [0]:
#get required columns from base table
df_base_required = spark.sql(f"select {','.join(filtered_basetable_columns)} from {DestinationSchema}.{DestinationTable} where currentrecord='Yes' ")
df_base_required.createOrReplaceTempView("vw_base")  #use this as a base table

if AddOnType == 'AddOn':
  if df_base_required.count() > 0:
      join_conditions = " and ".join([f"vw_base.{col.strip()} = vw_final_FAvw_final_FA.{col.strip()}" for col in get_pk.split(',')])

      df_final_base_with_addon = spark.sql(
          f"""
          select
              vw_base.*,
              {','.join([f'vw_final_FAvw_final_FA.{col} as {col}' for col in set_addon])}
          from 
              vw_base 
          left join 
              vw_final_FAvw_final_FA 
          on 
              {join_conditions}
      """)
      df_final_base_with_addon.createOrReplaceTempView("vw_final_base_with_addon")
      print(df_final_base_with_addon.count())
  else:
    df_final_FA.createOrReplaceTempView("vw_final_base_with_addon")
    count = df_final_FA.count()
    display(count)
else:
    df_final_FA.createOrReplaceTempView("vw_final_base_with_addon")
    count = df_final_FA.count()
    display(count)


In [0]:
# Generate the concatenated string
base_without_pk = filtered_basetable_columns.copy()
for pk in get_pk_temp:
    base_without_pk.remove(pk.strip())
Mergehashkey_columns = list(set(set_addon + base_without_pk))
concatenated_columns = ','.join(Mergehashkey_columns)

In [0]:
# Use the concatenated string in the SQL query
query = f"""
select
 *,
 MD5(
    CONCAT_WS(',', {concatenated_columns})
  ) AS MergeHashKey
  from
  vw_final_base_with_addon
"""
df_source = spark.sql(query)
set_addon.append('MergeHashKey')
set_addon=set(set_addon)
df_source.createOrReplaceTempView("vw_source")

In [0]:
query = f"""
select 
  {','.join([f'target.{col}' for col in filtered_basetable_columns]) if spark.sql(f"SELECT COUNT(*) FROM {DestinationSchema}.{DestinationTable}").collect()[0][0] > 0 else ','.join([f'source.{col}' for col in get_pk.split(',')])},
  {','.join([f'source.{col}' for col in set_addon])},
  current_user() as DW_Created_By,
  current_timestamp() as DW_Created_Date,
  current_user() as DW_Modified_By,
  current_timestamp() as DW_Modified_Date,
  current_timestamp() as Start_Date,
  NULL as End_Date,
  'Yes' as CurrentRecord,
  CASE 
    WHEN { ' AND '.join([f'target.{col} IS NULL' for col in get_pk.split(',')]) } THEN 'Insert'
    WHEN { ' AND '.join([f'target.{col} = source.{col}' for col in get_pk.split(',')]) } AND source.MergeHashKey != target.MergeHashKey THEN 'Update'
    ELSE 'No Changes' 
  END As Action_Code  
from (select {','.join(set_addon)}, {','.join([f'{col}' for col in get_pk.split(',')])} from vw_source group by all) as source
left join {DestinationSchema}.{DestinationTable} as target
on { ' AND '.join([f'target.{col} = source.{col}' for col in get_pk.split(',')]) }
where target.end_date is null
"""

df_source = spark.sql(query)
df_source = df_source.dropDuplicates()
df_source.createOrReplaceTempView("vw_silver")
final_col = df_source.columns
final_col.remove('Action_Code')

In [0]:
spark.sql(f"""
    INSERT INTO {DestinationSchema}.{DestinationTable}({','.join(final_col)})
    SELECT {','.join(final_col)} FROM vw_silver WHERE Action_Code IN ('Insert', 'Update')
""")

spark.sql(f"""
    MERGE INTO {DestinationSchema}.{DestinationTable} AS Target
    USING (SELECT {','.join(final_col)} FROM VW_silver WHERE Action_Code='Update') AS Source
    ON { ' AND '.join([f'Target.{col} = Source.{col}' for col in get_pk.split(',')])} AND Target.MergeHashKey != Source.MergeHashKey
    WHEN MATCHED THEN UPDATE SET
    Target.End_Date = CURRENT_TIMESTAMP(),
    Target.DW_Modified_Date = Source.DW_Modified_Date,
    Target.DW_Modified_By = Source.DW_Modified_By,
    Target.CurrentRecord = 'No'
""")

In [0]:
df = spark.sql(f"select * from {DestinationSchema}.{DestinationTable} where End_Date is null")
df.createOrReplaceTempView("target_view")

DFSourceNull = spark.sql(f"""
                SELECT t.*,
                    CASE WHEN s.{get_pk.split(',')[0]} IS NULL THEN 'No' ELSE 'Yes' END AS CurrentRecordTmp
                FROM target_view t
                FULL JOIN VW_silver s
                ON { ' AND '.join([f's.{col} = t.{col}' for col in get_pk.split(',')]) }
            """)

# Filter out the 'DeleteFlag' rows for next steps
DFSourceNull.createOrReplaceTempView("SourcetoInsertUpdate")

# Merge operation
MergeQuery = f"""
        MERGE INTO {DestinationSchema}.{DestinationTable} AS target
        USING SourcetoInsertUpdate AS source
        ON { ' AND '.join([f'target.{col} = source.{col}' for col in get_pk.split(',')]) }
        AND source.CurrentRecordTmp = 'No'
        WHEN MATCHED THEN
            UPDATE SET target.CurrentRecord = 'Deleted', target.end_date=current_date(), target.DW_modified_Date=current_date(),target.DW_Modified_By='Databricks'
    """

spark.sql(MergeQuery)