# ***Pipeline To Fetch Data From Google Doc***

You’ll need to create a service account in Google Cloud Console → enable Google Sheets API → download the JSON key → and share your Sheet with the service account’s email (e.g. service-account@yourproject.iam.gserviceaccount.com) with Editor permission.

In [7]:
import pyodbc
import os
import pandas as pd
from datetime import datetime, date
import gspread
from google.oauth2.service_account import Credentials
from gspread_dataframe import get_as_dataframe
from dotenv import load_dotenv , find_dotenv # to load .env files

load_dotenv()

True

In [91]:
warehouse_servr = os.environ.get("DB1_HOST")
warehouse_user = os.environ.get("DB1_USER")
warehouse_pass = os.environ.get("DB1_PASS")
warehouse_db = os.environ.get("DB1_NAME") 

conn = pyodbc.connect(
            f"Driver={{ODBC Driver 17 for SQL Server}};"
            f"Server={warehouse_servr};"  # Warehouse server
            f"Database={warehouse_db};"  # Warehouse database
            f"UID={warehouse_user};"
            f"PWD={warehouse_pass};"
        )

warehouse_cursor = conn.cursor()

warehouse_cursor.fast_executemany = True

# Connecting through Json Key

In [9]:
# Path to your downloaded key
service_acc_key = os.environ.get("gcp_bot_key")

# Define the required scopes
SCOPES = [
    "https://www.googleapis.com/auth/spreadsheets",
    "https://www.googleapis.com/auth/drive"
]

# Authorize
creds = Credentials.from_service_account_file(service_acc_key, scopes=SCOPES)
gc = gspread.authorize(creds)

In [65]:
# sheet_url = 'https://docs.google.com/spreadsheets/d/1SKgKmXeInkKkhEB4eIj0bEBMXel-fvYEnqe4n7OE1hc/edit?pli=1&gid=0#gid=0'

# sh = gc.open_by_url(sheet_url) # Stores Whole Google Sheet as an Object

***Gspreas Operations like Cursor***

* **sh.title**   # returns "Title" 
* **sh.worksheets()**  # returns a list of all sheet objects inside this file
# worksheet = sh.worksheet("Sheet1")
* **worksheet.title**  # 'Sheet1'
* **worksheet.row_count**  # total number of rows
* **worksheet.col_count**  # total number of columns
* **worksheet.get_all_records()**  # returns list of dicts for all rows

In [7]:
# sheetName = sh.worksheet('Sheet1')

# ld = sheetName.get_all_records() # List of Dictionaries
# df = pd.DataFrame(ld) # Converting List of Dictionaries to DataFrame
# df

# OR

In [15]:
# df2 = get_as_dataframe(sheetName, evaluate_formulas=True, header=0)
# df2

# Bulk Insert def

In [90]:
def bulk_insert(table_name, data, batch_size):
    try:
        columns = data.columns.tolist()
        # columns_str = ', '.join(columns)
        columns_str = ', '.join([f'[{col}]' for col in columns])
        placeholders = ', '.join(['?' for _ in range(len(columns))])
        insert_query = f"INSERT INTO WAVE..{table_name} ({columns_str}) VALUES ({placeholders})"
        
        total_inserted = 0
        
        # Process in batches
        for i in range(0, len(data), batch_size):
            batch = data.iloc[i: i + batch_size]
            records = [tuple(row) for _, row in batch.iterrows()]
            
            print(f"Executing batch insert for records {i} to {i + len(records) - 1}")
            warehouse_cursor.executemany(insert_query, records)
            
            total_inserted += len(records) 
            print(f"Inserted batch: {total_inserted}/{len(data)} records")
                
        
        print(f"✓ Successfully inserted all {total_inserted} records into {table_name}")
        conn.commit()
        # conn.close()
        
    except Exception as e:
        # conn.rollback()
        print(f"✗ Error inserting data: {str(e)}")
        
    # finally:
    #     warehouse_cursor.close()
    #     conn.close()

# CSP List / CSP Master

In [3]:
csp_list_url = 'https://docs.google.com/spreadsheets/d/1rVg5FKcyeg1Df4oUdc6ak-wka_YO4e8Gxs_FBexK5LU/edit?gid=0#gid=0'
csp_sheet = gc.open_by_url(csp_list_url)

In [4]:
csp_sheet.worksheets()

[<Worksheet 'Sheet1' id:0>, <Worksheet '>9 Months CSP' id:581720717>]

# Converting into DataFrame

In [12]:
csp_df = pd.DataFrame(csp_sheet.worksheet('Sheet1').get_all_records())
csp_df

Unnamed: 0,BANK,CSPCODE,CSP Name,State,Territory,District,BLOCK,Branch,PINCODE,Code Creation Date,...,Vatika ID,Vatika Name,Status,Refund Amount,Licence_Fee_Refund_Date,Employee\nMapped,Gender,Location Type,Tenure,Productive\nStatus
0,BOB,11710778,Jitesh Agrawal,Dadra And Nagar Haveli,Virtual Terr_Dadra And Nagar Haveli,Dadra And Nagar Haveli,Silvassa,Silvassa Vapi Mail Road,396230,2022-02-03,...,,,Active,,,Rahul Sain,M,URBAN,3.9,Active
1,BOB,11710813,Mahla Monalikumari Gulabbhai,Dadra And Nagar Haveli,Virtual Terr_Dadra And Nagar Haveli,Dadra And Nagar Haveli,Dadarand Nagar Haveli,Kilavani,396230,2022-02-03,...,,,Replacement,,,Rahul Sain,M,RURAL,3.9,
2,BOB,11710694,Abhinavkumar Rushabhkumar Yagnik,Gujarat,Virtual Terr_Gujarat,Ahmadabad,Vastral,"Naroda Road, Ahmedabad",,2022-03-31,...,,,Replacement,,,,M,,3.7,
3,BOB,11710706,Divyaben Nepubhai Makwana,Gujarat,Virtual Terr_Gujarat,Ahmadabad,bavla,Bavla,,2022-03-31,...,,,Replacement,,,,F,,3.7,
4,BOB,11710652,Mohasinbhai Ishakbhai Chandi,Gujarat,Virtual Terr_Gujarat,Ahmadabad,Ghanchiwada,Meghraj,383350,2022-02-01,...,,,Active,,,Rahul Sain,M,URBAN,3.9,Active
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4400,SBI,10022740,,Assam,,Cachar,,,,,...,,,Closed,,,,,,,
4401,SBI,10022769,,Assam,,Cachar,,,,,...,,,Closed,,,,,,,
4402,SBI,10022881,,Maharashtra,,Ahmednagar,,,,,...,,,Closed,,,,,,,
4403,BOM,12320010,Rohit Kumar Ramani,Jharkhand,,Deoghar,,,,2025-12-15,...,,,Active,,,,,,0.0,


