#### 25HK010BN - Distribution - Agency - Data analytics review on Agency Pre-sales Controls (Python / Databricks)

##### import packages / files from Sharepoint

In [None]:
import pandas as pd
import pyspark.pandas as ps
!pip install openpyxl

In [None]:
# read param file from GIA blob 
df_param = pd.read_excel('/Volumes/aiahk_dna_p_catalog/dna_gia_blob/gia_blob_volume/Agency_PD/parameters.xlsx', engine='openpyxl', index_col='param_name')
df_param.loc['review_period_start']

In [None]:
df_param

In [None]:
from datetime import datetime

def is_invalid_date(date_str):
    # Return True if date_str is null, empty, or not a valid date after 1902-01-01
    if pd.isnull(date_str) or str(date_str).strip() == '':
        return True
    if str(date_str)[0:10] >= '2099-12-31':
        return False
    try:
        dt = pd.to_datetime(date_str, errors='coerce')
        if pd.isnull(dt):
            return True
        if dt <= pd.Timestamp('1902-01-01'):
            return True
        # Check for invalid year, month, or day
        if dt.year < 1902 or dt.month < 1 or dt.month > 12 or dt.day < 1 or dt.day > 31:
            return True
    except Exception:
        return True
    return False

##### Data integrity scripts

###### [HK & Macau] DAS_DI_001 Identify agents with blank or invalid agent contract dates (with date format other than 'YYYY-MM-DD' or earlier than '1902-01-01') on DAS

In [None]:
df_DI001 = spark.sql(
    f"""
    SELECT agt_mas.AGT_CD, agt_mas.AGT_FULL_NAME, agt_mas.CHANNEL_TYPE, agt_mas.REGION, left(agt_mas.AGT_CONTRACT_DT, 10) AS AGT_CONTRACT_DT, agt_mas.AGT_STATUS, agt_mas.TERMINATION_DT, agt_lic.EFF_DT, agt_lic.EXP_DT, agt_lic.LICENSE_DESC, agt_lic.LICENSE_TYPE, agt_lic.LICENSE_NO
    FROM (
        SELECT *
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_master 
        WHERE AGT_STATUS = '00' or AGT_STATUS = '10' -- inforce agents
        OR (TERMINATION_DT >= '{df_param.loc["review_period_start"]["param_value"]}' -- filter: termination date after review period start
          AND TERMINATION_DT <= '{df_param.loc["review_period_end"]["param_value"]}' -- filter: termination date before review period end
        )
    ) agt_mas
    LEFT JOIN (
        SELECT * 
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_license 
        WHERE LICENSE_TYPE = '03' -- filter: license type 03 (life)
    ) agt_lic
      ON agt_mas.AGT_CD = agt_lic.AGT_CD
    """
)

# put _sqldf into df_das as pyspark pandas df
df_DI001_ps = df_DI001.pandas_api()
display(df_DI001_ps)
display(df_DI001_ps.shape[0])

# df_DI001.write.mode('overwrite').option("overwriteSchema", "True").saveAsTable('aiahk_dna_p_catalog.dna_gia_blob.ca_agy_DI001')

In [None]:
df_DI001_ps['invalid_contract_date'] = df_DI001_ps['AGT_CONTRACT_DT'].apply(is_invalid_date)
display(df_DI001_ps.head())
df_DI001_exception = df_DI001_ps[df_DI001_ps['invalid_contract_date']]
display(df_DI001_exception)

In [None]:
# output exceptions to table 
if len(df_DI001_exception) > 0:
    df_to_write = df_DI001_exception
else:
    df_to_write = ps.DataFrame([], columns=df_DI001_exception.columns)
    
# Write df_to_write to table with mergeSchema option
df_to_write.to_table('aiahk_dna_p_catalog.dna_gia_blob.ca_agy_DI001',mode='overwrite', overwrite_schema=True,**{
        "mergeSchema": "true"
    }
)

###### [HK & Macau] DAS_DI_002 Identify agents with blank or invalid agent termination dates (with date format other than 'YYYY-MM-DD' or earlier than '1902-01-01') on DAS

