In [1]:
from google.cloud import bigquery

In [2]:
import pandas as pd
from google.api_core.exceptions import NotFound
import os
from datetime import datetime, timezone

# --- Google Sheets Dependencies ---
import gspread
from gspread_dataframe import set_with_dataframe
from datetime import datetime
from google.oauth2.service_account import Credentials

In [None]:
# --- GLOBAL CONFIGURATION (Update these variables) ---
CREDENTIALS_FILE = 'gen-lang-client-0209575391-96d90a513b0b.json'  # Path to your service account JSON key
SHEET_ID = '1Nq9u4bg0tvLnUutVh2TcxXOxe-G2E65kxy_pbJ8pce4'          # The ID of your target Google Sheet
WORKSHEET_DEPOSIT_NAME = 'Deposit Data'
WORKSHEET_WITHDRAW_NAME = 'Withdrawal Data'
# -----------------------------------------------------




# --- FULL LOAD QUERY FUNCTION (FIRST RUN) ---

def run_full_query_and_save(output_filename='daily_funding_full.csv'):
    """
    Runs a BigQuery query for a FULL historical load (no watermark),
    saves the result to a local CSV (overwrite), and returns the DataFrame.
    """

    client = bigquery.Client()

    sql_query = """
      -- BigQuery SQL
 WITH raw_data AS (
    SELECT
      f.type, f.createdAt, f.completedAt, f.providerKey, f.method, f.status, f.reqCurrency, 
      f.accountId, f.netAmount, f.insertedAt, 
      SPLIT(f.method, '/')[SAFE_OFFSET(1)] AS channel_type,

      -- mapping fields
      a.name AS brand,

      DATETIME(f.createdAt, CASE f.reqCurrency
        WHEN 'BDT' THEN '+06:00'
        WHEN 'THB' THEN 'Asia/Bangkok'
        WHEN 'MXN' THEN 'America/Mexico_City'
        WHEN 'IDR' THEN 'Asia/Jakarta'
        WHEN 'BRL' THEN 'America/Sao_Paulo'
        WHEN 'PKR' THEN 'Asia/Karachi'
        WHEN 'INR' THEN '+05:30'
        WHEN 'PHP' THEN 'Asia/Manila'
        ELSE 'UTC'
      END) AS local_ts,

      CASE 
        WHEN f.status = 'completed' AND f.type = 'deposit' THEN
          LEAST(TIMESTAMP_DIFF(f.completedAt, f.createdAt, SECOND), 900)
        WHEN f.status = 'completed' AND f.type = 'withdraw' THEN
          TIMESTAMP_DIFF(f.completedAt, f.createdAt, SECOND)
        ELSE NULL
      END AS transaction_time

    FROM `kz-dp-prod.kz_pg_to_bq_realtime.ext_funding_tx` AS f
    LEFT JOIN `kz-dp-prod.kz_pg_to_bq_realtime.account` AS a ON f.accountId = a.id
    WHERE 
      DATE(f.insertedAt) > '2025-11-01'
      AND f.type IN ('deposit', 'withdraw')
      AND f.reqCurrency IN ('BDT', 'THB', 'MXN', 'IDR', 'BRL', 'PKR', 'INR', 'PHP')
      AND f.status IN ('completed', 'errors', 'timeout', 'error')
    
    QUALIFY ROW_NUMBER() OVER (PARTITION BY f.id ORDER BY f.updatedAt DESC) = 1
  ),

      -- 2. Enrich Dimensions & Join Account Info (No change needed here)
    all_transactions AS (
    SELECT
      r.*,
      CASE WHEN LEFT(a.group,3) = 'kzg' THEN 'KZG' ELSE 'KZP' END AS group_re,
      UPPER(a.group) AS account_group,
      DATE(r.local_ts) AS transaction_date,
      DATE_TRUNC(DATE(r.local_ts), MONTH) AS transaction_month,
      FORMAT_DATETIME('%H:00 - %H:59', r.local_ts) AS Hour,
      UPPER(r.type) AS type_formatted, 
      CASE WHEN r.status = 'errors' THEN 'error' ELSE r.status END AS status_formatted
    FROM raw_data r
    LEFT JOIN `kz-dp-prod.kz_pg_to_bq_realtime.account` a 
      ON r.accountId = a.Id
  ),
      -- 3. Calculate Quantile Bounds (DAILY Stats) - Only for the new/updated transactions
      quantile_stats AS (
        SELECT
          transaction_date,
          providerKey,
          method,
          channel_type,
          type_formatted,
          reqCurrency,
          Hour,
          APPROX_QUANTILES(transaction_time, 101)[OFFSET(5)] AS p05,
          APPROX_QUANTILES(transaction_time, 101)[OFFSET(50)] AS p50,
          APPROX_QUANTILES(transaction_time, 101)[OFFSET(95)] AS p95
        FROM all_transactions
        WHERE status_formatted = 'completed' AND transaction_time IS NOT NULL
        GROUP BY 1, 2, 3, 4, 5, 6, 7
      )

      -- 4. Final Output Structure (The rest remains the same)
      SELECT
        t.transaction_date AS Date,
        t.providerKey,
        t.method,
        t.channel_type,
        t.type_formatted AS type,
        t.reqCurrency,
        account_group,
        group_re,
        
        CASE t.reqCurrency
          WHEN 'BDT' THEN 'Bangladesh'
          WHEN 'THB' THEN 'Thailand'
          WHEN 'MXN' THEN 'Mexico'
          WHEN 'IDR' THEN 'Indonesia'
          WHEN 'BRL' THEN 'Brazil'
          WHEN 'PKR' THEN 'Pakistan'
          WHEN 'INR' THEN 'India'
          WHEN 'PHP' THEN 'Philippines'
          ELSE 'Other'
        END AS Country,
        
        t.status_formatted AS status,
        t.Hour,
        
        COUNT(*) AS Count,
        SUM(t.netAmount) AS Total_Net_Amount,
        MAX(t.insertedAt) AS Max_InsertedAt, -- This will be the new watermark
        
        ROUND(SUM(
          CASE 
            WHEN t.status_formatted = 'completed' AND t.transaction_time IS NOT NULL THEN
              LEAST(GREATEST(t.transaction_time, s.p05), s.p95)
            ELSE NULL 
          END
        ), 2) AS winsorized_total_time_seconds,
        
        DATE_TRUNC(t.transaction_date, MONTH) AS DateMonth,

        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time <= 90) AS Count_01m30s_Below,
        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time > 90 AND t.transaction_time <= 120) AS Count_01m31s_to_02m00s,
        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time > 120 AND t.transaction_time <= 180) AS Count_02m01s_to_03m00s,
        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time > 180) AS Count_03m00s_Above,

        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time <= 180) AS Count_03m00s_Below,
        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time > 180 AND t.transaction_time <= 300) AS Count_03m31s_to_05m00s,
        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time > 300 AND t.transaction_time <= 600) AS Count_05m00s_to_10m00s,
        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time > 600) AS Count_10m00s_Above,

        t.providerKey AS providerName,
        SPLIT(t.channel_type, '-')[SAFE_OFFSET(0)] AS channel_main,

          -- from mapping table
        t.brand,

      FROM all_transactions t
      LEFT JOIN quantile_stats s
        ON t.transaction_date = s.transaction_date
        AND t.providerKey = s.providerKey
        AND t.method = s.method
        AND t.channel_type = s.channel_type
        AND t.type_formatted = s.type_formatted
        AND t.reqCurrency = s.reqCurrency
        AND t.Hour = s.Hour

      GROUP BY 
    t.transaction_date,
    t.providerKey,
    t.method,
    t.channel_type,
    t.type_formatted,
    t.reqCurrency,
    t.account_group,
    group_re,
    Country,
    t.status_formatted,
    t.Hour,
    DateMonth,
    t.providerKey,
    channel_main,
    t.brand
    """

    print("Executing FULL historical query (first run)...")

    try:
        df_full = client.query(sql_query).to_dataframe()
    except NotFound as e:
        print(f"BigQuery Error: {e}")
        return None

    if df_full.empty:
        print("Query returned 0 rows.")
        return None

    print(f"Query returned {len(df_full)} rows (full load).")

    # Overwrite CSV for first load
    df_full.to_csv(output_filename, mode='w', header=True, index=False)
    print(f"Successfully wrote full data to: {os.path.abspath(output_filename)}")

    return df_full


