In [1]:
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder

## pipelines
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import logging
import os
from datetime import datetime

In [2]:

class CustomerChurnModel:

    def __init__(self, data_path):
        self.data_path = data_path
        self.preprocessor = None
        LOG_FILE = f"{datetime.now().strftime('%m_%d_%Y_%H_%M_%S')}.log" 
        logs_path = os.path.join(os.getcwd(), "logs", LOG_FILE)
        os.makedirs(logs_path, exist_ok=True)

        LOG_FILE_PATH = os.path.join(logs_path, LOG_FILE)

        logging.basicConfig(
            filename=LOG_FILE_PATH,
            format="[%(asctime)s] %(lineno)d %(name)s - %(levelname)s - %(message)s",
            level=logging.INFO
        )

    def data_ingestion(self):
        try:
            # divinding data in input and output features
            data = pd.read_excel(self.data_path) 
            logging.info("Loading data from %s", self.data_path)
            X = data.drop(labels=['Churn', "CustomerID", "Name"], axis=1) #dropping unnecessary columns and assigning to X
            Y = data[['Churn']] # selecting churn as Y
            logging.info("Data_ingestion completed successfully")
            return X, Y
        except Exception as e:
            logging.info('Exception occured in data_ingestion', e)
    
    def cat_num(self, X,Y):
        try:
            # extracting categorical and numerical data 
            logging.info("Cat_num started")
            categorical_cols = X.select_dtypes(include='object').columns # object data will be added
            numerical_cols = X.select_dtypes(exclude='object').columns # non object data will be added
            logging.info("Cat_num completed successfully")
            return categorical_cols, numerical_cols
        except Exception as e:
            logging.info('Exception occured in cat_num', e)

    def pipeline(self, categorical_cols, numerical_cols):
        try:
            logging.info("pipeline started")
            num_pipeline = Pipeline(
                steps=[
                    ('imputer', SimpleImputer(strategy='median')),# for imputing missing values using median as strategy
                    ('scaler', StandardScaler())# scaling the numeric data
                ]
            )

            cat_pipeline = Pipeline(
                steps=[
                    ('imputer', SimpleImputer(strategy='most_frequent')), #imputing missing values using most frequent
                    ('onehotencoder', OneHotEncoder(drop='first')) #encoding the categorical data
                ]
            )

            self.preprocessor = ColumnTransformer([
                ('num_pipeline', num_pipeline, numerical_cols), #initializing the column transformer
                ('cat_pipeline', cat_pipeline, categorical_cols)
            ])
            logging.info("pipeline completed successfully")
        except Exception as e:
            logging.info('Exception occured in pipeline', e)

    def train_test(self, X, Y):
        try:
            logging.info("train_test started")
            # splitting the data
            X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.30, random_state=30)
            logging.info("train_test completed successfully")
            return X_train, X_test, y_train, y_test
        except Exception as e:
            logging.info('Exception occured in train_test', e)

    def feature_scaling(self, X_train, X_test):
        try:
            # using pipeline and fit_transforming the train and transforming the test data
            logging.info("feature_scaling started")
            X_train_scaled = pd.DataFrame(self.preprocessor.fit_transform(X_train), columns=self.preprocessor.get_feature_names_out())
            X_test_scaled = pd.DataFrame(self.preprocessor.transform(X_test), columns=self.preprocessor.get_feature_names_out())
            logging.info("feature_scaling completed successfully")
            return X_train_scaled, X_test_scaled
        except Exception as e:
            logging.info('Exception occured in feature_scaling', e)

    def model_train(self, X_train, X_test, y_train, y_test):
        try:
            logging.info("model_train started")
            classifier = RandomForestClassifier(n_estimators=1000,   # number of trees in forest
                                                criterion='gini',    # criterion method
                                                ccp_alpha=0.3,       # cost complexity pruning
                                                max_depth=100,       # maximum depth
                                                min_samples_split=5, # minimum number of samples
                                                min_samples_leaf=2,  # number of leaf 
                                                max_features="sqrt", # maximum features
                                                random_state=42)        

            # Fitting the model on the training data
            classifier.fit(X_train, y_train)

            logging.info("model.fit completed successfully")

            # Making predictions on the testing data
            y_pred = classifier.predict(X_test)

            logging.info("model prediction completed")

            # Evaluating the model's performance and training accuracy
            accuracy = accuracy_score(y_test, y_pred)

            logging.info("accuracy score: %s", accuracy)

            # Checking training score
            train_score = classifier.score(X_train, y_train)
            logging.info("train score: %s", train_score)
            print(f"Accuracy: {accuracy:.4f} Training: {train_score:.4f}")
        except Exception as e:
            logging.info('Exception occured in data_ingestion', e)

data_path = "data/customer_churn_large_dataset.xlsx"
churn_model = CustomerChurnModel(data_path)
X, Y = churn_model.data_ingestion()
categorical_cols, numerical_cols = churn_model.cat_num(X, Y)
churn_model.pipeline(categorical_cols, numerical_cols)
X_train, X_test, y_train, y_test = churn_model.train_test(X, Y)
X_train_scaled, X_test_scaled = churn_model.feature_scaling(X_train, X_test)
churn_model.model_train(X_train_scaled, X_test_scaled, y_train, y_test)


  return fit_method(estimator, *args, **kwargs)


Accuracy: 0.4982 Training: 0.5039