In [None]:
df_DI002 = spark.sql(
    f"""
    SELECT agt_mas.AGT_CD, agt_mas.AGT_FULL_NAME, agt_mas.CHANNEL_TYPE, agt_mas.REGION, left(agt_mas.AGT_CONTRACT_DT, 10) AS AGT_CONTRACT_DT, agt_mas.AGT_STATUS, agt_mas.TERMINATION_DT, agt_lic.EFF_DT, agt_lic.EXP_DT, agt_lic.LICENSE_DESC, agt_lic.LICENSE_TYPE, agt_lic.LICENSE_NO
    FROM (
        SELECT *
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_master 
        WHERE AGT_STATUS NOT IN ('00','10') 
        )
     agt_mas
    LEFT JOIN (
        SELECT * 
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_license 
        WHERE LICENSE_TYPE = '03' -- filter: license type 03 (life)
    ) agt_lic
      ON agt_mas.AGT_CD = agt_lic.AGT_CD
    """
)

# put _sqldf into df_das as pyspark pandas df
df_DI002_ps = df_DI002.pandas_api()
display(df_DI002_ps)
display(df_DI002_ps.shape[0])


In [None]:
df_DI002_ps['invalid_term_date'] = df_DI002_ps['TERMINATION_DT'].apply(is_invalid_date)
display(df_DI002_ps.head())
df_DI002_exception = df_DI002_ps[df_DI002_ps['invalid_term_date']]
display(df_DI002_exception)

In [None]:
# output exceptions to table 
if len(df_DI002_exception) > 0:
    df_to_write = df_DI002_exception
else:
    df_to_write = ps.DataFrame([], columns=df_DI002_exception.columns)
    
# Write df_to_write to table with mergeSchema option
df_to_write.to_table('aiahk_dna_p_catalog.dna_gia_blob.ca_agy_DI002',mode='overwrite',overwrite_schema=True,**{
        "mergeSchema": "true"
    }
)


###### [HK & Macau] DAS_DI_003 Identify agents with contract dates later than start dates of any policies sold

In [None]:
df_DI003 = spark.sql(
    f"""
    SELECT agt_mas.AGT_CD, agt_mas.AGT_FULL_NAME, agt_mas.CHANNEL_TYPE, agt_mas.REGION, left(agt_mas.AGT_CONTRACT_DT, 10) AS AGT_CONTRACT_DT, agt_mas.AGT_STATUS, agt_mas.TERMINATION_DT, agt_lic.EFF_DT, agt_lic.EXP_DT, agt_lic.LICENSE_DESC, agt_lic.LICENSE_TYPE, agt_lic.LICENSE_NO, UI.*
    FROM (
       SELECT *
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_master 
        WHERE AGT_STATUS = '00' or AGT_STATUS = '10' -- inforce agents
        OR (TERMINATION_DT >= '{df_param.loc["review_period_start"]["param_value"]}' -- filter: termination date after review period start
          AND TERMINATION_DT <= '{df_param.loc["review_period_end"]["param_value"]}' -- filter: termination date before review period end
        )
    )
     agt_mas
    LEFT JOIN (
        SELECT * 
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_license 
        WHERE LICENSE_TYPE = '03' -- filter: license type 03 (life)
    ) agt_lic
      ON agt_mas.AGT_CD = agt_lic.AGT_CD
    LEFT JOIN (
        SELECT AGT_CODE, MIN(app_date) AS earliest_app_date, 
               COLLECT_SET(UI_POL.POLICY_NO)[0] AS earliest_policy_no
        FROM aiahk_dna_p_catalog.dna_gia_blob._ui_policy_app_review_period ui_pol
        INNER JOIN
        aiahk_dna_p_catalog.dna_gia_blob._ui_policy_app_review_period_agt_mapping ui_mapping
        ON ui_pol.POLICY_NO = ui_mapping.POLICY_NO
        GROUP BY ui_mapping.AGT_CODE
    ) UI
    ON UI.AGT_CODE = agt_mas.AGT_CD
    WHERE UI.earliest_app_date < agt_mas.AGT_CONTRACT_DT
    """
)

# put _sqldf into df_das as pyspark pandas df
df_DI003_ps = df_DI003.pandas_api()
display(df_DI003_ps)
display(df_DI003_ps.shape[0])

df_DI003.write.mode('overwrite').option("overwriteSchema", "True").saveAsTable('aiahk_dna_p_catalog.dna_gia_blob.ca_agy_DI003')

###### [HK & Macau] DAS_DI_004 Identify terminiated agents with termination date earlier than agent contract date

