In [None]:
import pandas as pd
import numpy as np
import math
import datetime as dt
import matplotlib.pyplot as plt

from sklearn.metrics import mean_squared_error, mean_absolute_error, explained_variance_score, r2_score 
from sklearn.metrics import mean_poisson_deviance, mean_gamma_deviance, accuracy_score
from sklearn.preprocessing import MinMaxScaler

from itertools import product
import statsmodels.api as sm

import tensorflow as tf

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.layers import LSTM
from itertools import cycle
import plotly.offline as py
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

btc_input_df = pd.read_csv("/kaggle/input/krishhhhhhhh/btc_1h.csv")
btc_input_df_datetype = btc_input_df.astype({'datetime': 'datetime64[ns]'})
plt.figure(figsize=(15,12))
plt.suptitle('Lag Plots', fontsize=22)

plt.subplot(3,3,1)
pd.plotting.lag_plot(btc_input_df_datetype['close'], lag=1) #minute lag
plt.title('1-Minute Lag')

plt.subplot(3,3,2)
pd.plotting.lag_plot(btc_input_df_datetype['close'], lag=60) #hourley lag
plt.title('1-Hour Lag')

plt.subplot(3,3,3)
pd.plotting.lag_plot(btc_input_df_datetype['close'], lag=1440) #Daily lag
plt.title('Daily Lag')

plt.subplot(3,3,4)
pd.plotting.lag_plot(btc_input_df_datetype['close'], lag=10080) #weekly lag
plt.title('Weekly Lag')

plt.subplot(3,3,5)
pd.plotting.lag_plot(btc_input_df_datetype['close'], lag=43200) #month lag
plt.title('1-Month Lag')

plt.legend()
plt.show()

btc_input_df['datetime'] = pd.to_datetime(btc_input_df['datetime'])
#display(btc_input_df_datetype.head())

group = btc_input_df_datetype.groupby('datetime')

btc_closing_price_groupby_date = group['close'].mean()
display(btc_closing_price_groupby_date.head(10))

print("Length of btc_closing_price_groupby_date :", len(btc_closing_price_groupby_date))

prediction_days = 60

# Set Train data to be uplo ( Total data length - prediction_days )
df_train= btc_closing_price_groupby_date[:len(btc_closing_price_groupby_date)-prediction_days].values.reshape(-1,1)


# Set Test data to be the last prediction_days (or 60 days in this case)
df_test= btc_closing_price_groupby_date[len(btc_closing_price_groupby_date)-prediction_days:].values.reshape(-1,1)
df_test.shape


chosen_col = 'Close'

fig, ax = plt.subplots(1, figsize=(13, 7))
ax.plot(df_train, label='Train', linewidth=2)
ax.plot(df_test, label='Test', linewidth=2)
ax.set_ylabel('Price USD', fontsize=14)
ax.set_title('', fontsize=16)
ax.legend(loc='best', fontsize=16)

scaler_train = MinMaxScaler(feature_range=(0, 1))
scaled_train = scaler_train.fit_transform(df_train)

scaler_test = MinMaxScaler(feature_range=(0, 1))
scaled_test = scaler_test.fit_transform(df_test)




def dataset_generator_lstm(dataset, look_back=5):
    dataX, dataY = [], []
    
    for i in range(len(dataset) - look_back):
        window_size_x = dataset[i:(i + look_back), 0]
        dataX.append(window_size_x)
        dataY.append(dataset[i + look_back, 0]) # this is the label or actual y-value
    return np.array(dataX), np.array(dataY)

trainX, trainY = dataset_generator_lstm(scaled_train)

testX, testY = dataset_generator_lstm(scaled_test)

print("trainX: ", trainX.shape)
print("trainY: ", trainY.shape)
print("testY: ", testX.shape)
print("testY", testY.shape)

from numpy import array
data = array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0])
data = data.reshape((1, 10, 1))
print(data.shape)

