# Define Library

In [1]:
# %% [markdown]
# # Jupyter Notebook Loading Header
#
# This is a custom loading header for Jupyter Notebooks in Visual Studio Code.
# It includes common imports and settings to get you started quickly.
# %% [markdown]
## Import Libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from google.cloud import bigquery
from google.cloud import storage
import os
import tempfile
import time
from datetime import datetime
import uuid
import joblib
import uuid

import gcsfs
import duckdb as dd
import pickle
import joblib
from typing import Union
import io

path = r'C:\Users\Dwaipayan\AppData\Roaming\gcloud\legacy_credentials\dchakroborti@tonikbank.com\adc.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path
client = bigquery.Client(project='prj-prod-dataplatform')
os.environ["GOOGLE_CLOUD_PROJECT"] = "prj-prod-dataplatform"
# %% [markdown]
## Configure Settings
# Set options or configurations as needed
pd.set_option('display.max_columns', None)
pd.set_option("Display.max_rows", 100)


# Constant

In [2]:
CURRENT_DATE = datetime.now().strftime("%Y%m%d")


# Config

In [3]:
unique_id = str(uuid.uuid4()).replace('-', '')[-12:]
print(f"The unique Id is: {unique_id}")
BUCKETNAME = 'prod-asia-southeast1-tonik-aiml-workspace'
CLOUDPATH = 'DC/Tendo_Data'
LOCALPATH = r'D:\OneDrive - Tonik Financial Pte Ltd\MyStuff\Data Engineering\Model_Data_Set_preparation\Tendo_Data_Preparation\Data'
VERSION = 'V1'

The unique Id is: 5e110d4c771f


# <div align="left" style="color:rgb(51, 250, 250);"> Functions </div>

## <div align="left" style="color:rgb(51, 250, 250);"> Save the data to google clound storage </div>

In [4]:
def save_df_to_gcs(df, bucket_name, destination_blob_name, file_format='csv'):
    """Saves a pandas DataFrame to Google Cloud Storage.

    Args:
        df: The pandas DataFrame to save.
        bucket_name: The name of the GCS bucket.
        destination_blob_name: The name of the blob to be created.
        file_format: The file format to save the DataFrame in ('csv' or 'parquet').
    """

    # Create a temporary file
    if file_format == 'csv':
        temp_file = 'temp.csv'
        df.to_csv(temp_file, index=False)
    elif file_format == 'parquet':
        temp_file = 'temp.parquet'
        df.to_parquet(temp_file, index=False)
    else:
        raise ValueError("Invalid file format. Please choose 'csv' or 'parquet'.")

    # Upload the file to GCS
    storage_client = storage.Client(project="prj-prod-dataplatform")

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(temp_file)

    # Remove the temporary file
    import os
    os.remove(temp_file)
    


## <div align="left" style="color:rgb(51, 250, 250);"> Read the Data from Google Cloud Storage </div>

In [5]:
def read_df_from_gcs(bucket_name, source_blob_name, file_format='csv'):
    """Reads a DataFrame from Google Cloud Storage.

    Args:
        bucket_name: The name of the GCS bucket.
        source_blob_name: The name of the blob to read.
        file_format: The file format to read ('csv' or 'parquet').

    Returns:
        pandas.DataFrame: The data loaded from the GCS file.
    """
    # Create a temporary file name
    temp_file = f'temp.{file_format}'
    
    try:
        # Initialize GCS client
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)

        # Download the file to a temporary location
        blob.download_to_filename(temp_file)

        # Read the file into a DataFrame
        if file_format == 'csv':
            df = pd.read_csv(temp_file, low_memory=False)
        elif file_format == 'parquet':
            df = pd.read_parquet(temp_file)
        else:
            raise ValueError("Invalid file format. Please choose 'csv' or 'parquet'.")

        return df

    finally:
        # Clean up the temporary file
        if os.path.exists(temp_file):
            os.remove(temp_file)

## <div align = "left" style="color:rgb(51, 250, 250);"> Data Quality Report </div>

