# Тестирование классической модели KNN для предсказания увеличения/уменьшения катировки

In [None]:
import numpy as np
import pandas as pd
from numpy.typing import ArrayLike
from scipy.fft import dct

## Data proccessing

In [None]:
def load_data(dt_path: str) -> pd.DataFrame:
    return pd.read_csv(dt_path)[['open', 'high', 'low', 'close']]

def preproccess_data(dts: pd.DataFrame, window_size: int) -> list[tuple[int, ArrayLike, ArrayLike, ArrayLike]]:
    dataset = []
    
    close = dts['close'].array
    open  = dts['open'].array
    frame = (dts['close'] - dts['open']).array
    for start in range(len(dts) - window_size - 1):
        end   = start + window_size
        body  = frame[start: end]
        label = 2 * (frame[end] > 0) - 1
        dataset.append((label, body, open[end], close[end]))
        
    return dataset

def dct_composition(dataset: list, main_components: None | int) -> list[tuple[int, ArrayLike, ArrayLike, ArrayLike]]:
    # if main_components is None => identity mapping
    composition = dct if main_components is None else lambda x: dct(x)[:main_components]
    return [
        (label, composition(body), open, close) for label, body, open, close in dataset
    ]
    

In [None]:
def dataset_fabric(
    dts_path: str, 
    window_size: int, 
    dct_transform: bool = False, 
    main_components: int | None = None
) -> list[tuple[int, ArrayLike]]:
    dataset = preproccess_data(
        load_data(dts_path),
        window_size,
    )
    
    if dct_transform:
        return dct_composition(dataset, main_components)
    else:
        return dataset

## Metrics for KNN

In [None]:
from functools import partial
from pyts.metrics import dtw

In [None]:
def create_metric(
    metric_type: str,
    distance_type: str = 'square',
    dtw_method = 'classic',
    dtw_options = None
):
    assert metric_type in ['DTW'], f'wrong metric_type={metric_type}'
    assert distance_type in ['square', 'absolute']
    
    match metric_type:
        case 'DTW':
            return partial(dtw, dist=distance_type, method=dtw_method, options=dtw_options)

## KNN

In [None]:
from scipy.special import softmax
import numpy as np

In [None]:
class DummyClassifier: 
    def __init__(self, metric, k_neighbours = 1, weighted = True): 
        # settings of model
        self.metric       = metric
        self.k_neighbours = k_neighbours
        self.weighted     = weighted
        
        # model's memory
        self.neighbours   = []
        
    def add_object(self, label, x):
        self.neighbours.append((label, x))
        
    def __call__(self, x):
        scores = []
        labels = []
        for label, neighbour in self.neighbours:
            labels.append(label)
            scores.append(self.metric(x, neighbour))
            
        scores     = np.array(scores)
        labels     = np.array(labels)
        best_match = np.argpartition(-scores, self.k_neighbours)[:self.k_neighbours]
        
        if self.weighted:
            weights = softmax(1 / scores[best_match])
            return 2 * (labels[best_match] @ weights > 0) - 1
        else:
            return 2 * (labels[best_match].mean() > 0) - 1

## Validation

In [None]:
from sklearn.metrics import f1_score, recall_score, precision_score, accuracy_score
import numpy as np