# --- Google Sheets Writer Function (same logic, works for first run too) ---

def append_df_to_gsheet_smart(dataframe: pd.DataFrame, worksheet_name: str):
    if dataframe.empty:
        print(f"DataFrame for '{worksheet_name}' is empty. 0 rows appended.")
        return

    scopes = [
        'https://www.googleapis.com/auth/spreadsheets',
        'https://www.googleapis.com/auth/drive'
    ]
    credentials = Credentials.from_service_account_file(CREDENTIALS_FILE, scopes=scopes)
    gc = gspread.authorize(credentials)
    
    sh = gc.open_by_key(SHEET_ID)
    
    # Get or create worksheet
    try:
        worksheet = sh.worksheet(worksheet_name)
    except gspread.WorksheetNotFound:
        worksheet = sh.add_worksheet(title=worksheet_name, rows="1", cols="1")

    # Determine next row
    current_values = worksheet.get_all_values()
    next_row = len(current_values) + 1
    include_headers = (next_row == 1)

    set_with_dataframe(
        worksheet,
        dataframe,
        row=next_row,
        col=1,
        include_index=False,
        include_column_header=include_headers,
        resize=True
    )

    print(f"Successfully appended {len(dataframe)} rows to '{worksheet_name}'.")


# --- Main Execution Block (FIRST RUN) ---