In [13]:
warehouse_cursor.execute("SELECT * FROM WAVE..CSP_Master")

rows_tuple = [tuple(i) for i in warehouse_cursor.fetchall()]

column = [column[0] for column in warehouse_cursor.description]

csp_master_df = pd.DataFrame(rows_tuple, columns=column)
csp_master_df

Unnamed: 0,BANK,CSPCODE,CSP_Name,Key,State,Territory,District,BLOCK,bhk_block_code,Status,...,IIBF_Certificate_Number,Printer,MATM,PinPad,Licence_Fee_Amount,MR_Date,MR_No,Received_Amount,Vatika_ID,Vatika_Name
0,BOB,11710778,Jitesh Agrawal,Dadra And Nagar Haveli-Dadra And Nagar Haveli-...,Dadra And Nagar Haveli,Virtual Terr_Dadra And Nagar Haveli,Dadra And Nagar Haveli,Silvassa,0,Active,...,,NO,0,0,0E-10,2000-01-01,0,0E-10,0,
1,BOB,11710813,Mahla Monalikumari Gulabbhai,Dadra And Nagar Haveli-Dadra And Nagar Haveli-...,Dadra And Nagar Haveli,Virtual Terr_Dadra And Nagar Haveli,Dadra And Nagar Haveli,Dadarand Nagar Haveli,0,Replacement,...,,NO,0,0,0E-10,2000-01-01,0,0E-10,0,
2,BOB,11710694,Abhinavkumar Rushabhkumar Yagnik,Gujarat-Ahmadabad-Vastral,Gujarat,Virtual Terr_Gujarat,Ahmadabad,Vastral,0,Replacement,...,,NO,0,0,0E-10,2000-01-01,0,0E-10,0,
3,BOB,11710706,Divyaben Nepubhai Makwana,Gujarat-Ahmadabad-bavla,Gujarat,Virtual Terr_Gujarat,Ahmadabad,bavla,4051,Replacement,...,,NO,0,0,0E-10,2000-01-01,0,0E-10,0,
4,BOB,11710652,Mohasinbhai Ishakbhai Chandi,Gujarat-Ahmadabad-Ghanchiwada,Gujarat,Virtual Terr_Gujarat,Ahmadabad,Ghanchiwada,0,Active,...,801852292,NO,0,0,0E-10,2000-01-01,0,0E-10,0,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5096,SBI,10022740,,,Assam,,Cachar,,0,Closed,...,,NO,0,0,0E-10,2000-01-01,0,0E-10,0,
5097,SBI,10022769,,,Assam,,Cachar,,0,Closed,...,,NO,0,0,0E-10,2000-01-01,0,0E-10,0,
5098,SBI,10022881,,,Maharashtra,,Ahmednagar,,0,Closed,...,,NO,0,0,0E-10,2000-01-01,0,0E-10,0,
5099,BOM,12320010,Rohit Kumar Ramani,,Jharkhand,,Deoghar,,0,Active,...,,NO,0,0,11800.0000000000,2025-12-12,8115,11800.0000000000,0,


# Transformation

In [14]:
csp_df.columns.tolist

<bound method IndexOpsMixin.tolist of Index(['BANK', 'CSPCODE', 'CSP Name', 'State', 'Territory', 'District',
       'BLOCK', 'Branch', 'PINCODE', 'Code Creation Date', 'Agreement Date',
       'Agreement Renewal Date', 'IIBF Certificate\nNumber', 'Printer', 'MATM',
       'PinPad', 'Licence Fee\n Amount', 'MR Date', 'MR No.',
       'Received Amount', 'Vatika ID', 'Vatika Name', 'Status',
       'Refund Amount', 'Licence_Fee_Refund_Date', 'Employee\nMapped',
       'Gender', 'Location Type', 'Tenure', 'Productive\nStatus'],
      dtype='object')>

# Replacing Null Values

In [15]:
csp_df['PINCODE'] = pd.to_numeric(csp_df['PINCODE'], errors='coerce').fillna(0).astype('int')
csp_df['MATM'] = pd.to_numeric(csp_df['MATM'], errors='coerce').fillna(0).astype('int')
csp_df['PinPad'] = pd.to_numeric(csp_df['PinPad'], errors='coerce').fillna(0).astype('int')
csp_df['Vatika ID'] = pd.to_numeric(csp_df['Vatika ID'], errors='coerce').fillna(0).astype('int')
csp_df['MR No.'] = pd.to_numeric(csp_df['MR No.'], errors='coerce').fillna(0).astype('int')
csp_df['Licence Fee\n Amount'] = pd.to_numeric(csp_df['Licence Fee\n Amount'], errors='coerce').fillna(0.0)
csp_df['Received Amount'] = pd.to_numeric(csp_df['Received Amount'], errors='coerce').fillna(0.0)

In [None]:
#  'Vatika Name', 'Status',
#        'Refund Amount', 'Licence_Fee_Refund_Date', 'Employee\nMapped',
#        'Gender', 'Location Type']