In [None]:
def validate(
    model, 
    dataset: list, 
    warm_start: int = 30
):  
    # Validate dataset length
    if len(dataset) <= warm_start:
        raise ValueError(f"Dataset size ({len(dataset)}) must exceed warm_start ({warm_start})")
    
    # Warm up model
    for i in range(warm_start):
        label, body, _, _ = dataset[i]
        model.add_object(label, body)
    
    # Initialize trading
    initial_price = dataset[warm_start][2]  # Open price
    initial_bank = 10 * initial_price
    current_bank = initial_bank
    current_currency = 0
    in_position = False
    
    real_labels = []
    predicted_labels = []  # Will store binary predictions (1/-1)
    
    # Trading simulation
    for i in range(warm_start, len(dataset)):
        label, body, open_price, close_price = dataset[i]
        
        # Get prediction and classify
        raw_pred = model(body)
        pred_class = 1 if raw_pred > 0 else -1  # Binarize prediction
        
        real_labels.append(label)
        predicted_labels.append(pred_class)
        
        # Trading rules - use explicit conditions
        if pred_class > 0 and not in_position:
            # Buy at open
            current_currency = current_bank / open_price
            current_bank = 0
            in_position = True
            
        elif pred_class < 0 and in_position:
            # Sell at close
            current_bank = current_currency * open_price
            current_currency = 0
            in_position = False
    
    # Close final position
    if in_position:
        _, _, _, final_close = dataset[-1]
        current_bank = current_currency * final_close
    
    # Calculate metrics
    real_labels = np.array(real_labels)
    predicted_labels = np.array(predicted_labels)
    
    # Ensure labels are binary (1/-1)
    if not np.array_equal(np.unique(real_labels), [-1, 1]):
        real_labels = np.where(real_labels > 0, 1, -1)
    
    metrics = {
        'accuracy': accuracy_score(real_labels, predicted_labels),
        'precision': precision_score(real_labels, predicted_labels, pos_label=1),
        'recall': recall_score(real_labels, predicted_labels, pos_label=1),
        'f1_score': f1_score(real_labels, predicted_labels, pos_label=1),
        'return_multiplier': current_bank / initial_bank
    }
    
    return current_bank, current_bank / initial_bank, metrics

## Optuna tunning

In [None]:
import optuna
import warnings
warnings.filterwarnings("ignore")
from functools import partial

In [None]:
def trial(trial: optuna.trial.Trial):
    k_neighbours = trial.suggest_categorical("k_neighbours", range(1, 10))
    weighted = trial.suggest_categorical("KNN_weighted", [True, False])
    distance_type = trial.suggest_categorical('distance_type', ['square', 'absolute'])
    
    window_size = trial.suggest_categorical('window_len', range(5, 21))
    transform_dct = trial.suggest_categorical('dct_transform', [True, False])
    main_components = trial.suggest_categorical('main_components', [None, 0.5, 0.7])
    
    if main_components is not None:
        components = int(window_size * main_components)
    else:
        components = None
    
    metric = create_metric('DTW', distance_type)
    model = DummyClassifier(metric=metric, k_neighbours=k_neighbours, weighted=weighted)
    dataset = dataset_fabric('../data/TONUSDT.csv', window_size, transform_dct, components)
    
    current_bank, ratio, metrics = validate(model, dataset, 30)
    
    # trial.set_user_attr("metrics", metrics)
    return ratio

try:
    sampler = optuna.samplers.TPESampler(n_startup_trials=10, group=True, multivariate=True)
    study = optuna.create_study(
        sampler=sampler,
        load_if_exists=True,
        storage="sqlite:///../optuna/KNN_first_trial_db.sqlite3",
        direction="maximize", 
        study_name="TON")
    study.optimize(trial, n_trials=300, n_jobs=15, show_progress_bar=True)
except: pass

In [None]:
def trial(trial: optuna.trial.Trial):
    k_neighbours = trial.suggest_categorical("k_neighbours", range(1, 10))
    weighted = trial.suggest_categorical("KNN_weighted", [True, False])
    distance_type = trial.suggest_categorical('distance_type', ['square', 'absolute'])
    
    window_size = trial.suggest_categorical('window_len', range(5, 21))
    transform_dct = trial.suggest_categorical('dct_transform', [True, False])
    main_components = trial.suggest_categorical('main_components', [None, 0.5, 0.7])
    
    if main_components is not None:
        components = int(window_size * main_components)
    else:
        components = None
    
    metric = create_metric('DTW', distance_type)
    model = DummyClassifier(metric=metric, k_neighbours=k_neighbours, weighted=weighted)
    dataset = dataset_fabric('../data/LINKUSDT.csv', window_size, transform_dct, components)
    
    current_bank, ratio, metrics = validate(model, dataset, 30)
    
    # trial.set_user_attr("metrics", metrics)
    return ratio

try:
    sampler = optuna.samplers.TPESampler(n_startup_trials=10, group=True, multivariate=True)
    study = optuna.create_study(
        sampler=sampler,
        load_if_exists=True,
        storage="sqlite:///../optuna/KNN_first_trial_db.sqlite3",
        direction="maximize", 
        study_name="LINK")
    study.optimize(trial, n_trials=100, n_jobs=15, show_progress_bar=True)
except: pass