if __name__ == "__main__":
    # 1. Run the full query and get the data
    full_data_df = run_full_query_and_save("daily_funding_full.csv")

    if full_data_df is not None and not full_data_df.empty:
        print("\nStarting Google Sheets upload process...")
    else:
        print("\nNo data retrieved from BigQuery for Google Sheets.")




Executing FULL historical query (first run)...
Query returned 929355 rows (full load).


In [None]:
full_data_df= pd.read_csv('daily_funding_full.csv')

FileNotFoundError: [Errno 2] No such file or directory: 'daily_funding_full.csv'

In [None]:
brand_merge = pd.read_csv('mapping_brand.csv')
brand_merge = brand_merge.drop_duplicates("brand")

In [None]:
data_full = full_data_df[['Date', 'providerKey', 'method', 'channel_type', 'type', 'reqCurrency',
       'account_group', 'Country', 'status', 'Hour', 'Count',
       'Total_Net_Amount', 'Max_InsertedAt', 'winsorized_total_time_seconds',
       'DateMonth', 'Count_01m30s_Below', 'Count_01m31s_to_02m00s',
       'Count_02m01s_to_03m00s', 'Count_03m00s_Above', 'Count_03m00s_Below',
       'Count_03m31s_to_05m00s', 'Count_05m00s_to_10m00s',
       'Count_10m00s_Above', 'providerName', 'channel_main', 'brand']].drop_duplicates()

In [None]:
data_full_merge = data_full.copy()

In [None]:
data_full_merge["brand"] = data_full_merge["brand"].str.upper().str.strip()

In [None]:
data_full_merge = data_full_merge.merge(brand_merge, on = "brand", how = "left")

In [None]:
data_full_merge = data_full_merge[data_full_merge["whitelabel"].notna()]

In [None]:
df_deposit = data_full_merge[(data_full_merge['type'] == 'DEPOSIT')&(data_full_merge["Date"]>='2025-10-01')]

In [None]:
df_withdraw = data_full_merge[(data_full_merge['type'] == 'WITHDRAW')&(data_full_merge["Date"]>='2025-10-01')]