In [None]:
df_DI004 = spark.sql(
    f"""
    SELECT agt_mas.AGT_CD, agt_mas.AGT_FULL_NAME, agt_mas.CHANNEL_TYPE, agt_mas.REGION, left(agt_mas.AGT_CONTRACT_DT, 10) AS AGT_CONTRACT_DT, agt_mas.AGT_STATUS, agt_mas.TERMINATION_DT, agt_lic.EFF_DT, agt_lic.EXP_DT, agt_lic.LICENSE_DESC, agt_lic.LICENSE_TYPE, agt_lic.LICENSE_NO
    FROM (
        SELECT *
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_master 
        WHERE AGT_STATUS NOT IN ('00','10') 
        )
     agt_mas
    LEFT JOIN (
        SELECT * 
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_license 
        WHERE LICENSE_TYPE = '03' -- filter: license type 03 (life)
    ) agt_lic
      ON agt_mas.AGT_CD = agt_lic.AGT_CD
    WHERE TERMINATION_DT < AGT_CONTRACT_DT
    """
)

# put _sqldf into df_das as pyspark pandas df
df_DI004_ps = df_DI004.pandas_api()
display(df_DI004_ps)
display(df_DI004_ps.shape[0])

df_DI004.write.mode('overwrite').option("overwriteSchema", "True").saveAsTable('aiahk_dna_p_catalog.dna_gia_blob.ca_agy_DI004')

###### [HK & Macau] DAS_DI_005 Identify agents with blank or invalid license effective dates (with date format other than 'YYYY-MM-DD' or earlier than '1902-01-01') on DAS

In [None]:
df_DI005 = spark.sql(
    f"""
    SELECT agt_mas.AGT_CD, agt_mas.AGT_FULL_NAME, agt_mas.CHANNEL_TYPE, agt_mas.REGION, left(agt_mas.AGT_CONTRACT_DT, 10) AS AGT_CONTRACT_DT, agt_mas.AGT_STATUS, agt_mas.TERMINATION_DT, agt_lic.EFF_DT, agt_lic.EXP_DT, agt_lic.LICENSE_DESC, agt_lic.LICENSE_TYPE, agt_lic.LICENSE_NO
    FROM (
        SELECT *
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_master 
        WHERE AGT_STATUS = '00' or AGT_STATUS = '10' -- inforce agents
        OR (TERMINATION_DT >= '{df_param.loc["review_period_start"]["param_value"]}' -- filter: termination date after review period start
          AND TERMINATION_DT <= '{df_param.loc["review_period_end"]["param_value"]}' -- filter: termination date before review period end
        )
    ) agt_mas
    INNER JOIN (
        SELECT * 
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_license 
    ) agt_lic
      ON agt_mas.AGT_CD = agt_lic.AGT_CD
    WHERE agt_lic.AGT_CD IS NOT NULL
    """
)

# put _sqldf into df_das as pyspark pandas df
df_DI005_ps = df_DI005.pandas_api()
display(df_DI005_ps)
display(df_DI005_ps.shape[0])

# df_DI001.write.mode('overwrite').option("overwriteSchema", "True").saveAsTable('aiahk_dna_p_catalog.dna_gia_blob.ca_agy_DI001')

In [None]:
df_DI005_ps['invalid_licenseEff_date'] = df_DI005_ps['EFF_DT'].apply(is_invalid_date)
display(df_DI005_ps.head())
df_DI005_exception = df_DI005_ps[df_DI005_ps['invalid_licenseEff_date']]
display(df_DI005_exception)

In [None]:
# output exceptions to table 
if len(df_DI005_exception) > 0:
    df_to_write = df_DI005_exception
else:
    df_to_write = ps.DataFrame([], columns=df_DI005_exception.columns)
    
# Write df_to_write to table with mergeSchema option
df_to_write.to_table('aiahk_dna_p_catalog.dna_gia_blob.ca_agy_DI005',mode='overwrite', overwrite_schema=True,**{
        "mergeSchema": "true"
    }
)


###### [HK & Macau] DAS_DI_006 Identify agents with blank or invalid license expiry dates (with date format other than 'YYYY-MM-DD' or earlier than '1902-01-01') on DAS

In [None]:
df_DI006_ps = df_DI005_ps[~(df_DI005_ps['EXP_DT']>=pd.Timestamp.max)] ## exclude out of range future dates
df_DI006_ps['invalid_licenseExp_date'] = df_DI006_ps['EXP_DT'].apply(is_invalid_date)
display(df_DI006_ps.head())
df_DI006_exception = df_DI006_ps[df_DI006_ps['invalid_licenseExp_date']]
display(df_DI006_exception)

# output exceptions to table 
if len(df_DI006_exception) > 0:
    df_to_write = df_DI006_exception
else:
    df_to_write = ps.DataFrame([], columns=df_DI006_exception.columns)
    
