In [68]:
import pandas as pd 
import numpy as np 
import logging
from typing import Tuple, Union
from abc import ABC, abstractmethod
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from collections import Counter
from imblearn.over_sampling import ADASYN
from imblearn.combine import SMOTETomek
from imblearn.under_sampling import NearMiss, TomekLinks
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler, LabelEncoder
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, roc_auc_score
from sklearn.model_selection import cross_val_score
from sklearn.base import ClassifierMixin, RegressorMixin
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.svm import SVC, SVR
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.feature_selection import mutual_info_classif, mutual_info_regression


class DataStrategy(ABC):

    @abstractmethod
    def handle_data(self, data : pd.DataFrame, target: str) -> Union[pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]:
      pass

class RemoveIdentifierStrategy(DataStrategy):
  def handle_data(self, data: pd.DataFrame, target: str) -> pd.DataFrame:
    try:
      if data is not None and target in data.columns:
        for column in data.columns:
          if data[column].nunique() == 1 or data[column].nunique() == len(data):
            data = data.drop(columns=[column])
        return data
      else:
        raise ValueError("Remove Identifier Strategy Error : Data is None or target column is missing.")
    except Exception as e:
      logging.error(f"Error removing identifier columns: {e}")
      raise
    
class MissingValueStrategy(DataStrategy):
  def handle_data(self, data: pd.DataFrame, target: str) -> pd.DataFrame:
    try:
      if data is not None and target in data.columns:
        for column in data.columns:
          count_nan = data[column].isnull().sum()
          if len(data) <= 2000:
            if data[column].dtype in ['object', 'category']:
              data[column].fillna(data[column].mode()[0], inplace=True)
            else:
              data[column].fillna(data[column].mean(), inplace=True)
          elif len(data) >= 2000 and count_nan < 0.2 * len(data):
            data = data.dropna(subset=[column])
          else:
            data = data.drop(columns=[column])
        logging.info("Missing values handled successfully.")
        return data
      elif data is None or target not in data.columns:
        raise ValueError("Missing values Strategy Error : Data is None or target column is missing.")
    except Exception as e:
        logging.error(f"Error handling missing values: {e}")
        raise

class OutlierStrategy(DataStrategy): 
  def handle_data(self, data: pd.DataFrame, target: str) -> pd.DataFrame:
    try:
      if data is not None and target in data.columns:
        original_rows = len(data)
        logging.info(f"Original number of rows: {original_rows}")
        if len(data) <= 2000:
          numeric_cols = data.select_dtypes(include=[np.number]).columns
          mask = pd.Series(True, index=data.index)
          
          for column in numeric_cols:
            q1 = data[column].quantile(0.25)
            q3 = data[column].quantile(0.75)
            iqr = q3 - q1
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr
            col_mask = (data[column] >= lower_bound) & (data[column] <= upper_bound)
            mask &= col_mask
          data = data[mask]
          if data is None or data.empty:
            raise ValueError("Outlier Strategy Error : IQR method, No data left after outlier removal.")
        elif 2000 < len(data) <= 10000:
          lof = LocalOutlierFactor(n_neighbors=20)
          outliers = lof.fit_predict(data.select_dtypes(include=[np.number]))
          data = data[outliers == 1]
          if data is None or data.empty:
            raise ValueError("Outlier Strategy Error : Local Factor method, No data left after outlier removal.")
        else:
          numeric_data = data.select_dtypes(include=[np.number])
          iso_forest = IsolationForest(contamination='auto',
                                    random_state=42,
                                    n_estimators=100)
          outliers = iso_forest.fit_predict(numeric_data)
          data = data[outliers == 1]
          if data is None or data.empty:
            raise ValueError("Outlier Strategy Error : Isolation Forest method, No data left after outlier removal.")
        
        logging.info("Outliers handled successfully.")
        logging.info(f"Removed {original_rows - len(data)} rows ({((original_rows - len(data))/original_rows):.1%})")
        return data
      else:
        raise ValueError("Outlier Strategy Error : Data is None or target column is missing.")
    except Exception as e:
      logging.error(f"Error handling outliers: {e}")
      raise
    
