In [None]:
%matplotlib widget

# загрузим данные
import numpy as np
import pandas as pd

file_path = "../npz/BTCUSD_1T_close_only.npz"
data = np.load(file_path, allow_pickle=True)
df = pd.DataFrame(data['data'], columns=['timestamp', 'close'])
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['close'] = df['close'].astype(float)
df.set_index('timestamp', inplace=True)

In [None]:
dfc = df[:100000].copy()
dfc['close'] = np.log(dfc['close']) - np.log(dfc['close'].iloc[0])

from itertools import islice

def get_positions(windows, diffs, diff_mins, diff_maxs, grid_sizes, i):
    positions = []
    for wi, w in islice(enumerate(windows), 1, None):
        diff = diffs[wi].iloc[i]
        diff_min = diff_mins[wi]
        diff_max = diff_maxs[wi]
        grid_size = grid_sizes[wi]

        # Avoid division by zero and ensure diff_max > diff_min
        if diff_max == diff_min:
            pos_fixed = 0
        else:
            pos = (grid_size - 1) * (diff - diff_min) / (diff_max - diff_min)
            pos_fixed = max(0, min(round(pos), grid_size - 1))  # Ensure position is within bounds

        positions.append(pos_fixed)
    return positions

def get_diffs(df, windows):
    closes = df['close']
    mas = {}
    diffs = {}
    for wi, w in enumerate(windows):
        mas[wi] = closes.rolling(w).mean() if w > 0 else closes
        if wi > 0:
            diffs[wi] = mas[wi] - mas[wi - 1]
    return diffs
    
def get_min_max_diffs(diffs, windows):
    diff_mins = [0]
    diff_maxs = [0]
    for wi, w in islice(enumerate(windows), 1, None):
        diff_mins.append(diffs[wi].min())
        diff_maxs.append(diffs[wi].max())
    return diff_mins, diff_maxs

def get_transitions_map(df, windows, tau, grid_sizes, transitions_grid_size):
    """
    creates a map of price shifts
    """
    diffs = get_diffs(df, windows)
    diff_mins, diff_maxs = get_min_max_diffs(diffs, windows)

    closes = df['close']
    transition_values = closes.shift(-tau) - closes
    transitions_min = transition_values.min()
    transitions_max = transition_values.max()
    transitions = {}
    transitions_map = {}
    max_window = max(windows)
    for i in range(max_window, len(df) - tau):
        positions = get_positions(windows, diffs, diff_mins, diff_maxs, grid_sizes, i)

        # Calculate transition position
        transition_value = transition_values.iloc[i]
        if transitions_max == transitions_min:
            transitions_pos_fixed = 0
        else:
            transitions_pos = (transitions_grid_size - 1) * (transition_value - transitions_min) / (transitions_max - transitions_min)
            transitions_pos_fixed = max(0, min(round(transitions_pos), transitions_grid_size - 1))

        # Update transitions_map
        if tuple(positions) not in transitions_map:
            transitions_map[tuple(positions)] = {}
        if transitions_pos_fixed not in transitions_map[tuple(positions)]:
            transitions_map[tuple(positions)][transitions_pos_fixed] = 1
        else:
            transitions_map[tuple(positions)][transitions_pos_fixed] += 1
    return transitions, diff_mins, diff_maxs, transitions_map, transitions_min, transitions_max

windows = [120, 240, 480, 960]
grid_sizes = [13, 13, 13, 13]
tau = 30
transitions_grid_size=99

transitions, diff_mins, diff_maxs, transitions_map, transitions_min, transitions_max = \
    get_transitions_map(dfc, windows, tau, grid_sizes, transitions_grid_size)

In [None]:
# Filter out transitions with low frequencies
def filter_low_frequency_transitions(transitions_map, threshold):
    """
    Filters out transitions with frequencies below the given threshold.
    
    Args:
        transitions_map (dict): The transitions map to filter.
        threshold (int): The minimum frequency threshold.
    
    Returns:
        dict: A filtered transitions map.
    """
    filtered_map = {}
    for positions, counts in transitions_map.items():
        # Filter out transitions with counts below the threshold
        filtered_counts = {k: v for k, v in counts.items() if v >= threshold}
        if filtered_counts:  # Only include positions with valid counts
            filtered_map[positions] = filtered_counts
    return filtered_map

# Apply the filter with a threshold of 10
filtered_transitions_map = filter_low_frequency_transitions(transitions_map, threshold=50)

#transitions_map
filtered_transitions_map

In [None]:
def plot_transitions(skip):
    max_window = max(windows)
    start = 100000 + skip
    end = start + 300 + max_window
    dfc = df[start:end].copy()
    dfc['close'] = np.log(dfc['close']) - np.log(dfc['close'].iloc[0])

    diffs = get_diffs(dfc, windows)
    diff_mins2, diff_maxs2 = get_min_max_diffs(diffs, windows)
    print(diff_mins, diff_mins2)
    averages = []
    confidences = []
    for i in range(max_window, len(dfc)):
        positions = get_positions(windows, diffs, diff_mins, diff_maxs, grid_sizes, i)
        if tuple(positions) in filtered_transitions_map:
            probs = transitions_map[tuple(positions)]
            average = 0
            hits = sum(probs.values())
            for k, v in probs.items():
                shift = (transitions_max - transitions_min) * k / (transitions_grid_size - 1) + transitions_min
                average += shift * v / hits
            averages.append(average)
            confidences.append(hits)
        else:
            averages.append(0)
            confidences.append(0)

    import matplotlib.pyplot as plt
    fig, ax1 = plt.subplots(figsize=(14, 6))  # создаём figure и первую ось
    ax1.plot(dfc[max_window:].index, confidences, label='confidence', color='red', alpha=0.3)
    ax1.set_ylabel('Confidence')
    ax2 = ax1.twinx()
    ax2.plot(df['close'][start:end][max_window:], label='close')
    ax2.plot(np.exp(dfc['close'][max_window:] + averages + np.log(df['close'].iloc[start])), label='average', linestyle='--')
    plt.legend()

    # vline = ax1.axvline(x=dfc.index[max_window], color='gray', linestyle='--')  # вертикальная линия, начально x=0

    # Вертикальная линия (общая для обеих осей)
    vline = ax1.axvline(x=dfc.index[max_window], color='gray', linestyle='--')

    def on_mouse_move(event):
        if event.inaxes in [ax1, ax2] and event.xdata is not None:
            vline.set_xdata([event.xdata])
            fig.canvas.draw_idle()

    fig.canvas.mpl_connect('motion_notify_event', on_mouse_move)

    plt.show()

skip = 0

In [None]:
skip += 150
plot_transitions(skip)