In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
# Import packages
import numpy as np
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler  # type: ignore
from tensorflow.keras.models import load_model # type: ignore
import tensorflow as tf
import pandas as pd
import json

# Local imports
from src.data_processing.lstm_data_preprocessing import reduce_time_bucket_features, FeaturesConfig, TimeBucketConfig
from src.data_processing.loader import load_time_bucket_data

In [3]:
# Load the model and details on configurations used to train the model like the time bucket data used and the features used

# Custom loss function
def weighted_mse_large_moves(y_true, y_pred):
    diff = y_true - y_pred
    weight = tf.math.square(y_true)
    return tf.reduce_mean(weight * tf.square(diff))

model_folder = "../model_generation/trained_models/lstm_1"
model = load_model(os.path.join(model_folder, "model.keras"),
    custom_objects={'weighted_mse_large_moves': weighted_mse_large_moves})
with open(os.path.join(model_folder, "config.json"), 'r') as f:
    configs = json.load(f)

features_config = FeaturesConfig(**configs["features_config"])
time_bucket_folder = configs["time_bucket_folder"]
test_size = configs["test_size"]

In [4]:
# Get the train test data set used to train the model were testing

X_scaler = StandardScaler()
y_scaler = StandardScaler()

token_time_buckets, time_bucket_config = load_time_bucket_data(time_bucket_folder)

token_datasets = []
for token_address, data in token_time_buckets.items():
    X = data["X"]
    y = data["y"]
    bucket_times = data["bucket_times"]

    # Only get the features listed in features_config
    X = reduce_time_bucket_features(X, features_config)

    token_datasets.append((X, y, token_address, bucket_times))

# Combine all token data
all_X = np.vstack([data[0] for data in token_datasets])
all_y = np.vstack([data[1].reshape(-1, 1) for data in token_datasets])

# Scale features
num_samples, time_steps, features = all_X.shape
X_reshaped = all_X.reshape(num_samples * time_steps, features)
X_scaled = X_scaler.fit_transform(X_reshaped)
X_scaled = X_scaled.reshape(num_samples, time_steps, features)

# Scale target variable also using StandardScaler to preserve direction
y_scaled = y_scaler.fit_transform(all_y)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=test_size, shuffle=False)

FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\matth\\Uni Work\\CM3203 - Individual Project\\Project Environment\\CM3203-Individual-Project\\data\\time_bucket_data\\time_bucket_1\\config.json'

In [None]:
y_pred = model.predict(X_test)

# Inverse transform to get real values
y_pred_actual = y_scaler.inverse_transform(y_pred)
y_test_actual = y_scaler.inverse_transform(y_test)

In [None]:
from visualisation_methods import plot_error_distribution, plot_directional_accuracy, plot_prediction_vs_actual, plot_magnitude_accuracy, plot_value_distributions

In [None]:
plot_error_distribution(y_pred_actual, y_test_actual, title="Error distribution")

In [None]:
plot_value_distributions(y_pred_actual, y_test_actual)

In [None]:
print(max(y_pred_actual))

In [None]:
bins = [-np.inf, -8, -4, -2, -1, -0.5, -0.1, 0, 0.1, 0.5, 1, 2, 4, 8, np.inf]
plot_directional_accuracy(y_pred_actual, y_test_actual, bins=bins)

In [None]:
plot_prediction_vs_actual(y_pred_actual, y_test_actual)

In [None]:
from visualisation_methods import plot_error_heatmap
plot_error_heatmap(y_pred_actual, y_test_actual)

In [None]:
plot_magnitude_accuracy(y_pred_actual, y_test_actual, bins=bins, use_percentage_error=True)

In [None]:
def get_test_tokens_with_large_pred(token_datasets, test_size, y_pred_actual, min_abs_pred_size=1.5):
    total_buckets = sum(len(data[1]) for data in token_datasets)
    test_start_idx = int((1 - test_size) * total_buckets)

    fully_test_tokens = {}
    current_idx = 0
    y_pred_test_index = 0  # Index within y_pred_actual

    for X, y, token_address, bucket_times in token_datasets:
        token_len = len(y)
        token_start_idx = current_idx
        current_idx += token_len

        # Only process tokens fully in the test set
        if token_start_idx >= test_start_idx:
            bucket_pred_map = []

            for i in range(token_len):
                if y_pred_test_index >= len(y_pred_actual):
                    break  # Prevent overflow if mismatch in lengths
                pred = y_pred_actual[y_pred_test_index].item()
                if abs(pred) >= min_abs_pred_size:
                    bucket_pred_map.append({
                        'bucket_time': tuple(bucket_times[i]),
                        'prediction': pred
                    })
                y_pred_test_index += 1

            if bucket_pred_map:
                fully_test_tokens[token_address] = bucket_pred_map

    return fully_test_tokens


In [None]:
# Set minimum absolute prediction size
min_abs_pred_size = 1.5

# Get tokens with large predictions
test_tokens_pred = get_test_tokens_with_large_pred(token_datasets, test_size, y_pred_actual, min_abs_pred_size=min_abs_pred_size)

# Print the results
print(list(test_tokens_pred.keys()))  # Print token addresses
print(len(test_tokens_pred))  # Print number of tokens