model = Sequential()
model.add(LSTM(32, input_shape=(10, 1)))
model.add(Dense(1))
trainX = np.reshape(trainX, (trainX.shape[0], trainX.shape[1], 1))
testX = np.reshape(testX, (testX.shape[0], testX.shape[1], 1 ))
print("Shape of trainX: ", trainX.shape)
print("Shape of testX: ", testX.shape)

regressor = Sequential()
regressor.add(LSTM(units=128, activation='relu', return_sequences=True, input_shape=(trainX.shape[1], trainX.shape[2])))
regressor.add(Dropout(0.2))
regressor.add(LSTM(units=64, activation='relu', return_sequences=True))  # Change activation function
regressor.add(Dropout(0.2))
regressor.add(LSTM(units=32, activation='relu'))  # Add another LSTM layer
regressor.add(Dropout(0.2))
regressor.add(Dense(units=1))
regressor.summary()

from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
regressor.compile(optimizer = 'adam', loss = 'mean_squared_error')
checkpoint_path = 'my_best_model.hdf5'
checkpoint = ModelCheckpoint(filepath=checkpoint_path, 
                             monitor='val_loss',
                             verbose=1, 
                             save_best_only=True,
                             mode='min')


earlystopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
callbacks = [checkpoint, earlystopping]
history = regressor.fit(trainX, trainY, batch_size = 64, epochs = 50, verbose=1, shuffle=False, validation_data=(testX, testY), callbacks=callbacks)


from tensorflow.keras.models import load_model
model_from_saved_checkpoint = load_model(checkpoint_path)
plt.figure(figsize=(16,7))
plt.plot(history.history['loss'], label='train')

plt.plot(history.history['val_loss'], label='test')
plt.legend()
plt.show()

predicted_btc_price_test_data = model_from_saved_checkpoint.predict(testX)
predicted_btc_price_test_data = scaler_test.inverse_transform(predicted_btc_price_test_data.reshape(-1, 1))
test_actual = scaler_test.inverse_transform(testY.reshape(-1, 1))
plt.figure(figsize=(16,7))
plt.plot(predicted_btc_price_test_data, 'r', marker='.', label='Predicted Test')
plt.plot(test_actual, marker='.', label='Actual Test')
plt.legend()
plt.show()

predicted_btc_price_train_data = model_from_saved_checkpoint.predict(trainX)
predicted_btc_price_train_data = scaler_train.inverse_transform(predicted_btc_price_train_data.reshape(-1, 1))
train_actual = scaler_train.inverse_transform(trainY.reshape(-1, 1))
plt.figure(figsize=(16,7))
plt.plot(predicted_btc_price_train_data, 'r', marker='.', label='Predicted Train')
plt.plot(train_actual, marker='.', label='Actual Train')
plt.legend()
plt.show()

rmse_lstm_test = math.sqrt(mean_squared_error(test_actual, predicted_btc_price_test_data))
print('Test RMSE: %.3f' % rmse_lstm_test)
rmse_lstm_train = math.sqrt(mean_squared_error(train_actual, predicted_btc_price_train_data))
print('Train RMSE: %.3f' % rmse_lstm_train)
mae = mean_absolute_error(test_actual, predicted_btc_price_test_data)
print('MAE:', mae)
r2 = r2_score(test_actual, predicted_btc_price_test_data)
print('R-squared:', r2)

testX.shape

lookback_period=5
testX_last_5_days=testX[testX.shape[0] - lookback_period : ]
testX_last_5_days.shape

predicted_5_days_forecast_price_test_x=[]
for i in range(5):
  predicted_forecast_price_test_x = model_from_saved_checkpoint.predict(testX_last_5_days[i:i+1])
  predicted_forecast_price_test_x = scaler_test.inverse_transform(predicted_forecast_price_test_x.reshape(-1, 1))
  predicted_5_days_forecast_price_test_x.append(predicted_forecast_price_test_x)
    
print("Forecast for the next 5 Days Beyond the actual trading days ", np.array(predicted_5_days_forecast_price_test_x))


