In [1]:

# %% [markdown]
# # Jupyter Notebook Loading Header
#
# This is a custom loading header for Jupyter Notebooks in Visual Studio Code.
# It includes common imports and settings to get you started quickly.
# %% [markdown]
## Import Libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from google.cloud import bigquery
from google.cloud import storage
import os

import time
from datetime import datetime
import uuid
import joblib
import uuid

import gcsfs
import duckdb as dd



path = r'C:\Users\Dwaipayan\AppData\Roaming\gcloud\legacy_credentials\dchakroborti@tonikbank.com\adc.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path
client = bigquery.Client(project='prj-prod-dataplatform')
os.environ["GOOGLE_CLOUD_PROJECT"] = "prj-prod-dataplatform"
# %% [markdown]
## Configure Settings
# Set options or configurations as needed
pd.set_option('display.max_columns', None)
pd.set_option("Display.max_rows", 100)


# Constant

In [2]:
CURRENT_DATE = datetime.now().strftime("%Y%m%d")


# <div align="left" style="color:rgb(51, 250, 250);"> Functions </div>

## <div align="left" style="color:rgb(51, 250, 250);"> Save the data to google clound storage </div>

In [3]:
def save_df_to_gcs(df, bucket_name, destination_blob_name, file_format='csv'):
    """Saves a pandas DataFrame to Google Cloud Storage.

    Args:
        df: The pandas DataFrame to save.
        bucket_name: The name of the GCS bucket.
        destination_blob_name: The name of the blob to be created.
        file_format: The file format to save the DataFrame in ('csv' or 'parquet').
    """

    # Create a temporary file
    if file_format == 'csv':
        temp_file = 'temp.csv'
        df.to_csv(temp_file, index=False)
    elif file_format == 'parquet':
        temp_file = 'temp.parquet'
        df.to_parquet(temp_file, index=False)
    else:
        raise ValueError("Invalid file format. Please choose 'csv' or 'parquet'.")

    # Upload the file to GCS
    storage_client = storage.Client(project="prj-prod-dataplatform")

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(temp_file)

    # Remove the temporary file
    import os
    os.remove(temp_file)
    


## <div align="left" style="color:rgb(51, 250, 250);"> Read the Data from Google Cloud Storage </div>

In [4]:
def read_df_from_gcs(bucket_name, source_blob_name, file_format='csv'):
    """Reads a DataFrame from Google Cloud Storage.

    Args:
        bucket_name: The name of the GCS bucket.
        source_blob_name: The name of the blob to read.
        file_format: The file format to read ('csv' or 'parquet').

    Returns:
        pandas.DataFrame: The data loaded from the GCS file.
    """
    # Create a temporary file name
    temp_file = f'temp.{file_format}'
    
    try:
        # Initialize GCS client
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)

        # Download the file to a temporary location
        blob.download_to_filename(temp_file)

        # Read the file into a DataFrame
        if file_format == 'csv':
            df = pd.read_csv(temp_file, low_memory=False)
        elif file_format == 'parquet':
            df = pd.read_parquet(temp_file)
        else:
            raise ValueError("Invalid file format. Please choose 'csv' or 'parquet'.")

        return df

    finally:
        # Clean up the temporary file
        if os.path.exists(temp_file):
            os.remove(temp_file)

## <div align = "left" style="color:rgb(51, 250, 250);"> Data Quality Report </div>

In [5]:
def data_quality_report(df, target_col='ln_fspd30_flag'):
    # Initialize an empty list to store each row of data
    report_data = []
    # Iterate over each column in the DataFrame to compute metrics
    for col in df.columns:
        # Determine the data type of the column
        data_type = df[col].dtype
       
        # Calculate the number of missing values in the column
        missing_values = df[col].isnull().sum()
       
        # Calculate the percentage of missing values relative to the total number of rows
        missing_percentage = (missing_values / len(df)) * 100
       
        # Calculate the number of unique values in the column
        unique_values = df[col].nunique()
       
        # Calculate the percentage of non-missing values
        non_missing_percentage = ((len(df) - missing_values) / len(df)) * 100
       
        # Check if the column is numeric to compute additional metrics
        if pd.api.types.is_numeric_dtype(df[col]):
            # Compute minimum, maximum, mean, median, mode, mode percentage, standard deviation, and quantiles
            min_value = df[col].min()
            max_value = df[col].max()
            mean_value = df[col].mean()
            median_value = df[col].median()
            mode_value = df[col].mode().iloc[0] if not df[col].mode().empty else None
            mode_percentage = (df[col] == mode_value).sum() / len(df) * 100 if mode_value is not None else None
            std_dev = df[col].std()
            quantile_25 = df[col].quantile(0.25)
            quantile_50 = df[col].quantile(0.50)  # Same as median
            quantile_75 = df[col].quantile(0.75)
            
            # Calculate the Interquartile Range (IQR)
            iqr = quantile_75 - quantile_25
            
            # Calculate Skewness and Kurtosis
            skewness = df[col].skew()
            kurtosis = df[col].kurt()
            
            # Calculate Coefficient of Variation (CV) - standardized measure of dispersion
            cv = (std_dev / mean_value) * 100 if mean_value != 0 else None
            
            # Calculate correlation with target variable if target exists in dataframe
            if target_col in df.columns and col != target_col and pd.api.types.is_numeric_dtype(df[target_col]):
                # Calculate correlation only using rows where both columns have non-null values
                correlation = df[[col, target_col]].dropna().corr().iloc[0, 1]
            else:
                correlation = None
        else:
            # Assign None for non-numeric columns where appropriate
            min_value = None
            max_value = None
            mean_value = None
            median_value = None
            mode_value = df[col].mode().iloc[0] if not df[col].mode().empty else None
            mode_percentage = (df[col] == mode_value).sum() / len(df) * 100 if mode_value is not None else None
            std_dev = None
            quantile_25 = None
            quantile_50 = None
            quantile_75 = None
            iqr = None
            skewness = None
            kurtosis = None
            cv = None
            correlation = None
       
        # Append the computed metrics for the current column to the list
        report_data.append({
            'Column': col,
            'Data Type': data_type,
            'Missing Values': missing_values,
            'Missing Percentage': missing_percentage,
            'Unique Values': unique_values,
            'Min': min_value,
            'Max': max_value,
            'Mean': mean_value,
            'Median': median_value,
            'Mode': mode_value,
            'Mode Percentage': mode_percentage,
            'Std Dev': std_dev,
            'Non-missing Percentage': non_missing_percentage,
            '25% Quantile': quantile_25,
            '50% Quantile': quantile_50,
            '75% Quantile': quantile_75,
            'IQR': iqr,
            'Skewness': skewness,
            'Kurtosis': kurtosis,
            'CV (%)': cv,
            f'Correlation with {target_col}': correlation
        })
    # Create the DataFrame from the list of dictionaries
    report = pd.DataFrame(report_data)
   
    # Return the complete data quality report DataFrame
    return report

# <div align = "left" style="color:rgb(51,250,250);"> Upload pickle file to Google Cloud Storage Bucke </div>

In [6]:
def upload_to_gcs(bucket_name, source_file_path, destination_blob_name):
    """Uploads a file to Google Cloud Storage"""
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    
    blob.upload_from_filename(source_file_path)
    print(f"File {source_file_path} uploaded to {bucket_name}/{destination_blob_name}")

