<a href="https://colab.research.google.com/github/Rifades/ELT-using-Google-Ecosystem/blob/main/ETL_in_Google_Environment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [28]:
import pandas as pd
import numpy as np
import glob
import os
import re
import time
from datetime import date
from pathlib import Path
from zipfile import BadZipFile # Import for openpyxl related errors

# Google Colab & Sheets
from google.colab import drive, auth
from google.auth import default
import gspread
from gspread_dataframe import set_with_dataframe

# Mount Drive
if not os.path.exists('/content/drive'):
    drive.mount('/content/drive')

# Try to import xlrd, which is needed for .xls files
try:
    import xlrd
    from xlrd.biffh import XLRDError # Import specific error for .xls
except ImportError:
    print("xlrd not found. Installing now...")
    !pip install xlrd
    import xlrd
    from xlrd.biffh import XLRDError

# --- OPTIMIZED: Universal File Reader Function ---
def read_and_process_files(folder_path, source_cols, dtype_dict, sheet_name='Sheet1',
                           parse_dates=None, extraction_func=None, skiprows=None):
    """
    Reads all Excel files in a folder using specified columns/dtypes.
    Includes fallback logic for mislabeled .xls/.xlsx files.
    extraction_func: A function to extract metadata (like Dist Code) from the filename.
    """
    files = list(Path(folder_path).glob('*.xlsx')) + \
            list(Path(folder_path).glob('*.XLSX')) + \
            list(Path(folder_path).glob('*.xls')) + \
            list(Path(folder_path).glob('*.XLS'))
    df_list = []

    print(f"Processing {len(files)} files in: {Path(folder_path).name}...")

    for file_path in files:
        temp_df = None
        current_file_name = file_path.name
        suffix = file_path.suffix.lower()

        try:
            if suffix == '.xls':
                # For .xls files, first try xlrd
                try:
                    temp_df = pd.read_excel(
                        file_path,
                        sheet_name=sheet_name,
                        header=0,
                        skiprows=skiprows,
                        dtype=dtype_dict,
                        parse_dates=parse_dates,
                        usecols=source_cols,
                        engine='xlrd'
                    )
                except XLRDError as e:
                    # If xlrd fails with 'Excel xlsx file; not supported', try openpyxl
                    if "Excel xlsx file; not supported" in str(e):
                        print(f"Warning: '{current_file_name}' (.{suffix}) seems to be an XLSX file. Trying openpyxl.")
                        temp_df = pd.read_excel(
                            file_path,
                            sheet_name=sheet_name,
                            header=0,
                            skiprows=skiprows,
                            dtype=dtype_dict,
                            parse_dates=parse_dates,
                            usecols=source_cols,
                            engine='openpyxl'
                        )
                    else:
                        raise e # Re-raise other xlrd errors
                except Exception as e:
                    print(f"Warning: xlrd failed for '{current_file_name}' (.{suffix}) with '{e}'. Trying openpyxl as fallback.")
                    temp_df = pd.read_excel(
                        file_path,
                        sheet_name=sheet_name,
                        header=0,
                        skiprows=skiprows,
                        dtype=dtype_dict,
                        parse_dates=parse_dates,
                        usecols=source_cols,
                        engine='openpyxl'
                    )
            else: # .xlsx or .XLSX
                # For .xlsx files, first try openpyxl
                try:
                    temp_df = pd.read_excel(
                        file_path,
                        sheet_name=sheet_name,
                        header=0,
                        skiprows=skiprows,
                        dtype=dtype_dict,
                        parse_dates=parse_dates,
                        usecols=source_cols,
                        engine='openpyxl'
                    )
                except (BadZipFile, ValueError) as e:
                    # If openpyxl fails with BadZipFile or similar for XLSX, it might be an XLS
                    if "BadZipFile" in str(e) or "File is not a zip file" in str(e):
                         print(f"Warning: '{current_file_name}' (.{suffix}) seems to be an XLS file. Trying xlrd.")
                         temp_df = pd.read_excel(
                            file_path,
                            sheet_name=sheet_name,
                            header=0,
                            skiprows=skiprows,
                            dtype=dtype_dict,
                            parse_dates=parse_dates,
                            usecols=source_cols,
                            engine='xlrd'
                        )
                    else:
                        raise e # Re-raise other openpyxl errors
                except Exception as e:
                    print(f"Warning: openpyxl failed for '{current_file_name}' (.{suffix}) with '{e}'. Trying xlrd as fallback.")
                    temp_df = pd.read_excel(
                        file_path,
                        sheet_name=sheet_name,
                        header=0,
                        skiprows=skiprows,
                        dtype=dtype_dict,
                        parse_dates=parse_dates,
                        usecols=source_cols,
                        engine='xlrd'
                    )

            if temp_df is not None and not temp_df.empty:
                # Apply custom filename extraction logic (e.g., getting Dist Code)
                if extraction_func:
                    temp_df = extraction_func(temp_df, file_path)
                df_list.append(temp_df)
            elif temp_df is not None and temp_df.empty:
                print(f"Warning: No data extracted from '{current_file_name}'. DataFrame was empty after reading.")

        except Exception as e:
            print(f"Final Error processing '{current_file_name}': {e}")
            continue

    if not df_list:
        print("Warning: No data found.")
        return pd.DataFrame(columns=source_cols)

    return pd.concat(df_list, ignore_index=True)

