In [None]:
import numpy as np
import pandas as pd 
from pylab import mpl, plt
import random
import optuna
import os
os.environ['PYTORCH_MPS_HIGH_WATERMARK_RATIO'] = '0.0'

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
torch.set_num_threads(1)


from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, roc_auc_score

import matplotlib.pyplot as plt
plt.style.use('seaborn-v0_8-darkgrid')
mpl.rcParams['font.family'] = 'serif'
%matplotlib inline

import warnings
warnings.simplefilter("ignore", UserWarning)

import vectorbtpro as vbt


from plotly.subplots import make_subplots
import plotly.graph_objects as go
from collections import deque

vbt.settings.set_theme('dark')
vbt.settings['plotting']['layout']['width'] = 800
vbt.settings['plotting']['layout']['height'] = 400


In [None]:
df_sol = pd.read_csv('2ySOLdata1h.csv')
df_sol['timestamp'] = pd.to_datetime(df_sol['timestamp'], unit='s')
df_sol.set_index('timestamp', inplace=True)
pd.set_option('future.no_silent_downcasting', True)

In [None]:
div = 8
data = df_sol.iloc[:, 3:4].copy()
data_trimmed = data[-100:].copy()

In [None]:
pd.set_option('future.no_silent_downcasting', True)
data_trimmed.loc[:, 'signal'] = 'SignalNone'
# Define window size
window_size = 10

rolling_max = data_trimmed.loc[:,'Close'].rolling(window=2*window_size+1, center=True, min_periods=1).max()
rolling_min = data_trimmed.loc[:,'Close'].rolling(window=2*window_size+1, center=True, min_periods=1).min()

is_peak = (data_trimmed.loc[:, 'Close'] == rolling_max)

is_low = (data_trimmed.loc[:, 'Close'] == rolling_min) 

# Update signal columns where conditions are met
data_trimmed.loc[is_peak, 'signal'] = 'SignalShort'  # Mark peaks with SignalShort
data_trimmed.loc[is_low, 'signal'] = 'SignalLong'   # Mark lows with SignalLong


In [None]:
df = data_trimmed.copy()

In [None]:
df_filtered = df[df['signal'] != 'SignalNone'].copy()

# Iterate through the DataFrame and adjust the signals
for i in range(1, len(df_filtered)):
    current_signal = df_filtered.iloc[i]['signal']
    previous_signal = df_filtered.iloc[i - 1]['signal']
    current_close = df_filtered.iloc[i]['Close']
    previous_close = df_filtered.iloc[i - 1]['Close']
    
    if current_signal == previous_signal:
        if current_signal == 'SignalLong' and previous_close > current_close:
            df_filtered.iloc[i - 1, df_filtered.columns.get_loc('signal')] = 'SignalNone'
        elif current_signal != 'SignalLong' and previous_close < current_close:
            df_filtered.iloc[i - 1, df_filtered.columns.get_loc('signal')] = 'SignalNone'
        else:
            df_filtered.iloc[i, df_filtered.columns.get_loc('signal')] = 'SignalNone'


df.update(df_filtered)


# # Making binary... comment out if not desired
# previous_signal = None  # Initialize a variable to keep track of the previous non-"SignalNone" value

# for i in range(len(df)):
#     if df.iloc[i, df_filtered.columns.get_loc('signal')] == "SignalNone" and previous_signal is not None:
#         df.iloc[i, df_filtered.columns.get_loc('signal')] = previous_signal  # Replace "SignalNone" with the previous signal
#     elif df.iloc[i, df_filtered.columns.get_loc('signal')] != "SignalNone":
#         previous_signal = df.iloc[i, df_filtered.columns.get_loc('signal')]  # Update the previous signal to the current one if it's not "SignalNone"

# df = df.loc[df['signal'] != 'SignalNone']
# # end of binary

df.loc[:,'signal'] = df.loc[:,'signal'].replace({'SignalLong': 2, 'SignalShort': 0, 'SignalNone': 1})
df = df.ffill()

In [None]:
def interpolate_signals(df):
    # Convert 'signal' to float to allow interpolation
    df['signal'] = df['signal'].astype(float)

    # Get boolean masks for Long (2) and Short (0) signals
    long_signals = df['signal'] == 2
    short_signals = df['signal'] == 0

    # Iterate over DataFrame by index
    for idx in df.index[:-1]:  # Skip the last index to prevent out-of-range
        if short_signals.loc[idx]:
            # Find next Long signal after the current Short signal
            next_long_idx = df.loc[idx:].index[long_signals[idx:]].min()
            
            # Linear interpolation
            start_close = df.loc[idx, 'Close']
            end_close = df.loc[next_long_idx, 'Close']
            close_range = end_close - start_close

            if close_range == 0:  # Avoid division by zero
                continue

            # Update signal values between Short and Long signals
            for mid_idx in df.loc[idx:next_long_idx].index:
                if mid_idx == idx or mid_idx == next_long_idx:
                    continue  # Skip updating the start and end points
                current_close = df.loc[mid_idx, 'Close']
                proportion = (current_close - start_close) / close_range
                interpolated_signal = 2 * proportion
                df.at[mid_idx, 'signal'] = interpolated_signal
     
        if long_signals.loc[idx]:
            # Find next Short signal after the current Long signal
            next_short_indices = df.loc[idx:].index[short_signals[idx:]]
            if not next_short_indices.empty:
                next_short_idx = next_short_indices.min()
            else:
                # No more Short signals after this Long signal
                continue
            
            start_close_long = df.loc[idx, 'Close']
            if pd.isna(next_short_idx):
                continue  # If no Short signal is found, skip this iteration
            end_close_long = df.loc[next_short_idx, 'Close']
            close_range_long = end_close_long - start_close_long

            if close_range_long == 0:
                continue


            for mid_idx_long in df.loc[idx:next_short_idx].index:
                if mid_idx_long == idx or mid_idx_long == next_short_idx:
                    continue
                current_close_long = df.loc[mid_idx_long, 'Close']
                proportion_long = (current_close_long - start_close_long) / close_range_long
                interpolated_signal_long = 2 - (2 * proportion_long)
                df.at[mid_idx_long, 'signal'] = interpolated_signal_long

    return df


df_interpolated = interpolate_signals(df.copy())


In [None]:
signal = df_interpolated['signal']
entries = signal == 2
exits = signal == 0
pf = vbt.Portfolio.from_signals(
    close=df.Close, 
    long_entries=entries, 
    short_entries=exits,
    size=100,
    size_type='value',
    # accumulate=True,
    init_cash='auto'
)
pf.plot({"orders"}).show()




In [None]:
fig = signal.vbt.plot()
fig.update_layout(yaxis_title='signal')
fig.show()