Exception code testing

In [1]:
import sys

class CustomException(Exception):

    def __init__(self, error_message, error_details:sys):
        self.error_message = error_message
        _, _, exc_tb = error_details.exc_info()
        self.lineno = exc_tb.tb_lineno
        self.file_name = exc_tb.tb_frame.f_code.co_filename

    def __str__(self):
        return "Error occured in python script name [{0}] line number [{1}] error message [{2}]".format(
            self.file_name, self.lineno, str(self.error_message))


try:
    1 / 0
except Exception as e:
    raise CustomException(e, sys)

CustomException: Error occured in python script name [C:\Users\vchar\AppData\Local\Temp\ipykernel_36868\4159084028.py] line number [17] error message [division by zero]

Logging testing

In [2]:
import logging
import os
from datetime import datetime as dt

LOG_FILE = f"{dt.now().strftime('%m_%d_%Y_%H_%M_%S')}.log"

log_path = os.path.join(os.getcwd(), "logs")

os.makedirs(log_path, exist_ok=True)

LOG_FILEPATH = os.path.join(log_path, LOG_FILE)

logging.basicConfig(
    level=logging.INFO,
    filename=LOG_FILEPATH,
    format="[%(asctime)s] %(lineno)d %(name)s - %(levelname)s - %(message)s"
)

logging.info("Log testing executed!!!")

Testing mongodb

In [2]:
import os 
from pathlib import Path

company_bankruptcy_path = Path(r"C:/\Users/\vchar/\OneDrive/\Desktop/\ML Projects/\portfolio/\CompanyBankruptcy")
current_dir = os.getcwd()

os.chdir(company_bankruptcy_path)

In [None]:
current_dir = os.getcwd()

In [4]:
import pandas as pd
import pymongo
import json

# company_bankruptcy_path = Path(r"C:/\Users/\vchar/\OneDrive/\Desktop/\ML Projects/\portfolio/\CompanyBankruptcy/\company_bankruptcy")
# current_dir = os.getcwd()

# os.chdir(company_bankruptcy_path)

from company_bankruptcy.exception.exception import CustomException
from company_bankruptcy.logger.logger import logging
from company_bankruptcy.constants.constants import DATABASE_NAME, COLLECTION_NAME, MONGODB_COLLECTION_STR

# os.chdir(current_dir)

import sys


class MongoOps:

    def __init__(self, client_url:str, database_name:str=None, collection_name:str=None):
        self.client_url = client_url
        self.database_name = database_name
        self.collection_name = collection_name

    def create_client(self):
        logging.info('Initiating MongoClient')
        client = pymongo.MongoClient(self.client_url)
        logging.info('MongoClient initiated')
        return client

    def create_database(self):
        logging.info('Creating Mongo database')
        client = self.create_client()
        database = client[self.database_name]
        logging.info(f'Mongo database {self.database_name} created')
        return database

    def create_collection(self):
        logging.info('Creating Mongo collection')
        database = self.create_database()
        collection = database[self.collection_name]
        logging.info(f'Mongo collection {self.collection_name} created')
        return collection
    
    def get_database(self, db_name:str):
        logging.info(f'Accessing {db_name} database')
        client = self.create_client()
        database = client[db_name]
        logging.info(f'{db_name} database accessed')
        return database

    def get_collection(self, coll_name:str, db_name:str):
        logging.info(f'Accessing {coll_name} collection')
        database = self.get_database(db_name)
        collection = database[coll_name]
        logging.info(f'{coll_name} collection accessed')
        return collection

    def insert_record(self, record:dict, coll_name:str, db_name:str):
        collection = self.get_collection(coll_name, db_name)
        logging.info(f'Starting record insertion into {coll_name} collection of {db_name} database')
        if isinstance(record, list):
            for data in record:
                if type(data) != dict:
                    logging.info("Records' list should have elements as dict")
                    raise TypeError("Records' list should have elements as dict")
            collection.insert_many(record)
        elif isinstance(record, dict):
            collection.insert_one(record)
        logging.info(f'Insertion into {coll_name} collection of {db_name} database completed')

    def insert_from_file(self, datafile:str, coll_name:str, db_name:str):
        logging.info(f'Starting record insertion into {coll_name} collection of {db_name} database from {datafile}')
        self.path = datafile

        if self.path.endswith('.csv'):
            df = pd.read_csv(self.path, encoding='utf-8')
        elif self.path.endswith('.xlsx'):
            df = pd.read_excel(self.path, encoding='utf-8')
        logging.info('Data is loaded as a pandas dataframe')

        logging.info('Converting the data into json')
        datajson = json.loads(df.to_json(orient='record'))
        logging.info('Conversion to json completed')

        collection = self.get_collection(coll_name, db_name)

        logging.info('Inserting json data')
        collection.insert_many(datajson)
        logging.info('Insertion completed')

    def get_records(self, coll_name:str, db_name:str):
        collection = self.get_collection(coll_name, db_name)
        retrieved_data = pd.DataFrame(list(collection.find()))
        try:
            retrieved_data.drop(columns='_id', inplace=True)
            logging.info('Loading the data from the database completed')
        except Exception as e:
            retrieved_data = pd.DataFrame()
            logging.info('Loading the data from the database failed')
            raise CustomException(e, sys)
        return retrieved_data

mongo_instance = MongoOps(
    client_url=MONGODB_COLLECTION_STR
)

retrieved_data = mongo_instance.get_records(coll_name=COLLECTION_NAME, db_name=DATABASE_NAME)
retrieved_data.head()