# --- Regex Patterns (Compiled once for speed) ---
dist_code_pattern = re.compile(r'(\d+)')
date_pattern = re.compile(r'(\d{2})(\d{2})(\d{4})')

In [29]:
# --- 1. Define Paths ---
base_path = '/content/drive/My Drive/Marico JBP Work'
month_folder = 'January 2026'

paths = {
    'billwise': f"{base_path}/Bill Wise Item Wise/{month_folder}",
    'fillrate': f"{base_path}/Fill Rate Loss Report/{month_folder}",
    'billprint': f"{base_path}/Bill Print/{month_folder}",
    'retailer': f"{base_path}/Retailer Master Data/{month_folder}",
    'stock': f"{base_path}/Stock Report/{month_folder}"
}

# --- 2. Define Extraction Logic (Helpers) ---

def extract_dist_code(df, file_path):
    """Helper to extract Dist Code from filename for FillRate Report"""
    match = dist_code_pattern.search(file_path.stem)
    if match:
        df['Dist Code'] = int(match.group(1))
    else:
        df['Dist Code'] = np.nan
    return df

def extract_stock_metadata(df, file_path):
    """Helper to extract Date and Dist Code for Stock Report"""
    # Basic Cleaning
    df = df[df['PrdDcode'].astype(str).str.upper() != 'TOTAL'].copy()
    df = df.dropna(subset=['PrdDcode'])

    # Filename Parsing
    parts = file_path.stem.split('_', 1)
    if len(parts) == 2:
        dist_name = parts[1]
        # Extract Date
        d_match = date_pattern.search(dist_name)
        if d_match:
            month, day, year = map(int, d_match.groups())
            df['Date'] = pd.to_datetime(date(year, month, day))

        # Extract Dist Code
        c_match = dist_code_pattern.search(parts[0])
        if c_match:
            df['Dist Code'] = int(c_match.group(1))

    # Optimization: Map-Reduce (Aggregate immediately to save RAM)
    return df.groupby(['Date', 'Dist Code'], as_index=False).agg({'DisplaySalRate': 'sum'})


# --- 3. Execute Loads ---

# A. Bill Wise
dfs_billwise = read_and_process_files(
    paths['billwise'],
    source_cols=['Bill Date', 'Bill Number', 'Status', 'Dsr Name', 'Beat Name', 'Retailer Name', 'ChannelName', 'NetAmount', 'Dist Code'],
    dtype_dict={'Bill Number': str, 'Status': str, 'Dsr Name': str, 'Beat Name': str, 'Dist Code': int},
    parse_dates=['Bill Date'],
    skiprows=[0, 1]
)