predicted_5_days_forecast_price_test_x = np.array(predicted_5_days_forecast_price_test_x)
predicted_5_days_forecast_price_test_x. shape

predicted_5_days_forecast_price_test_x=predicted_5_days_forecast_price_test_x.flatten()

predicted_btc_price_test_data=predicted_btc_price_test_data.flatten()

predicted_btc_test_concatenated=np.concatenate((predicted_btc_price_test_data, predicted_5_days_forecast_price_test_x))
predicted_btc_test_concatenated


plt.figure(figsize=(16, 7))
plt.plot(predicted_btc_test_concatenated, 'r', marker='.', label='Predicted Test and Forecast')
plt.plot(test_actual, marker='.', label='Actual Test')
plt.title('Bitcoin Price Prediction and Forecast')
plt.xlabel('Time (days)')
plt.ylabel('Price (USD)')
plt.legend()
plt.show()

predicted_returns = np.diff(predicted_btc_test_concatenated) / test_actual[1:]
actual_returns = np.diff(test_actual.flatten()) / test_actual[:-1].flatten()

# Calculate cumulative returns
cumulative_predicted_returns = np.cumsum(predicted_returns)
cumulative_actual_returns = np.cumsum(actual_returns)


roi = (cumulative_predicted_returns[-1] - cumulative_actual_returns[-1]) / cumulative_actual_returns[-1] * 100

print("Return on Investment (ROI): {:.2f}%".format(roi))


plt.figure(figsize=(16, 7))
plt.plot(cumulative_predicted_returns, 'r', marker='.', label='Cumulative Predicted Returns')
plt.plot(cumulative_actual_returns, marker='.', label='Cumulative Actual Returns')
plt.legend()
plt.title('Cumulative Returns')
plt.xlabel('Days')
plt.ylabel('Cumulative Returns')
plt.show()


predicted_returns = predicted_btc_price_test_data.flatten()
actual_returns = testY.flatten()

stop_loss = 0.02
take_profit = 0.02

equity_curve = [1.0]
current_position = 0  

for i in range(1, len(actual_returns)):
    if current_position == 0:  
        if predicted_returns[i] < -stop_loss:
            current_position = -1 
        elif predicted_returns[i] > take_profit:
            current_position = 1 
    elif current_position == 1: 
        if predicted_returns[i] < -stop_loss or predicted_returns[i] > take_profit:
            current_position = 0  
    elif current_position == -1:
        if predicted_returns[i] > take_profit or predicted_returns[i] < -stop_loss:
            current_position = 0  

    if current_position == 1:
        equity_curve.append(equity_curve[-1] * (1 + actual_returns[i]))
    elif current_position == -1:
        equity_curve.append(equity_curve[-1] * (1 - actual_returns[i]))

# Plotting the backtest results
plt.figure(figsize=(16, 7))
plt.plot(equity_curve, label='Equity Curve', color='green')
plt.title('Equity Curve - Backtest Results')
plt.xlabel('Time')
plt.ylabel('Equity Value')
plt.legend()
plt.show()



current_position = 0
equity_curve = [1.0]  
drawdowns = [0.0]  
highest_equity = 1.0
trailing_stop_loss = 0.0  # Initialize trailing stop-loss

for i in range(1, len(actual_returns)):
    if current_position == 0:  # No position
        if np.any(predicted_returns[i] < -trailing_stop_loss):
            current_position = -1  # Enter short position
            trailing_stop_loss = highest_equity * stop_loss  # Set trailing stop-loss
    elif current_position == 1:  # Long position
        if np.any(predicted_returns[i] < -stop_loss) or np.any(predicted_returns[i] > take_profit):
            current_position = 0  # Exit the position
    elif current_position == -1:  # Short position
        if np.any(predicted_returns[i] > take_profit) or np.any(predicted_returns[i] < -trailing_stop_loss):
            current_position = 0  # Exit the position

    if current_position == 1:
        equity_curve.append(equity_curve[-1] * (1 + actual_returns[i]))
    elif current_position == -1:
        equity_curve.append(equity_curve[-1] * (1 - actual_returns[i]))

    drawdown = (max(equity_curve) - equity_curve[-1]) / max(equity_curve)
    drawdowns.append(drawdown)

    highest_equity = max(highest_equity, equity_curve[-1])