In [None]:
from visualisation_methods import plot_predictions_on_price_graph
token_address = "bpR2DF4sMarWvMJT6tevpQ9K7AtXskBQ6zA35iCpump"
bucket_pred_map = test_tokens_pred[token_address]
plot_predictions_on_price_graph(token_address, bucket_pred_map, min_abs_pred_size=min_abs_pred_size)



In [None]:
min_abs_pred_size = 3
test_tokens_real = get_test_tokens_with_large_pred(token_datasets, test_size, y_test_actual, min_abs_pred_size=min_abs_pred_size)
print(list(test_tokens_real.keys()))
print(len(test_tokens_real))

In [None]:
token_address = "E5YYfMPBbWz3vvaQcdPfNSC659nBmMZFqpAXqPtcpump"
bucket_pred_map = test_tokens_real[token_address]
plot_predictions_on_price_graph(token_address, bucket_pred_map, min_abs_pred_size=min_abs_pred_size)

In [None]:
from src.data_processing.loader import load_token_price_data

# Get all 1.75x predictions and their tokens
test_tokens_pred = get_test_tokens_with_large_pred(token_datasets, test_size, y_pred_actual, min_abs_pred_size=1.5)

# Load price data
tokens_tx_data = {
    token: load_token_price_data(token, use_datetime=False)
    for token in test_tokens_pred.keys()
}

buy_size = 0.1  # Fixed investment in SOL

# Strategy parameters
sell_targets = [1.0, 1.5, 2.5, 5]                # Profit target percentages (e.g., +100%, +150%)
trailing_stop_targets = [0.5, 0.75]       # Trailing drawdown levels (e.g., -10%, -20%)
buy_fee_pct = 0.01
sell_fee_pct = 0.01
slippage_pct = 0.005

# Stats
total_profit = 0
num_trades = 0
num_wins = 0
num_losses = 0
profits = []

for token_address, bucket_pred_map in test_tokens_pred.items():
    price_df = tokens_tx_data[token_address]
    price_df = price_df.groupby(price_df.index).mean().sort_index()

    print(f"\nSimulating for token: {token_address}")
    executed_trade = False

    for pred_data in bucket_pred_map:
        if executed_trade:
            break

        bucket_time = pred_data['bucket_time']
        pred = pred_data['prediction']
        if pred < 0 :
            continue
        start_time, end_time = bucket_time

        if end_time not in price_df.index:
            closest_idx = price_df.index.get_indexer([end_time], method='nearest')[0]
            closest_time = price_df.index[closest_idx]
        else:
            closest_time = end_time

        raw_buy_price = price_df.loc[closest_time, 'price']
        if pd.isna(raw_buy_price):
            continue

        buy_price = raw_buy_price * (1 + slippage_pct)
        entry_value = buy_size * (1 - buy_fee_pct)
        tokens_bought = entry_value / buy_price
        remaining_tokens = tokens_bought
        profit = -buy_size
        peak_price = buy_price

        print(f"  Prediction: {pred}x | Buy Price: {buy_price} (with slippage/fees)")

        future_prices = price_df.loc[closest_time:]
        if len(future_prices) < 2:
            continue

        target_idx = 0
        stop_idx = 0

        for time, row in future_prices.iterrows():
            raw_price = row['price']
            peak_price = max(peak_price, raw_price)

            sell_price = raw_price * (1 - slippage_pct) * (1 - sell_fee_pct)

            # Profit targets
            if target_idx < len(sell_targets) and raw_price >= buy_price * (1 + sell_targets[target_idx]):
                sell_amount = remaining_tokens * 0.5
                profit += sell_amount * sell_price
                remaining_tokens -= sell_amount
                print(f"    Sold 50% at profit target {sell_targets[target_idx]}x | Price: {raw_price}")
                target_idx += 1

            # Trailing stop stages
            drawdown = 1 - (raw_price / peak_price)
            if stop_idx < len(trailing_stop_targets) and drawdown >= trailing_stop_targets[stop_idx]:
                sell_amount = remaining_tokens * 0.5
                profit += sell_amount * sell_price
                remaining_tokens -= sell_amount
                print(f"    Sold 50% at trailing stop {int(trailing_stop_targets[stop_idx]*100)}% | Price: {raw_price}")
                stop_idx += 1

            # Fully exit if no tokens left
            if remaining_tokens <= 0:
                print(f"    Fully exited. Total profit: {profit} SOL")
                break

        # If tokens remain, sell at final price
        if remaining_tokens > 0:
            final_price = raw_price * (1 - slippage_pct) * (1 - sell_fee_pct)
            profit += remaining_tokens * final_price
            print(f"    Final exit at {raw_price:.4f}. Total profit: {profit} SOL")

        # Update stats
        executed_trade = True
        num_trades += 1
        total_profit += profit
        profits.append(profit)
        if profit > 0:
            num_wins += 1
        else:
            num_losses += 1

# Summary
print("\n--- Strategy Summary ---")
print(f"Total Trades: {num_trades}")
print(f"Wins: {num_wins} | Losses: {num_losses}")
print(f"Win Rate: {(num_wins / num_trades * 100):.2f}%" if num_trades > 0 else "No trades made")
print(f"Total Profit: {total_profit:.4f} SOL")
if num_trades > 0:
    print(f"Average Profit per Trade: {(total_profit / num_trades):.4f} SOL")


In [None]:
from tensorflow.keras.utils import plot_model

# Assuming your model is named 'model'
plot_model(model, to_file='model_diagram.png', show_shapes=True, show_layer_names=True)