fill_values = {
    'BANK': "",
    'CSPCODE': "",
    'CSP Name': "",
    'Key': "",
    'State': "",
    'Territory': "",
    'District': "",
    'BLOCK': "",
    'Status': "",
    'Branch': "",
    'Code Creation Date': '1999-01-01',        # keep datetime Null
    'Agreement Date': '1999-01-01',
    'Agreement Renewal Date': '1999-01-01',
    'IIBF Certificate\nNumber': "",
    'Printer': "",
    'MR Date': '1999-01-01',
    'Vatika Name': ""
}

for col, default_value in fill_values.items():
    if col in csp_df.columns:
        csp_df[col] = csp_df[col].fillna(default_value)

# Correcting Datatypes

In [None]:
dtype_map = {
    'BANK': 'object',
    'CSPCODE': 'object',
    'CSP Name': 'object',
    'Key': 'object',
    'State': 'object',
    'Territory': 'object',
    'District': 'object',
    'BLOCK': 'object',
    'Status': 'object',
    'Branch': 'object',
    'PINCODE': 'int',
    'Code Creation Date': 'datetime64[ns]',
    'Agreement Date': 'datetime64[ns]',
    'Agreement Renewal Date': 'datetime64[ns]',
    'IIBF Certificate\nNumber': 'object',
    'Printer': 'object',
    'MATM': 'int',
    'PinPad': 'int',
    'Licence Fee\n Amount': 'float',
    'MR Date': 'datetime64[ns]',
    'MR No': 'int',
    'Received Amount': 'float',
    'Vatika ID': 'int',
    'Vatika Name': 'object'
}

# Apply safe conversions
for col, new_type in dtype_map.items():
    if col in csp_df.columns:
        try:
            if "datetime" in new_type:
                csp_df[col] = pd.to_datetime(csp_df[col], errors='coerce')
            else:
                csp_df[col] = csp_df[col].astype(new_type)
        except Exception as e:
            print(f"Datatype conversion failed for {col}: {e}")

In [None]:
csp_df['Code Creation Date'] = pd.to_datetime(csp_df['Code Creation Date'])
csp_df['Agreement Date'] = pd.to_datetime(csp_df['Agreement Date'])
csp_df['Agreement Renewal Date'] = pd.to_datetime(csp_df['Agreement Renewal Date'])
# csp_df['Licence Fee Refund Date'] = pd.to_datetime(csp_df['Licence Fee Refund Date'])
csp_df['MR Date'] = pd.to_datetime(csp_df['MR Date'])
# csp_df['Licence Fee Amount'] = csp_df['Licence Fee Amount'].astype('float')
csp_df['Received Amount'] = csp_df['Received Amount'].astype('float') 
csp_df['Vatika ID'] = csp_df['Vatika ID'].astype('int')

In [19]:
# [f"{i} - {csp_df[i].dtype}" for i in csp_df.columns]
# print([f"{i} - {csp_master_df[i].dtype}" for i in csp_master_df.columns])

In [20]:
csp_df['CSPCODE'] = csp_df['CSPCODE'].astype(str).str.strip()
csp_master_df['CSPCODE'] = csp_master_df['CSPCODE'].astype(str).str.strip()

# Removing Cancelled & TBA from csp_master

In [21]:
csp_master_df_1 = csp_master_df[~csp_master_df['CSPCODE'].isin(['Cancelled', 'TBA'])]

In [16]:
csp_master_df_1.columns.to_list

<bound method IndexOpsMixin.tolist of Index(['BANK', 'CSPCODE', 'CSP_Name', 'Key', 'State', 'Territory', 'District',
       'BLOCK', 'bhk_block_code', 'Status', 'Branch', 'PINCODE',
       'Code_Creation_Date', 'Agreement_Date', 'Agreement_Renewal_Date',
       'IIBF_Certificate_Number', 'Printer', 'MATM', 'PinPad',
       'Licence_Fee_Amount', 'MR_Date', 'MR_No', 'Received_Amount',
       'Vatika_ID', 'Vatika_Name'],
      dtype='object')>

In [17]:
# set(csp_df.columns) - set(csp_master_df.columns) 

In [22]:
csp_1 = pd.merge(csp_df, csp_master_df_1, left_on='CSPCODE', right_on = 'CSPCODE', how='left')

# csp_1 = csp_df.merge(csp_master_df_1,on="CSPCODE",how="left",suffixes=("", "_master"))

In [163]:
# csp_1['CSPCODE'].count()
more_than_1 = csp_1.groupby(['CSPCODE'])['CSPCODE'].agg(['count']).sort_values('count', ascending=False).reset_index()[csp_1.groupby(['CSPCODE'])['CSPCODE'].agg(['count']).sort_values('count', ascending=False).reset_index()['count'] > 1]

In [36]:
more_than_1

Unnamed: 0,CSPCODE,count
0,Cancelled,2209
1,TBA,784
2,10022394 (R),2
3,10022485 (R),2
4,10022267 (R),2
5,10022005 (R),2
6,10022591 (R),2
7,10022206 (R),2
8,K3800744,2
9,10022131 (R),2


In [95]:
# csp_1.columns.to_list

In [96]:
# csp_master_df.columns.to_list

In [23]:
csp_1 = csp_1[['BANK_x', 'CSPCODE', 'CSP Name', 'State_x', 'Territory_x', 'District_x',
       'BLOCK_x', 'bhk_block_code', 'Status_x', 'Branch_x', 'PINCODE_x', 'Code Creation Date',
       'Agreement Date', 'Agreement Renewal Date', 'IIBF Certificate\nNumber',
       'Printer_x', 'MATM_x', 'PinPad_x', 'Licence Fee\n Amount', 'MR Date',
       'MR No.', 'Received Amount', 'Vatika ID', 'Vatika Name','Key']]

# Renaming Columns