In [6]:
def data_quality_report(df, target_col='ln_fspd30_flag'):
    # Initialize an empty list to store each row of data
    report_data = []
    # Iterate over each column in the DataFrame to compute metrics
    for col in df.columns:
        # Determine the data type of the column
        data_type = df[col].dtype
       
        # Calculate the number of missing values in the column
        missing_values = df[col].isnull().sum()
       
        # Calculate the percentage of missing values relative to the total number of rows
        missing_percentage = (missing_values / len(df)) * 100
       
        # Calculate the number of unique values in the column
        unique_values = df[col].nunique()
       
        # Calculate the percentage of non-missing values
        non_missing_percentage = ((len(df) - missing_values) / len(df)) * 100
       
        # Check if the column is numeric to compute additional metrics
        if pd.api.types.is_numeric_dtype(df[col]):
            # Compute minimum, maximum, mean, median, mode, mode percentage, standard deviation, and quantiles
            min_value = df[col].min()
            max_value = df[col].max()
            mean_value = df[col].mean()
            median_value = df[col].median()
            mode_value = df[col].mode().iloc[0] if not df[col].mode().empty else None
            mode_percentage = (df[col] == mode_value).sum() / len(df) * 100 if mode_value is not None else None
            std_dev = df[col].std()
            quantile_25 = df[col].quantile(0.25)
            quantile_50 = df[col].quantile(0.50)  # Same as median
            quantile_75 = df[col].quantile(0.75)
            
            # Calculate the Interquartile Range (IQR)
            iqr = quantile_75 - quantile_25
            
            # Calculate Skewness and Kurtosis
            skewness = df[col].skew()
            kurtosis = df[col].kurt()
            
            # Calculate Coefficient of Variation (CV) - standardized measure of dispersion
            cv = (std_dev / mean_value) * 100 if mean_value != 0 else None
            
            # Calculate correlation with target variable if target exists in dataframe
            if target_col in df.columns and col != target_col and pd.api.types.is_numeric_dtype(df[target_col]):
                # Calculate correlation only using rows where both columns have non-null values
                correlation = df[[col, target_col]].dropna().corr().iloc[0, 1]
            else:
                correlation = None
        else:
            # Assign None for non-numeric columns where appropriate
            min_value = None
            max_value = None
            mean_value = None
            median_value = None
            mode_value = df[col].mode().iloc[0] if not df[col].mode().empty else None
            mode_percentage = (df[col] == mode_value).sum() / len(df) * 100 if mode_value is not None else None
            std_dev = None
            quantile_25 = None
            quantile_50 = None
            quantile_75 = None
            iqr = None
            skewness = None
            kurtosis = None
            cv = None
            correlation = None
       
        # Append the computed metrics for the current column to the list
        report_data.append({
            'Column': col,
            'Data Type': data_type,
            'Missing Values': missing_values,
            'Missing Percentage': missing_percentage,
            'Unique Values': unique_values,
            'Min': min_value,
            'Max': max_value,
            'Mean': mean_value,
            'Median': median_value,
            'Mode': mode_value,
            'Mode Percentage': mode_percentage,
            'Std Dev': std_dev,
            'Non-missing Percentage': non_missing_percentage,
            '25% Quantile': quantile_25,
            '50% Quantile': quantile_50,
            '75% Quantile': quantile_75,
            'IQR': iqr,
            'Skewness': skewness,
            'Kurtosis': kurtosis,
            'CV (%)': cv,
            f'Correlation with {target_col}': correlation
        })
    # Create the DataFrame from the list of dictionaries
    report = pd.DataFrame(report_data)
   
    # Return the complete data quality report DataFrame
    return report

# <div align = "left" style="color:rgb(51,250,250);"> Upload pickle file to Google Cloud Storage Bucke </div>

In [7]:
def upload_to_gcs(bucket_name, source_file_path, destination_blob_name):
    """Uploads a file to Google Cloud Storage"""
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    
    blob.upload_from_filename(source_file_path)
    print(f"File {source_file_path} uploaded to {bucket_name}/{destination_blob_name}")