# Check if maximum drawdown is within the specified threshold
max_drawdown_percentage = max(drawdowns)
if max_drawdown_percentage <= 0.07:  # 7% threshold
    print("Maximum drawdown within the specified threshold.")
else:
    print("Maximum drawdown exceeds the specified threshold.")

risk_percentage = np.max(np.abs(predicted_returns))
print(f"Risk Percentage: {risk_percentage * 100:.2f}%")
print(f"Max Drawdown Percentage: {max_drawdown_percentage * 100:.2f}%")

# Plotting
plt.figure(figsize=(16, 7))
plt.plot(equity_curve, label='Equity Curve', color='green')
plt.plot(drawdowns, label='Drawdowns', color='red', linestyle='--')
plt.title('Equity Curve and Drawdowns with Risk Management')
plt.xlabel('Time')
plt.ylabel('Values')
plt.legend()
plt.show()


def calculate_metrics(actual, predicted):
    actual_returns = np.diff(actual.flatten()) / actual[:-1].flatten()
    predicted_returns = np.diff(predicted.flatten()) / predicted[:-1].flatten()

    actual_equity = np.cumsum(actual_returns)
    predicted_equity = np.cumsum(predicted_returns)
    gross_profit = np.sum(np.maximum(predicted_returns, 0))
    gross_loss = -np.sum(np.minimum(predicted_returns, 0))
    net_profit = gross_profit - gross_loss
    total_closed_trades = np.sum(predicted_returns != 0)
    win_rate = np.sum(predicted_returns > 0) / total_closed_trades if total_closed_trades > 0 else 0
    max_drawdown = np.min(np.maximum.accumulate(predicted_equity) - predicted_equity)
    average_winning_trade = np.mean(predicted_returns[predicted_returns > 0])
    average_losing_trade = np.mean(predicted_returns[predicted_returns < 0])
    largest_losing_trade = np.min(predicted_returns)
    largest_winning_trade = np.max(predicted_returns)
    

    risk_free_rate = 0  # You can adjust this based on the risk-free rate
    daily_returns = np.diff(actual.flatten()) / actual[:-1].flatten()
    excess_returns = daily_returns - risk_free_rate / 252  # Assuming 252 trading days in a year
    annualized_volatility = np.std(daily_returns) * np.sqrt(252)
    sharpe_ratio = np.mean(excess_returns) / annualized_volatility if annualized_volatility != 0 else 0
    sortino_ratio = np.mean(excess_returns[excess_returns < 0]) / np.std(excess_returns[excess_returns < 0]) if np.std(excess_returns[excess_returns < 0]) != 0 else 0

    # Average Holding Duration per Trade
    holding_durations = np.diff(np.where(predicted_returns != 0)[0])
    average_holding_duration = np.mean(holding_durations) if holding_durations.size > 0 else 0

    running_dips = np.maximum.accumulate(predicted_equity) - predicted_equity
    max_dip = np.max(running_dips)
    average_dip = np.mean(running_dips)

    print(f'1. Gross Profit: {gross_profit:.2f} USDT')
    print(f'2. Net Profit: {net_profit:.2f} USDT')
    print(f'3. Total Closed Trades: {total_closed_trades}')
    print(f'4. Win Rate (Profitability %): {win_rate:.2%}')
    print(f'5. Max Drawdown: {max_drawdown:.2%}')
    print(f'6. Gross Loss: {gross_loss:.2f} USDT')
    print(f'7. Average Winning Trade (in USDT): {average_winning_trade:.2f} USDT')
    print(f'8. Average Losing Trade (in USDT): {average_losing_trade:.2f} USDT')
    print(f'9. Largest Losing Trade (in USDT): {largest_losing_trade:.2f} USDT')
    print(f'10. Largest Winning Trade (in USDT): {largest_winning_trade:.2f} USDT')
    print(f'11. Sharpe Ratio: {sharpe_ratio:.4f}')
    print(f'12. Sortino Ratio: {sortino_ratio:.4f}')
    print(f'13. Average Holding Duration per Trade: {average_holding_duration:.2f} days')
    print(f'14. Max Dip in Running Trade: {max_dip:.2f} USDT')
    print(f'15. Average Dip in Running Trade: {average_dip:.2f} USDT')

    # Plot Equity Graph
    plt.figure(figsize=(16, 7))
    plt.plot(actual_equity, label='Actual Equity', color='blue')
    plt.plot(predicted_equity, label='Predicted Equity', color='orange')
    plt.title('Actual vs Predicted Equity')
    plt.xlabel('Time')
    plt.ylabel('Equity')
    plt.legend()
    plt.show()

