In [1]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################

# W czasie sprawdzania Twojego rozwiązania, wartość flagi FINAL_EVALUATION_MODE zostanie zmieniona na True
FINAL_EVALUATION_MODE = False

In [2]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################
import cloudpickle

import os
import random
from abc import ABC, abstractmethod

import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import balanced_accuracy_score

In [3]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################

# Ustawienie ziarna generatora liczb pseudolosowych w celu zapewnienia deterministyczności wyników.
random.seed(42)
np.random.seed(42)

In [4]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################

train_val_filename = "train_validation_sets.npz"
if not os.path.exists(train_val_filename):
    import gdown
    url = "https://drive.google.com/file/d/1pCqgbsKBQP1UnH2kMmBKRS1AuvmSl9jx/view?usp=sharing"
    gdown.download(url, train_val_filename, quiet=True, fuzzy=True)

train_valid_bundle = np.load("train_validation_sets.npz", allow_pickle=True)
x_train = train_valid_bundle["X_train"]
y_train = train_valid_bundle["y_train"]
y_train_str = train_valid_bundle["anomaly_train"]

x_valid = train_valid_bundle["X_validation"]
y_valid = train_valid_bundle["y_validation"]
y_valid_str = train_valid_bundle["anomaly_validation"]

In [5]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################

class ISolution(ABC):
    random_forest: RandomForestClassifier | None = None

    @classmethod
    def create_with_training(cls) -> "ISolution":
        """Metoda służąca do stworzenia rozwiązania z wytrenowanym lasem losowym."""
        solution = cls()

        hyperparameters = cls.get_rf_hyperparameters()
        hyperparameters = cls.validate_hyperparameters(hyperparameters)
        solution.random_forest = RandomForestClassifier(**hyperparameters)

        meta_features = solution.compute_meta_features(x_train)
        solution.random_forest.fit(meta_features, y_train)
        return solution

    @staticmethod
    def validate_hyperparameters(hyperparameters: dict[str, int | float | str]) -> dict[str, int | float | str]:
        """
        Funkcja ta sprawdza, czy hiperparametry lasu losowego są zgodne z wymaganiami zadania. Jeśli nie, to poprawia je na
        domyślne wartości.
        """
        hyperparameters["n_estimators"] = min(hyperparameters.get("n_estimators", 10), 10)
        hyperparameters["max_depth"] = min(hyperparameters.get("max_depth", 10), 10)
        hyperparameters["random_state"] = 42
        return hyperparameters

    @abstractmethod
    def compute_meta_features(self, x: np.ndarray) -> np.ndarray:
        """
        Funkcja ta powinna dla każdego przykładu ze zbioru $x$ opisanego 150 cechami zwrócić wektor 4 cech, który będzie
        reprezentował ten przykład. Funkcja ta powinna przekształcać wejściową tablicę o rozmiarze (n, 150) na tablicę o
        rozmiarze (n, 4).
        """

        pass

    @staticmethod
    @abstractmethod
    def get_rf_hyperparameters() -> dict[str, int | float | str]:
        """
        Funkcja ta powinna zwracać słownik z hiperparametrami lasu losowego. Pamiętaj o ograniczeniach na liczbę drzew i ich
        głębokość!
        """

        pass

In [6]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################

def balanced_accuracy_to_score(balanced_accuracy: float) -> float:
    return min(max((balanced_accuracy - 75.) * (100. / (98. - 75.)), 0.), 100.)


def score_solution(solution: ISolution) -> float:
    x, y = x_valid, y_valid
    meta_features = solution.compute_meta_features(x)
    y_hat = solution.random_forest.predict(meta_features)
    balanced_accuracy = 100. * balanced_accuracy_score(y, y_hat)

    assert meta_features.shape[-1] <= 4
    assert solution.random_forest.n_estimators <= 10
    assert solution.random_forest.max_depth <= 10

    if not FINAL_EVALUATION_MODE:
        print("Ocena działania modelu: \n")
        print(f"Zbalansowana dokładność klasyfikacji: {balanced_accuracy: .4f}")
    return int(round(balanced_accuracy_to_score(balanced_accuracy)))

In [7]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################

class ExemplarySolution(ISolution):
    def compute_meta_features(self, x: np.ndarray) -> np.ndarray:
        return np.array([
            np.min(x, axis=1),
            np.max(x, axis=1),
            np.mean(x, axis=1),
            np.std(x, axis=1)
        ]).T

    @staticmethod
    def get_rf_hyperparameters() -> dict[str, int | float | str]:
        return {
            "n_estimators": 3,
            "random_state": 42
        }

