In [12]:
import pyodbc
import logging
import time
import pandas as pd
from datetime import datetime
from typing import List, Dict, Optional

class BaseDBComponent:
    def __init__(self):
        self.connection = None
        self.cursor = None
        self.logger = logging.getLogger(self.__class__.__name__)

    def connect(self):
        raise NotImplementedError

    def disconnect(self):
        if self.cursor:
            self.cursor.close()
        if self.connection:
            self.connection.close()
        self.logger.info("Successfully disconnected from the database")


def get_secret(secret_name: str) -> Optional[Dict[str, str]]:
    """
    Retrieves secrets. In a real application, this would fetch
    credentials from a secure vault.
    """
    if secret_name == "sqlserver_cred":
        return {
            "server": "192.168.83.9",
            "database": "THIRDPARTY",
            "username": "digital_login",
            "password": "Digital123!@#",
            "port": "1433"
        }
    return None



class SQLServerComponent(BaseDBComponent):
    """Component for SQL Server database operations"""

    def __init__(self):
        super().__init__()
        self._load_credentials()

    def _load_credentials(self) -> None:
        """Load SQL Server credentials"""
        creds = get_secret("sqlserver_cred")
        if not creds:
            raise ConnectionError("SQL Server credentials not found")

        self.server = creds.get('server')
        self.database = creds.get('database')
        self.username = creds.get('username')
        self.password = creds.get('password')
        self.port = creds.get('port', '1433')
        self.corporate_view_table = 'VIEW_CLA_CORPORATE'
        self.individual_view_table = 'VIEW_CLA_INDIVIDUAL'

    def connect(self) -> None:
        """Connect to SQL Server database"""
        try:
            conn_str = (
                f"DRIVER={{ODBC Driver 17 for SQL Server}};"
                f"SERVER={self.server},{self.port};"
                f"DATABASE={self.database};"
                f"UID={self.username};"
                f"PWD={self.password};"
            )
            self.connection = pyodbc.connect(conn_str)
            self.cursor = self.connection.cursor()
            self.logger.info("Successfully connected to SQL Server database")

        except pyodbc.Error as ex:
            sqlstate = ex.args[0]
            self.logger.error(f"Failed to connect to SQL Server database: {sqlstate}")
            self.disconnect()
            raise ConnectionError(f"Failed to connect to SQL Server database: {str(ex)}")

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            self.logger.error(f"Error during database operation: {exc_type}")
        self.disconnect()
        return not exc_type

    def _execute_query(self, query: str, params: tuple = None, fetch_one: bool = False) -> List[Dict]:
        """Execute query and return results as list of dictionaries"""
        try:
            start_time = time.time()
            if params:
                self.cursor.execute(query, params)
            else:
                self.cursor.execute(query)

            if query.strip().lower().startswith("select"):
                if fetch_one:
                    row = self.cursor.fetchone()
                    if row:
                        columns = [column[0] for column in self.cursor.description]
                        return [dict(zip(columns, row))]
                    return []

                rows = self.cursor.fetchall()
                if not rows:
                    return []

                columns = [column[0] for column in self.cursor.description]
                result = [dict(zip(columns, row)) for row in rows]

                end_time = time.time()
                execution_time = end_time - start_time
                self.logger.info(f"Query executed in {execution_time:.4f} seconds")
                return result
            
            self.connection.commit()
            return []

        except pyodbc.Error as e:
            self.logger.error(f"Error executing query: {e}")
            raise Exception(f"Error executing query: {str(e)}")

    def fetch_corporate_data(self) -> Optional[List[Dict]]:
        """Fetch data from the corporate view"""
        try:
            self.logger.info("Initiating corporate data fetch from SQL Server...")
            start_time = time.time()

            query = f"SELECT * FROM {self.corporate_view_table}"
            results = self._execute_query(query)

            elapsed_time = time.time() - start_time
            self.logger.info(f"Corporate data fetch completed in {elapsed_time:.2f} seconds")

            if not results:
                self.logger.warning("No corporate data found")
                return []
            return results
        except Exception as e:
            self.logger.error(f"Error fetching corporate data: {e}")
            raise

    def fetch_individual_data(self) -> Optional[List[Dict]]:
        """Fetch data from the individual view"""
        try:
            self.logger.info("Initiating individual data fetch from SQL Server...")
            start_time = time.time()

            query = f"SELECT * FROM {self.individual_view_table}"
            results = self._execute_query(query)

            elapsed_time = time.time() - start_time
            self.logger.info(f"Individual data fetch completed in {elapsed_time:.2f} seconds")

            if not results:
                self.logger.warning("No individual data found")
                return []
            return results
        except Exception as e:
            self.logger.error(f"Error fetching individual data: {e}")
            raise



