In [7]:
import pandas as pd
import numpy as np
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import joblib

In [8]:
df = pd.read_csv('merged_shuffled_data_10.csv')

In [9]:
FEATURES = ['RSI', 'EMA12', 'EMA26', 'MACD', 'Signal', 'Histogram', 'DEMA9', 'SMA', 'TSI', '%K', '%D']

X = df[FEATURES].values
y = df["Label"].values

# ===============================
# 2. Scale features
# ===============================
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
joblib.dump(scaler, "scaler_15m.pkl")  # save scaler for later predictions


['scaler_15m.pkl']

In [10]:
# ===============================
# 3. Create sliding windows for LSTM
# ===============================
def create_sequences(X, y, window_size=10):
    X_seq, y_seq = [], []
    for i in range(window_size, len(X)):
        X_seq.append(X[i-window_size:i])  # shape: (window_size, features)
        y_seq.append(y[i])
    return np.array(X_seq, dtype=np.float32), np.array(y_seq, dtype=np.int64)

window_size = 10
X_seq, y_seq = create_sequences(X_scaled, y, window_size)

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X_seq, y_seq, test_size=0.2, shuffle=False)

# Convert to tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)


In [11]:
# ===============================
# 4. Compute class weights
# ===============================
labels, counts = np.unique(y_train.numpy(), return_counts=True)
total = sum(counts)
class_weights = [total/count for count in counts]  # inverse frequency
class_weights = torch.tensor(class_weights, dtype=torch.float32)


In [12]:
# ===============================
# 5. Define LSTM Model
# ===============================
class CryptoLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
        super(CryptoLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, x):
        out, (hn, cn) = self.lstm(x)
        out = out[:, -1, :]  # last timestep
        out = self.fc(out)
        return out

input_dim = len(FEATURES)
hidden_dim = 128
num_layers = 4
output_dim = 3

model = CryptoLSTM(input_dim, hidden_dim, num_layers, output_dim)


In [13]:
# ===============================
# 6. Loss and optimizer
# ===============================
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# ===============================
# 7. Training loop
# ===============================
epochs = 500
batch_size = 32

for epoch in range(epochs):
    model.train()
    running_loss = 0
    permutation = torch.randperm(X_train.size(0))
    
    for i in range(0, X_train.size(0), batch_size):
        indices = permutation[i:i+batch_size]
        batch_X, batch_y = X_train[indices], y_train[indices]

        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/(X_train.size(0)//batch_size):.4f}")


Epoch 1/500, Loss: 1.1034
Epoch 2/500, Loss: 1.1030
Epoch 3/500, Loss: 1.1050
Epoch 4/500, Loss: 1.1029
Epoch 5/500, Loss: 1.1025
Epoch 6/500, Loss: 1.1026
Epoch 7/500, Loss: 1.1029
Epoch 8/500, Loss: 1.1029
Epoch 9/500, Loss: 1.1028


KeyboardInterrupt: 

In [None]:
model.eval()
with torch.no_grad():
    outputs = model(X_test)
    predictions = torch.argmax(outputs, dim=1)

accuracy = (predictions == y_test).float().mean()
print(f"Test Accuracy: {accuracy:.4f}")


In [None]:
torch.save(model.state_dict(), "greg_tech_8.pth")
print("Model saved successfully!")


In [None]:
import numpy as np
import torch
from sklearn.preprocessing import StandardScaler

# Example feature values
example_features = {
    'RSI': 40.231328, 
    'EMA12': 117510.625832,
    'EMA26': 117522.059487,
    'MACD': -11.433655,
    'Signal':36.277050,
    'Histogram':-47.710705,
    'DEMA9':117395.915076,
    'SMA':117365.470000,
    'TSI': 41.065534,
    '%K':29.053022,
    '%D':42.278097
}

FEATURES = ['RSI', 'EMA12', 'EMA26', 'MACD', 'Signal', 'Histogram', 'DEMA9', 'SMA', 'TSI', '%K', '%D']

# Scale features
X_scaled = scaler.transform(df[FEATURES].values)


In [None]:
window_size = 10
predictions = [1]*window_size  # first window_size rows cannot be predicted, assume "No Reversal"

for i in range(window_size, len(X_scaled)):
    X_input = torch.tensor(X_scaled[i-window_size:i], dtype=torch.float32).unsqueeze(0)  # (1, seq_len, features)
    with torch.no_grad():
        output = model(X_input)
        pred_label = torch.argmax(output, dim=1).item()
        predictions.append(pred_label)

df["Predicted"] = predictions

In [None]:
# ===============================
# 5. Plot Close prices with predicted reversals
# ===============================
import matplotlib.pyplot as plt
plt.figure(figsize=(15,7))
plt.plot(df.index, df["close"], label="Close Price", color="blue")

# Uptrend reversals (label=2)
plt.scatter(df.index[df["Predicted"]==2], df["close"][df["Predicted"]==2],
            marker="^", color="green", s=100, label="Predicted Uptrend Reversal")

# Downtrend reversals (label=0)
plt.scatter(df.index[df["Predicted"]==0], df["close"][df["Predicted"]==0],
            marker="v", color="red", s=100, label="Predicted Downtrend Reversal")

plt.title("Predicted Reversals on Close Prices")
plt.xlabel("Time")
plt.ylabel("Price")
plt.legend()
plt.show()