In [None]:
# --- Google Sheets Writer Function ---
def append_df_to_gsheet(dataframe: pd.DataFrame, worksheet_name: str, SHEET_ID: str):
    
    if dataframe.empty:
        print(f"DataFrame for '{worksheet_name}' is empty. 0 rows appended.")
        return

    # --- AUTHENTICATION ---
    scopes = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive']
    credentials = Credentials.from_service_account_file(CREDENTIALS_FILE, scopes=scopes)
    gc = gspread.authorize(credentials)
    
    sh = gc.open_by_key(SHEET_ID)
    
    # Get or Create Worksheet
    try:
        worksheet = sh.worksheet(worksheet_name)
    except gspread.WorksheetNotFound:
        worksheet = sh.add_worksheet(title=worksheet_name, rows="1", cols="1")

    # --- CALCULATE POSITION ---
    current_values = worksheet.get_all_values()
    # If sheet is empty, next_row is 1. If has data, it's len + 1
    next_row = len(current_values) + 1
    
    # Write headers only if the sheet is brand new (row 1)
    include_headers = (next_row == 1)

    # --- WRITE DATA ---
    set_with_dataframe(
        worksheet, 
        dataframe, 
        row=next_row, 
        col=1, 
        include_index=False, 
        include_column_header=include_headers,
        resize=True
    )
    
    # --- PRINT ROW COUNT ---
    # The number of data rows appended is simply the length of the dataframe
    print(f"Successfully appended {len(dataframe)} rows to '{worksheet_name}'.")

In [None]:
# --- GLOBAL CONFIGURATION (Update these variables) ---
CREDENTIALS_FILE = 'gen-lang-client-0209575391-96d90a513b0b.json' # Path to your service account JSON key

In [None]:
append_df_to_gsheet(df_deposit, 'DEPOSIT_DATA', '1hHV98ZAqng6ogy48iKDTP_tMVP8siwzgB8ybLSq40kU')

APIError: APIError: [400]: Invalid requests[0].updateSheetProperties: This action would increase the number of cells in the workbook above the limit of 10000000 cells.

In [None]:
# --- GLOBAL CONFIGURATION (Update these variables) ---
CREDENTIALS_FILE = 'gen-lang-client-0209575391-96d90a513b0b.json' # Path to your service account JSON key
SHEET_ID = '1Nq9u4bg0tvLnUutVh2TcxXOxe-G2E65kxy_pbJ8pce4' # The ID of your target Google Sheet
# WORKSHEET_DEPOSIT_NAME = 'Deposit Data' 
# WORKSHEET_WITHDRAW_NAME = 'Withdrawal Data' 
# --------------------------------------------------


# --- Helper Function to Get Watermark ---

def get_last_run_watermark(output_filename: str) -> str:
    """
    Reads the existing CSV file to find the highest 'Max_InsertedAt' timestamp.
    This timestamp will be used to filter the next BigQuery run.
    """
    print(f"Checking for existing data file: {output_filename}")
    
    if not os.path.exists(output_filename):
        print("Existing file not found. Performing full historical load.")
        # Return a very old timestamp to ensure full data load on the first run
        return '1970-01-01 00:00:00+00' 

    try:
        # We only need to load the column necessary for filtering
        # Force reading as string first to handle mixed types/formats gracefully
        df_existing = pd.read_csv(output_filename, usecols=['Max_InsertedAt'])
        
        # Ensure the column is datetime for accurate comparison
        df_existing['Max_InsertedAt'] = pd.to_datetime(df_existing['Max_InsertedAt'], utc=True, errors='coerce')
        df_existing.dropna(subset=['Max_InsertedAt'], inplace=True)

        # Find the latest timestamp and format it for the SQL query
        max_ts = df_existing['Max_InsertedAt'].max()
        
        # Format as a string with UTC timezone offset for BigQuery TIMESTAMP comparison
        watermark = max_ts.strftime('%Y-%m-%d %H:%M:%S.%f')
        # BigQuery expects the +HH:MM format, but BigQuery is usually fine with the full string
        watermark += '+00:00' 
        
        print(f"Found existing watermark: {watermark}")
        
        return watermark

    except Exception as e:
        print(f"Error reading existing CSV file for watermark ({e}). Performing full load as fallback.")
        return '1970-01-01 00:00:00+00' 