# B. Fill Rate
dfs_fillrate = read_and_process_files(
    paths['fillrate'],
    source_cols=['Salesman Name', 'Route Name', 'Retailer Name', 'Sales Invoice No', 'Total Ordered Value', 'Total Delivered Value', 'Order Date'],
    dtype_dict={'Sales Invoice No': str},
    parse_dates=['Order Date'],
    skiprows=[0, 1],
    extraction_func=extract_dist_code
)
# Post-process fillrate dates
if not dfs_fillrate.empty:
    dfs_fillrate['Order Date'] = dfs_fillrate['Order Date'].dt.date
    dfs_fillrate = dfs_fillrate.groupby(
        ['Order Date', 'Sales Invoice No', 'Salesman Name', 'Route Name', 'Retailer Name', 'Dist Code'], as_index=False
    ).sum()

# C. Bill Print
dfs_billprint = read_and_process_files(
    paths['billprint'],
    source_cols=['SalInvno', 'NoofCopy', 'BilledUser'],
    dtype_dict={'SalInvno': str, 'NoofCopy': int, 'BilledUser': str},
    sheet_name='BillPrintUser'
)
if not dfs_billprint.empty:
    dfs_billprint['BillPrinted'] = 'Yes'

# D. Retailer Master
dfs_retailer = read_and_process_files(
    paths['retailer'],
    source_cols=['Dist Code', 'Salesman Name', 'Route Name', 'Retailer Name', 'Retailer Status'],
    dtype_dict={'Dist Code': int, 'Retailer Status': str},
    skiprows=[0, 1]
)
if not dfs_retailer.empty:
    dfs_retailer = dfs_retailer[dfs_retailer['Retailer Status'] == 'Active']

# E. Stock Report (Note: This one aggregates inside the loop via helper function)
dfs_stock = read_and_process_files(
    paths['stock'],
    source_cols=['PrdDcode', 'DisplaySalRate'],
    dtype_dict={'PrdDcode': str, 'DisplaySalRate': float},
    skiprows=[0, 1],
    extraction_func=extract_stock_metadata
)
# Final aggregation for Stock
if not dfs_stock.empty:
    dfs_stock = dfs_stock.groupby(['Date', 'Dist Code'], as_index=False).sum()

print("\nAll files loaded successfully.")

Processing 15 files in: January 2026...
Processing 15 files in: January 2026...
Processing 15 files in: January 2026...
Processing 15 files in: January 2026...
Processing 164 files in: January 2026...

All files loaded successfully.


In [30]:
# --- 1. Clean & Filter Transactions ---
def remove_psr(df, col1, col2):
    """Filters out rows where 'psr' appears in col1 or col2 (case insensitive)"""
    mask = (
        ~df[col1].str.contains('psr', case=False, na=False) &
        ~df[col2].str.contains('psr', case=False, na=False)
    )
    return df[mask].copy()

# Filter Data
df_trans_clean = remove_psr(dfs_billwise, 'Dsr Name', 'Beat Name')
df_master_clean = remove_psr(dfs_retailer, 'Salesman Name', 'Route Name')

# Ensure Merge Keys Match (String vs String)
df_trans_clean['Dist Code'] = df_trans_clean['Dist Code'].astype(str).str.strip()
df_master_clean['Dist Code'] = df_master_clean['Dist Code'].astype(str).str.strip()

# --- 2. Calculate Served % ---
# Aggregation
df_served = df_trans_clean.groupby(['Dist Code', 'Beat Name', 'Dsr Name'], as_index=False)['Retailer Name'].nunique()
df_served.rename(columns={'Retailer Name': 'served_retailers'}, inplace=True)

df_total = df_master_clean.groupby(['Dist Code', 'Route Name', 'Salesman Name'], as_index=False)['Retailer Name'].nunique()
df_total.rename(columns={'Retailer Name': 'total_retailers'}, inplace=True)

# Merge
df_final = pd.merge(
    df_served,
    df_total,
    left_on=['Dist Code', 'Beat Name', 'Dsr Name'],
    right_on=['Dist Code', 'Route Name', 'Salesman Name'],
    how='left'
)

# Cleanup
df_final.drop(columns=['Salesman Name', 'Route Name'], inplace=True)
df_final['total_retailers'] = df_final['total_retailers'].fillna(0)

# Vectorized Calculation (Safe Division)
df_final['Served %'] = np.where(
    df_final['total_retailers'] > 0,
    (df_final['served_retailers'] / df_final['total_retailers']) * 100,
    0
).round(2)

