In [2]:
import pandas as pd
from indicators import RSI, extract_bb
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import cross_validate, GridSearchCV
from sklearn.metrics import accuracy_score
from sklearn.ensemble import GradientBoostingClassifier
import numpy as np
import warnings
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

warnings.filterwarnings("ignore")

In [3]:
df = pd.read_csv("gzpn_data.csv")
df = df.dropna().sample(frac=1).reset_index(drop=True)
df = df.drop(columns=["<TICKER>", "<PER>", "<DATE>", "<TIME>"])
df.columns = ["open", "high", "low", "close", "volume"]
df

Unnamed: 0,open,high,low,close,volume
0,163.43,163.50,163.43,163.47,10440
1,163.62,163.63,163.56,163.58,6200
2,162.50,162.50,162.48,162.49,38410
3,166.61,166.67,166.60,166.64,15740
4,166.74,166.74,166.73,166.74,4980
...,...,...,...,...,...
2128,166.93,167.39,166.93,167.21,1180230
2129,166.24,166.35,166.24,166.26,87430
2130,168.17,168.24,168.03,168.09,49680
2131,166.68,166.72,166.67,166.69,15180


In [4]:
n_steps = 11

prices = df["close"]

rsi_values = RSI(prices=prices, n_steps=n_steps)
bb_values = extract_bb(prices=prices, n_steps=n_steps)

assert len(rsi_values) == len(
    bb_values
), f"Indicators length don't coincide: {len(rsi_values)} and {len(bb_values)}"

In [5]:
def prepare_target(df, steps_obs: int = 3):
    targets = []
    for i in range(0, len(df) - steps_obs):
        current_price = df["close"].iloc[i]
        max_price = df["high"].iloc[i + 1 : i + 1 + steps_obs].max()
        targets.append(max_price > current_price)
    targets += [0] * steps_obs
    return np.array(targets, dtype=np.int32)


steps_obs = 3

targets = prepare_target(df=df, steps_obs=steps_obs)

In [6]:
all_data = (
    pd.DataFrame(
        data=np.array([rsi_values, bb_values, targets]).T,
        columns=["rsi", "bb", "target"],
    )
    .dropna()
    .reset_index(drop=True)
    .astype(np.float64)
)
all_data["target"] = all_data["target"].astype(np.int32)
all_data

Unnamed: 0,rsi,bb,target
0,57.688849,1.078445,0
1,50.953137,-1.037126,1
2,44.016319,-0.521764,1
3,46.187364,-0.117254,1
4,50.404184,-1.047835,1
...,...,...,...
2117,52.772316,0.844034,1
2118,44.282029,0.322123,1
2119,63.741461,1.142901,0
2120,50.974601,0.586726,0


In [7]:
all_data.describe()

Unnamed: 0,rsi,bb,target
count,2122.0,2122.0,2122.0
mean,50.003633,-0.000611,0.764844
std,6.646051,0.951884,0.424196
min,25.292969,-2.154532,0.0
25%,45.591125,-0.838591,1.0
50%,50.040521,-0.210749,1.0
75%,54.446105,0.820443,1.0
max,75.0,2.314039,1.0


# Обработка фичей

In [8]:
scaler = MinMaxScaler()
scaler.fit(all_data.iloc[:, :-1])
all_data.iloc[:, :-1] = scaler.transform(all_data.iloc[:, :-1])
all_data

Unnamed: 0,rsi,bb,target
0,0.651736,0.723492,0
1,0.516228,0.250059,1
2,0.376674,0.365389,1
3,0.420351,0.455913,1
4,0.505184,0.247662,1
...,...,...,...
2117,0.552826,0.671035,1
2118,0.382020,0.554239,1
2119,0.773502,0.737917,0
2120,0.516660,0.613453,0


In [9]:
X = all_data.iloc[:, :-1]
y = all_data.iloc[:, -1]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=0, shuffle=False
)

# Обучение модели

## Logistic Regression

In [17]:
model = LogisticRegression()
results = cross_validate(model, X, y, cv=5, scoring="balanced_accuracy")
results["test_score"].mean()

0.684035903579623

In [18]:
tree_params = {
    "criterion": ["gini", "log_loss", "entropy"],
    "max_depth": [15, 20, 25],
    "min_samples_split": [2, 3, 4],
    "min_samples_leaf": [2, 3, 4],
}

## Decision Tree

In [25]:
grs = GridSearchCV(
    DecisionTreeClassifier(random_state=0),
    cv=6,
    param_grid=tree_params,
    n_jobs=-1,
    scoring="balanced_accuracy",
)
grs.fit(X, y)

In [27]:
grs.best_score_

0.7071748915272397

In [28]:
grs.best_params_

{'criterion': 'gini',
 'max_depth': 15,
 'min_samples_leaf': 4,
 'min_samples_split': 2}

## XGboost

In [29]:
boost_params = {
    "loss": ["log_loss", "exponential"],
    "learning_rate": [0.1, 0.2],
    "n_estimators": [70, 80, 90],
    "max_depth": [2, 3],
    "min_samples_leaf": [3, 4, 5],
    "min_samples_split": [2, 3],
}

In [30]:
grs = GridSearchCV(
    GradientBoostingClassifier(random_state=0),
    cv=5,
    param_grid=boost_params,
    n_jobs=-1,
    scoring="balanced_accuracy",
)
grs.fit(X, y)
print(grs.best_score_)
print(grs.best_params_)

0.7168896840467319
{'learning_rate': 0.2, 'loss': 'exponential', 'max_depth': 2, 'min_samples_leaf': 3, 'min_samples_split': 2, 'n_estimators': 80}