In [7]:
import pickle
import io
from google.cloud import storage
def save_pickle_to_gcs(data, bucket_name, destination_blob_name):
    """
    Save any Python object as a pickle file to Google Cloud Storage
    
    Args:
        data: The Python object to pickle (DataFrame, dict, list, etc.)
        bucket_name: Name of the GCS bucket
        destination_blob_name: Path/filename in the bucket
    """
    # Initialize the GCS client
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    
    # Serialize the data to pickle format in memory
    pickle_buffer = io.BytesIO()
    pickle.dump(data, pickle_buffer)
    pickle_buffer.seek(0)
    
    # Upload the pickle data to GCS
    blob.upload_from_file(pickle_buffer, content_type='application/octet-stream')
    print(f"Pickle file uploaded to gs://{bucket_name}/{destination_blob_name}")

# Table

In [8]:
schema1 = 'risk_mart'


al = f'applied_loans_20230101_{CURRENT_DATE}'
altrans = f'applied_loans_20210701_{CURRENT_DATE}_trans'
nal = f'tsa_onboarded_but_never_applied_loan_20230101_{CURRENT_DATE}'

# worktable_data_analysis.trench2_never_applied_snapshot_transactions

In [None]:
sq = """  
create or replace table `prj-prod-dataplatform.worktable_data_analysis.trench2_never_applied_snapshot_transactions` as 
with input_customers as (
SELECT 
customerId,
credolabRefNumber onb_credolabRefNumber,
credo_inquiry_date as onb_credo_inquiry_date,
onb_tsa_onboarding_datetime,
onb_age,
onb_doc_type,
onb_os_version as onb_osversion,
onb_email,
ln_industry_new,
ln_employment_type_new,
DATE('2025-08-10') as snapshot_date,
DATE_DIFF(DATE('2025-08-10'),DATE(onb_tsa_onboarding_datetime),DAY) dob_snapshot_date,
DATE_DIFF(DATE(onb_tsa_onboarding_datetime),credo_inquiry_date,DAY) days_since_credo_call_onb,
DATE_DIFF(DATE('2025-08-10'),credo_inquiry_date,DAY) AS days_since_credo_call_snapshot_date,
ln_self_dec_income,
ln_marital_status,
ln_education_level,
ln_nature_of_work_new,
ln_source_funds_new,
onb_first_name,
onb_middle_name,
onb_city,
onb_province,
onb_last_name,
ln_brand,
ln_cnt_dependents,
ln_doc_type,
ln_osversion,
onb_kyc_status,
onb_latitude,
onb_longitude,
ln_user_type,
ln_loan_type,
ln_prod_type,
ln_loan_applied_flag,
c_credo_score,
cust_status_flag,
cust_status_close_date,

FROM
`risk_mart.tsa_onboarded_but_never_applied_loan_20230101_20250810`
--where DATE_ADD(onb_tsa_onboarding_datetime,INTERVAL 180 DAY) <= '2025-03-09'
),
cust_onboarding_acc_data as (
    SELECT
    DATE(opendate,'Asia/Manila') OFDATEOPENED,
    DATE(closuredate,'Asia/Manila') ofdateclosed,
    closed OFISCLOSED,
    DATE(c.created_dt) registration_date,
    c.created_dt as onboarding_date,
    cust_id,
    DATETIME(reccreatedon,'Asia/Manila') reccreatedon,
    accountid,
    productid,
    accountdescription as account_type,
    clearedbalance,
    snapshot_date
    FROM `dl_customers_db_raw.tdbk_customer_mtb` c    
    JOIN `finastra_raw.account` b ON c.cust_id = b.ubcustomercode
    JOIN input_customers on c.cust_id = CAST(input_customers.customerid AS STRING)
),
main_transaction_data AS 
(
     SELECT 
    transaction_date,
    OFDATEOPENED,
    OFISCLOSED,
    registration_date,
    transaction_id,
    b.cust_id customer_id,
    a.accountid,
    productid,
    b.account_type,
    transaction_code,
    a.status,
    channel,
    credit_debit_indicator,
    inter_exter_flag,
    trx_amount,
    core_narration,
    input_customers.snapshot_date
    -- customer_transactions to get the transactions
    FROM cust_onboarding_acc_data b
    JOIN input_customers on b.cust_id = CAST(input_customers.customerid AS STRING)
    LEFT JOIN `risk_mart.customer_transactions` a ON a.accountid = b.accountid
    and a.transaction_date < input_customers.snapshot_date and a.status = 'Success'
    --and a.transaction_datetime < input_customers.snapshot_date
    WHERE 1=1
   
),


#### Net Cash In ####
  -- 1. Outside Tonik to TSA
  -- 2. Other Tonik user to Own Tonik Account


net_cash_in AS 
(
  ## 1. Outside Tonik to TSA
  SELECT
    transaction_date,
    OFDATEOPENED,
    OFISCLOSED,
    registration_date,
    transaction_id,
    customer_id,
    accountid,
    account_type,
    status,
    channel,
    credit_debit_indicator,
    inter_exter_flag,
    trx_amount,
    core_narration,
    'Net Cash In' main_transaction_type,
    'Outside Tonik to TSA' sub_transaction_type,
    snapshot_date
  FROM main_transaction_data
  WHERE 1=1
  -- main conditions: should be a successful transaciton and credit and all coming from Tonik Account
  AND credit_debit_indicator = 'CREDIT'
  AND account_type = 'Tonik Account' and LOWER(core_narration) NOT LIKE '%blocking%' and transaction_code not like 'A0%'
  AND transaction_code IN ('N01','IP2','XE2','00T','21C','P01')
  -- 1. Outside Tonik to TSA conditions (all cash in)
  AND inter_exter_flag = 'Outside Tonik'

  UNION ALL

  ## 2. Other Tonik user to Own Tonik Account
  SELECT
    transaction_date,
    OFDATEOPENED,
    OFISCLOSED,
    registration_date,
    transaction_id,
    customer_id,
    accountid,
    account_type,
    status,
    channel,
    credit_debit_indicator,
    inter_exter_flag,
    trx_amount,
    core_narration,
    'Net Cash In' main_transaction_type,
    'Other Tonik Users to Town Tonik Account' sub_transaction_type,
    snapshot_date
  FROM main_transaction_data
  WHERE 1=1
  -- main conditions: should be a successful transaciton and credit and all coming from Tonik Account
  AND credit_debit_indicator = 'CREDIT'
  AND account_type = 'Tonik Account' and LOWER(core_narration) NOT LIKE '%blocking%' and transaction_code not like 'A0%'
  AND transaction_code IN ('N01','IP2','XE2','00T','21C','P01')

  -- 2. Other Tonik user to Own Tonik Account
  AND inter_exter_flag = 'Inside Tonik'
  AND core_narration LIKE '%Receive money from other Tonik Account%'
  -- AND LEFT(core_narration,STRPOS(core_narration, ",")-1) = 'Receive money from other Tonik Account'
)

#### Net Cash Out ####
-- 1. Bills Pay
-- 2. Card Transactions
-- 3. Own TSA to other Tonik Users
-- 4. TSA to Outside Tonik

, net_cash_out AS 
(

 ## 1. Bills Pay
  SELECT 
    transaction_date,
    OFDATEOPENED,
    OFISCLOSED,
    registration_date,
    transaction_id,
    customer_id,
    accountid,
    account_type,
    status,
    channel,
    credit_debit_indicator,
    inter_exter_flag,
    trx_amount,
    core_narration,
    'Net Cash Out' main_transaction_type,
    'Bills Pay' sub_transaction_type,
    snapshot_date,
  FROM main_transaction_data
  WHERE 1=1
  -- main conditions: should be a successful transaciton and debit
  AND credit_debit_indicator = 'DEBIT'
  AND LOWER(core_narration) NOT LIKE '%blocking%'

  -- 1. Bills Pay
  AND channel = 'Billspay'


  UNION ALL

  ## 2. Card Transactions (Cash Out)
  SELECT
    a.transaction_date,
    a.OFDATEOPENED,
    a.OFISCLOSED,
    a.registration_date,
    a.transaction_id,
    a.customer_id,
    a.accountid,
    a.account_type,
    a.status,
    a.channel,
    a.credit_debit_indicator,
    a.inter_exter_flag,
    a.trx_amount,
    a.core_narration,
    'Net Cash Out' main_transaction_type,
    'Card Transactions (Cash Out)' sub_transaction_type,
    snapshot_date
  FROM main_transaction_data a
  -- 2. Card Transactions (Cash Out) -- using the table made above
  WHERE 1=1
  -- main conditions: should be a successful transaciton and debit and coming from tonik account
  AND a.credit_debit_indicator = 'DEBIT'
  AND a.account_type = 'Tonik Account'
  AND transaction_code like 'A0%' and core_narration not like '%Blocking%'

  UNION ALL

  ## 3. Own TSA to other Tonik Users
  SELECT DISTINCT
    transaction_date,
    OFDATEOPENED,
    OFISCLOSED,
    registration_date,
    transaction_id,
    customer_id,
    accountid,
    account_type,
    status,
    channel,
    credit_debit_indicator,
    inter_exter_flag,
    trx_amount,
    core_narration,
    'Net Cash Out' main_transaction_type,
    'Own TSA to Other Tonik Users' sub_transaction_type,
    snapshot_date
  FROM main_transaction_data a
  WHERE 1=1
  -- main conditions: should be a successful transaciton and debit
  AND a.credit_debit_indicator = 'DEBIT'
  AND a.account_type = 'Tonik Account'
  AND transaction_code not like 'A0%' and core_narration not like '%Blocking%'

  -- 3. Own TSA to other Tonik Users
  AND a.channel = 'Core transactions'
  AND a.inter_exter_flag = 'Inside Tonik'
  --AND LOWER(core_narration) LIKE '%send money to other tonik account%' 
  -- AND LOWER(core_narration) NOT LIKE '%scontri%'
  -- AND LOWER(core_narration) NOT LIKE '%stash%'
  -- AND LOWER(core_narration) NOT LIKE '%time deposit%'

  UNION ALL

  ## 4. TSA to Outside Tonik (Other banks)
  SELECT DISTINCT
    transaction_date,
    OFDATEOPENED,
    OFISCLOSED,
    registration_date,
    transaction_id,
    customer_id,
    accountid,
    account_type,
    status,
    channel,
    credit_debit_indicator,
    inter_exter_flag,
    trx_amount,
    core_narration,
    'Net Cash Out' main_transaction_type,
    'TSA to Outside Tonik (Other Banks)' sub_transaction_type,
    snapshot_date
  FROM main_transaction_data a
  WHERE 1=1
  -- main conditions: should be a successful transaciton and debit
  AND a.credit_debit_indicator = 'DEBIT'
  AND a.account_type = 'Tonik Account'
  AND core_narration not like '%Blocking%'

  -- channels not in core transactions and billspay with the flag as outside tonik are sending to other banks
  AND a.channel NOT IN  ('Core transactions','Billspay')
  AND a.inter_exter_flag = 'Outside Tonik'
)

, transactions_sub AS 
(
  -- merging the cash ins and cash outs
  SELECT DISTINCT *
  FROM net_cash_in 
  UNION ALL
  SELECT DISTINCT *
  FROM net_cash_out
)

, date_diff_sub AS 
(
    -- to get the date difference between 2 transactions (cash in and cash out)
    SELECT customer_id,
    'Overall' days_diff_type,
    DATE_DIFF(LEAD(transaction_date) OVER (PARTITION BY customer_id ORDER BY transaction_date,core_narration ASC),transaction_date,DAY) days_bt_trans,
    snapshot_date
    FROM 
    (
        SELECT DISTINCT
        transaction_date,
        customer_id,
        main_transaction_type,
        snapshot_date,
        core_narration
        FROM transactions_sub
        WHERE 1=1
        --   AND customer_id IN ('2077378','2081999','2475220','2485072')
    )

    UNION ALL

    -- to get the date difference between 2 cash ins
    SELECT customer_id,
    'Cash In' days_diff_type,
    DATE_DIFF(LEAD(transaction_date) OVER (PARTITION BY customer_id ORDER BY transaction_date ASC),transaction_date,DAY) days_bt_trans,
    snapshot_date
    FROM 
    (
        SELECT DISTINCT
        transaction_date,
        customer_id,
        main_transaction_type,
        snapshot_date,
        core_narration
        FROM transactions_sub
        WHERE 1=1
        --   AND customer_id IN ('2077378','2081999','2475220','2485072')
        AND main_transaction_type = 'Net Cash In'
    )

    UNION ALL

    -- to get the date difference between 2 cash outs
    SELECT customer_id,
    'Cash Out' days_diff_type,
    DATE_DIFF(LEAD(transaction_date) OVER (PARTITION BY customer_id ORDER BY transaction_date,core_narration ASC),transaction_date,DAY) days_bt_trans,
    snapshot_date
    FROM 
    (
        SELECT DISTINCT
        transaction_date,
        customer_id,
        main_transaction_type,
        snapshot_date,
        core_narration
        FROM transactions_sub
        WHERE 1=1
        --   AND customer_id IN ('2077378','2081999','2475220','2485072')
        AND main_transaction_type = 'Net Cash Out'
    )
)

, days_bt_trans_avg AS 
(
-- get the average days in between 
SELECT DISTINCT
customer_id,
AVG(IF(days_diff_type='Overall',days_bt_trans,NULL)) overall_avg_days_bt_trans,
AVG(IF(days_diff_type='Cash In',days_bt_trans,NULL)) net_cash_in_avg_days_bt_trans,
AVG(IF(days_diff_type='Cash Out',days_bt_trans,NULL)) net_cash_out_avg_days_bt_trans
FROM date_diff_sub
GROUP BY 1
)

, days_bt_trans_med AS 
(
-- get the median days in between
SELECT DISTINCT
customer_id,
PERCENTILE_CONT(IF(days_diff_type='Overall',days_bt_trans,NULL), .50) OVER (PARTITION BY customer_id) overall_med_days_bt_trans,
PERCENTILE_CONT(IF(days_diff_type='Cash In',days_bt_trans,NULL), .50) OVER (PARTITION BY customer_id) cash_in_med_days_bt_trans,
PERCENTILE_CONT(IF(days_diff_type='Cash Out',days_bt_trans,NULL), .50) OVER (PARTITION BY customer_id) cash_out_med_days_bt_trans,
FROM date_diff_sub
)

, transactions_final AS 
(
SELECT DISTINCT
acc.customerid as customer_id,
## Number of transactions within the observation window (x days from onboarding date),
## Cash In Count Details
COUNT(DISTINCT(IF(main_transaction_type = 'Net Cash In',transaction_id,NULL))) tx_cnt_cash_in_total,
COUNT(DISTINCT(IF(sub_transaction_type='Outside Tonik to TSA' ,transaction_id,NULL))) tx_cnt_cash_in_ob2t,
COUNT(DISTINCT(IF(sub_transaction_type='Other Tonik Users to Town Tonik Account' ,transaction_id,NULL))) tx_cnt_cash_in_ot2t,

## Cash In Amount Details
SUM(IF(main_transaction_type = 'Net Cash In',trx_amount,NULL)) tx_amt_cash_in_total,
SUM((IF(sub_transaction_type='Outside Tonik to TSA' ,trx_amount,NULL))) tx_amt_cash_in_ob2t,
SUM((IF(sub_transaction_type='Other Tonik Users to Town Tonik Account',trx_amount,NULL))) tx_amt_cash_in_ot2t,

## Cash Out Count Details
COUNT(DISTINCT(IF(main_transaction_type = 'Net Cash Out',transaction_id,NULL))) tx_cnt_cash_out_total,
COUNT(DISTINCT(IF(sub_transaction_type= 'Bills Pay' ,transaction_id,NULL))) tx_cnt_cash_out_billpay,
COUNT(DISTINCT(IF(sub_transaction_type= 'Card Transactions (Cash Out)' ,transaction_id,NULL))) tx_cnt_cash_out_cards,
COUNT(DISTINCT(IF(sub_transaction_type= 'Own TSA to Other Tonik Users' ,transaction_id,NULL))) tx_cnt_cash_out_t2ot,
COUNT(DISTINCT(IF(sub_transaction_type= 'TSA to Outside Tonik (Other Banks)' ,transaction_id,NULL))) tx_cnt_cash_out_t2ob,

## Cash Out Amount Details
SUM(IF(main_transaction_type = 'Net Cash Out',trx_amount,NULL)) tx_amt_cash_out_total,
SUM(IF(sub_transaction_type= 'Bills Pay' ,trx_amount,NULL)) tx_amt_cash_out_billpay,
SUM(IF(sub_transaction_type= 'Card Transactions (Cash Out)' ,trx_amount,NULL)) tx_amt_cash_out_cards,
SUM(IF(sub_transaction_type= 'Own TSA to Other Tonik Users' ,trx_amount,NULL)) tx_amt_cash_out_t2ot,
SUM(IF(sub_transaction_type= 'TSA to Outside Tonik (Other Banks)' ,trx_amount,NULL)) tx_amt_cash_out_t2ob,
FROM input_customers acc 
LEFT JOIN transactions_sub a ON CAST(acc.customerid AS STRING) = a.customer_id and date(transaction_date) < date(acc.snapshot_date)
GROUP BY 1
ORDER BY 2 

),

utility_transaction_data AS (
        SELECT 
        customer_id,
        CASE 
            WHEN SUM(CASE WHEN transaction_code IN ('BP1', 'BP2', 'BP3', 'BP4') THEN 1 ELSE 0 END) > 0 
                THEN MIN(transaction_date) 
            ELSE NULL 
        END AS first_billpay_date,
        CASE 
            WHEN SUM(CASE WHEN transaction_code like 'A0%' AND core_narration NOT LIKE '%Blocking%' THEN 1 ELSE 0 END) > 0 
                THEN MIN(transaction_date) 
            ELSE NULL 
        END AS virtual_transaction_date,
        CASE 
            WHEN SUM(CASE WHEN transaction_code IN ('21C', 'N01', 'IP2', 'XE2','P01') THEN 1 ELSE 0 END) > 0 
                THEN MIN(transaction_date) 
            ELSE NULL 
        END AS first_tsa_topup_date
    FROM 
        main_transaction_data
    GROUP BY 1
),

combined_data AS (
    SELECT 
        COALESCE(acc.cust_id,acc_data.customer_id,utility_transaction_data.customer_id) customer_id,
        productid,
        accountdescription,
        acc_data.opendate as opendate ,
        first_billpay_date,
        virtual_transaction_date,
        first_tsa_topup_date,
        snapshot_date,
        LEAST(
            DATE(snapshot_date),
            IFNULL(DATE(acc_data.opendate),'9999-12-31'), 
            IFNULL(first_billpay_date, '9999-12-31'), 
            IFNULL(virtual_transaction_date, '9999-12-31'), 
            IFNULL(first_tsa_topup_date, '9999-12-31')
        ) AS first_opened_date
        FROM (SELECT DISTINCT cust_id,registration_date,snapshot_date from cust_onboarding_acc_data) acc 
        LEFT JOIN (SELECT cust_id as customer_id,ofdateopened opendate ,productid,account_type as accountdescription from cust_onboarding_acc_data 
        WHERE  account_type NOT IN ('Tonik Account','Tendo Individual Stash') AND productid in ('fixdep','savings','SaveForFuture') AND ofdateopened >= '2023-01-01' AND DATE(reccreatedon) < DATE(snapshot_date)
       QUALIFY ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY reccreatedon asc) = 1) acc_data
       ON acc.cust_id = acc_data.customer_id
      LEFT JOIN utility_transaction_data
        ON 
      acc.cust_id = utility_transaction_data.customer_id
),
first_product_data as (
SELECT
    customer_id,
    --first_opened_date,
    CASE
        WHEN first_opened_date = DATE(opendate) THEN accountdescription
        WHEN first_opened_date = first_billpay_date THEN 'Bills Pay'
        WHEN first_opened_date = first_tsa_topup_date THEN 'TSA Top-Up'
        WHEN first_opened_date = virtual_transaction_date THEN 'Virtual Transaction'
        ELSE 'Unknown'
    END AS first_product,
    CASE
        WHEN first_opened_date = DATE(opendate) and productid in ('fixdep','savings','SaveForFuture') THEN 'Deposit Users'
        --WHEN first_opened_date = DATE(opendate) and productid NOT IN ('fixdep','savings','SaveForFuture') THEN 'Loan Users'
        WHEN first_opened_date = first_tsa_topup_date or first_opened_date = virtual_transaction_date or first_opened_date = first_billpay_date THEN 'Utility Users'
        ELSE 'Ghost Users'
    END AS first_product_user_segment
FROM
    combined_data
),
loan_metrics as (
SELECT 
input_customers.customerid,

count(DISTINCT(IF (lmt.digitalLoanAccountId IS NOT NULL AND coalesce(lmt.termsAndConditionsSubmitDateTime,if (lmt.new_loan_type ='Flex-up',lmt.startApplyDateTime,lmt.termsAndConditionsSubmitDateTime)) is null,lmt.digitalLoanAccountId,NULL))) tx_incomplete_loan_apps,
from input_customers
JOIN `risk_credit_mis.loan_master_table` lmt on cast(lmt.customerId as string) = input_customers.customerid and COALESCE(DATE(termsAndConditionsSubmitDateTime),DATE(startApplyDateTime))  < DATE(input_customers.snapshot_date)
group by 1
),
complete_deposit_metrics as (
with  deposit_acc_main AS 
(
   SELECT
  a.OFDATEOPENED as ofdateopened,
  IF(OFISCLOSED = 'Y',DATE_DIFF(date(ofdateclosed),date(OFDATEOPENED),day),NULL) as stash_duration,
  registration_date,
  duration,
  reccreatedon,
  a.cust_id as customer_id,
  a.account_type,
  a.accountid as ofstandardaccountid,
  --balancedateasof,
  b.clearedbalance,
  a.OFISCLOSED as closed,
  ff.status as td_status,
  a.ofdateclosed,
  autorollover,
  snapshot_date,
  FROM cust_onboarding_acc_data a
  JOIN risk_mart.customer_balance b on a.accountid = b.accountid and date(balanceDateAsOf) = DATE_SUB(DATE(snapshot_date),INTERVAL 1 DAY)
  --JOIN input_customers on input_customers.customerid = a.cust_id 
  LEFT JOIN `finastra_raw.fixturefeature` ff on ff.accountid = a.accountid
  WHERE 1=1 
  --and date(balancedateasof) = DATE_SUB(p.start_date,INTERVAL 1 DAY)
  and productid in ('savings','fixdep','SaveForFuture') and  DATE(reccreatedon) < DATE(snapshot_date)
  )
  
,deposit_days_diff_sub as (
    SELECT customer_id,
    'Between All Deposits' days_diff_type,
    DATE_DIFF(LEAD(ofdateopened) OVER (PARTITION BY customer_id ORDER BY ofdateopened ASC),ofdateopened,DAY) days_bt_trans
    FROM deposit_acc_main

    UNION ALL

    SELECT customer_id,
    'Between TDs' days_diff_type,
    DATE_DIFF(LEAD(ofdateopened) OVER (PARTITION BY customer_id ORDER BY ofdateopened ASC),ofdateopened,DAY) days_bt_trans
    FROM deposit_acc_main
    WHERE account_type not like '%Stash%'

)
,dep_days_bt_trans_med AS 
(
-- get the median days in between
SELECT DISTINCT
customer_id,

PERCENTILE_CONT(IF(days_diff_type='Between All Deposits',days_bt_trans,NULL), .50) OVER (PARTITION BY customer_id) med_days_bw_new_dep_acct_open,
PERCENTILE_CONT(IF(days_diff_type='Between TDs',days_bt_trans,NULL), .50) OVER (PARTITION BY customer_id) med_days_bw_td_acct_open,
FROM deposit_days_diff_sub
)

, deposit_account_counts AS 
(
#### Number of Stash and Time Deposit accounts that are still open until the observation date with balance >= 100
SELECT
customer_id,
CASE WHEN SUM(autorollover) >= 1.0 THEN 1 ELSE 0 END AS tx_td_auto_roll_over_enabled,
MAX((IF(account_type LIKE '%Time Deposit%' and (DATE(ofdateclosed) = '1970-01-01' OR DATE(ofdateclosed) < DATE(snapshot_date)),duration,0))) AS td_max_duration,
AVG((IF(account_type LIKE '%Time Deposit%'and (DATE(ofdateclosed) = '1970-01-01' OR DATE(ofdateclosed) > DATE(snapshot_date)) ,duration,0))) AS td_avg_duration,
MIN((IF(account_type LIKE '%Time Deposit%'and (DATE(ofdateclosed) = '1970-01-01' OR DATE(ofdateclosed) > DATE(snapshot_date)),duration,0))) AS td_min_duration,
MAX((IF(account_type LIKE '%Stash%' and (DATE(ofdateclosed) = '1970-01-01' OR DATE(ofdateclosed) > DATE(snapshot_date)),stash_duration,0))) AS stash_max_duration,
AVG((IF(account_type LIKE '%Stash%' and (DATE(ofdateclosed) = '1970-01-01' OR DATE(ofdateclosed) > DATE(snapshot_date)),stash_duration,0))) AS stash_avg_duration,
MIN((IF(account_type LIKE '%Stash%' and (DATE(ofdateclosed) = '1970-01-01' OR DATE(ofdateclosed) > DATE(snapshot_date)),stash_duration,0))) AS stash_min_duration,
SUM(DISTINCT(IF(account_type LIKE '%Time Deposit%' AND td_status = '1' and (DATE(ofdateclosed) = '1970-01-01' OR DATE(ofdateclosed) > DATE(snapshot_date)),clearedbalance,NULL))) td_balance,
SUM(DISTINCT(IF(account_type LIKE '%Stash%' and (DATE(ofdateclosed) = '1970-01-01' OR DATE(ofdateclosed) > DATE(snapshot_date)),clearedbalance,NULL))) stash_balance,
COUNT(DISTINCT(IF(account_type LIKE '%Time Deposit%' AND td_status = '4' and closed = 'Y' and (DATE(ofdateclosed) = '1970-01-01' OR DATE(ofdateclosed) > DATE(snapshot_date)),ofstandardaccountid,NULL))) td_accounts_completed_cnt,
COUNT(DISTINCT(IF(account_type LIKE '%Time Deposit%' AND td_status = '9' and closed = 'Y' and (DATE(ofdateclosed) = '1970-01-01' OR DATE(ofdateclosed) > DATE(snapshot_date)),ofstandardaccountid,NULL))) td_accounts_broken_cnt,
COUNT(DISTINCT(IF(account_type <> 'Tonik Account', ofstandardaccountid,NULL))) deposit_accs_cnt,
COUNT(DISTINCT(IF(account_type LIKE '%Stash%',ofstandardaccountid,NULL))) stash_accounts_opened_cnt,
COUNT(DISTINCT(IF(account_type LIKE '%Time Deposit%',ofstandardaccountid,NULL))) td_accounts_opened_cnt,
COUNT(DISTINCT(IF(account_type LIKE '%Stash%' and closed = 'Y' and (DATE(ofdateclosed) = '1970-01-01' OR  DATE(ofdateclosed) > DATE(snapshot_date)) ,ofstandardaccountid,NULL))) stash_accounts_closed_cnt,
FROM 
deposit_acc_main
where
DATE(reccreatedon) < DATE(snapshot_date) and account_type <> 'Tonik Account'
GROUP BY 1
)

SELECT DISTINCT 
a.customer_id,
td_balance,
stash_balance,
tx_td_auto_roll_over_enabled,
deposit_accs_cnt,
stash_accounts_opened_cnt,
stash_accounts_closed_cnt,
td_accounts_opened_cnt,
td_accounts_completed_cnt,
td_accounts_broken_cnt,
med_days_bw_new_dep_acct_open,
med_days_bw_td_acct_open,
td_max_duration,
td_min_duration,
td_avg_duration,
stash_max_duration,
stash_avg_duration,
stash_min_duration,
FROM deposit_account_counts a
--LEFT JOIN deposit_account_90th_day_counts d ON a.customer_id = d.customer_id
LEFT JOIN dep_days_bt_trans_med b ON b.customer_id = a.customer_id
ORDER BY 2 DESC
)
SELECT
COALESCE(first_product_data.first_product,'Unknown') as tx_first_product,
COALESCE(first_product_data.first_product_user_segment,'Ghost Users') tx_first_product_user_segment,
acc.customerid as customer_id,
onb_tsa_onboarding_datetime,
onb_credo_inquiry_date,
onb_credolabRefNumber,
null as ln_disb_flag,
null as ln_loan_type,
null as ln_prod_type,
null as ln_loan_appln_time,
null as first_disb_loan_disbursed_time,
null as first_applied_loan_appln_time,
null as first_applied_loan_type,
null as first_applied_loan_tenor,
null as first_applied_loan_amount,
null as first_applied_loan_decision,
null as first_applied_product_type,
null as last_applied_loan_appln_time,
null as last_applied_loan_decision,
null as last_applied_os_type,
null as last_applied_loan_type,
null as last_applied_loan_tenor,
null as last_applied_loan_amount,
null as last_applied_product_type,
null as last_applied_crif_id,
null as last_applied_cic_score,
null as last_applied_credo_ref_no,
null as last_applied_credo_score,
null as last_applied_demo_score,
null as last_applied_apps_score,
null as ln_fpd30_flag,
null as ln_mature_fpd30_flag,
null as ln_mature_fspd30_flag,
null as ln_fspd30_flag,
null as first_reject_loan_appln_time,
null as first_rejected_digitalloanaccountId,
null as first_reject_loan_type,
null as first_reject_loan_amount,
null as first_reject_loan_tenor,
null as last_reject_loan_appln_time,
null as last_rejected_digitalloanaccountId,
null as last_reject_loan_type,
null as last_reject_loan_amount,
null as last_reject_loan_tenor,
dob_snapshot_date,
days_since_credo_call_onb,
days_since_credo_call_snapshot_date,
onb_age,
onb_doc_type,
onb_osversion,
c_credo_score,
ln_self_dec_income,
ln_marital_status,
ln_education_level,
onb_email,
ln_industry_new,
ln_employment_type_new,
snapshot_date,
ln_nature_of_work_new,
ln_source_funds_new,
onb_first_name,
onb_middle_name,
onb_last_name,
ln_brand,
ln_cnt_dependents,
ln_doc_type,
ln_osversion,
ln_user_type,
cust_status_flag,
cust_status_close_date,
0 as ln_ever_applied_flag,
onb_city,
onb_province,
EXTRACT(YEAR FROM onb_tsa_onboarding_datetime) onb_year,
EXTRACT(MONTH FROM onb_tsa_onboarding_datetime) onb_month_of_year,
EXTRACT(WEEK FROM onb_tsa_onboarding_datetime) - EXTRACT(WEEK FROM DATE_TRUNC(onb_tsa_onboarding_datetime, MONTH)) + 1 onb_week_of_month,
EXTRACT(DAY FROM onb_tsa_onboarding_datetime) onb_day_of_month,
EXTRACT(TIME FROM onb_tsa_onboarding_datetime) onb_time_of_day,
a.tx_cnt_cash_in_total,
a.tx_cnt_cash_in_ob2t,
a.tx_cnt_cash_in_ot2t,
a.tx_amt_cash_in_total,
a.tx_amt_cash_in_ob2t,
a.tx_amt_cash_in_ot2t,
a.tx_cnt_cash_out_total,
a.tx_cnt_cash_out_billpay,
a.tx_cnt_cash_out_cards,
a.tx_cnt_cash_out_t2ot,
a.tx_cnt_cash_out_t2ob,
a.tx_amt_cash_out_total,
a.tx_amt_cash_out_billpay,
a.tx_amt_cash_out_cards,
a.tx_amt_cash_out_t2ot,
a.tx_amt_cash_out_t2ob,
overall_avg_days_bt_trans tx_avg_days_bt_trans,
net_cash_in_avg_days_bt_trans tx_avg_days_bt_cash_in_trans,
net_cash_out_avg_days_bt_trans tx_avg_days_bt_cash_out_trans,
overall_med_days_bt_trans tx_med_days_bt_trans,
cash_in_med_days_bt_trans tx_med_days_bt_cash_in_trans,
cash_out_med_days_bt_trans tx_med_days_bt_cash_out_trans,
deposit_accs_cnt tx_deposit_accnt_cnt,
stash_accounts_opened_cnt tx_stash_accnt_opened_cnt,
stash_accounts_closed_cnt tx_stash_accnt_closed_cnt,
stash_balance tx_stash_balance,
td_accounts_opened_cnt tx_td_accnt_opened_cnt,
td_accounts_completed_cnt tx_td_accnt_completed_cnt,
td_accounts_broken_cnt tx_td_accnt_broken_cnt,
tx_td_auto_roll_over_enabled,
td_balance tx_td_balance,
td_max_duration,
td_min_duration,
td_avg_duration,
stash_max_duration,
stash_avg_duration,
stash_min_duration,
med_days_bw_new_dep_acct_open tx_med_days_bw_new_dep_acct_open,
med_days_bw_td_acct_open tx_med_days_bw_td_acct_open,
null as tx_cnt_completed_loans,
null as tx_cnt_rejected_loans,
null as tx_cnt_active_loans,
null as tx_cnt_applied_loan,
null as tx_cnt_approved_loans,
null as tx_cnt_disbursed_loans,
tx_incomplete_loan_apps as tx_cnt_incomplete_loan_apps,
null as tx_cnt_installments_paid_tot,
null as tx_amt_installments_paid_tot,
null as tx_cnt_installments_paid_last_disb,
null as tx_amt_installments_paid_last_disb,
null as tx_cnt_fpd10_ever,
null as tx_cnt_fpd30_ever,
null as tx_cnt_fspd30_ever,
null as tx_cnt_dpd_gt_5_ever,
null as tx_max_ever_dpd,
null as tx_max_dpd_30d,
null as tx_max_dpd_60d,
null as tx_max_dpd_90d,
null as tx_max_dpd_120d,
null as tx_max_dpd_150d,
null as tx_max_dpd_180d,
null as tx_max_current_dpd,
null as ln_any_prev_disb_loan_sil_mobile_flag,
CASE WHEN tdbk_referral_code_mtb.cust_id is not null THEN 1
ELSE 0 END AS referral_flag

FROM input_customers acc
LEFT JOIN first_product_data ON first_product_data.customer_id = acc.customerid 
LEFT JOIN transactions_final a ON acc.customerid = a.customer_id
LEFT JOIN days_bt_trans_avg b ON acc.customerid = b.customer_id 
LEFT JOIN days_bt_trans_med c ON acc.customerid = c.customer_id
LEFT JOIN complete_deposit_metrics d on d.customer_id = acc.customerid
LEFT JOIN loan_metrics ON cast(loan_metrics.customerid as string) = acc.customerid
LEFT JOIN (SELECT cust_id,member_type,referral_type FROM dl_customers_db_raw.tdbk_referral_code_mtb
WHERE tdbk_referral_code_mtb.member_type='REFEREE') tdbk_referral_code_mtb on  tdbk_referral_code_mtb.cust_id = acc.customerid

"""