In [8]:
if not FINAL_EVALUATION_MODE:
    exemplary_solution = ExemplarySolution.create_with_training()
    print(f"Ocena: {score_solution(exemplary_solution)} pkt")

Ocena działania modelu: 

Zbalansowana dokładność klasyfikacji:  45.3163
Ocena: 0 pkt


In [9]:
import numpy as np
from scipy import signal
from sklearn.ensemble import RandomForestClassifier
class YourSolution(ISolution):
    def compute_meta_features(self, x: np.ndarray, filter_params: tuple[float, float, int] = (0.5, 40, 2)) -> np.ndarray:
        meta_features = []
        fs = 81
        lowcut, highcut, order = filter_params
        for sample in x:
            if len(sample) < 15:
                sample_padded = np.pad(sample, (0, max(0, 15 - len(sample))), mode='constant', constant_values=0)
            else:
                sample_padded = sample
            b, a = signal.butter(order, [lowcut, highcut], btype='band', fs=fs)
            sample_filt = signal.filtfilt(b, a, sample_padded)
            sample_filt = (sample_filt - np.mean(sample_filt)) / (np.std(sample_filt) + 1e-6)
            avg_peak_height = np.mean(sample_filt) + 0.5 * np.std(sample_filt)
            peaks, _ = signal.find_peaks(sample_filt, height=avg_peak_height, distance=len(sample_filt) // 15)
            rr_intervals = np.diff(peaks) if len(peaks) > 1 else [0]
            rr_median = np.median(rr_intervals)
            fft_vals = np.abs(np.fft.rfft(sample_filt))
            freqs = np.fft.rfftfreq(len(sample_filt), d=1 / fs)
            qrs_energy = np.sum(fft_vals[(freqs >= 5) & (freqs <= 15)])
            r_amplitude = np.max(sample_filt[peaks]) if len(peaks) > 0 else np.max(sample_filt)
            ##############################################
            def higuchi_fd(ts, kmax=10):
                n = len(ts)
                lk = np.zeros(kmax)
                x = np.array(ts)
                for k in range(1, kmax + 1):
                    lm = np.zeros(k)
                    for m in range(k):
                        lm[m] = np.sum(np.abs(x[m + k::k] - x[m:-k:k])) * (n - 1) / (k * ((n - m) / k))
                    lk[k - 1] = np.mean(lm)
                return -np.polyfit(np.log(range(1, kmax + 1)), np.log(lk), 1)[0]
            higuchi = higuchi_fd(sample_filt)
            meta_features.append([rr_median, qrs_energy, r_amplitude, higuchi])
        return np.array(meta_features)
    @staticmethod
    def get_rf_hyperparameters() -> dict[str, int | float | str]:
        #parametry z GridSarchCV
        return {
            "n_estimators": 10,
            "max_depth": 8,
            "random_state": 42,
            "max_features": 2,                # 'sqrt'
            "min_samples_split": 2,
            "min_samples_leaf": 8,            #  1
            "class_weight": 'balanced_subsample',  #  'balanced'
            "n_jobs": -1
        }
    @classmethod
    ###################################################
    def create_with_training(cls) -> "YourSolution":
        solution = cls()
        # ustawienie preprocessingu
        best_filter = (0.5, 40, 2)
        x_train_processed = solution.compute_meta_features(x_train, best_filter)
        rf = RandomForestClassifier(**cls.get_rf_hyperparameters())
        rf.fit(x_train_processed, y_train)
        #############################################
        solution.random_forest = rf
        solution.best_filter = best_filter
        return solution
    def predict(self, x: np.ndarray) -> np.ndarray:
        x_processed = self.compute_meta_features(x, self.best_filter)
        return self.random_forest.predict(x_processed)


In [10]:
FINAL_EVALUATION_MODE=True

In [11]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################
if FINAL_EVALUATION_MODE:
    your_solution = YourSolution.create_with_training()
    print(f"Ocena: {score_solution(your_solution)} pkt")

    OUTPUT_PATH = "file_output"
    FUNCTION_FILENAME = "your_solution"
    FUNCTION_OUTPUT_PATH = os.path.join(OUTPUT_PATH, FUNCTION_FILENAME)

    if not os.path.exists(OUTPUT_PATH):
        os.makedirs(OUTPUT_PATH)

    with open("file_output/your_model.pkl", "wb") as model_out:
        cloudpickle.dump(your_solution, model_out)

Ocena: 81 pkt
