In [None]:
from sklearn.linear_model import SGDRegressor
from sklearn.preprocessing import MinMaxScaler
from joblib import dump, load
import pandas
import numpy
import json
import matplotlib.pyplot as plt
from collections import Counter

class ShadowLearning:
    def __init__(self):
        pass

    def scaler(self, values):
        min_value = values.min()
        max_value = values.max()

        scaled_distances = (values - min_value) / (max_value - min_value)
        return scaled_distances

    def roundToNearestBins(self, values):
        values = numpy.array(values).reshape(-1, 1)
        bins = numpy.arange(0.000, 1.050, 0.025)
        bins = numpy.array(bins).reshape(1, -1)

        abs_diff = numpy.abs(values - bins)

        min_indices = numpy.argmin(abs_diff, axis=1)

        return bins[0, min_indices]

    def refineData(self, df: pandas.core.frame.DataFrame):
        pass

    def optimizer(self, df: pandas.core.frame.DataFrame, x: list[str], y: str):
        pass

    def fit(self, df: pandas.core.frame.DataFrame,  x: list[str], y: str, save_files: bool = True):
        # Checking paramters are of required type
        if not(isinstance(df, pandas.core.frame.DataFrame)):
            raise TypeError("DataFrame provided is not of type pandas.core.frame.DataFrame")

        if not (isinstance(x, list) and all(isinstance(item, str) for item in x)):
            raise TypeError("Provided features list is not of of type list[str]")

        if not(isinstance(y, str)):
            raise TypeError("Provided target column is not of type str")

        #Initializing and Training the model on initial data
        model = SGDRegressor(
            max_iter=3,
            warm_start=True,
            penalty='l2',
            alpha=0.01,
            learning_rate='constant',
            eta0=0.01
            )
        scaler = MinMaxScaler()

        X = df[x]
        Y = df[y]

        model.partial_fit(X, Y)

        weights = model.coef_
        intercept = model.intercept_

        numerator = numpy.abs(numpy.dot(X, weights) + intercept - Y)
        denominator = numpy.sqrt(numpy.sum(weights ** 2))
        distances_from_fit = numerator / denominator
        distances_from_fit = scaler.fit_transform(numpy.array(distances_from_fit).reshape(-1, 1))
        distribution = self.roundToNearestBins(distances_from_fit)

        target_values = numpy.arange(0.000, 1.050, 0.025)

        distribution = [round(x, 3) for x in distribution]
        counts_dict = {numpy.around(val, decimals=3): 0 for val in target_values}
        counts_dict.update(Counter(distribution))

        # Saving the model and distribution file
        if save_files:
            dump(model, 'shadow_fit.pkl')
            dump(scaler, 'shadow_scaler.pkl')
            with open('shadow_distribution.json', 'w') as f:
                json.dump(counts_dict, f, indent=4)

        return model, counts_dict

    def filter(self, df: pandas.core.frame.DataFrame, x: list[str], y: str, fitted_line: str, scaler_path: str, distribution_curve: str, distance_threshold: float, distribution_threshold, filtered: bool = True, visualize: bool = False, update_fits: bool = True):
        """
        Filter rows from the DataFrame object to train with respect to Multiple Linear Regression fitted line and Distance Distribution.

        Parameters
        ----------
        df : pandas.core.frame.DataFrame
            A pandas DataFrame object from which to filter untrained data (rows).
        x : list[str]
            List of column names from the DataFrame to treat as features.
        y : str
            The name of the column from the DataFrame to treat as target.
        fitted_line : str
            The file path of shadow_fit.pkl
        scaler_path : str
            The file path of shadow_scaler.pkl
        distribution_curve : str
            The file path of shadow_distribution.json
        distance_threshold : float
            The minimum distance to accept the row vectors from the fitted line.
        distribution_threshold : float
            The minimum distance from the distribution curve to accept for each distance interval.
        filtered : bool
            When True, return a new DataFrame object with filtered rows only, otherwise add a new column to the DataFrame object named 'SLToTrain' to the DataFrame object provided to mark the rows. True by default.
        visualize : bool
            When True, prints the plot of new row vectors with fitted line and the plot of distance distribution curve. False by default
        update_fits : bool
            When True, update the fitted line and the distnace distributions with the filtered rows. True by default.

        Returns
        -------
        pandas.core.frame.DataFrame
            Filtered or marked DataFrame object.
        list[float]
            Updated fitted line, if update_lr = True
        list[float]
            Updated distance distribution curve, if update_distribution = True
        """


        # Checking paramters are of required type
        if not(isinstance(df, pandas.core.frame.DataFrame)):
            raise TypeError("DataFrame provided is not of type pandas.core.frame.DataFrame")

        if not (isinstance(x, list) and all(isinstance(item, str) for item in x)):
            raise TypeError("Provided features list is not of of type list[str]")

        if not(isinstance(y, str)):
            raise TypeError("Provided target column is not of type str")

        if not(isinstance(fitted_line, str)):
            raise TypeError("Fitted Line is not of type ...")
        else:
            try:
                model = load(fitted_line)
            except FileNotFoundError:
                raise FileNotFoundError(f"Could not find the model file: '{fitted_line}'")

        if not(isinstance(scaler_path, str)):
            raise TypeError("Fitted Line is not of type ...")
        else:
            try:
                scaler = load(scaler_path)
            except FileNotFoundError:
                raise FileNotFoundError(f"Could not find the model file: '{scaler_path}'")

        if not(isinstance(distribution_curve, str)):
            raise TypeError("Distribution Curve is not of type ...")
        else:
            try:
                with open(distribution_curve, 'r') as file:
                    fitted_distribution = json.load(file)

            except FileNotFoundError:
                print(f"Error: The file '{distribution_curve}' doesn't exist.")
                return None
            except json.JSONDecodeError:
                print(f"Error: The file '{distribution_curve}' is not valid JSON.")
                return None
            except Exception as e:
                print(f"An unexpected error occurred: {e}")
                return None

        if isinstance(distance_threshold, (int, float)):
            if not(0 <= distance_threshold <= 1):
                raise ValueError("Distance Threshold should be between 0 and 1.")
        else:
            raise TypeError("Distance Threshold should be of type int or float")

        if isinstance(distribution_threshold, (int, float)):
            if not(0 <= distribution_threshold <= 1):
                raise ValueError("Distribution Threshold should be between 0 and 1.")
        else:
            raise TypeError("Distribution Threshold should be of type int or float")

        if not(isinstance(filtered, bool)):
            raise TypeError("Filter parameter is not of type bool")

        if not(isinstance(visualize, bool)):
            raise TypeError("Visualize parameter is not of type bool")

        if not(isinstance(update_fits, bool)):
            raise TypeError("update_lr parameter is not of type bool")


        X = df[x]
        Y = df[y]

        weights = model.coef_
        intercept = model.intercept_

        numerator = numpy.abs(numpy.dot(X, weights) + intercept - Y)
        denominator = numpy.sqrt(numpy.sum(weights ** 2))
        distances_from_fit = numerator / denominator
        distances_from_fit = scaler.transform(numpy.array(distances_from_fit).reshape(-1, 1))
        distribution = self.roundToNearestBins(distances_from_fit)

        df['SLToTrain'] = distances_from_fit.flatten() > distance_threshold

        target_values = numpy.arange(0.000, 1.050, 0.025)
        distribution = [round(x, 3) for x in distribution]

        df['SLDistances'] = numpy.array(distribution).flatten()


        counts_dict = {numpy.around(val, decimals=3): 0 for val in target_values}
        counts_dict.update(Counter(distribution))

        fitted_distribution = {float(k): v for k, v in fitted_distribution.items()}
        differences = {key: abs(counts_dict[key] - fitted_distribution[key]) / counts_dict[key]
                        if counts_dict[key] != 0
                        else 0
                        for key in counts_dict.keys() if key in fitted_distribution}

        df["SLToTrain"] = df["SLDistances"].apply(
            lambda x: any(differences.get(bin_key, 0) > distribution_threshold for bin_key in [x])
        )


        if update_fits:
            self.fit(df, x, y)

        if visualize:
            self.visualizing(fitted_distribution, counts_dict)

        if filtered:
            filtered_df = df[df['SLToTrain'] == True]
            filtered_df = filtered_df.copy()
            filtered_df.drop(columns=["SLToTrain", "SLDistances"], inplace=True)
            return filtered_df
        else:
            df.drop(columns=["SLDistances"], inplace=True)
            return df

    def visualizing(self, first_distribution, second_distribution):
        # Example dictionaries
        dict1 = first_distribution
        dict2 = second_distribution

        keys = list(dict1.keys())
        values1 = list(dict1.values())
        values2 = list(dict2.values())

        # Define bar positions
        x = numpy.arange(len(keys))  # Positions for the bars
        width = 0.35  # Width of the bars

        # Plotting
        fig, ax = plt.subplots()
        bar1 = ax.bar(x - width/2, values1, width, label='Fitted Distribution', color='lightblue')
        bar2 = ax.bar(x + width/2, values2, width, label='New Distribution', color='steelblue')

        # Adding labels and title
        ax.set_xlabel('Bins')
        ax.set_ylabel('Frequency')
        ax.set_title('Fitted vs. New Distribution')
        ax.set_xticks(x)
        ax.set_xticklabels(keys, rotation=90)  # Tilt x-axis labels 90 degrees
        ax.legend()

        for bar in bar1:
            height = bar.get_height()
            ax.text(
                bar.get_x() + bar.get_width()/2, height + 0.05,  # Position
                f'{height}', ha='center', va='bottom', fontsize=5
            )

        for bar in bar2:
            height = bar.get_height()
            ax.text(
                bar.get_x() + bar.get_width()/2, height + 0.05,  # Position
                f'{height}', ha='center', va='bottom', fontsize=5
            )

        # Display the plot
        plt.show()

In [None]:
###################################################################
###     TESTING     ###
# df  = pandas.read_csv('random_dataframe.csv')
df  = pandas.read_csv('random2.csv')
# df  = 541
x = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5']
# x = 50
y = 'target'
# y = 0.0
fitted = 'shadow_fit.pkl'
scale = 'shadow_scaler.pkl'
curve = 'shadow_distribution.json'
dist = 0.521
distri = 0.62
filter_ = False
visi = True
ulr = True
udist = True


sl = ShadowLearning()
# sl.fit(df, x, y)
# output = sl.filter(df, x, y, fitted, scale, curve, dist, distri, filter_, visi, update_fits=False)
# print(output)

###     TESTING     ###
###################################################################