job = client.query(sq)
job.result()  # Wait for the job to complete.
time.sleep(5) # Delays for 30 seconds
print(f'Table {schema1}.{al} created successfully')

# worktable_data_analysis.trench2_never_applied_snapshot_call_count

In [None]:
sq = """  
create or replace table `worktable_data_analysis.trench2_never_applied_snapshot_call_count` as
with input_customers as (
SELECT 
customerId,
onb_tsa_onboarding_datetime,
onb_age,
onb_email,
ln_industry_new,
ln_employment_type_new,
DATE('2025-09-01') as snapshot_date,
ln_nature_of_work_new,
ln_doc_type,
ln_osversion,
onb_kyc_status,
onb_latitude,
onb_longitude,
ln_user_type,
ln_loan_type,
ln_prod_type,
ln_loan_applied_flag,
FROM
`risk_mart.tsa_onboarded_but_never_applied_loan_20230101_20250831`
--where DATE_ADD(onb_tsa_onboarding_datetime,INTERVAL 90 DAY) <= '2025-04-15'
),
cust_emails as (
  with temp_output as (
select too.email as to_email, emailTranscript.from.email as from_email,
 
CASE WHEN SPLIT(emailTranscript.from.email,'@')[SAFE_OFFSET(1)] like '%tonik%' THEN 'outbound'
WHEN SPLIT(too.email,'@')[SAFE_OFFSET(1)] like '%tonik%' THEN 'inbound'
ELSE 'outbound'
END AS category,
creationTime
from `genesys_raw.emails`,
unnest(emailTranscript) emailTranscript,unnest(emailTranscript.to) too
)
SELECT
*,
CASE WHEN category = 'outbound' THEN to_email
WHEN category = 'inbound'THEN from_email
WHEN from_email not like '%tonik%' THEN from_email
END AS customer_email_address,
FROM temp_output
),
 
cust_mobile_genesys_data as (
SELECT
input_customers.customerid,
MAX(
    CASE
      WHEN DATE(call_history.callDatetime) between DATE_SUB(input_customers.snapshot_date, INTERVAL 90 DAY) and DATE(input_customers.snapshot_date) THEN 1
      ELSE 0
  END
    ) AS flag_contactable_last90D,
  count(
    CASE
      WHEN DATE(call_history.callDatetime) between DATE_SUB(input_customers.snapshot_date, INTERVAL 90 DAY) and DATE(input_customers.snapshot_date) THEN 1
      ELSE null
  END
    ) AS count_contactable_last90D,
COUNT(CASE WHEN calldirection = 'outbound' THEN 1 ELSE null END) AS outbound_call_count,
COUNT(CASE WHEN calldirection = 'inbound' THEN 1 ELSE null END) AS inbound_call_count,

FROM input_customers
LEFT JOIN (SELECT DISTINCT cust_id,mobile_no, valid_from_dt FROM `datalake_worktables.customer_mobile_mail_dtls`
QUALIFY ROW_NUMBER() OVER (PARTITION BY cust_id,mobile_no order by valid_from_dt asc) = 1
) mobile_details on input_customers.customerid = mobile_details.cust_id and DATE(mobile_details.valid_from_dt) < DATE(snapshot_date)
LEFT JOIN `risk_credit_mis.call_attempt_history_gensys` call_history ON RIGHT(mobile_details.mobile_no,10) = right(call_history.mobileNumber,10) and DATE(call_history.callDatetime) < DATE(input_customers.snapshot_date) and connected = 1 and mediaType = 'voice'
GROUP BY 1
),
cust_email_genesys_data as (
SELECT
input_customers.customerid,
COUNT(CASE WHEN category = 'outbound' THEN 1 ELSE null END) AS outbound_email_count,
COUNT(CASE WHEN category = 'inbound'  THEN 1 ELSE null END) AS inbound_email_count
FROM input_customers
LEFT JOIN (SELECT DISTINCT cust_id,email, valid_from_dt FROM `datalake_worktables.customer_mobile_mail_dtls`
QUALIFY ROW_NUMBER() OVER (PARTITION BY cust_id,email order by valid_from_dt asc) = 1
) email_details on input_customers.customerid = email_details.cust_id and DATE(email_details.valid_from_dt) < DATE(snapshot_date)
left JOIN cust_emails ON email_details.email = cust_emails.customer_email_address and DATE(cust_emails.creationTime) < DATE(input_customers.snapshot_date)
GROUP BY 1
)
SELECT
  input_customers.customerid,
  input_customers.snapshot_date,
 flag_contactable_last90D,
 count_contactable_last90D,
inbound_call_count,
outbound_call_count,
outbound_email_count,
inbound_email_count,
FROM input_customers
LEFT JOIN cust_mobile_genesys_data on cust_mobile_genesys_data.customerid = CAST(input_customers.customerid AS STRING) --and cust_mobile_genesys_data.snapshot_date = input_customers.snapshot_date
LEFT JOIN cust_email_genesys_data ON  cust_email_genesys_data.customerid = CAST(input_customers.customerid AS STRING) --and cust_email_genesys_data.snapshot_date = input_customers.snapshot_date
 
 ;
"""