In [None]:
def trial(trial: optuna.trial.Trial):
    k_neighbours = trial.suggest_categorical("k_neighbours", range(1, 10))
    weighted = trial.suggest_categorical("KNN_weighted", [True, False])
    distance_type = trial.suggest_categorical('distance_type', ['square', 'absolute'])
    
    window_size = trial.suggest_categorical('window_len', range(5, 21))
    transform_dct = trial.suggest_categorical('dct_transform', [True, False])
    main_components = trial.suggest_categorical('main_components', [None, 0.5, 0.7])
    
    if main_components is not None:
        components = int(window_size * main_components)
    else:
        components = None
    
    metric = create_metric('DTW', distance_type)
    model = DummyClassifier(metric=metric, k_neighbours=k_neighbours, weighted=weighted)
    dataset = dataset_fabric('../data/XRPUSDT.csv', window_size, transform_dct, components)
    
    current_bank, ratio, metrics = validate(model, dataset, 30)
    
    # trial.set_user_attr("metrics", metrics)
    return ratio

try:
    sampler = optuna.samplers.TPESampler(n_startup_trials=10, group=True, multivariate=True)
    study = optuna.create_study(
        sampler=sampler,
        load_if_exists=True,
        storage="sqlite:///../optuna/KNN_first_trial_db.sqlite3",
        direction="maximize", 
        study_name="XRP")
    study.optimize(trial, n_trials=100, n_jobs=15, show_progress_bar=True)
except: pass

In [None]:
def trial(trial: optuna.trial.Trial):
    k_neighbours = trial.suggest_categorical("k_neighbours", range(1, 10))
    weighted = trial.suggest_categorical("KNN_weighted", [True, False])
    distance_type = trial.suggest_categorical('distance_type', ['square', 'absolute'])
    
    window_size = trial.suggest_categorical('window_len', range(5, 21))
    transform_dct = trial.suggest_categorical('dct_transform', [True, False])
    main_components = trial.suggest_categorical('main_components', [None, 0.5, 0.7])
    
    if main_components is not None:
        components = int(window_size * main_components)
    else:
        components = None
    
    metric = create_metric('DTW', distance_type)
    model = DummyClassifier(metric=metric, k_neighbours=k_neighbours, weighted=weighted)
    dataset = dataset_fabric('../data/SOLUSDT.csv', window_size, transform_dct, components)
    
    current_bank, ratio, metrics = validate(model, dataset, 30)
    
    # trial.set_user_attr("metrics", metrics)
    return ratio

try:
    sampler = optuna.samplers.TPESampler(n_startup_trials=10, group=True, multivariate=True)
    study = optuna.create_study(
        sampler=sampler,
        load_if_exists=True,
        storage="sqlite:///../optuna/KNN_first_trial_db.sqlite3",
        direction="maximize", 
        study_name="SOL")
    study.optimize(trial, n_trials=75, n_jobs=15, show_progress_bar=True)
except: pass

In [None]:
def trial(trial: optuna.trial.Trial):
    k_neighbours = trial.suggest_categorical("k_neighbours", range(1, 10))
    weighted = trial.suggest_categorical("KNN_weighted", [True, False])
    distance_type = trial.suggest_categorical('distance_type', ['square', 'absolute'])
    
    window_size = trial.suggest_categorical('window_len', range(5, 21))
    transform_dct = trial.suggest_categorical('dct_transform', [True, False])
    main_components = trial.suggest_categorical('main_components', [None, 0.5, 0.7])
    
    if main_components is not None:
        components = int(window_size * main_components)
    else:
        components = None
    
    metric = create_metric('DTW', distance_type)
    model = DummyClassifier(metric=metric, k_neighbours=k_neighbours, weighted=weighted)
    dataset = dataset_fabric('../data/PEPEUSDT.csv', window_size, transform_dct, components)
    
    current_bank, ratio, metrics = validate(model, dataset, 30)
    
    # trial.set_user_attr("metrics", metrics)
    return ratio

try:
    sampler = optuna.samplers.TPESampler(n_startup_trials=10, group=True, multivariate=True)
    study = optuna.create_study(
        sampler=sampler,
        load_if_exists=True,
        storage="sqlite:///../optuna/KNN_first_trial_db.sqlite3",
        direction="maximize", 
        study_name="PEPE")
    study.optimize(trial, n_trials=75, n_jobs=15, show_progress_bar=True)
except: pass