# --- Incremental Query Function ---

def run_incremental_query_and_save(output_filename='daily_funding_aggregation.csv'):
    """
    Runs a BigQuery query incrementally, filtering for data inserted
    after the last recorded Max_InsertedAt timestamp.
    Appends new data to the local CSV file and returns the new DataFrame.
    """
    
    # 1. Get the Watermark
    last_inserted_at = get_last_run_watermark(output_filename)
    
    # 2. Initialize the BigQuery Client
    client = bigquery.Client()

    # Define your SQL Query with a placeholder for the watermark
    sql_query_template = """
     WITH raw_data AS (
    SELECT
      f.type, f.createdAt, f.completedAt, f.providerKey, f.method, f.status, f.reqCurrency, 
      f.accountId, f.netAmount, f.insertedAt, 
      SPLIT(f.method, '/')[SAFE_OFFSET(1)] AS channel_type,

      -- mapping fields
      a.name AS brand,

      DATETIME(f.createdAt, CASE f.reqCurrency
        WHEN 'BDT' THEN '+06:00'
        WHEN 'THB' THEN 'Asia/Bangkok'
        WHEN 'MXN' THEN 'America/Mexico_City'
        WHEN 'IDR' THEN 'Asia/Jakarta'
        WHEN 'BRL' THEN 'America/Sao_Paulo'
        WHEN 'PKR' THEN 'Asia/Karachi'
        WHEN 'INR' THEN '+05:30'
        WHEN 'PHP' THEN 'Asia/Manila'
        ELSE 'UTC'
      END) AS local_ts,

      CASE 
        WHEN f.status = 'completed' AND f.type = 'deposit' THEN
          LEAST(TIMESTAMP_DIFF(f.completedAt, f.createdAt, SECOND), 900)
        WHEN f.status = 'completed' AND f.type = 'withdraw' THEN
          TIMESTAMP_DIFF(f.completedAt, f.createdAt, SECOND)
        ELSE NULL
      END AS transaction_time

    FROM `kz-dp-prod.kz_pg_to_bq_realtime.ext_funding_tx` AS f
    LEFT JOIN `kz-dp-prod.kz_pg_to_bq_realtime.account` AS a ON f.accountId = a.id
    WHERE 
      f.insertedAt > TIMESTAMP('{last_inserted_at}') 
      AND f.type IN ('deposit', 'withdraw')
      AND f.reqCurrency IN ('BDT', 'THB', 'MXN', 'IDR', 'BRL', 'PKR', 'INR', 'PHP')
      AND f.status IN ('completed', 'errors', 'timeout', 'error')
    
    QUALIFY ROW_NUMBER() OVER (PARTITION BY DATE(local_ts), f.id ORDER BY f.updatedAt DESC) = 1
  ),

      -- 2. Enrich Dimensions & Join Account Info (No change needed here)
    all_transactions AS (
    SELECT
      r.*,
      CASE WHEN LEFT(a.group,3) = 'kzg' THEN 'KZG' ELSE 'KZP' END AS group_re,
      UPPER(a.group) AS account_group,
      DATE(r.local_ts) AS transaction_date,
      DATE_TRUNC(DATE(r.local_ts), MONTH) AS transaction_month,
      FORMAT_DATETIME('%H:00 - %H:59', r.local_ts) AS Hour,
      UPPER(r.type) AS type_formatted, 
      CASE WHEN r.status = 'errors' THEN 'error' ELSE r.status END AS status_formatted
    FROM raw_data r
    LEFT JOIN `kz-dp-prod.kz_pg_to_bq_realtime.account` a 
      ON r.accountId = a.Id
  ),
      -- 3. Calculate Quantile Bounds (DAILY Stats) - Only for the new/updated transactions
      quantile_stats AS (
        SELECT
          transaction_date,
          providerKey,
          method,
          channel_type,
          type_formatted,
          reqCurrency,
          Hour,
          APPROX_QUANTILES(transaction_time, 101)[OFFSET(5)] AS p05,
          APPROX_QUANTILES(transaction_time, 101)[OFFSET(50)] AS p50,
          APPROX_QUANTILES(transaction_time, 101)[OFFSET(95)] AS p95
        FROM all_transactions
        WHERE status_formatted = 'completed' AND transaction_time IS NOT NULL
        GROUP BY 1, 2, 3, 4, 5, 6, 7
      )

      -- 4. Final Output Structure (The rest remains the same)
      SELECT
        t.transaction_date AS Date,
        t.providerKey,
        t.method,
        t.channel_type,
        t.type_formatted AS type,
        t.reqCurrency,
        account_group,
        group_re,
        
        CASE t.reqCurrency
          WHEN 'BDT' THEN 'Bangladesh'
          WHEN 'THB' THEN 'Thailand'
          WHEN 'MXN' THEN 'Mexico'
          WHEN 'IDR' THEN 'Indonesia'
          WHEN 'BRL' THEN 'Brazil'
          WHEN 'PKR' THEN 'Pakistan'
          WHEN 'INR' THEN 'India'
          WHEN 'PHP' THEN 'Philippines'
          ELSE 'Other'
        END AS Country,
        
        t.status_formatted AS status,
        t.Hour,
        
        COUNT(*) AS Count,
        SUM(t.netAmount) AS Total_Net_Amount,
        MAX(t.insertedAt) AS Max_InsertedAt, -- This will be the new watermark
        
        ROUND(SUM(
          CASE 
            WHEN t.status_formatted = 'completed' AND t.transaction_time IS NOT NULL THEN
              LEAST(GREATEST(t.transaction_time, s.p05), s.p95)
            ELSE NULL 
          END
        ), 2) AS winsorized_total_time_seconds,
        
        DATE_TRUNC(t.transaction_date, MONTH) AS DateMonth,

        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time <= 90) AS Count_01m30s_Below,
        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time > 90 AND t.transaction_time <= 120) AS Count_01m31s_to_02m00s,
        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time > 120 AND t.transaction_time <= 180) AS Count_02m01s_to_03m00s,
        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time > 180) AS Count_03m00s_Above,

        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time <= 180) AS Count_03m00s_Below,
        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time > 180 AND t.transaction_time <= 300) AS Count_03m31s_to_05m00s,
        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time > 300 AND t.transaction_time <= 600) AS Count_05m00s_to_10m00s,
        COUNTIF(t.status_formatted = 'completed' AND t.transaction_time > 600) AS Count_10m00s_Above,

        t.providerKey AS providerName,
        SPLIT(t.channel_type, '-')[SAFE_OFFSET(0)] AS channel_main,

          -- from mapping table
        t.brand,

      FROM all_transactions t
      LEFT JOIN quantile_stats s
        ON t.transaction_date = s.transaction_date
        AND t.providerKey = s.providerKey
        AND t.method = s.method
        AND t.channel_type = s.channel_type
        AND t.type_formatted = s.type_formatted
        AND t.reqCurrency = s.reqCurrency
        AND t.Hour = s.Hour

      GROUP BY 
    t.transaction_date,
    t.providerKey,
    t.method,
    t.channel_type,
    t.type_formatted,
    t.reqCurrency,
    t.account_group,
    group_re,
    Country,
    t.status_formatted,
    t.Hour,
    DateMonth,
    t.providerKey,
    channel_main,
    t.brand 
    """
    
    # 3. Insert the Watermark into the Query
    query = sql_query_template.format(last_inserted_at=last_inserted_at)
    
    print(f"Executing incremental query since: {last_inserted_at}")
    
    # 4. Execute Query and Download to DataFrame
    try:
        df_new = client.query(query).to_dataframe()
    except NotFound as e:
        print(f"BigQuery Error: {e}")
        return None

    # Check if any new rows were returned
    if df_new.empty:
        print("Query returned 0 new rows. File remains unchanged.")
        return None

    print(f"Query returned {len(df_new)} new rows.")

    # 5. Save/Append to CSV (maintaining the watermark for next run)
    
    # Check if the file exists to determine if we need to write headers
    file_exists = os.path.exists(output_filename)
    
    # Use 'a' (append mode) and set header=False if the file already exists
    df_new.to_csv(output_filename, 
                  mode='a', 
                  header=not file_exists, 
                  index=False)
    
    print(f"Successfully appended new data to: {os.path.abspath(output_filename)}")
    
    # Return the new DataFrame for GSheet processing
    return df_new