job = client.query(sq)
job.result()  # Wait for the job to complete.
time.sleep(5) # Delays for 30 seconds
print(f'Table {schema1}.{al} created successfully')

# worktable_data_analysis.trench2_never_applied_snapshot_events

In [None]:
sq = """  
CREATE OR REPLACE TABLE worktable_data_analysis.trench2_never_applied_snapshot_events as
with input_customers as ( 
SELECT 
customerId,
onb_tsa_onboarding_datetime,
onb_age,
onb_email,
ln_industry_new,
ln_employment_type_new,
DATE('2025-08-10') as snapshot_date,
ln_nature_of_work_new,
ln_doc_type,
ln_osversion,
onb_kyc_status,
onb_latitude,
onb_longitude,
ln_user_type,
ln_loan_type,
ln_prod_type,
ln_loan_applied_flag,
FROM
`risk_mart.tsa_onboarded_but_never_applied_loan_20230101_20250810`
),
af_link AS
(
  ## To get the AF ID and Customer ID Link (using the first install of a customer)
  SELECT DISTINCT appsflyer_id, customer_user_id, install_time,snapshot_date
  FROM `appsflyer_raw.in_app_events_report` in_apps_events
  JOIN `dl_customers_db_raw.tdbk_customer_mtb` c ON c.cust_id = in_apps_events.customer_user_id
  JOIN input_customers ON In_apps_events.customer_user_id = input_customers.customerid and DATE(in_apps_events._partitiondate) < DATE(input_customers.snapshot_date)
  WHERE 1=1
  AND customer_user_id IS NOT NULL
  QUALIFY ROW_NUMBER() OVER (PARTITION BY customer_user_id ORDER BY install_time ASC) = 1
 
  UNION ALL
 
  SELECT DISTINCT appsflyer_id, customer_user_id, install_time,snapshot_date
  FROM `appsflyer_raw.organic_in_app_events_report` organic_in_apps_events
  JOIN `dl_customers_db_raw.tdbk_customer_mtb` c ON c.cust_id = organic_in_apps_events.customer_user_id
 JOIN input_customers ON organic_in_apps_events.customer_user_id = input_customers.customerid and DATE(organic_in_apps_events._partitiondate) < DATE(input_customers.snapshot_date)
  WHERE 1=1
  AND customer_user_id IS NOT NULL
  QUALIFY ROW_NUMBER() OVER (PARTITION BY customer_user_id ORDER BY install_time ASC) = 1
)
, events AS
(
  SELECT DISTINCT
  cust_id as customer_id,
  --snapshot_date,
COUNT(DISTINCT IF(event_name = 'App_Launch',event_uuid,NULL)) meng_no_of_logins,
COUNT(DISTINCT IF(
    event_name IN ('Loans_QL_Calculator'),moengagerefid,NULL)) meng_ql_calculator_count,
COUNT(DISTINCT IF(
    event_name IN ('Loans_QL_Calculator'),event_uuid,NULL)) meng_ql_calculator_tot_visit_cnt
  FROM `dl_customers_db_raw.tdbk_customer_mtb` c
  LEFT JOIN `moengage_raw.events_hourly` a ON c.cust_id = cast(a.customer_id as string)
 JOIN input_customers ON cast(a.customer_id as string) = input_customers.customerid 
  WHERE 1=1 and DATE(event_time) < DATE(input_customers.snapshot_date)
  --AND event_name IN ('App_Launch','Loans_QL_Calculator','Loans_QL_Launch','Loans_SIL_Launch')
  group by 1
  ),

campaign_data as
(  SELECT DISTINCT a.customer_user_id, a.appsflyer_id, a.media_source, a.partner, a.campaign, a.Retargeting_Conversion_Type, 
  CASE
  WHEN a.media_source = 'website_channel=website_ss_ui=true_ss_gtm_ui=true_ss_qr=c' THEN 'Website'
  WHEN a.media_source IS NULL AND a.partner IS NULL AND a.campaign IS NULL THEN 'Organic'
  ELSE b.source END source,
CASE
  WHEN a.media_source = 'website_channel=website_ss_ui=true_ss_gtm_ui=true_ss_qr=c' THEN 'Website'
  WHEN a.media_source IS NULL AND a.partner IS NULL AND a.campaign IS NULL THEN 'Organic'
  ELSE b.source_group END source_group,
  FROM 
  (
    SELECT DISTINCT
    install_time,
    customer_user_id,
    AppsFlyer_ID,
    media_source,
    partner,
    campaign,
    'Install' Retargeting_Conversion_Type,
    FROM `appsflyer_raw.in_app_events_report`
    WHERE 1=1

    UNION ALL

    SELECT DISTINCT
    install_time,
    customer_user_id,
    AppsFlyer_ID,
    media_source,
    partner,
    campaign,
    'Install' Retargeting_Conversion_Type
    FROM `appsflyer_raw.organic_in_app_events_report`
    WHERE 1=1

    UNION ALL

    SELECT DISTINCT
    install_time,
    customer_user_id,
    AppsFlyer_ID,
    media_source,
    partner,
    campaign,
    Retargeting_Conversion_Type
    FROM `appsflyer_raw.in_app_events_retarget`
    WHERE 1=1
  ) a
  LEFT JOIN `prj-prod-dataplatform.worktable_datachampions.installs_attribution_mapping` b
  ON 
    COALESCE(a.media_source,a.partner,a.campaign) = COALESCE(b.media_source,b.partner,b.campaign)

  WHERE 1=1
 
  QUALIFY ROW_NUMBER() OVER (PARTITION BY a.customer_user_id ORDER BY install_time DESC) = 1
  ORDER BY customer_user_id
 ),
final_output as (
SELECT DISTINCT
input_customers.customerId,
--input_customers.snapshot_date,
TIMESTAMP_DIFF(MIN(cust_mtb.created_dt),MIN(install_time),MINUTE) appsflyer_install_to_registration_minutes,
FROM input_customers
JOIN `dl_customers_db_raw.tdbk_customer_mtb` cust_mtb ON cust_mtb.cust_id = input_customers.customerId
--LEFT JOIN events c ON c.customer_id = input_customers.customerId and input_customers.snapshot_date = c.snapshot_date
LEFT JOIN af_link b ON b.customer_user_id = input_customers.customerId and input_customers.snapshot_date = b.snapshot_date
--LEFT JOIN campaign_data ON campaign_data.customer_user_id = input_customers.customerId
group by 1
)
SELECT 
final_output.*,
meng_no_of_logins,
meng_ql_calculator_count,
COALESCE(meng_ql_calculator_tot_visit_cnt,0) meng_ql_calculator_tot_visit_cnt,
CASE WHEN campaign_data.source_group = 'Organic' THEN 'Organic'
ELSE 'InOrganic' END AS channel_source_group,
source as marketing_source_name,
from final_output
LEFT JOIN campaign_data ON campaign_data.customer_user_id = final_output.customerId
LEFT JOIN events c ON c.customer_id = final_output.customerId-- and final_output.ln_loan_appln_time = c.ln_loan_appln_time
;
"""