# Write df_to_write to table with mergeSchema option
df_to_write.to_table('aiahk_dna_p_catalog.dna_gia_blob.ca_agy_DI006',mode='overwrite',overwrite_schema=True,**{
        "mergeSchema": "true"
    }
)


###### [HK & Macau] DAS_DI_007 Identify agents with blank regulatory license no. on DAS

In [None]:
df_DI007 = spark.sql(
    f"""
    SELECT agt_mas.AGT_CD, agt_mas.AGT_FULL_NAME, agt_mas.CHANNEL_TYPE, agt_mas.REGION, left(agt_mas.AGT_CONTRACT_DT, 10) AS AGT_CONTRACT_DT, agt_mas.AGT_STATUS, agt_mas.TERMINATION_DT, agt_lic.MAX_LICENSE_NO
    FROM (
        SELECT *
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_master 
        WHERE AGT_STATUS = '00' or AGT_STATUS = '10' -- inforce agents
        OR (TERMINATION_DT >= '{df_param.loc["review_period_start"]["param_value"]}' -- filter: termination date after review period start
          AND TERMINATION_DT <= '{df_param.loc["review_period_end"]["param_value"]}' -- filter: termination date before review period end
        )
    ) agt_mas
    INNER JOIN (
        SELECT AGT_CD, max(LICENSE_NO) as MAX_LICENSE_NO
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_license 
        WHERE LICENSE_TYPE in ('01','02','03','04','99') 
        GROUP BY AGT_CD 
    ) agt_lic
      ON agt_mas.AGT_CD = agt_lic.AGT_CD
    WHERE agt_lic.AGT_CD IS NOT NULL
    AND (MAX_LICENSE_NO IS NULL OR TRIM(MAX_LICENSE_NO) = '')
    """
)

# put _sqldf into df_das as pyspark pandas df
df_DI007_ps = df_DI007.pandas_api()
display(df_DI007_ps)
display(df_DI007_ps.shape[0])

df_DI007.write.mode('overwrite').option("overwriteSchema", "True").saveAsTable('aiahk_dna_p_catalog.dna_gia_blob.ca_agy_DI007')

###### [HK & Macau] DAS_DI_008 Identify agents with blank license flag or other than '0' and '1'

In [None]:
df_DI008 = spark.sql(
    f"""
    SELECT agt_mas.AGT_CD, agt_mas.AGT_FULL_NAME, agt_mas.CHANNEL_TYPE, agt_mas.REGION, left(agt_mas.AGT_CONTRACT_DT, 10) AS AGT_CONTRACT_DT, agt_mas.AGT_STATUS, agt_mas.TERMINATION_DT, agt_mas.AGT_TYPE_DESC, agt_lic.EFF_DT, agt_lic.EXP_DT, agt_lic.LICENSE_DESC, agt_lic.LICENSE_TYPE, agt_lic.LICENSE_NO, agt_lic.MAX_GI_LICENSE_IND
    FROM (
        SELECT *
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_master 
        WHERE (AGT_CONTRACT_DT <= '{df_param.loc["review_period_end"]["param_value"]}') AND -- exclude agents contracted after review period
        (
          AGT_STATUS = '00' or AGT_STATUS = '10' -- inforce agents
          OR  (TERMINATION_DT >= '{df_param.loc["review_period_start"]["param_value"]}' -- filter: termination date after review period start
              AND TERMINATION_DT <= '{df_param.loc["review_period_end"]["param_value"]}' -- filter: termination date before review period end
        ))
    ) agt_mas
    LEFT JOIN (
        SELECT AGT_CD,
               MAX(EFF_DT) AS EFF_DT,
               MAX(EXP_DT) AS EXP_DT,
               MAX(LICENSE_DESC) AS LICENSE_DESC,
               MAX(LICENSE_TYPE) AS LICENSE_TYPE,
               MAX(LICENSE_NO) AS LICENSE_NO,
               MAX(GI_LICENSE_IND) AS MAX_GI_LICENSE_IND
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_license 
        GROUP BY AGT_CD
    ) agt_lic
      ON agt_mas.AGT_CD = agt_lic.AGT_CD
    WHERE agt_lic.MAX_GI_LICENSE_IND IS NULL OR trim(agt_lic.MAX_GI_LICENSE_IND) = "" OR agt_lic.MAX_GI_LICENSE_IND NOT IN (1,0) 
    """
)

# put _sqldf into df_das as pyspark pandas df
df_DI008_ps = df_DI008.pandas_api()
display(df_DI008_ps)
display(df_DI008_ps.shape[0])

