In [None]:
import math
import random
from dataclasses import dataclass
from typing import List, Optional

import numpy as np
import pandas as pd
from more_itertools import windowed
from scipy.optimize import Bounds, differential_evolution
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.metrics import r2_score, max_error, mean_absolute_error, mean_squared_error, median_absolute_error, \
    mean_absolute_percentage_error
from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler

# GRNN

In [None]:
class GRNN(BaseEstimator, ClassifierMixin):
    def __init__(self, name = "GRNN", sigma = 0.1):
        self.name = name
        self.sigma = 2 * np.power(sigma, 2)

    def predict(self, instance_X, train_X, train_y):
        gaussian_distances = np.exp(
            -np.power(
                np.sqrt(
                    np.square(
                        train_X-instance_X
                    ).sum(axis=1)
                ),2
            ) / self.sigma)

        gaussian_distances_sum = gaussian_distances.sum()
        if gaussian_distances_sum < math.pow(10, -7):
            gaussian_distances_sum = math.pow(10, -7)

        result = np.multiply(
            gaussian_distances, train_y
        ).sum() / gaussian_distances_sum

        return result


In [None]:
def get_sigma_by_diff_evol(train_X, train_y, test_X, test_y):

    def objective(s):
        grnn = GRNN(sigma=s)
        predictions = np.apply_along_axis(lambda i: grnn.predict(i, train_X, train_y), axis=1, arr=test_X)
        return -r2_score(test_y, predictions)

    bounds = Bounds(0.00001, 10.01)
    diff_evol_res = differential_evolution(objective, bounds)
    return diff_evol_res.x[0]


def objective(s, train_X, train_y, test_X, test_y):
    grnn = GRNN(sigma=s)
    predictions = np.apply_along_axis(lambda i: grnn.predict(i, train_X, train_y), axis=1, arr=test_X)
    return -r2_score(test_y, predictions)


def bruteforce_sigma(train_X, train_y, test_X, test_y):
    bounds = [0.00001, 1]
    s = bounds[0]
    best_sigma = float('inf')
    best_score = float('inf')
    while s <= bounds[1]:
        score = objective(s, train_X, train_y, test_X, test_y)
        if score < best_score:
            best_sigma = s
            best_score = score
        s += 0.01

    return best_sigma

# Space expander

In [None]:
@dataclass
class Segment:
    from_: float
    to_: float

    def __repr__(self):
        return f'[{self.from_}, {self.to_}] '

In [None]:
class SpaceExpander:

    def __init__(
            self,
            lower_bound: float,
            upper_bound: float,
    ):
        self.lower_bound = lower_bound
        self.upper_bound = upper_bound
        self.segments: Optional[List[Segment]] = None

    def init_segments(self, segments: List[Segment]) -> None:
        ...

    def random_segments(self, count_of_segments: int) -> None:
        ...

    def check_configuration(self):
        if not self.segments:
            raise ValueError()

    def print_state(self):
        print(f'Lower bound: {self.lower_bound}')
        print(f'Upper bound: {self.upper_bound}')
        print(f'Segments: {self.segments}')

    @property
    def segments_count(self):
        return len(self.segments)

    def detect_segment(self, x):
        ...

    def transform_val(self, x, seg_num) -> List[float]:
        ...

    def expand(self, df: pd.DataFrame | np.ndarray) -> pd.DataFrame | np.ndarray:
        ...

    def _expand_for_pandas(self, df: pd.DataFrame) -> pd.DataFrame:
        ...

    def _expand_for_numpy(self, df: np.ndarray) -> np.ndarray:
        ...

In [None]:
def generate_segments(
    step_size,
    segments_count,
    lower_bound,
    upper_bound,
) -> list[list[Segment]]:
    segment_range = []
    i = step_size
    while i < upper_bound:
        segment_range.append(i)
        i += step_size

    segment_range.append(lower_bound)
    segment_range.append(upper_bound)
    segment_range = sorted(segment_range)
    segment_matrix = [segment_range * segments_count]
    segment_matrix = np.asarray(segment_matrix)
    segment_matrix = segment_matrix.T

    def check(arr):
        for i, j in windowed(arr, 2):
            if i >= j:
                return False
        return True

    result = [
        [Segment(*w) for w in windowed(i, 2)]
        for i in segment_matrix
        if check(i)
    ]

    return result

# Ensemble

In [None]:
class Ensemble:

    def __init__(self, count_of_segments: int):
        self.count_of_segments = count_of_segments

    def fit(self, x, y):
        pass

    def predict(self, x):
        pass