## Neural network (simple)

In [27]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [10]:
X_train = torch.Tensor(X_train.values)
y_train = torch.Tensor(y_train.values).reshape(-1, 1)
X_test = torch.Tensor(X_test.values)
y_test = torch.Tensor(y_test.values).reshape(-1, 1)

In [28]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader


# Define a simple neural network
class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.fc1 = nn.Linear(2, 32)
        self.fc2 = nn.Linear(32, 1)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x


# data_num = 1024
# Generate some random data
# X = torch.Tensor(X)
# y = torch.Tensor(y).reshape(-1,1)

batch_size = 256
dataset_train = TensorDataset(X_train, y_train)
dataset_test = TensorDataset(X_test, y_test)
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)

# Initialize model and optimizer
model = SimpleModel()
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

print(count_parameters(model))

129


In [None]:
# Training loop
epochs = 5000
for epoch in range(epochs):
    loss_train = []

    for inputs, labels in dataloader_train:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        loss_train.append(loss.item())
        optimizer.step()

    if epoch % 100 == 0:
        loss_test = []
        for inputs, labels in dataloader_test:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss_test.append(loss.item())
            optimizer.step()
        print(
            f"Epoch [{epoch+1}/{epochs}], Loss_train: {np.mean(loss_train):.4f}, Loss_test: {np.mean(loss_test):.4f}"
        )


# Save the trained model
# torch.save(model.state_dict(), 'simple_model.pth')

## Reccurent neural network

In [23]:
def prepare_input(data, sequence_length):
    """
    Prepare input data for an RNN by creating sequences of fixed length.

    Args:
    - data (torch.Tensor): 1D tensor containing the input data.
    - sequence_length (int): Length of each sequence.

    Returns:
    - torch.Tensor: 3D tensor containing the input data in suitable format for RNN.
    """
    input_sequences = []
    for i in range(len(data) - sequence_length + 1):
        input_seq = data[i : i + sequence_length]
        input_sequences.append(input_seq)
    return torch.stack(input_sequences)


sequence_length = 3
X_train_rnn = prepare_input(X_train, sequence_length)
y_train_rnn = y_train[sequence_length - 1 :]

X_test_rnn = prepare_input(X_test, sequence_length)
y_test_rnn = y_test[sequence_length - 1 :]

In [25]:
y_train_rnn.shape, X_train_rnn.shape

(torch.Size([1695, 1]), torch.Size([1695, 3, 2]))

In [41]:
# Define a simple neural network
class SequentialModel(nn.Module):
    def __init__(
        self,
        input_size: int = 2,
        hidden_size: int = 8,
        output_size: int = 1,
        rnn_layers: int = 2,
        n_layers: int = 2,
        dropout: float = 0.1,
        input_layer_type=nn.RNN,
    ):
        super(SequentialModel, self).__init__()
        self.rnn_layer = input_layer_type(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=rnn_layers,
            batch_first=True,
            dropout=dropout,
        )
        self.fcc_layers = []
        for i in range(n_layers):
            self.fcc_layers.append(nn.Linear(hidden_size, hidden_size))
        self.output_layer = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        rnn_output, h0 = self.rnn_layer(x)
        x = torch.relu(rnn_output[:, -1, :])
        for fcc_layer in self.fcc_layers:
            x = torch.relu(fcc_layer(x))
            x = self.dropout(x)
        x = self.output_layer(x)
        return x


# data_num = 1024
# Generate some random data
# X = torch.Tensor(X)
# y = torch.Tensor(y).reshape(-1,1)

batch_size = 256
dataset_train = TensorDataset(X_train_rnn, y_train_rnn)
dataset_test = TensorDataset(X_test_rnn, y_test_rnn)
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)

# Initialize model and optimizer
model = SequentialModel(
    input_size=2,
    hidden_size=4,
    rnn_layers=3,
    n_layers=2,
    output_size=1,
    dropout=0.1,
    input_layer_type=nn.RNN,
)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

count_parameters(model)

117

In [46]:
# Training loop
epochs = 600
for epoch in range(epochs):
    loss_train = []

    for inputs, labels in dataloader_train:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        loss_train.append(loss.item())
        optimizer.step()

    if epoch % 100 == 0:
        loss_test = []
        for inputs, labels in dataloader_test:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss_test.append(loss.item())
            optimizer.step()
        print(
            f"Epoch [{epoch+1}/{epochs}], Loss_train: {np.mean(loss_train):.4f}, Loss_test: {np.mean(loss_test):.4f}"
        )


# Save the trained model
# torch.save(model.state_dict(), 'simple_model.pth')

Epoch [1/600], Loss_train: 0.6618, Loss_test: 0.6641
Epoch [101/600], Loss_train: 0.6245, Loss_test: 0.6284
Epoch [201/600], Loss_train: 0.6005, Loss_test: 0.6040
Epoch [301/600], Loss_train: 0.5814, Loss_test: 0.5813
Epoch [401/600], Loss_train: 0.5721, Loss_test: 0.5661
Epoch [501/600], Loss_train: 0.5615, Loss_test: 0.5566


In [48]:
from sklearn.metrics import accuracy_score

y_pred = torch.sigmoid(model(X_test_rnn)).detach().numpy().reshape(-1)
y_pred = np.round(y_pred)
accuracy_score(y_test_rnn, y_pred)

0.7659574468085106

In [45]:
y_pred.min()

0.07212937

In [17]:
output.shape

torch.Size([5, 3, 20])