df_DI008.write.mode('overwrite').option("overwriteSchema", "True").saveAsTable('aiahk_dna_p_catalog.dna_gia_blob.ca_agy_DI008')

###### [HK & Macau] DAS_DI_009 Identify agents with blank or null license type code

In [None]:
df_DI009 = spark.sql(
    f"""
    SELECT agt_mas.AGT_CD, agt_mas.AGT_FULL_NAME, agt_mas.CHANNEL_TYPE, agt_mas.REGION, left(agt_mas.AGT_CONTRACT_DT, 10) AS AGT_CONTRACT_DT, agt_mas.AGT_STATUS, agt_mas.TERMINATION_DT, agt_mas.AGT_TYPE_DESC, agt_lic.EFF_DT, agt_lic.EXP_DT, agt_lic.LICENSE_DESC, agt_lic.LICENSE_TYPE, agt_lic.LICENSE_NO, agt_lic.MAX_GI_LICENSE_IND
    FROM (
        SELECT *
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_master 
        WHERE (AGT_CONTRACT_DT <= '{df_param.loc["review_period_end"]["param_value"]}') AND -- exclude agents contracted after review period
        (
          AGT_STATUS = '00' or AGT_STATUS = '10' -- inforce agents
          OR  (TERMINATION_DT >= '{df_param.loc["review_period_start"]["param_value"]}' -- filter: termination date after review period start
              AND TERMINATION_DT <= '{df_param.loc["review_period_end"]["param_value"]}' -- filter: termination date before review period end
        ))
    ) agt_mas
    LEFT JOIN (
        SELECT AGT_CD,
               MAX(EFF_DT) AS EFF_DT,
               MAX(EXP_DT) AS EXP_DT,
               MAX(LICENSE_DESC) AS LICENSE_DESC,
               MAX(LICENSE_TYPE) AS LICENSE_TYPE,
               MAX(LICENSE_NO) AS LICENSE_NO,
               MAX(GI_LICENSE_IND) AS MAX_GI_LICENSE_IND
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_license 
        GROUP BY AGT_CD
    ) agt_lic
      ON agt_mas.AGT_CD = agt_lic.AGT_CD
    WHERE agt_lic.LICENSE_TYPE IS NULL OR trim(agt_lic.LICENSE_TYPE) = ""
    """
)

# put _sqldf into df_das as pyspark pandas df
df_DI009_ps = df_DI009.pandas_api()
display(df_DI009_ps)
display(df_DI009_ps.shape[0])

df_DI009.write.mode('overwrite').option("overwriteSchema", "True").saveAsTable('aiahk_dna_p_catalog.dna_gia_blob.ca_agy_DI009')

##### Summary figures

###### Number of Records Analysed

In [None]:
df = spark.sql(
    f"""
    SELECT REGION, count(distinct agt_mas.AGT_CD)
    FROM (
        SELECT *
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_master 
        where (agt_status in ('00','10') or TERMINATION_DT >= '2025-01-01')
    ) agt_mas
    GROUP BY REGION
    """
)
display(df)

In [None]:
df = spark.sql(
    f"""
    SELECT agt_type_desc, AGT_DUMMY_TYPE, count(distinct agt_mas.AGT_CD)
    FROM (
        SELECT *
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_master 
        where (agt_status in ('00','10')) 
 /*       and AGT_TYPE_DESC not like '%Staff%'
        and AGT_TYPE_DESC not like '%Intern%'
        and AGT_TYPE_DESC not like '%Corporate%'
        and AGT_DUMMY_TYPE is null
   */ ) agt_mas
    group by  agt_type_desc, AGT_DUMMY_TYPE
    """
)
display(df)

In [None]:
df = spark.sql(
    f"""
    SELECT REGION, count(distinct UI.POLICY_NO)
    FROM (
        SELECT *
        FROM aiahk_dna_p_catalog.dna_gia_blob.`_ui_policy_app_review_period` 
    ) UI
    LEFT JOIN 
      aiahk_dna_p_catalog.dna_gia_blob._ui_policy_app_review_period_agt_mapping agt_map
      ON UI.POLICY_NO = agt_map.POLICY_NO
    INNER JOIN  
    (
        SELECT * 
        FROM aiahk_dna_p_catalog.dna_gia_blob._das_agent_master 
    ) agt_mas
     on agt_map.AGT_CODE = agt_mas.AGT_CD
     GROUP BY REGION
     """
)
display(df)