In [None]:
class AVGEnsemble(Ensemble):

    def __init__(self, count_of_segments, member_number: int):
        super().__init__(count_of_segments)
        self.member_number = member_number
        self._members = {}

    def fit(self, x, y):
        ...

    def _predict(self, x):
        predictions = np.asarray([
            member['estimator'].predict(
                member['expander'].expand(x)
            ) for member in self._members
        ])
        return predictions.T

    def predict(self, x):
        return np.average(self._predict(x), axis=1)

# Experiments

In [None]:
metrics_funcs = {
    'max error': max_error,
    'MAE': mean_absolute_error,
    'MSE': mean_squared_error,
    'MedAE': median_absolute_error,
    'RMSE': lambda x, y: math.sqrt(mean_squared_error(x, y)),
    'MAPE': mean_absolute_percentage_error,
    'R2': r2_score
}

In [None]:
def stand_grnn():
    # df_train = pd.read_csv('./datasets/marketingTrain.csv', header=None)
    # df_test = pd.read_csv('./datasets/marketingTest.csv', header=None)
    # df_train[1] = df_train[1] * 1_000
    # df_test[1] = df_test[1] * 1_000

    df_name = 'zone3'

    df_train = pd.read_csv(f'./datasets/{df_name}_train.txt', header=None)
    df_test = pd.read_csv(f'./datasets/{df_name}_test.txt', header=None)

    train_X = df_train.iloc[:, :-1].to_numpy()
    train_y = df_train.iloc[:, -1].to_numpy()
    test_X = df_test.iloc[:, :-1].to_numpy()
    test_y = df_test.iloc[:, -1]

    scaler = MinMaxScaler()
    # scaler = StandardScaler()
    scaler.fit(train_X)
    train_X = scaler.transform(train_X)
    test_X = scaler.transform(test_X)

    result = []

    for i in range(1, 7):

        t = []

        for j in range(100):
            print(f'[+] segments: {i}, member: {j}')
            space_expander = SpaceExpander(0.0, 1.0)
            space_expander.random_segments(i)
            expanded_train_x = space_expander.expand(train_X)
            expanded_test_x = space_expander.expand(test_X)

            # sigma = get_sigma_by_diff_evol(expanded_train_x, train_y, expanded_test_x, test_y)
            sigma = bruteforce_sigma(expanded_train_x, train_y, expanded_test_x, test_y)
            model = GRNN(sigma=sigma)

            predictions = np.apply_along_axis(lambda i: model.predict(i, expanded_train_x, train_y), axis=1, arr=expanded_test_x)

            t.append(predictions)
            # predictions = model.predict(expanded_test_x)

            metrics_results = {}

            for m_name, m_func in metrics_funcs.items():
                metrics_results[m_name] = m_func(test_y, predictions)

            r = np.asarray(t)

            r = r.T
            avg_predictions = np.average(r, axis=1)

            avg_metrics_results = {}

            for m_name, m_func in metrics_funcs.items():
                avg_metrics_results[f'avg_{m_name}'] = m_func(test_y, avg_predictions)

            result.append({
                'segments_count': i,
                'segments_bounds': list(space_expander.segments),
                'member N': j,
                'sigma': sigma,
                **metrics_results,
                **avg_metrics_results
            })

    res_df = pd.DataFrame.from_records(result)
    res_df.to_excel(excel_writer=f'results/{df_name}_result.xlsx', sheet_name='res')

In [None]:
def stand_ensemble():
    # df_train = pd.read_csv('./datasets/OBDTrain.csv', header=None)
    # df_test = pd.read_csv('./datasets/OBDTest.csv', header=None)

    # df_train = pd.read_csv('./datasets/procom_train.csv', header=None, )
    # df_test = pd.read_csv('./datasets/procom_test.csv', header=None)

    df_train = pd.read_csv('./datasets/marketingTrain.csv', header=None)
    df_test = pd.read_csv('./datasets/marketingTest.csv', header=None)

    df_train[1] = df_train[1] * 1_000
    df_test[1] = df_test[1] * 1_000
    # #
    # df_train[1] = df_train[1] * 1_000_000
    # df_test[1] = df_test[1] * 1_000_000
    # df_train[0] = df_train[0] * 1_000
    # df_test[0] = df_test[0] * 1_000

    print(df_train)

    train_X = df_train.iloc[:, :-1].to_numpy()
    train_y = df_train.iloc[:, -1].to_numpy()
    test_X = df_test.iloc[:, :-1].to_numpy()
    test_y = df_test.iloc[:, -1]

    scaler = MaxAbsScaler()
    scaler.fit(train_X)
    train_X = scaler.transform(train_X)
    test_X = scaler.transform(test_X)


    ens = AVGEnsemble(count_of_segments=1, member_number=3)
    ens.fit(train_X, train_y)

    predictions = ens.predict(test_X)

    metrics_results = {}

    for m_name, m_func in metrics_funcs.items():
        metrics_results[m_name] = m_func(test_y, predictions)

    print(metrics_results)