df_attendance_masterdata = df_final.sort_values(['Dist Code'])

# --- 3. Bill Print Report ---
df_Billprint_report = pd.merge(
    dfs_billwise,
    dfs_billprint,
    left_on='Bill Number',
    right_on='SalInvno',
    how='left'
)
df_Billprint_report['BillPrinted'] = df_Billprint_report['BillPrinted'].fillna('No')
# Drop unnecessary columns
cols_to_drop_bp = ['BilledUser', 'NoofCopy', 'NetAmount', 'Status', 'ChannelName', 'SalInvno']
df_Billprint_report.drop(columns=[c for c in cols_to_drop_bp if c in df_Billprint_report.columns], inplace=True)

# --- 4. Order vs Delivery Report (ADDED) ---
# Merging Fill Rate (Left) with Bill Wise (Right) to check which orders were billed
df_ordervsdelivery_report = pd.merge(
    dfs_fillrate,
    dfs_billwise,
    left_on=['Sales Invoice No', 'Salesman Name', 'Route Name', 'Retailer Name'],
    right_on=['Bill Number', 'Dsr Name', 'Beat Name', 'Retailer Name'],
    how='left'
)

# [cite_start]Drop unnecessary columns [cite: 317]
cols_to_drop_ovd = ['Bill Date', 'Status', 'NetAmount', 'Dsr Name', 'Beat Name', 'ChannelName']
df_ordervsdelivery_report.drop(columns=[c for c in cols_to_drop_ovd if c in df_ordervsdelivery_report.columns], inplace=True)

# [cite_start]Drop rows where Bill Number is missing (means no matching bill found) [cite: 319]
df_ordervsdelivery_report = df_ordervsdelivery_report.dropna(subset=['Bill Number'])

# --- 5. Stock Norms Report ---
dfs_stocknormsreport = dfs_stock.groupby(['Dist Code']).agg({'DisplaySalRate': 'mean'}).reset_index()

print("Analysis Reports Generated.")

Analysis Reports Generated.


In [31]:
auth.authenticate_user()
creds, _ = default()
gc = gspread.authorize(creds)

# Replace with your actual Spreadsheet URL
spreadsheet_url = 'https://docs.google.com/spreadsheets/d/1zxw_7Yo8EOAr9CJgIbb5daDLjDZSAO6hohBZcaC7kYo/edit'

my_reports = {
    'Attendance_Master_Data': df_attendance_masterdata,
    'Stock_Summary': dfs_stocknormsreport,
    'Bill_Print_Analysis': df_Billprint_report,
    'Order_vs_Delivery_Analysis': df_ordervsdelivery_report # <--- ADDED HERE
}

def export_to_gsheet(url, data_dict):
    try:
        sh = gc.open_by_url(url)
    except Exception as e:
        print(f"Error opening sheet: {e}")
        return

    for sheet_name, df in data_dict.items():
        try:
            print(f"Exporting {sheet_name}...")
            # 1. Get or Create Worksheet
            try:
                worksheet = sh.worksheet(sheet_name)
            except gspread.exceptions.WorksheetNotFound:
                worksheet = sh.add_worksheet(title=sheet_name, rows="1", cols="1")

            # 2. Resize Sheet (Critical for API limits)
            rows, cols = df.shape
            # Resize needs at least 1 row/col, adds buffer for header
            worksheet.resize(rows=max(1, rows + 1), cols=max(1, cols))

            # 3. Clear & Write
            worksheet.clear()
            set_with_dataframe(worksheet, df, include_index=False)
            print(f"✅ {sheet_name} updated successfully.")

            # 4. Anti-Quota Pause
            time.sleep(2)

        except Exception as e:
            print(f"❌ Failed to update {sheet_name}: {e}")

export_to_gsheet(spreadsheet_url, my_reports)

Exporting Attendance_Master_Data...
✅ Attendance_Master_Data updated successfully.
Exporting Stock_Summary...
✅ Stock_Summary updated successfully.
Exporting Bill_Print_Analysis...
✅ Bill_Print_Analysis updated successfully.
Exporting Order_vs_Delivery_Analysis...
✅ Order_vs_Delivery_Analysis updated successfully.