class ImbalancedDataStrategy(DataStrategy):
  def handle_data(self, data: pd.DataFrame, target: str) -> pd.DataFrame:
    try:
      data_size = None
      if len(data) <= 2000:
        data_size = "small"
      elif 2000 < len(data) <= 20000:
        data_size = "medium"
      else: 
        data_size = "large"
      class_counts = Counter(data[target])
      perform_test = False
      for class_value, count in class_counts.items():
        if count >= 0.75 * len(data):
          perform_test = True
          break
      if perform_test:
        X = data.drop(columns=[target])
        y = data[target]
        logging.warning(f"Class {class_value} is a majority class with {count/len(data)*100}% of the total records.")
        if data_size == "small":
          ada = ADASYN(sampling_strategy='minority', n_neighbors=3)
          X_res, y_res = ada.fit_resample(X, y)
          data = pd.concat([X_res, y_res], axis=1)
        elif data_size == "medium":
          smote_tomek = SMOTETomek(sampling_strategy='auto', tomek=TomekLinks(sampling_strategy='majority'))
          X_res, y_res = smote_tomek.fit_resample(X, y)
          data = pd.concat([X_res, y_res], axis=1)
        else:
          nm = NearMiss(version=3, n_jobs=-1)  
          X_res, y_res = nm.fit_resample(X, y)
          data = pd.concat([X_res, y_res], axis=1)
        logging.info("Imbalanced data handled successfully.")
      return data
    except Exception as e:
      logging.error(f"Error handling imbalanced data: {e}")
      raise

class SplitDataStrategy(DataStrategy):
  def handle_data(self, data: pd.DataFrame, target: str) -> Tuple[pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]:
    try:
      x_train, x_test, y_train, y_test = train_test_split(data.drop(columns=[target]), 
                                                        data[target], 
                                                        test_size=0.2, 
                                                        random_state=42)
      logging.info("Data split into training and testing sets successfully.")
      return x_train, x_test, y_train, y_test
    except Exception as e:
      logging.error(f"Error splitting data: {e}")
      raise

class ScaleDataStrategy(DataStrategy):
  def handle_data(self, data: pd.DataFrame, target: str) -> pd.DataFrame:
    try:
      X = data.drop(columns=[target])
      y = data[target]
      scaler = RobustScaler()
      X_scaled = scaler.fit_transform(X)
      scaled_data = pd.DataFrame(X_scaled, columns=X.columns)
      scaled_data[target] = y.reset_index(drop=True)
      logging.info("Data scaled successfully.")
      return scaled_data
    except Exception as e:
      logging.error(f"Error scaling data: {e}")
      raise
    
class EncodeDataStrategy(DataStrategy):
  def handle_data(self, data: pd.DataFrame, target: str) -> pd.DataFrame:
    try:
      X = data.drop(columns=[target])
      y = data[target]
      categorical_cols = X.select_dtypes(include=['object', 'category']).columns
      
      # Encode features
      for col in categorical_cols:
        le = LabelEncoder()
        X[col] = le.fit_transform(X[col])
      
      # Encode target variable
      if y.dtype == 'object' or y.dtype.name == 'category':
        le = LabelEncoder()
        y_encoded = le.fit_transform(y)
        y_encoded = pd.Series(y_encoded, name=target, index=y.index)  # Keep original index
      else:
        y_encoded = y
      
      X[target] = y_encoded
      logging.info("Data encoded successfully.")
      return X
    except Exception as e:
      logging.error(f"Error encoding data: {e}")
      raise
  
class FeatureSelectionStrategy(DataStrategy):
  def handle_data(self, data: pd.DataFrame, target: str) -> pd.DataFrame:
    try:
      X = data.drop(columns=[target])
      y = data[target]
      if pd.api.types.is_categorical_dtype(y) or not pd.api.types.is_numeric_dtype(y):
        mi = mutual_info_classif(X, y, random_state=42)
      else:
          mi = mutual_info_regression(X, y, random_state=42)
      threshold = max(0.1, np.percentile(mi, 50))
      selected = mi >= threshold
      if not any(selected): 
          selected = mi >= mi.max()
      X_filtered = X.loc[:, selected]
      logging.info(f"Selected {X_filtered.shape[1]} features with threshold {threshold:.3f}")
      return pd.concat([X_filtered, y], axis=1)
    except Exception as e:
      logging.error(f"Feature selection failed, returning original data. Error: {e}")
      return data 

# class FeatureEngineeringStrategy(DataStrategy):
#   def handle_data(self, data: pd.DataFrame) -> pd.DataFrame:
#     pass

class DimensionalityReductionStrategy(DataStrategy):
  def handle_data(self, data: pd.DataFrame, target: str) -> pd.DataFrame:
    try:
      X = data.drop(columns=[target])
      y = data[target]
      pca = PCA(n_components=0.95, random_state=42)
      X_reduced = pca.fit_transform(X)
      reduced_data = pd.DataFrame(X_reduced, columns=[f'PC{i+1}' for i in range(X_reduced.shape[1])])
      reduced_data[target] = y.reset_index(drop=True)
      logging.info("Dimensionality reduction applied successfully.")
      return reduced_data
    except Exception as e:
      logging.error(f"Error applying dimensionality reduction: {e}")
      raise