# --- Google Sheets Writer Function ---
def append_df_to_gsheet_smart(dataframe: pd.DataFrame, worksheet_name: str):
    
    if dataframe.empty:
        print(f"DataFrame for '{worksheet_name}' is empty. 0 rows appended.")
        return

    # --- AUTHENTICATION ---
    scopes = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive']
    credentials = Credentials.from_service_account_file(CREDENTIALS_FILE, scopes=scopes)
    gc = gspread.authorize(credentials)
    
    sh = gc.open_by_key(SHEET_ID)
    
    # Get or Create Worksheet
    try:
        worksheet = sh.worksheet(worksheet_name)
    except gspread.WorksheetNotFound:
        worksheet = sh.add_worksheet(title=worksheet_name, rows="1", cols="1")

    # --- CALCULATE POSITION ---
    current_values = worksheet.get_all_values()
    # If sheet is empty, next_row is 1. If has data, it's len + 1
    next_row = len(current_values) + 1
    
    # Write headers only if the sheet is brand new (row 1)
    include_headers = (next_row == 1)

    # --- WRITE DATA ---
    set_with_dataframe(
        worksheet, 
        dataframe, 
        row=next_row, 
        col=1, 
        include_index=False, 
        include_column_header=include_headers,
        resize=True
    )
    
    # --- PRINT ROW COUNT ---
    # The number of data rows appended is simply the length of the dataframe
    print(f"Successfully appended {len(dataframe)} rows to '{worksheet_name}'.")