job = client.query(sq)
job.result()  # Wait for the job to complete.
time.sleep(5) # Delays for 30 seconds
print(f'Table {schema1}.{al} created successfully')

# worktable_data_analysis.gamma_model_never_applied_snapshot_20250831

In [None]:
sq = """ 
create or replace table `worktable_data_analysis.gamma_model_never_applied_snapshot_20250831` as
SELECT 
a.customer_id,
null as digitalloanaccountid,
dob_snapshot_date,
days_since_credo_call_onb,
days_since_credo_call_snapshot_date,
onb_credolabRefNumber,
onb_credo_inquiry_date,
onb_email,
onb_age,
onb_city,
onb_province,
ln_industry_new,
ln_employment_type_new,
a.snapshot_date,
ln_nature_of_work_new,
ln_source_funds_new,
onb_first_name,
onb_middle_name,
onb_last_name,
ln_brand,
ln_cnt_dependents,
onb_doc_type,
onb_osversion onb_osversion,
c_credo_score,
cust_status_flag,
cust_status_close_date,
ln_self_dec_income,
ln_marital_status,
ln_education_level,
0 as ln_ever_applied_flag,
tx_first_product,
tx_first_product_user_segment,
ln_user_type,
ln_loan_type,
ln_prod_type ln_product_type,
CASE WHEN a.last_rejected_digitalloanaccountId is not null then 'Rejected_Before'
else 'Never_Applied_Before' end as existing_user_type,
onb_tsa_onboarding_datetime onboarding_date,
a.ln_loan_appln_time,
first_disb_loan_disbursed_time ln_loan_disb_time,
ln_mature_fspd30_flag,
--CASE WHEN LOANMATURITYDATE < secondDueDate THEN 0 ELSE ln_mature_fspd30_flag
--END AS  ln_mature_fspd30_flag,
ln_fspd30_flag,
ln_mature_fpd30_flag,
--CASE WHEN LOANMATURITYDATE < firstDueDate THEN 0 ELSE ln_mature_fpd30_flag
--END AS ln_mature_fpd30_flag,
ln_fpd30_flag,
first_applied_loan_appln_time,
first_applied_loan_decision,
first_applied_loan_type,
first_applied_product_type,
first_applied_loan_amount,
first_applied_loan_tenor,
last_applied_loan_appln_time,
last_applied_loan_decision,
last_applied_loan_type,
last_applied_product_type,
last_applied_loan_amount,
last_applied_loan_tenor,
last_applied_os_type,
last_applied_crif_id last_applied_digitalloanaccountId,
last_applied_crif_id,
last_applied_cic_score,
last_applied_credo_ref_no,
last_applied_credo_score,
last_applied_demo_score,
last_applied_apps_score,
onb_year,
onb_month_of_year,
onb_week_of_month,
onb_day_of_month,
onb_time_of_day,
tx_cnt_cash_in_total,
tx_cnt_cash_in_ob2t,
tx_cnt_cash_in_ot2t,
tx_amt_cash_in_total,
tx_amt_cash_in_ob2t,
tx_amt_cash_in_ot2t,
tx_cnt_cash_out_total,
tx_cnt_cash_out_billpay,
tx_cnt_cash_out_cards,
tx_cnt_cash_out_t2ot,
tx_cnt_cash_out_t2ob,
tx_amt_cash_out_total,
tx_amt_cash_out_billpay,
tx_amt_cash_out_cards,
tx_amt_cash_out_t2ot,
tx_amt_cash_out_t2ob,
tx_deposit_accnt_cnt,
tx_stash_accnt_opened_cnt,
tx_stash_accnt_closed_cnt,
tx_stash_balance,
tx_td_accnt_opened_cnt,
tx_td_accnt_completed_cnt,
tx_td_accnt_broken_cnt,
tx_td_auto_roll_over_enabled,
tx_td_balance,
td_max_duration tx_td_max_duration,
td_min_duration tx_td_min_duration,
td_avg_duration tx_td_avg_duration,
stash_max_duration tx_stash_max_duration,
stash_avg_duration tx_stash_avg_duration,
stash_min_duration tx_stash_min_duration,
tx_med_days_bw_new_dep_acct_open,
tx_med_days_bw_td_acct_open,
tx_avg_days_bt_trans,
tx_avg_days_bt_cash_in_trans,
tx_avg_days_bt_cash_out_trans,
tx_med_days_bt_trans,
tx_med_days_bt_cash_in_trans,
tx_med_days_bt_cash_out_trans,
tx_cnt_applied_loan,
COALESCE(tx_cnt_rejected_loans,0) tx_cnt_rejected_loans,
tx_cnt_approved_loans,
tx_cnt_disbursed_loans,
tx_cnt_completed_loans,
tx_cnt_active_loans,
tx_cnt_incomplete_loan_apps,
referral_flag onb_referral_flag,
flag_contactable_last90D cs_contactable_last_90d_flag,
ln_any_prev_disb_loan_sil_mobile_flag,
CASE WHEN appsflyer_install_to_registration_minutes < 0 THEN NULL
ELSE appsflyer_install_to_registration_minutes END appsflyer_install_to_registration_minutes,
meng_no_of_logins,
meng_ql_calculator_count,
meng_ql_calculator_tot_visit_cnt,
channel_source_group,
marketing_source_name,
outbound_call_count cs_cnt_outbound_calls,
inbound_call_count cs_cnt_inbound_calls,
outbound_email_count cs_cnt_outbound_emails,
inbound_email_count cs_cnt_inbound_emails,
FROM 
 `worktable_data_analysis.trench2_never_applied_snapshot_transactions` a
LEFT JOIN `worktable_data_analysis.trench2_never_applied_snapshot_events` b on a.customer_id = b.customerid 
LEFT JOIN `worktable_data_analysis.trench2_never_applied_snapshot_call_count` contactability ON cast(contactability.customerid as string)= a.customer_id 
LEFT JOIN (select customer_id from `worktable_data_analysis.gamma_model_ever_applied_snapshot_20250831`) ever_applied on ever_applied.customer_id = a.customer_id
where ever_applied.customer_id is null
"""

job = client.query(sq)
job.result()  # Wait for the job to complete.
time.sleep(5) # Delays for 30 seconds
print(f'Table {schema1}.{al} created successfully')