In [61]:
def remove_identifiers(data: pd.DataFrame, target: str) -> pd.DataFrame:
  try:
    data = RemoveIdentifierStrategy().handle_data(data, target)
    logging.info("Identifiers removed successfully.")
    return data
  except Exception as e:
    logging.error(f"Error removing identifiers: {e}")
    raise RuntimeError(f"Error in removing identifiers: {e}") 
  
def fill_missing_values(data: pd.DataFrame, target: str) -> pd.DataFrame:
  try:
    data = MissingValueStrategy().handle_data(data, target)
    return data
  except Exception as e:
    raise RuntimeError(f"Error in filling missing values: {e}")
  
def remove_outliers(data: pd.DataFrame, target: str) -> pd.DataFrame:
  try:
    data = OutlierStrategy().handle_data(data, target)
    
    return data
  except Exception as e:
    raise RuntimeError(f"Error in removing outliers: {e}")
  
def encode_data(data: pd.DataFrame, target: str) -> pd.DataFrame:
  try:
    data = EncodeDataStrategy().handle_data(data, target)
    
    return data
  except Exception as e:
    raise RuntimeError(f"Error in encoding data: {e}")
  
def scale_data(data: pd.DataFrame, target: str) -> pd.DataFrame:
  try:
    data = ScaleDataStrategy().handle_data(data, target)
    
    return data
  except Exception as e:
    raise RuntimeError(f"Error in scaling data: {e}")
  
def select_features(data: pd.DataFrame, target: str) -> pd.DataFrame:
  try:
    data = FeatureSelectionStrategy().handle_data(data, target)
    
    return data
  except Exception as e:
    raise RuntimeError(f"Error in feature selection: {e}")
  
def reduce_dimensions(data: pd.DataFrame, target: str) -> pd.DataFrame:
  try:
    data = DimensionalityReductionStrategy().handle_data(data, target)
    
    return data
  except Exception as e:
    raise RuntimeError(f"Error in dimensionality reduction: {e}")
  
def split_data(data: pd.DataFrame, target: str, test_size: float = 0.2, random_state: int = 42) -> tuple: 
  try:
    x_train, x_test, y_train, y_test = SplitDataStrategy().handle_data(data, target)
    logging.info('data shape after splitting: x_train: {}, y_train: {}, x_test: {}, y_test: {}'.format(
      x_train.shape, y_train.shape, x_test.shape, y_test.shape))
    return x_train, x_test, y_train, y_test
  except Exception as e:
    raise RuntimeError(f"Error in splitting data: {e}")

In [65]:
data = pd.read_excel('H:\DATA\MY\Projects\supervised_learning_prediction_Saas\data\Raisin_Dataset.xlsx')
data.head()

Unnamed: 0,Area,MajorAxisLength,MinorAxisLength,Eccentricity,ConvexArea,Extent,Perimeter,Class
0,87524,442.246011,253.291155,0.819738,90546,0.758651,1184.04,Kecimen
1,75166,406.690687,243.032436,0.801805,78789,0.68413,1121.786,Kecimen
2,90856,442.267048,266.328318,0.798354,93717,0.637613,1208.575,Kecimen
3,45928,286.540559,208.760042,0.684989,47336,0.699599,844.162,Kecimen
4,79408,352.19077,290.827533,0.564011,81463,0.792772,1073.251,Kecimen


In [66]:
data = pd.read_excel('H:\DATA\MY\Projects\supervised_learning_prediction_Saas\data\Raisin_Dataset.xlsx')
data = remove_identifiers(data, 'Class')
data = fill_missing_values(data, 'Class')
data = remove_outliers(data, 'Class')
data = encode_data(data, 'Class')
data = scale_data(data, 'Class')

data = select_features(data, 'Class')
x_train, x_test, y_train, y_test = split_data(data, 'Class')
data.isnull().sum()  # Check for missing values

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  data[column].fillna(data[column].mean(), inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  data[column].fillna(data[column].mode()[0], inplace=True)
  if pd.api.types.is_categorical_dtype(y) or not pd.api.types.is_numeric_dtype(y):


ConvexArea    0
Class         0
dtype: int64

In [69]:
model = RandomForestClassifier(random_state=42)
model.fit(x_train, y_train)

y_pred = model.predict(x_test)


In [72]:
print(y_test.shape)
print(y_pred.shape)

(172,)
(172,)


In [73]:
evaluation_results = {
    'accuracy': accuracy_score(y_test, y_pred),
    'f1_score': f1_score(y_test, y_pred, average='weighted'),  # Automatically set to 'weighted' for multi-class
    'confusion_matrix': confusion_matrix(y_test, y_pred),
    'mean_cross_val_score': cross_val_score(model, x_test, y_test, 
                                             cv=5, scoring='accuracy').mean(),
    'cross_val_scores': cross_val_score(model, x_test, y_test, 
                                        cv=5, scoring='accuracy')
}