# --- Main Execution Block ---

if __name__ == "__main__":
    # 1. Run the incremental query and get the new data
    new_data_df = run_incremental_query_and_save("daily_funding_full.csv")

    if new_data_df is not None and not new_data_df.empty:
        print("\nStarting Google Sheets upload process...")
        
        # 2. Split the DataFrame based on 'type' (which contains 'DEPOSIT' or 'WITHDRAW')
        # df_deposit = new_data_df[new_data_df['type'] == 'DEPOSIT'].copy()
        # df_withdraw = new_data_df[new_data_df['type'] == 'WITHDRAW'].copy()
        
        # We need to drop the 'Max_InsertedAt' column from the data being uploaded
        # as it contains BigQuery timestamp data useful for the watermark but usually not for the report.
        # if not df_deposit.empty:
            # 3. Write DEPOSIT data to Google Sheet (APPENDS data now)
            # append_df_to_gsheet_smart(df_deposit, WORKSHEET_DEPOSIT_NAME)
        # else:
            # print(f"No new DEPOSIT records found in this run to upload to '{WORKSHEET_DEPOSIT_NAME}'.")

        # if not df_withdraw.empty:
            # 4. Write WITHDRAW data to a separate sheet (APPENDS data now)
            # append_df_to_gsheet_smart(df_withdraw, WORKSHEET_WITHDRAW_NAME)
        # else:
            # print(f"No new WITHDRAW records found in this run to upload to '{WORKSHEET_WITHDRAW_NAME}'.")

    else:
        print("\nNo new data retrieved from BigQuery to process for Google Sheets.")

Checking for existing data file: daily_funding_full.csv
Found existing watermark: 2026-01-05 09:41:28.039808+00:00




Executing incremental query since: 2026-01-05 09:41:28.039808+00:00
Query returned 24441 new rows.
Successfully appended new data to: d:\Career\Working\KZ GROUP\Data Synchronization\PGW Health Dashboard\daily_funding_full.csv

Starting Google Sheets upload process...
