# Wykrywanie Zaburzeń Sygnału EKG

![ECG_anomaly_detection_intro.png](https://live.staticflickr.com/65535/54253200317_77c251c48c_o.png)

*Obraz wygenerowany przy użyciu modelu DALL-E.*

## Wstęp

Rozwój sztucznej inteligencji otwiera nowe możliwości w diagnostyce medycznej, zwłaszcza w analizie złożonych danych, takich jak [sygnały elektrokardiograficzne (EKG)](https://www.healio.com/cardiology/learn-the-heart/ecg-review/ecg-interpretation-tutorial). EKG to jedno z najczęściej stosowanych narzędzi diagnostycznych w medycynie, umożliwiające ocenę i wykrywanie nieprawidłowości pracy serca.

Tradycyjnie sygnał EKG powstaje z dwunastu odprowadzeń, jednak w tym zadaniu skupimy się na sygnale jednoodprowadzeniowym, czyli dysponujemy
jedną zmienną reprezentującą napięcie elektryczne generowane przez serce w czasie. Dane te są rejestrowane w postaci krzywej zależnej od czasu, czyli można tutaj mówić o szeregu czasowym. Z sygnału EKG można wydzielić charakterystyczne fragmenty, czyli **załamki** (ang. waves) P, Q, R, S, T oraz **odstępy** pomiędzy dwoma zdarzeniami w EKG (ang. intervals), spośród których istotną rolę odgrywa odstęp R-R (czas między wystąpieniem dwóch kolejnych załamków R). Oprócz tego mówimy o **odcinkach** (ang. segments), czyli długości między dwoma określonymi załamkami w EKG, pomiędzy którymi powinna występować bazowa amplituda sygnału. Z kolei **zespół** (ang. complex) stanowi kilka zgrupowanych załamków. Głównie wyrózniamy tutaj zespół QRS. Schematyczny rysunek EKG wraz z podpisanymi fragmentami, jest prezentowany poniżej.

![ECG.png](https://live.staticflickr.com/65535/54254099351_213d47784d_o.png)

W EKG pochodzącym od zdrowej osoby można zauważyć sekwencję PQRST. Na początku wyróżniamy załamek P, która reprezentuje skurcz przedsionków i jest małym pionowym wychyleniem przed zespołem QRS. Następnie, zespół QRS wskazuje na skurcz komór i tworzony jest przez trzy wygięcia: załamek Q, załamek R oraz załamek S. Dalej można zauważyć odcinek ST, czyli płaski odcinek między zespołem QRS a załamkiem T, który odpowiada wczesnej fazie repolaryzacji komór. Finalnie, załamek T, który jest zaokrąglonym, pionowym wychyleniem, dotyczy repolaryzacji komór i ich powrotu do wyjściowego stanu.  Sekwencja PQRST przypomina sinusoidę, której maksimum jest osiągane dla załamka R. W przypadku zaburzeń pracy serca, EKG może wykazywać różne anomalie, takie jak dodatkowe minima lub maksima, czy też znacznie zwiększone odchylenie standardowe podczas całego pomiaru. Charakterystyka tych anomalii zależy od rodzaju i przyczyny zaburzenia.

W poniższym zadaniu musisz zmierzyć się z próbkami zawierającymi pojedyncze sekwencje PQRST oraz ich okolice. Większość próbek będzie odpowiadała danym bez anomalii, których przykład zaprezentowany jest na poniższym obrazku:

![normal_example.png](https://live.staticflickr.com/65535/54253200322_f0173af129_o.png)

Występować będą też pomiary odpowiadające czterem rodzajom zaburzeń:  **AFib**, czyli migotaniu przedsionków, **PAC**, czyli przedwczesnemu pobudzeniu przedsionkowemu, **PVC**, czyli przedwczesnym skurczom komorowym oraz **STEMI**, czyli zawałowi mięśnia sercowego z uniesieniem odcinka ST.

**UWAGA**: Poniższe dane są danymi syntetycznymi i są tylko pewnym przybliżeniem rzeczywistych danych EKG!

EKG jest typowym przykładem szeregu czasowego, który można analizować za pomocą dedykowanych metod uczenia maszynowego, w tym sieci neuronowych, np. sieci rekurencyjnych. Jednak nie zawsze wykorzystanie sieci neuronowych jest konieczne, a nawet wskazane. W przypadku niektórych problemów satysfakcjonujące wyniki można uzyskać za pomocą prostszych metod, gdzie kluczowe jest odpowiednie przygotowanie danych. Ich umiejętna analiza pozwala na selekcję kliku metacech - cech zwięźle opisujących próbki ze zbioru danych, np. średniej, minimum, maksimum, odchylenia standardowego, itp. Mogą być one wykorzystane do klasyfikacji zamiast oryginalnych cech. Dzięki temu korzystamy z niskowymiarowych danych wejściowych, przykładowo redukujemy 150-wymiarowy wektor zawierający informacje z oryginalnych kroków czasowych do wektora 4-wymiarowego zawierającego specjalnie przygotowane cechy.

Przykładem zastosowania niewielkiej liczby metacech są modele uczenia maszynowego, które mają działać na urządzeniach wbudowanych lub małych urządzeniach mobilnych, gdzie kluczowe są takie ograniczenia, jak wymóg niskiego poboru energii, mała ilość dostępnej pamięci operacyjnej czy ograniczona moc obliczeniowa. W takich przypadkach wymagane jest zastosowanie prostszych modeli, które są w stanie zapewnić odpowiednią dokładność klasyfikacji przy jednoczesnym zachowaniu wymaganych ograniczeń.

## Zadanie

Przygotuj rozwiązanie (wraz z wytrenowaniem modelu lasu losowego), które spełni wymagania naszego urządzenia wbudowanego. Przeanalizuj dane i przygotuj zestaw **4 metacech**, które dadzą najlepszą zrównoważoną dokładność (ang. *balanced accuracy*) dla problemu klasyfikacji sygnałów EKG. Zbiór danych składa się ze zbioru treningowego oraz walidacyjnego (wraz z etykietami), na którym możesz weryfikować swoje podejście. Twoje rozwiązanie będzie sprawdzane na osobnym (tajnym) zbiorze testowym, w którym liczba obserwacji będzie się różnić od liczby obserwacji w zbiorach treningowym i walidacyjnym. Każda próbka jest opisana 150 wartościami odpowiadającymi kolejnym krokom czasowym oraz jest przypisana do jednej z pięciu następujących klas:

| ID klasy  | Nazwa klasy   | Opis  | Próbki w zbiorze treningowym | Próbki w zbiorze walidacyjnym |
| ------    | ------        | ----  | ---------------------------- | ----------------------------- |
| 0         | normal        | brak anomalii | 1400 | 819 |
| 1         | afib          | Atrial Fibrillation (migotanie przedsionków)| 150 | 142 |
| 2         | pac           | Premature Atrial Contractions (przedwczesne pobudzenie przedsionkowe)| 150 | 191 |
| 3         | pvc           | Premature Ventricular Complex (przedwczesne skurcze komorowe)| 150 | 197 |
| 4         | st_elevation  | ST-elevation myocardial infarction (zawał mięśnia sercowego z uniesieniem odcinka ST) | 150 | 151 |

Nowe cechy powinny zawierać kluczowe informacje diagnostyczne róznicujące powyższe klasy, które pozwolą na skuteczną klasyfikację wymienionych anomalii.

Klasyfikatorem dla tego zadania jest [las losowy](https://pl.wikipedia.org/wiki/Las_losowy) z liczbą drzew decyzyjnych nie większą niż 10 oraz maksymalną głębokością 10. **Rozwiązania niespełniające tych warunków będą dyskwalifikowane!** W przypadku innych parametrów lasu nie ma ograniczeń. Dozwolony jest także preprocessing, czyli wstępne przetwarzanie danych wejściowych (np. zastosowanie normalizacji danych).

### Kryterium Oceny

Twoje rozwiązanie oceniane będzie na tajnym zbiorze testowym na podstawie [zrównoważonej dokładności klasyfikacji (balanced accuracy)](https://scikit-learn.org/dev/modules/generated/sklearn.metrics.balanced_accuracy_score.html):

$$\text{score}(balanced\_accuracy) = 
\begin{cases} 
    0 &\quad \text{jeżeli }  balanced\_accuracy \leq 75 \% \\
    100 &\quad \text{jeżeli }  balanced\_accuracy \geq 98 \% \\
    \dfrac{balanced\_accuracy - 75 \%}{98 \% - 75 \%} &\quad \text{w pozostałych przypadkach}
\end{cases}$$

Oznacza to, że wszystkie rozwiązania, które na zbiorze testowym uzyskają do $75\%$ zrównoważonej dokładności klasyfikacji, otrzymają $0$ punktów, zaś co najmniej $98\%$ zrównoważonej dokładności klasyfikacji, uzyskają maksymalną liczbę punktów za zadanie. Wszystkie zaś wartości z przedziału $75-98\%$ zostaną zamienione na liczbę punktów (między $0$ a $100$) zgodnie z powyższym wzorem.

*Wskazówka*: Twoim wyznacznikiem jakości proponowanego rozwiązania powinien być wynik na zbiorze walidacyjnym.

W zagadnieniach dot. wykrywania chorób dość często mamy do czynienia z niezrównoważonym (niezbalansowanym) zbiorem danych. Chodzi o to, że zazwyczaj wśród danych dominują przykłady *normalne*, odpowiadające osobom zdrowym, a próbki reprezentujące osoby chore zwykle należą do mniejszości. Wyobraźmy sobie sytuację, w której na 100 próbek jedynie 10 dotyczy osób chorych, a pozostałe 90 zdrowych. Wówczas model, który każdej próbce przyporządkowywałby klasę *zdrowy*, osiągnąłby 90% dokładności klasyfikacji, lecz jedynie 50% zrównoważonej dokładności klasyfikacji! Oczywiście taki model byłby bezużyteczny. W takich przypadkach potrzebujemy miary, która lepiej odpowiada potrzebom wynikającym z postawionego problemu i informuje o skuteczności modelu w sposób użyteczny z punktu widzenia jego późniejszego użytkownika.

W tym zadaniu musisz się więc skupić się na tym, by każda z klas była przyporządkowywana prawidłowo.

## Ograniczenia
- Twoje rozwiazanie będzie testowane na Platformie Konkursowej bez dostępu do internetu oraz w środowisku bez GPU.
- Ewaluacja Twojego finalnego rozwiązania na Platformie Konkursowej nie może trwać dłużej niż 1 minutę bez GPU.
- Podczas przygotowania danych należy pamiętać o tym, że:
    - zakazane jest korzystanie z innych niż lasy losowe metod uczenia maszynowego, zarówno nadzorowanego jak i nienadzorowanego (np. autokodery, wielowarstwowe perceptrony i inne sieci neuronowe, maszyny wektorów nośnych (SVM), i inne), dozwolone są jednak metody redukcji wymiarowości, w stylu analizy składowych głównych (PCA);
    - przy konstrukcji metacech można korzystać wyłącznie z funkcji dostępnych standardowo w Pythonie (`v3.11`), a także Numpy (`v2.0.2`) oraz Scipy (`v1.14.1`);
    - można wyznaczyć maksymalnie 4 metacechy,
- Do klasyfikacji można wykorzystać wyłącznie [las losowy (RandomForestClassifier) z biblioteki scikit-learn](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html) (`v1.5.2`):
    - złożony z maksymalnie 10 drzew decyzyjnych (`n_estimators` $\leq 10$);
    - każde drzewo ma mieć maksymalną głębokość równą 10 (`max_depth` $\leq 10$);
    - pozostałe hiperparametry można modyfikować bez ograniczeń;

## Pliki Zgłoszeniowe
Ten notebook uzupełniony o Twoje rozwiązanie (patrz klasa `YourSolution`), w którym przygotujesz zestaw 4 metacech opisujących zbiór danych oraz zestaw hiperparametrów lasu losowego.

## Ewaluacja
Pamiętaj, że podczas sprawdzania flaga `FINAL_EVALUATION_MODE` zostanie ustawiona na `True`.

Za to zadanie możesz zdobyć pomiędzy 0 a 100 punktów. Liczba punktów, którą zdobędziesz, będzie wyliczona na (tajnym) zbiorze testowym na Platformie Konkursowej na podstawie wyżej wspomnianego wzoru, zaokrąglona do liczby całkowitej. Jeśli Twoje rozwiązanie nie będzie spełniało powyższych kryteriów lub nie będzie wykonywać się prawidłowo, otrzymasz za zadanie 0 punktów.

---

## Informacje Uzupełniające

### Zrównoważona Dokładność Klasyfikacji

Niech $C$ będzie liczbą klas, a $N_j$ odpowiada ilości próbek należących do $j$-tej klasy, gdzie $j \in \lbrace 1, ..., C \rbrace$. Ponadto, niech $\hat{y}_{i,j}$ będzie przewidywaną przez model klasą dla $j$-tej próbki należącej w rzeczywistości do $i$-tej klasy. Wówczas zrównoważoną (zbalansowaną) dokładność klasyfikacji możemy wyliczyć następująco:

$$
balanced\_accuracy = \dfrac{1}{C} \sum\limits_{i=1}^{C} \sum\limits_{j=1}^{|N_c|} \dfrac{1}{|N_c|} \cdot \mathbf{1} \left( \hat{y}_{i, j} = i \right),
$$

gdzie $\mathbf{1} \left( \hat{y}_{i, j} = i \right)$ jest funkcją indykatorową, która przyjmuje wartość 1, jeśli $\hat{y}_{i, j} = i$, czyli w sytuacji, w której klasa przewidywana dla $j$-tej próbki jest taka sama jak rzeczywista klasa tej próbki oraz 0 w przeciwnym przypadku. Suma zewnętrzna przebiega po kolejnych klasach, a wewnętrzna po kolejnych próbkach należących do danej klasy.

**Przykład**: Niech
$$\mathbf{y} = [0, 0, 0, 1, 1, 2, 2, 2, 2, 3, 3, 3]$$

będzie wektorem reprezentującym rzeczywiste klasy dla kolejnych próbek, a

$$\mathbf{\hat{y}} = [0, 0, 0, 0, 0, 2, 2, 2, 2, 3, 3, 3]$$

wektorem reprezentującym predykcje modelu dla tychże próbek. Mamy więc do czynienia z czterema klasami, gdzie model miał problem z klasą o numerze $1$. Wszystkie pozostałe przykłady zostały przypisane bezbłędnie. Łącznie 10 na 12 próbek zostało sklasyfikowanych prawidłowo, co oznacza, że gdybyśmy mieli mierzyć "zwykłą" dokładność klasyfikacji, otrzymalibyśmy ok. $83.3\%$ . Jednak gdy przyjrzymy się zbalansowanej dokładności klasyfikacji, otrzymamy wynik $75\%$.

Załóżmy teraz, że

$$\mathbf{\hat{y}} = [0, 0, 0, 1, 0, 2, 2, 2, 2, 3, 3, 3]$$

czyli model klasyfikuje poprawnie $50\%$ próbek z klasy 1 oraz $100\%$ próbek z pozostałych klas. "Zwykła" dokładność klasyfikacji wynosi tutaj niespełna $92\%$, podczas gdy zbalansowana dokładność klasyfikacji wynosi $87.5\%$.

### Anomalie Występujące w Rozważanym Zbiorze Danych

#### AFib
**Atrial Fibrillation (AFib)**, czyli migotanie przedsionków, występuje wtedy, gdy potencjały czynnościowe wyzwalają się bardzo szybko i chaotycznie, w związku z tym rytm serca jest nieregularny. W tym zaburzeniu załamki P mogą nie być widoczne w EKG, a zespół QRS staje się nieregularny.

![AFIB_example.png](https://live.staticflickr.com/65535/54253219357_78c51f06ff_o.png)

#### PAC
**Premature Atrial Contractions (PACs)**, czyli przedwczesne pobudzenie przedsionkowe, związane jest z nieprawidłowym załamkiem P, po którym następuje prawidłowy zespół QRS. *Uwaga!* W próbkach z zadania występują przykłady, w których w ramach jednej próbki ze zbioru danych widoczny jest jedynie przedwczesny załamek P.

![PAC_example.png](https://live.staticflickr.com/65535/54254324138_df77fdd62a_o.png)

#### PVC
**Premature Ventricular Contractions (PVCs)**, czyli przedwczesne skurcze komorowe, są dodatkowymi uderzeniami serca, które rozpoczynają się w jednej z dwóch komór serca i zakłócają jego regularny rytm. Są jednym z powszechnych rodzajów arytmii. Skurcze te zachodzą wcześniej niż byłoby to oczekiwane na podstawie poprzednich odstępów R-R.

![PVC_example.png](https://live.staticflickr.com/65535/54254518540_12ba23c53f_o.png)

#### STEMI
**ST-elevation myocardial infarction (STEMI)**, czyli zawał mięśnia sercowego z uniesieniem odcinka ST, powoduje zablokowanie przepływu krwi do mięśnia sercowego i jego obumarcie. Segment ST występuje tuż po zespole QRS. W normalnej sytuacji nie ma tam żadnej aktywności elektrycznej, przez co jest on płaski. Jeśli zaś występuje uniesienie odcinka ST, oznacza to blokadę jednej z głównych tętnic doprowadzających krew do serca.

![STEMI_example.png](https://live.staticflickr.com/65535/54254099356_422c23144e_o.png)

---

**Źródła opisu medycznego dot. EKG**: [1](https://www.healio.com/cardiology/learn-the-heart/ecg-review/ecg-topic-reviews-and-criteria/premature-ventricular-contractions-review), [2](https://www.mayoclinic.org/diseases-conditions/premature-ventricular-contractions/symptoms-causes/syc-20376757), [3](https://litfl.com/premature-atrial-complex-pac/), [4](https://www.healio.com/cardiology/learn-the-heart/ecg-review/ecg-topic-reviews-and-criteria/premature-atrial-contractions-review), [5](https://www.mayoclinic.org/diseases-conditions/atrial-fibrillation/symptoms-causes/syc-20350624), [6](https://www.healio.com/cardiology/learn-the-heart/ecg-review/ecg-topic-reviews-and-criteria/atrial-fibrillation-review), [7](https://my.clevelandclinic.org/health/diseases/22068-stemi-heart-attack), obraz załamków PQRST na podstawie [8](https://www.sciencedirect.com/science/article/pii/S0213911121002466).

# Kod Startowy

W tej sekcji inicjalizujemy środowisko poprzez zaimportowanie potrzebnych bibliotek i funkcji. Przygotowany kod ułatwi Tobie efektywne operowanie na danych i budowanie właściwego rozwiązania.

In [None]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################

# W czasie sprawdzania Twojego rozwiązania, wartość flagi FINAL_EVALUATION_MODE zostanie zmieniona na True
FINAL_EVALUATION_MODE = False

In [None]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################
import cloudpickle

import os
import random
from abc import ABC, abstractmethod

import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import balanced_accuracy_score

In [None]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################

# Ustawienie ziarna generatora liczb pseudolosowych w celu zapewnienia deterministyczności wyników.
random.seed(42)
np.random.seed(42)

## Ładowanie Danych
Za pomocą poniższego kodu wczytujemy dane.

In [None]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################

train_val_filename = "train_validation_sets.npz"
if not os.path.exists(train_val_filename):
    import gdown
    url = "https://drive.google.com/file/d/1pCqgbsKBQP1UnH2kMmBKRS1AuvmSl9jx/view?usp=sharing"
    gdown.download(url, train_val_filename, quiet=True, fuzzy=True)

train_valid_bundle = np.load("train_validation_sets.npz", allow_pickle=True)
x_train = train_valid_bundle["X_train"]
y_train = train_valid_bundle["y_train"]
y_train_str = train_valid_bundle["anomaly_train"]

x_valid = train_valid_bundle["X_validation"]
y_valid = train_valid_bundle["y_validation"]
y_valid_str = train_valid_bundle["anomaly_validation"]

## Publiczny Interfejs Rozwiązania

Tylko tego wymagamy od Twojej klasy. W Twoim rozwiązaniu możesz modyfikować swoją klasę do woli dodając atrybuty oraz nowe metody obliczające metacechy - cokolwiek co będzie Ci potrzebne do rozwiązania zadania.

In [None]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################

class ISolution(ABC):
    random_forest: RandomForestClassifier | None = None

    @classmethod
    def create_with_training(cls) -> "ISolution":
        """Metoda służąca do stworzenia rozwiązania z wytrenowanym lasem losowym."""
        solution = cls()

        hyperparameters = cls.get_rf_hyperparameters()
        hyperparameters = cls.validate_hyperparameters(hyperparameters)
        solution.random_forest = RandomForestClassifier(**hyperparameters)

        meta_features = solution.compute_meta_features(x_train)
        solution.random_forest.fit(meta_features, y_train)
        return solution

    @staticmethod
    def validate_hyperparameters(hyperparameters: dict[str, int | float | str]) -> dict[str, int | float | str]:
        """
        Funkcja ta sprawdza, czy hiperparametry lasu losowego są zgodne z wymaganiami zadania. Jeśli nie, to poprawia je na
        domyślne wartości.
        """
        hyperparameters["n_estimators"] = min(hyperparameters.get("n_estimators", 10), 10)
        hyperparameters["max_depth"] = min(hyperparameters.get("max_depth", 10), 10)
        hyperparameters["random_state"] = 42
        return hyperparameters

    @abstractmethod
    def compute_meta_features(self, x: np.ndarray) -> np.ndarray:
        """
        Funkcja ta powinna dla każdego przykładu ze zbioru $x$ opisanego 150 cechami zwrócić wektor 4 cech, który będzie
        reprezentował ten przykład. Funkcja ta powinna przekształcać wejściową tablicę o rozmiarze (n, 150) na tablicę o
        rozmiarze (n, 4).
        """

        pass

    @staticmethod
    @abstractmethod
    def get_rf_hyperparameters() -> dict[str, int | float | str]:
        """
        Funkcja ta powinna zwracać słownik z hiperparametrami lasu losowego. Pamiętaj o ograniczeniach na liczbę drzew i ich
        głębokość!
        """

        pass


## Kod z Kryterium Oceniającym
Kod, zbliżony do poniższego, będzie używany do oceny rozwiązania na zbiorze testowym.

In [None]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################

def balanced_accuracy_to_score(balanced_accuracy: float) -> float:
    return min(max((balanced_accuracy - 75.) * (100. / (98. - 75.)), 0.), 100.)


def score_solution(solution: ISolution) -> float:
    x, y = x_valid, y_valid
    meta_features = solution.compute_meta_features(x)
    y_hat = solution.random_forest.predict(meta_features)
    balanced_accuracy = 100. * balanced_accuracy_score(y, y_hat)

    assert meta_features.shape[-1] <= 4
    assert solution.random_forest.n_estimators <= 10
    assert solution.random_forest.max_depth <= 10

    if not FINAL_EVALUATION_MODE:
        print("Ocena działania modelu: \n")
        print(f"Zbalansowana dokładność klasyfikacji: {balanced_accuracy: .4f}")
    return int(round(balanced_accuracy_to_score(balanced_accuracy)))

## Przykładowe Rozwiązanie

Poniżej przedstawiamy uproszczone rozwiązanie, które służy jako przykład demonstrujący podstawową funkcjonalność notatnika. Może ono posłużyć jako punkt wyjścia do opracowania Twojego rozwiązania.

In [None]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################

class ExemplarySolution(ISolution):
    def compute_meta_features(self, x: np.ndarray) -> np.ndarray:
        return np.array([
            np.min(x, axis=1),
            np.max(x, axis=1),
            np.mean(x, axis=1),
            np.std(x, axis=1)
        ]).T

    @staticmethod
    def get_rf_hyperparameters() -> dict[str, int | float | str]:
        return {
            "n_estimators": 3,
            "random_state": 42
        }

In [None]:
if not FINAL_EVALUATION_MODE:
    exemplary_solution = ExemplarySolution.create_with_training()
    print(f"Ocena: {score_solution(exemplary_solution)} pkt")

# Twoje Rozwiązanie

W tej sekcji należy umieścić Twoje rozwiązanie. Wprowadzaj zmiany wyłącznie tutaj!

In [None]:
def detect_qrs(ecg_signal):
    signal_length = len(ecg_signal)
    middle = signal_length // 2
    try:
        r_peak = np.argmax(ecg_signal)
        q_point = np.argmin(ecg_signal[:r_peak])
        p_point = q_point
        for i in range(q_point - 1, 0, -1):
            if (ecg_signal[i] > ecg_signal[i - 1] and 
                ecg_signal[i] > ecg_signal[i + 1] and 
                (ecg_signal[i] - ecg_signal[q_point]) > 0.5):
                p_point = i
                break
        s_point = r_peak
        last_valid_s_point = r_peak
        non_decreasing_points_count = 0
        p_height = ecg_signal[p_point]
        s_threshold = p_height - 0.1
        for i in range(r_peak + 1, len(ecg_signal) - 1):
            if ecg_signal[i] < s_threshold:
                closest_diff = float('inf')
                adjusted_s_point = r_peak
                for j in range(r_peak + 1, i):
                    current_diff = abs(ecg_signal[j] - p_height)
                    if current_diff < closest_diff:
                        closest_diff = current_diff
                        adjusted_s_point = j
                s_point = adjusted_s_point
                break
            if ecg_signal[i] < ecg_signal[s_point]:
                s_point = i
                last_valid_s_point = i
                non_decreasing_points_count = 0
            elif ecg_signal[i] >= ecg_signal[s_point]:
                if (i - r_peak) > 15:
                    s_point = last_valid_s_point
                    break
                else:
                    non_decreasing_points_count += 1
                    if non_decreasing_points_count > 1:
                        s_point = last_valid_s_point
                        break
        if (ecg_signal[r_peak] - ecg_signal[s_point]) <= 0.5:
            s_point = r_peak
        if ecg_signal[s_point] < s_threshold:
            closest_diff = float('inf')
            adjusted_s_point = r_peak
            for i in range(r_peak + 1, min(len(ecg_signal), r_peak + 15)):
                current_diff = abs(ecg_signal[i] - p_height)
                if current_diff < closest_diff:
                    closest_diff = current_diff
                    adjusted_s_point = i
                    if current_diff < 0.05:
                        break
            s_point = adjusted_s_point
    except:
        r_peak = middle
        s_point = min(signal_length - 1, middle + 15)
        q_point = max(0, middle - 15)
        p_point = max(0, middle - 35)
    
    return p_point, q_point, r_peak, s_point



def compute_pac_feature(ecg_signal, p_point, s_point):
    pac_score = 0.0
    post_s_score = 0.0
    pre_p_score = 0.0
    start_post_s = s_point + 2
    post_s = ecg_signal[start_post_s:]
    if len(post_s) < 5:
        post_s_score = 0.0
    else:
        b = np.max(post_s)
        max_index = np.argmax(post_s)
        start_neighbor_index = max(0, max_index - 10)
        end_neighbor_index = min(len(post_s), max_index + 11)
        left_neighbors = post_s[start_neighbor_index:max_index]
        right_neighbors = post_s[max_index+1:end_neighbor_index]
        left_valley_depth = 0
        for neighbor_point in left_neighbors:
            if neighbor_point < b:
                current_left_valley_depth = b - neighbor_point - 0.05
                left_valley_depth = max(left_valley_depth, current_left_valley_depth)
        right_valley_depth = 0
        for neighbor_point in right_neighbors:
            if neighbor_point < b:
                current_right_valley_depth = b - neighbor_point - 0.05
                right_valley_depth = max(right_valley_depth, current_right_valley_depth)
        right_valley_truncated = (end_neighbor_index == len(post_s))
        if right_valley_truncated and len(right_neighbors) < 11:
            neighbor_height = left_valley_depth
        else:
            neighbor_height = min(left_valley_depth, right_valley_depth)
        post_s_score = neighbor_height * 4
    end_pre_p = p_point
    pre_p = ecg_signal[:end_pre_p]
    if len(pre_p) < 5:
        pre_p_score = 0.0
    else:
        b = np.max(pre_p)
        max_index = np.argmax(pre_p)
        start_neighbor_index = max(0, max_index - 10)
        end_neighbor_index = min(len(pre_p), max_index + 11)
        neighbor_region = np.concatenate((pre_p[start_neighbor_index:max_index],
                                         pre_p[max_index+1:end_neighbor_index]))
        has_lower_neighbor = False
        for neighbor_point in neighbor_region:
            if neighbor_point < b:
                has_lower_neighbor = True
                neighbor_height = b - neighbor_point - 0.05
                break
        if has_lower_neighbor:
            pre_p_score = neighbor_height * 4
        else:
            pre_p_score = 0.0
    pac_score = max(pre_p_score, post_s_score)
    return pac_score

def compute_pvc_feature(ecg_signal, s_point):
    t_end = s_point
    post_t = ecg_signal[t_end:]
    if len(post_t) < 5:
        return 0
    post_t_mean = np.mean(post_t)
    num_blocks = 10
    block_size = len(post_t) // num_blocks
    if block_size == 0:
        block_size = len(post_t)
        num_blocks = 1
    max_valley_depth = 0
    for block_index in range(num_blocks):
        start_block = block_index * block_size
        end_block = start_block + block_size
        if block_index == num_blocks - 1:
            end_block = len(post_t)
        current_block = post_t[start_block:end_block]
        if len(current_block) == 0:
            continue
        block_sorted = np.sort(current_block)
        top_half_block = block_sorted[len(block_sorted)//10:]
        top_half_mean = np.mean(top_half_block)
        if top_half_mean > post_t_mean:
            continue
        b_block = np.min(current_block)
        min_index_block_relative = np.argmin(current_block)
        min_index_block_absolute = start_block + min_index_block_relative
        start_neighbor_index = max(0, min_index_block_absolute - 10)
        end_neighbor_index = min(len(post_t), min_index_block_absolute + 11)
        neighbor_region = np.concatenate((post_t[start_neighbor_index:min_index_block_absolute],
                                            post_t[min_index_block_absolute+1:end_neighbor_index]))
        has_higher_neighbor = False
        current_block_valley_depth = 0
        for neighbor_point in neighbor_region:
            if neighbor_point > b_block:
                has_higher_neighbor = True
                neighbor_height = neighbor_point - b_block - 0.05
                current_block_valley_depth = neighbor_height
                break
        if has_higher_neighbor:
            max_valley_depth = max(max_valley_depth, current_block_valley_depth)
    return max_valley_depth * 4

def compute_afib_feature(p_point, s_point, ecg_signal):
    afib_score_pre_p = 0.0
    afib_score_post_s = 0.0
    if p_point > 1:
        segment_pre_p = ecg_signal[:p_point]
        for i in range(len(segment_pre_p) - 1):
            diff = abs(segment_pre_p[i+1] - segment_pre_p[i])
            afib_score_pre_p += diff
    if s_point < len(ecg_signal) - 2:
        segment_post_s = ecg_signal[s_point+1:]
        for i in range(len(segment_post_s) - 1):
            diff = abs(segment_post_s[i+1] - segment_post_s[i])
            afib_score_post_s += diff
    afib_score = afib_score_post_s + afib_score_pre_p
    return max(0, afib_score - 4.5)


def check_st_elevation(ecg_signal, s_point, threshold=0.04):
    st_segment = ecg_signal[s_point: min(len(ecg_signal), s_point + 7)]
    
    if len(st_segment) == 0:
        return 0.0
    max_index_in_segment = np.argmax(st_segment)
    max_index = s_point + max_index_in_segment
    max_st_point = ecg_signal[max_index]
    st_elevation = max_st_point - ecg_signal[s_point]
    higher_point_found = False
    for i in range(max_index_in_segment + 1, len(st_segment)):
        if st_segment[i] - 0.05 > max_st_point:
            higher_point_found = True
            break
    if st_elevation >= threshold and not higher_point_found:
        return st_elevation * 3
    else:
        return 0.0


class YourSolution(ISolution):
    def compute_meta_features(self, x: np.ndarray) -> np.ndarray:
        n_samples = x.shape[0]
        meta_features = np.zeros((n_samples, 4))
        for i in range(n_samples):
            signal = x[i]
            p, q, r, s = detect_qrs(signal)
            st_elevation= check_st_elevation(signal, s)
            pac_score = compute_pac_feature(signal, p, s)
            pvc_likelihood = compute_pvc_feature(signal, s)
            afib_likelihood = compute_afib_feature(p, s, signal)
            meta_features[i, 0] = st_elevation
            meta_features[i, 1] = pac_score
            meta_features[i, 2] = pvc_likelihood
            meta_features[i, 3] = afib_likelihood
        return meta_features

    def get_rf_hyperparameters() -> dict[str, int | float | str]:
        return {
            "n_estimators": 10,
            "max_depth": 9,
            "random_state": 42,
            "min_samples_split": 20,
            "min_samples_leaf": 5,
            "class_weight": "balanced_subsample",
        }

# Ewaluacja

Uruchomienie poniższej komórki pozwoli sprawdzić, ile punktów zdobyłoby Twoje rozwiązanie na danych walidacyjnych. Przed wysłaniem upewnij się, że cały notebook wykonuje się od początku do końca bez błędów i bez konieczności ingerencji użytkownika po wybraniu opcji "Run All".

Podczas sprawdzania model zostanie zapisany jako `your_model.pkl` i oceniony na zbiorze testowym.

In [None]:
######################### NIE ZMIENIAJ TEJ KOMÓRKI PODCZAS WYSYŁANIA ##########################
if FINAL_EVALUATION_MODE:
    your_solution = YourSolution.create_with_training()
    print(f"Ocena: {score_solution(your_solution)} pkt")

    OUTPUT_PATH = "file_output"
    FUNCTION_FILENAME = "your_solution"
    FUNCTION_OUTPUT_PATH = os.path.join(OUTPUT_PATH, FUNCTION_FILENAME)

    if not os.path.exists(OUTPUT_PATH):
        os.makedirs(OUTPUT_PATH)
    
    with open("file_output/your_model.pkl", "wb") as model_out:
        cloudpickle.dump(your_solution, model_out)