calculate_metrics(test_actual, predicted_btc_test_concatenated)




In [None]:
def calculate_metrics(actual, predicted):
    actual_returns = np.diff(actual.flatten()) / actual[:-1].flatten()
    predicted_returns = np.diff(predicted.flatten()) / predicted[:-1].flatten()

    # Final Portfolio Value
    final_portfolio_value = np.cumprod(1 + actual_returns)[-1]

    # Profit and Loss
    pnl = np.sum(predicted_returns)

    # Cumulative Returns
    cumulative_returns = np.cumsum(actual_returns)

    # Sharpe Ratio
    risk_free_rate = 0.0  # Assuming risk-free rate
    excess_returns = actual_returns - risk_free_rate / 252  # Assuming 252 trading days in a year
    annualized_volatility = np.std(actual_returns) * np.sqrt(252)
    sharpe_ratio = np.mean(excess_returns) / annualized_volatility if annualized_volatility != 0 else 0

    # Total Closed Trades
    total_closed_trades = np.sum(predicted_returns != 0)

    # Total Won Trades
    total_won_trades = np.sum(predicted_returns > 0)

    # Win%
    win_percentage = total_won_trades / total_closed_trades if total_closed_trades > 0 else 0

    # Max Drawdown
    max_drawdown = np.max(np.maximum.accumulate(cumulative_returns) - cumulative_returns)

    # Risk Reward Ratio
    risk_reward_ratio = -np.mean(predicted_returns[predicted_returns < 0]) / np.mean(predicted_returns[predicted_returns > 0]) \
        if np.mean(predicted_returns[predicted_returns > 0]) != 0 else 0

    # Average Winning Trade
    average_winning_trade = np.mean(predicted_returns[predicted_returns > 0])

    # Average Losing Trade
    average_losing_trade = np.mean(predicted_returns[predicted_returns < 0])

    # Largest Winning Trade
    largest_winning_trade = np.max(predicted_returns)

    # Largest Losing Trade
    largest_losing_trade = np.min(predicted_returns)

    print(f'Final Portfolio Value: {final_portfolio_value:.2f}')
    print(f'Profit and Loss: {pnl:.2f}')
    print(f'Cumulative Returns: {cumulative_returns[-1]:.2%}')
    print(f'Sharpe Ratio: {sharpe_ratio:.4f}')
    print(f'Total Closed Trades: {total_closed_trades}')
    print(f'Total Won Trades: {total_won_trades}')
    print(f'Win%: {win_percentage:.2%}')
    print(f'Max Drawdown: {max_drawdown:.2%}')
    print(f'Risk Reward Ratio: {risk_reward_ratio:.2f}')
    print(f'Average Winning Trade: {average_winning_trade:.2f}')
    print(f'Average Losing Trade: {average_losing_trade:.2f}')
    print(f'Largest Winning Trade: {largest_winning_trade:.2f}')
    print(f'Largest Losing Trade: {largest_losing_trade:.2f}')

# Example usage:
calculate_metrics(test_actual, predicted_btc_test_concatenated)