In [8]:
import pickle
import io
from google.cloud import storage
def save_pickle_to_gcs(data, bucket_name, destination_blob_name):
    """
    Save any Python object as a pickle file to Google Cloud Storage
    
    Args:
        data: The Python object to pickle (DataFrame, dict, list, etc.)
        bucket_name: Name of the GCS bucket
        destination_blob_name: Path/filename in the bucket
    """
    # Initialize the GCS client
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    
    # Serialize the data to pickle format in memory
    pickle_buffer = io.BytesIO()
    pickle.dump(data, pickle_buffer)
    pickle_buffer.seek(0)
    
    # Upload the pickle data to GCS
    blob.upload_from_file(pickle_buffer, content_type='application/octet-stream')
    print(f"Pickle file uploaded to gs://{bucket_name}/{destination_blob_name}")

# save_dataframe_multi_format

In [9]:
def save_dataframe_multi_format(
    dataframe: pd.DataFrame, 
    cloud_path: str, 
    filename: str, 
    client: bigquery.Client = None,
    bucket_name: str = None
) -> dict:
    """
    Save a pandas DataFrame to Google Cloud Storage in multiple formats (CSV, Pickle, Parquet, Joblib).
    
    Args:
        dataframe (pd.DataFrame): The DataFrame to save
        cloud_path (str): The cloud path (e.g., 'DC/Model_Monitoring/cash_beta_trench1_data')
        filename (str): The base filename without extension
        client (bigquery.Client, optional): BigQuery client (for project reference)
        bucket_name (str, optional): GCS bucket name. If None, will try to extract from client
        
    Returns:
        dict: Dictionary with status of each file saved
        
    Example:
        client = bigquery.Client(project='prj-prod-dataplatform')
        CLOUDPATH = 'DC/Model_Monitoring/cash_beta_trench1_data'
        
        results = save_dataframe_multi_format(
            dataframe=d1,
            cloud_path=CLOUDPATH,
            filename='my_data',
            client=client,
            bucket_name='your-bucket-name'  # Replace with your actual bucket name
        )
    """
    
    # Initialize Google Cloud Storage client
    storage_client = storage.Client(project=client.project if client else None)
    
    # You'll need to specify your bucket name here
    # Common bucket names in GCP data platforms might be like:
    # - 'prj-prod-dataplatform-storage'
    # - 'dataplatform-storage'
    # - or similar pattern
    if bucket_name is None:
        # You need to replace this with your actual bucket name
        raise ValueError("Please provide the bucket_name parameter")
    
    bucket = storage_client.bucket(bucket_name)
    
    # Results dictionary to track saves
    results = {}
    
    # Ensure cloud_path doesn't start with '/'
    cloud_path = cloud_path.lstrip('/')
    
    try:
        # 1. Save as CSV
        csv_buffer = io.StringIO()
        dataframe.to_csv(csv_buffer, index=False)
        csv_blob = bucket.blob(f"{cloud_path}/{filename}.csv")
        csv_blob.upload_from_string(csv_buffer.getvalue(), content_type='text/csv')
        results['csv'] = f"gs://{bucket_name}/{cloud_path}/{filename}.csv"
        
        # 2. Save as Pickle
        pickle_buffer = io.BytesIO()
        pickle.dump(dataframe, pickle_buffer)
        pickle_blob = bucket.blob(f"{cloud_path}/{filename}.pkl")
        pickle_blob.upload_from_string(pickle_buffer.getvalue(), content_type='application/octet-stream')
        results['pickle'] = f"gs://{bucket_name}/{cloud_path}/{filename}.pkl"
        
        # 3. Save as Parquet
        parquet_buffer = io.BytesIO()
        dataframe.to_parquet(parquet_buffer, index=False)
        parquet_blob = bucket.blob(f"{cloud_path}/{filename}.parquet")
        parquet_blob.upload_from_string(parquet_buffer.getvalue(), content_type='application/octet-stream')
        results['parquet'] = f"gs://{bucket_name}/{cloud_path}/{filename}.parquet"
        
        # 4. Save as Joblib
        joblib_buffer = io.BytesIO()
        joblib.dump(dataframe, joblib_buffer)
        joblib_blob = bucket.blob(f"{cloud_path}/{filename}.joblib")
        joblib_blob.upload_from_string(joblib_buffer.getvalue(), content_type='application/octet-stream')
        results['joblib'] = f"gs://{bucket_name}/{cloud_path}/{filename}.joblib"
        
        print("All files saved successfully!")
        for format_type, path in results.items():
            print(f"{format_type.upper()}: {path}")
            
    except Exception as e:
        print(f"Error occurred: {str(e)}")
        results['error'] = str(e)
    
    return results