In [24]:
csp_1 = csp_1.rename(columns={
    'BANK_x' : 'BANK',
    'CSP Name': 'CSP_Name',
    'State_x': 'State',
    'Territory_x': 'Territory',
    'District_x': 'District',
    'BLOCK_x': 'BLOCK',
    'Branch_x': 'Branch',
    'PINCODE_x': 'PINCODE',
    'Code Creation Date': 'Code_Creation_Date',
    'Agreement Date': 'Agreement_Date',
    'Agreement Renewal Date': 'Agreement_Renewal_Date',
    'IIBF Certificate\nNumber': 'IIBF_Certificate_Number',
    'Printer_x': 'Printer',
    'MATM_x': 'MATM',
    'PinPad_x': 'PinPad',
    'Licence Fee\n Amount': 'Licence_Fee_Amount',
    'MR Date': 'MR_Date',
    'MR No.': 'MR_No',
    'Received Amount': 'Received_Amount',
    'Vatika ID': 'Vatika_ID',
    'Vatika Name': 'Vatika_Name',
    'Status_x': 'Status'
    # Any extra columns (Refund Amount, Employee Mapped etc.) will remain
})

In [25]:
csp_1['bhk_block_code'] = csp_1['bhk_block_code'].fillna(0)
csp_1['bhk_block_code'] = csp_1['bhk_block_code'].astype(int)
csp_1['Code_Creation_Date'].fillna('2000-01-01', inplace=True)
csp_1['Agreement_Date'].fillna('2000-01-01', inplace=True)
csp_1['Agreement_Renewal_Date'].fillna('2000-01-01', inplace=True)
csp_1['MR_Date'].fillna('2000-01-01', inplace=True)
csp_1['Key'].fillna('', inplace=True)

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  csp_1['Code_Creation_Date'].fillna('2000-01-01', inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  csp_1['Agreement_Date'].fillna('2000-01-01', inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on

In [29]:
warehouse_cursor.execute("TRUNCATE TABLE WAVE..CSP_Master")

conn.commit()

In [30]:


warehouse_cursor.execute("SELECT * FROM WAVE..CSP_Master")
rows_tuple = [tuple(i) for i in warehouse_cursor.fetchall()]
column = [column[0] for column in warehouse_cursor.description]
csp_master_df_check = pd.DataFrame(rows_tuple, columns=column)

if len(csp_master_df_check) == 0:
    print(f"CSP_Master Truncated...! {len(csp_master_df_check)} Records Found")


CSP_Master Truncated...! 0 Records Found


In [28]:
table_name = "CSP_Master"  # Replace with your actual table name
batch_size = 1000
idf = csp_1

try:
    columns = idf.columns.tolist()
    # columns_str = ', '.join(columns)
    columns_str = ', '.join([f'[{col}]' for col in columns])
    placeholders = ', '.join(['?' for _ in range(len(columns))])
    insert_query = f"INSERT INTO WAVE..{table_name} ({columns_str}) VALUES ({placeholders})"
    
    total_inserted = 0
    
    # Process in batches
    for i in range(0, len(idf), batch_size):
        batch = idf.iloc[i: i + batch_size]
        records = [tuple(row) for _, row in batch.iterrows()]
        
        print(f"Executing batch insert for records {i} to {i + len(records) - 1}")
        warehouse_cursor.executemany(insert_query, records)
        
        total_inserted += len(records) 
        print(f"Inserted batch: {total_inserted}/{len(idf)} records")
              
    
    print(f"✓ Successfully inserted all {total_inserted} records into {table_name}")
    conn.commit()
    conn.close()
    
except Exception as e:
    # conn.rollback()
    print(f"✗ Error inserting data: {str(e)}")
    
# finally:
#     warehouse_cursor.close()
#     conn.close()

Executing batch insert for records 0 to 999
Inserted batch: 1000/4471 records
Executing batch insert for records 1000 to 1999
Inserted batch: 2000/4471 records
Executing batch insert for records 2000 to 2999
Inserted batch: 3000/4471 records
Executing batch insert for records 3000 to 3999
Inserted batch: 4000/4471 records
Executing batch insert for records 4000 to 4470
Inserted batch: 4471/4471 records
✓ Successfully inserted all 4471 records into CSP_Master


# WAVE Data

In [4]:
# Path to your downloaded key
service_acc_key = "cool-ocean-477314-v4-c7ffb11c1359.json"

# Define the required scopes
SCOPES = [
    "https://www.googleapis.com/auth/spreadsheets",
    "https://www.googleapis.com/auth/drive"
]

# Authorize
creds = Credentials.from_service_account_file(service_acc_key, scopes=SCOPES)
gc = gspread.authorize(creds)

In [5]:
wave_sheet = gc.open_by_url('https://docs.google.com/spreadsheets/d/1SKgKmXeInkKkhEB4eIj0bEBMXel-fvYEnqe4n7OE1hc/edit?usp=sharing')

wave_sheet.worksheets()

[<Worksheet 'Sheet1' id:0>,
 <Worksheet 'Block Correction' id:585178158>,
 <Worksheet 'Block Master' id:721843181>,
 <Worksheet 'Activity' id:2142096583>,
 <Worksheet '120925' id:1932757771>]

# DataFrames

In [6]:
wave_sheet_1 = wave_sheet.worksheet('Sheet1')

wave_sheet_df = pd.DataFrame(wave_sheet_1.get_all_records())

wave_sheet_df


Unnamed: 0,Wave,Update,State,Territory,Cluster,Block,Block Status,Vatika,Employee Name,Activity,Target_Date,Remarks,Identify,Expected Date,Complete_Date,BlockId,Id
0,0,2025-12-03,Assam,Dhubri,Barpeta,Chakchaka,New,Not Applicable,Without Employee,CSP Potential,2025-07-31,,,,,2492,2
1,0,2025-12-03,Assam,Dhubri,Barpeta,Chakchaka,New,Not Applicable,Without Employee,Dhavak Onboard,2025-07-31,,,,,2492,3
2,0,2025-12-03,Assam,Dhubri,Barpeta,Chakchaka,New,Not Applicable,Without Employee,Distributor Identify,2025-07-31,,,2025-07-31,,2492,4
3,0,2025-12-03,Assam,Dhubri,Barpeta,Chakchaka,New,Not Applicable,Without Employee,Product Survey,2025-07-31,,,,,2492,5
4,0,2025-12-03,Assam,Dhubri,Barpeta,Chakchaka,New,Not Applicable,Without Employee,Route Mapping,2025-07-31,,,,,2492,6
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5250,1,2025-12-03,Maharashtra,Nagpur,Wardha,Seloo,Existing,Mahabala,Priyanka Nagtode,Workshop 1A,2025-11-30,,1,,2025-11-02,4610,
5251,1,2025-12-03,Maharashtra,Nagpur,Wardha,Seloo,Existing,Mahabala,Priyanka Nagtode,FGD Complete,2025-09-30,,13,,2025-11-02,4610,
5252,1,2025-12-03,Maharashtra,Nagpur,Wardha,Seloo,Existing,Vadgaon,Priyanka Nagtode,FGD Complete,2025-09-30,,9,,2025-11-20,4610,
5253,0,2025-12-03,West Bengal,Durgapur,Bankura,Taldangra,New,Not Applicable,Kerim Pahalowan,Stock Point,,Not yet,,20-09-2025,2025-12-04,2794,


In [7]:
warehouse_cursor.execute("SELECT * FROM WAVE..[Wave_Progress_2]")

rows_tuple = [tuple(i) for i in warehouse_cursor.fetchall()]
column = [column[0] for column in warehouse_cursor.description]
wave_progress_df = pd.DataFrame(rows_tuple, columns=column)
wave_progress_df

Unnamed: 0,Wave,Update,State,Territory,Cluster,Block,BlockId,Block_Status,Vatika,Employee_Name,Activity,Target_Date,Remarks,Identify,Expected_Date,Complete_Date,Block_Key,insert_on
0,0,2025-11-06,Assam,Dhubri,Barpeta,Chakchaka,2492,New,Not Applicable,Without Employee,CSP Potential,2025-07-31,,,NaT,NaT,,
1,0,2025-11-06,Assam,Dhubri,Barpeta,Chakchaka,2492,New,Not Applicable,Without Employee,Dhavak Onboard,2025-07-31,,,NaT,NaT,,
2,0,2025-11-06,Assam,Dhubri,Barpeta,Chakchaka,2492,New,Not Applicable,Without Employee,Distributor Identify,2025-07-31,,,2025-07-31,NaT,,
3,0,2025-11-06,Assam,Dhubri,Barpeta,Chakchaka,2492,New,Not Applicable,Without Employee,Product Survey,2025-07-31,,,NaT,NaT,,
4,0,2025-11-06,Assam,Dhubri,Barpeta,Chakchaka,2492,New,Not Applicable,Without Employee,Route Mapping,2025-07-31,,,NaT,NaT,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5130,0,2025-11-06,West Bengal,Durgapur,Purba bardhaman,Memari,0,Existing,Not Applicable,Dhiraj Gupta,Product Survey,NaT,Done,,2025-09-07,2025-11-03,,
5131,0,2025-11-06,West Bengal,Darjeeling,Cooch Behar,Tufanganj 2,0,Existing,Not Applicable,Ashika Oraon,Volunteer Identify,NaT,Done,,NaT,2025-11-05,,
5132,1,2025-11-06,Assam,Sonitpur,Biswanath,Sootea,2715,Existing,Niz-Sootea,Bashab Das,Workshop 1B,2025-09-06,,,NaT,2025-11-06,,
5133,1,2025-11-06,Assam,Sonitpur,Sonitpur,Gabhoru,2710,Existing,Bhitor Porua,Sabita Biswas,Workshop 1B,2025-09-06,,,NaT,2025-11-06,,


In [8]:
wave_progress_df.columns.to_list

<bound method IndexOpsMixin.tolist of Index(['Wave', 'Update', 'State', 'Territory', 'Cluster', 'Block', 'BlockId',
       'Block_Status', 'Vatika', 'Employee_Name', 'Activity', 'Target_Date',
       'Remarks', 'Identify', 'Expected_Date', 'Complete_Date', 'Block_Key',
       'insert_on'],
      dtype='object')>

In [12]:
wave_sheet_df.columns.to_list

<bound method IndexOpsMixin.tolist of Index(['Wave', 'Update', 'State', 'Territory', 'Cluster', 'Block',
       'Block Status', 'Vatika', 'Employee Name', 'Activity', 'Target_Date',
       'Remarks', 'Identify', 'Expected Date', 'Complete_Date', 'BlockId',
       'Id'],
      dtype='object')>

# BHK BLock Master

In [13]:

prod_server = os.environ.get("DB2_HOST")
prod_user = os.environ.get("DB2_USER")     
prod_pass = os.environ.get("DB2_PASS")
prod_db = os.environ.get("DB2_NAME")

conn_prod = pyodbc.connect(
            f"Driver={{ODBC Driver 17 for SQL Server}};"
            f"Server={prod_server};"
            f"Database={prod_db};"
            f"UID={prod_user};"
            f"PWD={prod_pass};"
        )
prod_cursor = conn_prod.cursor()
prod_cursor.fast_executemany = True


In [15]:
prod_cursor.execute('''
SELECT bl.block_code , bl.block_name , d.district_code , d.district_name , 
tr.territory_name, st.state_code, st.state_name
FROM [Drishtee_stats_new2024].[dbo].[tbl_block_master] bl
LEFT JOIN [Drishtee_stats_new2024].[dbo].[tbl_district_master] d ON d.district_code = bl.district_code
LEFT JOIN [Drishtee_stats_new2024].[dbo].tbl_territory_master tr ON d.territory_code = tr.territory_code
LEFT JOIN [Drishtee_stats_new2024].[dbo].[tbl_state_master] st ON st.state_code = d.state_code
                    ''')

rows_tuple = [tuple(i) for i in prod_cursor.fetchall()]
column = [column[0] for column in prod_cursor.description]
block_df = pd.DataFrame(rows_tuple, columns=column)
block_df

Unnamed: 0,block_code,block_name,district_code,district_name,territory_name,state_code,state_name
0,1,Achabal,1,Anantnag,,1,Jammu And Kashmir
1,2,Breng,1,Anantnag,,1,Jammu And Kashmir
2,3,Dachnipora,1,Anantnag,,1,Jammu And Kashmir
3,4,Devsar,622,Kulgam,,1,Jammu And Kashmir
4,5,D.H. Pora,622,Kulgam,,1,Jammu And Kashmir
...,...,...,...,...,...,...,...
7299,1116535,Udainagar,402,Dewas,Madhya Pradesh Virtual,23,Madhya Pradesh
7300,1116983,Jharda,435,Ujjain,Madhya Pradesh Virtual,23,Madhya Pradesh
7301,1116995,Makdon,435,Ujjain,Madhya Pradesh Virtual,23,Madhya Pradesh
7302,1117032,Rampura,419,Neemuch,Madhya Pradesh Virtual,23,Madhya Pradesh


# Checking Blocks

In [20]:
wave_sheet_df[['State', 'Cluster', 'Block']].drop_duplicates().reset_index(drop=True).to_excel('wave_blocks.xlsx', index=False)

In [17]:
final_df = pd.merge(wave_sheet_df , 
    block_df, left_on=['State', 'Cluster', 'Block'], right_on=['state_name', 'district_name', 'block_name'], how='left'
)

final_df

Unnamed: 0,Wave,Update,State,Territory,Cluster,Block,Block Status,Vatika,Employee Name,Activity,...,Complete_Date,BlockId,Id,block_code,block_name,district_code,district_name,territory_name,state_code,state_name
0,0,2025-12-03,Assam,Dhubri,Barpeta,Chakchaka,New,Not Applicable,Without Employee,CSP Potential,...,,2492,2,2492.0,Chakchaka,280.0,Barpeta,Dhubri,18.0,Assam
1,0,2025-12-03,Assam,Dhubri,Barpeta,Chakchaka,New,Not Applicable,Without Employee,Dhavak Onboard,...,,2492,3,2492.0,Chakchaka,280.0,Barpeta,Dhubri,18.0,Assam
2,0,2025-12-03,Assam,Dhubri,Barpeta,Chakchaka,New,Not Applicable,Without Employee,Distributor Identify,...,,2492,4,2492.0,Chakchaka,280.0,Barpeta,Dhubri,18.0,Assam
3,0,2025-12-03,Assam,Dhubri,Barpeta,Chakchaka,New,Not Applicable,Without Employee,Product Survey,...,,2492,5,2492.0,Chakchaka,280.0,Barpeta,Dhubri,18.0,Assam
4,0,2025-12-03,Assam,Dhubri,Barpeta,Chakchaka,New,Not Applicable,Without Employee,Route Mapping,...,,2492,6,2492.0,Chakchaka,280.0,Barpeta,Dhubri,18.0,Assam
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5250,1,2025-12-03,Maharashtra,Nagpur,Wardha,Seloo,Existing,Mahabala,Priyanka Nagtode,Workshop 1A,...,2025-11-02,4610,,4610.0,Seloo,498.0,Wardha,Nagpur,27.0,Maharashtra
5251,1,2025-12-03,Maharashtra,Nagpur,Wardha,Seloo,Existing,Mahabala,Priyanka Nagtode,FGD Complete,...,2025-11-02,4610,,4610.0,Seloo,498.0,Wardha,Nagpur,27.0,Maharashtra
5252,1,2025-12-03,Maharashtra,Nagpur,Wardha,Seloo,Existing,Vadgaon,Priyanka Nagtode,FGD Complete,...,2025-11-20,4610,,4610.0,Seloo,498.0,Wardha,Nagpur,27.0,Maharashtra
5253,0,2025-12-03,West Bengal,Durgapur,Bankura,Taldangra,New,Not Applicable,Kerim Pahalowan,Stock Point,...,2025-12-04,2794,,2794.0,Taldangra,305.0,Bankura,Durgapur,19.0,West Bengal


In [None]:
# final_df[['State', 'Cluster', 'Block', 'block_code']].drop_duplicates().to_excel('wave_blocks_with_codes.xlsx', index=False)

# CSP Commission

In [70]:
csp_comm_sheet = gc.open_by_url('https://docs.google.com/spreadsheets/d/1r7u4fBEp5gU-4wtcQayBGQynJ4onWA76mM62eQA-7Rc/edit?usp=sharing')

csp_comm_sheet.worksheets()

[<Worksheet 'Sheet1' id:355993851>, <Worksheet 'Summary' id:478215777>]

In [71]:
comm_sheet_df = pd.DataFrame(csp_comm_sheet.worksheet('Sheet1').get_all_records())
comm_sheet_df

Unnamed: 0,Comm_Month,Accounting_Month,Bank,CSP_Code,State,Territorry,District,Comm_Pena,Revenue_Category,Revenue_Head,Count,Comm,Tag,Last Update
0,2025-03-31,2025-04-30,SBI,10021602,West Bengal,Darjeeling,Darjeeling,Comm,Social Security Scheme & A/C Opening,ACC OPEN WITHOUT EKYC AND INIT DEP 100 OR ABOVE,4,40.0,,16-Dec-2025
1,2025-03-31,2025-04-30,SBI,10021602,West Bengal,Darjeeling,Darjeeling,Comm,Social Security Scheme & A/C Opening,ACC OPEN WITH EKYC AND INIT DEP BTWN 100 AND 499,1,20.0,,16-Dec-2025
2,2025-03-31,2025-04-30,SBI,10021602,West Bengal,Darjeeling,Darjeeling,Comm,Non Transactional,AEPS MINI STATEMENT - ONUS,22,44.0,,16-Dec-2025
3,2025-03-31,2025-04-30,SBI,10021602,West Bengal,Darjeeling,Darjeeling,Comm,Withdrawal,AEPS ONUS WITHDRAWAL,737,7073.0,,16-Dec-2025
4,2025-03-31,2025-04-30,SBI,10021602,West Bengal,Darjeeling,Darjeeling,Comm,Fund Transfer,MONEY TRANSFER,99,861.0,,16-Dec-2025
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
288061,2025-11-30,2025-12-31,UGB,15010008,Uttarakhand,,Haldwani,Comm,Incentive,Points Earned,109,1500.0,LM,16-Dec-2025
288062,2025-11-30,2025-12-31,UGB,15010015,Uttarakhand,,Haldwani,Comm,Incentive,Points Earned,150,2500.0,LM,16-Dec-2025
288063,2025-11-30,2025-12-31,UGB,15010021,Uttarakhand,,Tehri Garhwal,Comm,Incentive,Points Earned,236,3200.0,LM,16-Dec-2025
288064,2025-11-30,2025-12-31,UGB,15010007,Uttarakhand,,Udham Singh Nagar,Comm,Incentive,Points Earned,229,2500.0,LM,16-Dec-2025


# tmp_csp_commission

In [41]:
warehouse_cursor.execute("SELECT TOP(1000) * FROM WAVE..tmp_csp_commission ORDER BY id DESC")

rows_tuple = [tuple(i) for i in warehouse_cursor.fetchall()]

column = [column[0] for column in warehouse_cursor.description]

tmp_csp_commission_df = pd.DataFrame(rows_tuple, columns=column)
tmp_csp_commission_df

Unnamed: 0,Comm_Month,Accounting_Month,Bank,CSP_Code,State,Territorry,District,Comm_Pena,Revenue_Category,Revenue_Head,Count,Comm,Tag,imported_by,imported_at,is_processed,id,error_in_process
0,2025-09-30,2025-10-31,UBI,0,0,0,0,0,0,0,0E-10,0E-10,LM,EMP123,2025-12-16 16:11:57.463,0,1720831,
1,2025-09-30,2025-10-31,PNB,K3800739,West Bengal,0,Darjeeling,Comm,Withdrawal,AEPS_ONUS_WDL,1.0000000000,0.4000000000,LM,EMP123,2025-12-16 16:11:57.463,0,1720830,
2,2025-09-30,2025-10-31,PNB,K3800735,Assam,0,Cachar,Comm,Withdrawal,AEPS_ONUS_WDL,1.0000000000,0.4000000000,LM,EMP123,2025-12-16 16:11:57.463,0,1720829,
3,2025-09-30,2025-10-31,PNB,K3800735,Assam,0,Cachar,Comm,Deposit,AEPS_ONUS_DEPOSIT,1.0000000000,0.4000000000,LM,EMP123,2025-12-16 16:11:57.463,0,1720828,
4,2025-09-30,2025-10-31,PNB,K3800733,Assam,0,Cachar,Comm,Non Transactional,UID_SEEDING,1.0000000000,5.0000000000,LM,EMP123,2025-12-16 16:11:57.463,0,1720827,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,2025-09-30,2025-10-31,PNB,K3800717,Chhattisgarh,0,Dhamtari,Comm,Deposit,AEPS_OFFUS_DEPOSIT,13.0000000000,100.0000000000,LM,EMP123,2025-12-16 12:26:47.160,0,1719836,
996,2025-09-30,2025-10-31,PNB,K3800716,West Bengal,0,Bardhaman,Comm,Social Security Scheme & A/C Opening,PMSBY,1.0000000000,1.0000000000,LM,EMP123,2025-12-16 12:26:47.160,0,1719835,
997,2025-09-30,2025-10-31,PNB,K3800716,West Bengal,0,Bardhaman,Comm,Social Security Scheme & A/C Opening,PMJDY,1.0000000000,5.0000000000,LM,EMP123,2025-12-16 12:26:47.160,0,1719834,
998,2025-09-30,2025-10-31,PNB,K3800716,West Bengal,0,Bardhaman,Comm,Fund Transfer,NEFT,1.0000000000,0.4000000000,LM,EMP123,2025-12-16 12:26:47.160,0,1719833,


# CSP_log

In [72]:
warehouse_cursor.execute("SELECT TOP(1000) * FROM WAVE..COMMISSION_Update_log")

rows_tuple = [tuple(i) for i in warehouse_cursor.fetchall()]

column = [column[0] for column in warehouse_cursor.description]

commission_log_df = pd.DataFrame(rows_tuple, columns=column)
commission_log_df

Unnamed: 0,last_update_date,table_name


In [73]:
comm_sheet_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 288066 entries, 0 to 288065
Data columns (total 14 columns):
 #   Column            Non-Null Count   Dtype 
---  ------            --------------   ----- 
 0   Comm_Month        288066 non-null  object
 1   Accounting_Month  288066 non-null  object
 2   Bank              288066 non-null  object
 3   CSP_Code          288066 non-null  object
 4   State             288066 non-null  object
 5   Territorry        288066 non-null  object
 6   District          288066 non-null  object
 7   Comm_Pena         288066 non-null  object
 8   Revenue_Category  288066 non-null  object
 9   Revenue_Head      288066 non-null  object
 10  Count             288066 non-null  object
 11  Comm              288066 non-null  object
 12  Tag               288066 non-null  object
 13  Last Update       288066 non-null  object
dtypes: object(14)
memory usage: 30.8+ MB


In [74]:
comm_sheet_df['Comm'].unique()

array([40.0, 20.0, 44.0, ..., 1663.75, 184.88, 440.05], dtype=object)

# Transformation

In [75]:
comm_sheet_df['Comm_Month'] = pd.to_datetime(comm_sheet_df['Comm_Month'])
comm_sheet_df['Accounting_Month'] = pd.to_datetime(comm_sheet_df['Accounting_Month'])
comm_sheet_df['Last Update'] = pd.to_datetime(comm_sheet_df['Last Update'])
comm_sheet_df['Comm'] = pd.to_numeric(comm_sheet_df['Comm'], errors='coerce').fillna(0).astype('float')
comm_sheet_df['Count'] = pd.to_numeric(comm_sheet_df['Count'], errors='coerce').fillna(0).astype('int')

In [76]:
comm_sheet_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 288066 entries, 0 to 288065
Data columns (total 14 columns):
 #   Column            Non-Null Count   Dtype         
---  ------            --------------   -----         
 0   Comm_Month        288066 non-null  datetime64[ns]
 1   Accounting_Month  288066 non-null  datetime64[ns]
 2   Bank              288066 non-null  object        
 3   CSP_Code          288066 non-null  object        
 4   State             288066 non-null  object        
 5   Territorry        288066 non-null  object        
 6   District          288066 non-null  object        
 7   Comm_Pena         288066 non-null  object        
 8   Revenue_Category  288066 non-null  object        
 9   Revenue_Head      288066 non-null  object        
 10  Count             288066 non-null  int32         
 11  Comm              288066 non-null  float64       
 12  Tag               288066 non-null  object        
 13  Last Update       288066 non-null  datetime64[ns]
dtypes: d

In [77]:
comm_sheet_df.columns.to_list

<bound method IndexOpsMixin.tolist of Index(['Comm_Month', 'Accounting_Month', 'Bank', 'CSP_Code', 'State',
       'Territorry', 'District', 'Comm_Pena', 'Revenue_Category',
       'Revenue_Head', 'Count', 'Comm', 'Tag', 'Last Update'],
      dtype='object')>

In [78]:
tmp_csp_commission_df.columns.to_list

<bound method IndexOpsMixin.tolist of Index(['Comm_Month', 'Accounting_Month', 'Bank', 'CSP_Code', 'State',
       'Territorry', 'District', 'Comm_Pena', 'Revenue_Category',
       'Revenue_Head', 'Count', 'Comm', 'Tag', 'imported_by', 'imported_at',
       'is_processed', 'id', 'error_in_process'],
      dtype='object')>

In [47]:
datetime.now().strftime('%Y-%m-%d %H:%M:%S')

'2025-12-19 16:54:22'

# CSP Logs

In [99]:
commission_log_df = pd.DataFrame([{
    "last_update_date": comm_sheet_df['Last Update'].max(),
    "table_name": "CSP_Commission"
}])
commission_log_df

Unnamed: 0,last_update_date,table_name
0,2025-12-16,CSP_Commission


# Filtering csp_comm_sheet

In [108]:
mx_1 = commission_log_df[commission_log_df['table_name'] == 'CSP_Commission']
mx_1['last_update_date'].max() 

Timestamp('2025-12-16 00:00:00')

In [None]:
comm_sheet_df_1 = comm_sheet_df[comm_sheet_df['Last Update'] > mx_1['last_update_date'].max()]

In [114]:
comm_sheet_df_1['imported_by'] = 'DF1002'
comm_sheet_df_1['imported_at'] = pd.to_datetime(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) 
comm_sheet_df_1['is_processed'] = 0
comm_sheet_df_1['error_in_process'] = 0

In [None]:
comm_sheet_df_1 = comm_sheet_df_1[['Comm_Month', 'Accounting_Month', 'Bank', 'CSP_Code', 'State',
       'Territorry', 'District', 'Comm_Pena', 'Revenue_Category',
       'Revenue_Head', 'Count', 'Comm', 'Tag', 'imported_by', 'imported_at',
       'is_processed', 'error_in_process']]

# Insert into tmp_csp_comm

In [None]:
table_name = "tmp_csp_commission"  # Replace with your actual table name
batch_size = 1000
idf = comm_sheet_df_1

bulk_insert(table_name, idf, batch_size)

Executing batch insert for records 0 to 999
Inserted batch: 1000/30172 records
Executing batch insert for records 1000 to 1999
Inserted batch: 2000/30172 records
Executing batch insert for records 2000 to 2999
Inserted batch: 3000/30172 records
Executing batch insert for records 3000 to 3999
Inserted batch: 4000/30172 records
Executing batch insert for records 4000 to 4999
Inserted batch: 5000/30172 records
Executing batch insert for records 5000 to 5999
Inserted batch: 6000/30172 records
Executing batch insert for records 6000 to 6999
Inserted batch: 7000/30172 records
Executing batch insert for records 7000 to 7999
Inserted batch: 8000/30172 records
Executing batch insert for records 8000 to 8999
Inserted batch: 9000/30172 records
Executing batch insert for records 9000 to 9999
Inserted batch: 10000/30172 records
Executing batch insert for records 10000 to 10999
Inserted batch: 11000/30172 records
Executing batch insert for records 11000 to 11999
Inserted batch: 12000/30172 records
E

In [89]:
table_name = "COMMISSION_Update_log"  # Replace with your actual table name
batch_size = 1000
idf = commission_log_df

bulk_insert(table_name, idf, batch_size)

Executing batch insert for records 0 to 0
✗ Error inserting data: The cursor's connection has been closed.


# Insert_into_CSP_Commission

In [None]:
insert_date = datetime.now().strftime('%Y-%m-%d')
warehouse_cursor.execute(f'''
                         USE WAVE;
                         exec sp_Insert_into_CSP_Commission '{insert_date}'
                         ''')
conn.commit()

<pyodbc.Cursor at 0x172f990dab0>

# Insert Into COMMISSION_Update_log

In [None]:
table_name = "COMMISSION_Update_log"  # Replace with your actual table name
batch_size = 1000
idf = commission_log_df

bulk_insert(table_name, idf, batch_size)

Executing batch insert for records 0 to 0
Inserted batch: 1/1 records
✓ Successfully inserted all 1 records into COMMISSION_Update_log