In [9]:

if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    try:
        with SQLServerComponent() as db:
            corporate_data = db.fetch_corporate_data()
            if corporate_data:
                print(f"Fetched {len(corporate_data)} corporate records.")
                # print(corporate_data[0]) # To print the first record

            individual_data = db.fetch_individual_data()
            if individual_data:
                print(f"Fetched {len(individual_data)} individual records.")
                # print(individual_data[0]) # To print the first record

    except ConnectionError as e:
        print(e)
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

INFO:SQLServerComponent:Successfully connected to SQL Server database
INFO:SQLServerComponent:Initiating corporate data fetch from SQL Server...
INFO:SQLServerComponent:Query executed in 11.8328 seconds
INFO:SQLServerComponent:Corporate data fetch completed in 11.85 seconds
INFO:SQLServerComponent:Initiating individual data fetch from SQL Server...


Fetched 132942 corporate records.


INFO:SQLServerComponent:Query executed in 34.4662 seconds
INFO:SQLServerComponent:Individual data fetch completed in 35.05 seconds


Fetched 1842336 individual records.


INFO:SQLServerComponent:Successfully disconnected from the database


In [10]:
corporate_data

[{'Cif_Id': 'C000120384',
  'FORACID': '0330100000069201',
  'Company_Name': 'ADVANCE BOOK and ORDER SUPPLIERS',
  'Company_Registeration_Number': None,
  'Company_Registeration_Date': None,
  'Company_Registration_Authority': None,
  'Pan_Number': '305482286',
  'Pan_Issue_Date': datetime.datetime(2012, 8, 26, 12, 0),
  'Phone_Number': '9804515829'},
 {'Cif_Id': 'C000120856',
  'FORACID': '0330100000071201',
  'Company_Name': 'BASU ENTERPRISES',
  'Company_Registeration_Number': None,
  'Company_Registeration_Date': None,
  'Company_Registration_Authority': None,
  'Pan_Number': '304782060',
  'Pan_Issue_Date': datetime.datetime(2012, 9, 3, 0, 0),
  'Phone_Number': '0000000000'},
 {'Cif_Id': 'C000120975',
  'FORACID': '0330100000073201',
  'Company_Name': 'PRAVAKAR KHADAYA TATHA KIRANA STORE',
  'Company_Registeration_Number': None,
  'Company_Registeration_Date': None,
  'Company_Registration_Authority': None,
  'Pan_Number': '304680569',
  'Pan_Issue_Date': datetime.datetime(2012, 9

In [11]:
individual_data

[{'CIF_ID': 'R000003561',
  'FORACID': '0012100000325202',
  'Customer_Name': 'GYANENDRA  PD. KAYASTHA',
  'Father_Name': 'KRISHNA DAS KAYASTHA',
  'Grandfather_Name': 'GANGA DAS KAYASTHA',
  'Spouse_Name': 'DIBYASWARI KAYASTHA',
  'Permanent_Address': ' ',
  'Temporary_Address': 'BIJAYA BASTI, SANEPA-2, LALITPUR, NEPAL',
  'Citizenship_Number': '259/032/33',
  'Citizenship_Issue_Date': datetime.datetime(2008, 3, 13, 12, 0),
  'Citizenship_Issue_District': 'LALITPUR',
  'NID_Number': None,
  'NID_Issue_Date': None,
  'NID_Issue_District': None,
  'PAN_Number': '101563291',
  'Phone_Number': '9841225992'},
 {'CIF_ID': 'R000003404',
  'FORACID': '0040100000075010',
  'Customer_Name': 'TRILOCHAN  POUDEL',
  'Father_Name': 'LAXMI PRASAD POUDEL',
  'Grandfather_Name': 'MIG',
  'Spouse_Name': 'LAXMI POUDEL',
  'Permanent_Address': ' ',
  'Temporary_Address': 'NIRMAL POKHARI 9 KASKI ',
  'Citizenship_Number': '114202',
  'Citizenship_Issue_Date': datetime.datetime(2008, 3, 11, 0, 0),
  'Citiz

In [13]:
df = pd.DataFrame(corporate_data)

In [14]:
df.to_csv('corporate_data.csv', index=False)

In [15]:
df = pd.DataFrame(individual_data)
df.to_csv('individual_data.csv', index=False)