# Table

In [10]:
schema1 = 'worktable_data_analysis'
tendo_app_package = f'tendo_user_app_package_{CURRENT_DATE}'
print(f"The table name is {tendo_app_package}")

The table name is tendo_user_app_package_20250915


# Query

In [12]:
sq = f"""
create or replace table {schema1}.{tendo_app_package} as
with rn as 
(
SELECT a2.ee_customer_id, a1.id, a1.user_id, a1.reference_number, 
  COALESCE(
    -- Try to extract UUID pattern anywhere in the string
    REGEXP_EXTRACT(a1.reference_number, r'([a-f0-9]{{8}}-[a-f0-9]{{4}}-[a-f0-9]{{4}}-[a-f0-9]{{4}}-[a-f0-9]{{12}})'),
    -- Try to extract after colon
    REGEXP_EXTRACT(a1.reference_number, r':([^:#W]+)'),
    -- Try to extract after hash and digits
    REGEXP_EXTRACT(a1.reference_number, r'#[0-9]+:([^:#W]+)'),
    -- Try to extract any long alphanumeric string
    REGEXP_EXTRACT(a1.reference_number, r'([a-zA-Z0-9-]{{20,}})'),
    -- Fallback: remove common prefixes
    REGEXP_REPLACE(a1.reference_number, r'^[#0-9:W]*', ''),
    -- Last resort: use the whole reference number
    a1.reference_number
  ) as cleaned_rn, 
  a1.deleted_at, a1.created_at, a1.updated_at 
FROM prj-prod-dataplatform.tendopay_raw.credo_lab_reference_numbers a1
join worktable_data_analysis.tendo_scorecard_features_data_20250915 a2 on a2.ee_customer_id = a1.user_id
where date(a1.created_at) <= date(a2.ee_onboarding_date)
qualify row_number() over(partition by a1.user_id order by date(a1.created_at) desc) = 1
),
aca as 
(select deviceId, REGEXP_REPLACE(deviceId, r'^#[0-9]+:', '') deviceId_cleaned, run_date,
A.package_name,A.first_install_time,A.last_update_time,A.version_name,A.flags 
from prj-prod-dataplatform.credolab_tendo_raw.android_credolab_Application,unnest(Application) A)
select rn.ee_customer_id, rn.id, rn.user_id, rn.cleaned_rn reference_number 
, rn.deleted_at
, rn.created_at 
, DATETIME(rn.created_at, "Asia/Manila") created_date_manila
, rn.updated_at
, aca.deviceId_cleaned deviceId
, aca.run_date
, aca.package_name
, aca.first_install_time
, aca.last_update_time
, aca.version_name
, aca.flags
from rn  
inner join aca on aca.deviceId_cleaned = rn.cleaned_rn
where date(rn.created_at) <= '2025-09-14';
"""
job = client.query(sq)
job.result()  # Wait for the job to complete.
time.sleep(5) # Delays for 5 seconds
print(f"Table {schema1}.{tendo_app_package} created")

Table worktable_data_analysis.tendo_user_app_package_20250915 created


Found no duplicate digitalLoanAccountId