In [1]:
import pandas as pd
import numpy as np
import requests
from bs4 import BeautifulSoup
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import xgboost as xgb
import matplotlib.pyplot as plt

def calculate_sma(df, period, column="close"):
    return df[column].rolling(window=period).mean()

def calculate_ema(df, period, column="close"):
    return df[column].ewm(span=period, adjust=False).mean()

def calculate_atr(df, period=14):
    high_low = df['high'] - df['low']
    high_close = np.abs(df['high'] - df['close'].shift())
    low_close = np.abs(df['low'] - df['close'].shift())
    true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
    atr = true_range.rolling(window=period).mean()
    return atr

def calculate_bbands(df, period=20):
    sma = calculate_sma(df, period)
    stddev = df['close'].rolling(window=period).std()
    upper_band = sma + (2 * stddev)
    lower_band = sma - (2 * stddev)
    return pd.DataFrame({"BB_MIDDLE": sma, "BB_UPPER": upper_band, "BB_LOWER": lower_band})

def calculate_macd(df, fast_period=12, slow_period=26, signal_period=9):
    fast_ema = calculate_ema(df, fast_period)
    slow_ema = calculate_ema(df, slow_period)
    macd = fast_ema - slow_ema
    signal_line = macd.ewm(span=signal_period, adjust=False).mean()
    return pd.DataFrame({"MACD": macd, "SIGNAL_LINE": signal_line})

def calculate_rsi(df, period=14, column="close"):
    delta = df[column].diff(1)
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    avg_gain = gain.rolling(window=period).mean()
    avg_loss = loss.rolling(window=period).mean()
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi


# Load the dataset
signals_df = pd.read_csv("EURUSD.csv")

# Preprocessing the data
signals_df = signals_df[signals_df['volume'] != 0]
signals_df.reset_index(drop=True, inplace=True)

longest_MA_window = 200
signals_df["9EMA"] = calculate_ema(signals_df, 9).fillna(method='bfill')
signals_df["20EMA"] = calculate_ema(signals_df, 20).fillna(method='bfill')
signals_df["50EMA"] = calculate_ema(signals_df, 50).fillna(method='bfill')
signals_df["200SMA"] = calculate_sma(signals_df, longest_MA_window).fillna(method='bfill')

# Setup Indicators
signals_df["ATR"] = calculate_atr(signals_df).fillna(method='bfill')
bbands_df = calculate_bbands(signals_df)
macd_df = calculate_macd(signals_df)
signals_df["RSI"] = calculate_rsi(signals_df).fillna(method='bfill')

# Merge technical indicators
bbands_df = pd.concat([bbands_df, macd_df], axis=1)
signals_df = pd.concat([signals_df, bbands_df], axis=1)
signals_df.drop(columns="SIGNAL_LINE", inplace=True)
# Create Bollinger Bands signals
signals_df["Bollinger_Bands_Below_Lower_BB"] = np.where(signals_df["close"] < signals_df["BB_LOWER"], 1, 0)
signals_df["Bollinger_Bands_Above_Upper_BB"] = np.where(signals_df["close"] > signals_df["BB_UPPER"], 1, 0)

# Generate the 9EMA/20EMA crossover signals
signals_df['9EMA_above_20EMA'] = np.where(signals_df['9EMA'] > signals_df['20EMA'], 1, 0)
signals_df['9EMA_cross_20EMA'] = signals_df['9EMA_above_20EMA'].diff().fillna(0)

# Generate the 50EMA/200SMA crossover signals
signals_df['50EMA_above_200SMA'] = np.where(signals_df['50EMA'] > signals_df['200SMA'], 1, 0)
signals_df['50EMA_cross_200SMA'] = signals_df['50EMA_above_200SMA'].diff().fillna(0)

# --------------- Integrating Support and Resistance Signals with Engulfing/Star Patterns ---


# Support and Resistance Functions
def support(df, l, n1, n2):
    for i in range(l - n1 + 1, l + 1):
        if df['low'][i] > df['low'][i - 1]:  #If all lows in the past are higher and those in the future are lower, it's a support level.
            return 0
    for i in range(l + 1, l + n2 + 1):
        if df['low'][i] < df['low'][i - 1]:  
            return 0
    return 1

def resistance(df, l, n1, n2):
    for i in range(l - n1 + 1, l + 1):
        if df['high'][i] < df['high'][i - 1]:
            return 0
    for i in range(l + 1, l + n2 + 1):
        if df['high'][i] > df['high'][i - 1]:
            return 0
    return 1

# Engulfing & Star Pattern Functions
def isEngulfing(row, open_, close_):
    bodydiffmin = 0.002
    bodydiff = abs(open_[row] - close_[row])
    prev_bodydiff = abs(open_[row - 1] - close_[row - 1])
    
    if bodydiff < 0.000001:
        bodydiff = 0.000001

    if bodydiff > bodydiffmin and prev_bodydiff > bodydiffmin and open_[row - 1] < close_[row - 1] and open_[row] > close_[row]:
        return 1  # Bearish Engulfing
    elif bodydiff > bodydiffmin and prev_bodydiff > bodydiffmin and open_[row - 1] > close_[row - 1] and open_[row] < close_[row]:
        return 2  # Bullish Engulfing
    return 0

def isStar(l, open_, close_, high, low):
    bodydiffmin = 0.0020
    bodydiff = abs(open_[l] - close_[l])
    highdiff = high[l] - max(open_[l], close_[l])
    lowdiff = min(open_[l], close_[l]) - low[l]
    
    if bodydiff < 0.000001:
        bodydiff = 0.000001
    
    ratio1 = highdiff / bodydiff
    ratio2 = lowdiff / bodydiff
    
    if ratio1 > 1 and lowdiff < 0.2 * highdiff and bodydiff > bodydiffmin:
        return 1  # Bearish Star
    elif ratio2 > 1 and highdiff < 0.2 * lowdiff and bodydiff > bodydiffmin:
        return 2  # Bullish Star
    return 0

# Signal Generation Function for Support/Resistance and Patterns
def generate_support_resistance_signals(df, n1=2, n2=2, backCandles=30, lim=150e-5):
    signal = [0] * len(df)
    open_ = df['open']
    close_ = df['close']
    high = df['high']
    low = df['low']
    
    for row in range(backCandles, len(df) - n2):
        ss = []  # Support levels
        rr = []  # Resistance levels
        for subrow in range(row - backCandles + n1, row + 1):
            if support(df, subrow, n1, n2):
                ss.append(low[subrow])
            if resistance(df, subrow, n1, n2):
                rr.append(high[subrow])

        # Generate signals based on Engulfing and Star patterns
        engulfing_pattern = isEngulfing(row, open_, close_)
        star_pattern = isStar(row, open_, close_, high, low)
        
        if ((engulfing_pattern == 1 or star_pattern == 1) and closeResistance(df, row, rr, lim)):
            signal[row] = 1  # Sell Signal
        elif ((engulfing_pattern == 2 or star_pattern == 2) and closeSupport(df, row, ss, lim)):
            signal[row] = 2  # Buy Signal
        else:
            signal[row] = 0  # No Signal
    
    df['Support_Resistance_Signal'] = signal
    return df

def closeResistance(df, l, levels, lim):
    if len(levels) == 0:
        return 0
    return abs(df['high'][l] - min(levels, key=lambda x: abs(x - df['high'][l]))) <= lim

def closeSupport(df, l, levels, lim):
    if len(levels) == 0:
        return 0
    return abs(df['low'][l] - min(levels, key=lambda x: abs(x - df['low'][l]))) <= lim

# Generate Support and Resistance Signals
signals_df = generate_support_resistance_signals(signals_df)

# Combine with existing signals
# Create a new combined signal column
signals_df['Combined_Signal'] = signals_df[['9EMA_cross_20EMA', '50EMA_cross_200SMA', 'Support_Resistance_Signal']].max(axis=1)

# Proceed with the rest of the logic (Exit signals, news data integration, model training, etc.)

# Initialize "Exit" and "Exit Price" columns
signals_df["Exit Price"] = np.nan
signals_df["Exit"] = np.nan

# Exit signal (target labels for ML)
num_rows_in_df = signals_df.shape[0]
reward = 3
risk = 1

# Loop through and calculate exit price and signals
for j in range(longest_MA_window, num_rows_in_df):
    entry = signals_df["close"].iloc[j]
    atr = signals_df["ATR"].iloc[j]
    stop = entry - (risk * atr)
    target = entry + (reward * atr)
    for k in range(j + 1, num_rows_in_df):
        curr_low = signals_df["low"].iloc[k]
        curr_high = signals_df["high"].iloc[k]
        if curr_low <= stop:
            signals_df.at[j, "Exit Price"] = stop
            signals_df.at[j, "Exit"] = -1
            break
        elif curr_high >= target:
            signals_df.at[j, "Exit Price"] = target
            signals_df.at[j, "Exit"] = 1
            break

# Drop rows that contain NaN values (from earlier calculations)
signals_df = signals_df.dropna(subset=["Exit"])

# Map Exit values from [-1, 1] to [0, 1]
# Map Exit values from [-1, 1, 2] to [0, 1, 1]
signals_df["Exit"] = signals_df["Exit"].map({-1: 0, 1: 1, 2: 1})

# Selecting features and target, adding Support/Resistance and Patterns to feature set
X = signals_df[['9EMA', '20EMA', '50EMA', '200SMA', 'ATR', 'RSI', 'BB_UPPER', 'BB_MIDDLE', 'BB_LOWER', 'MACD','Bollinger_Bands_Below_Lower_BB','Bollinger_Bands_Above_Upper_BB','9EMA_above_20EMA','9EMA_cross_20EMA','50EMA_above_200SMA','50EMA_cross_200SMA']]
y = signals_df['Exit']

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# XGBoost Model
xgb_model = xgb.XGBClassifier()
xgb_model.fit(X_train, y_train)

# Predictions
y_train_pred = xgb_model.predict(X_train)
y_test_pred = xgb_model.predict(X_test)

train_accuracy = accuracy_score(y_train, y_train_pred)
test_accuracy = accuracy_score(y_test, y_test_pred)

print(f'Training Accuracy: {train_accuracy * 100:.2f}%')
print(f'Test Accuracy: {test_accuracy * 100:.2f}%')

# 1. Classification reports for training and test sets
print("Classification Report for Training Data:")
print(classification_report(y_train, y_train_pred))

print("\nClassification Report for Test Data:")
print(classification_report(y_test, y_test_pred))

import os
import joblib

# Define model directory and ensure it exists
model_dir = os.path.join(os.getcwd(), "model")
os.makedirs(model_dir, exist_ok=True)

# Save the trained model
model_path = os.path.join(model_dir, "xgb_model.pkl")
joblib.dump(xgb_model, model_path)

print(f"Model saved at {model_path}")







  signals_df["9EMA"] = calculate_ema(signals_df, 9).fillna(method='bfill')
  signals_df["20EMA"] = calculate_ema(signals_df, 20).fillna(method='bfill')
  signals_df["50EMA"] = calculate_ema(signals_df, 50).fillna(method='bfill')
  signals_df["200SMA"] = calculate_sma(signals_df, longest_MA_window).fillna(method='bfill')
  signals_df["ATR"] = calculate_atr(signals_df).fillna(method='bfill')
  signals_df["RSI"] = calculate_rsi(signals_df).fillna(method='bfill')


Training Accuracy: 87.45%
Test Accuracy: 81.71%
Classification Report for Training Data:
              precision    recall  f1-score   support

           0       0.86      0.99      0.92     16962
           1       0.93      0.56      0.70      5929

    accuracy                           0.87     22891
   macro avg       0.90      0.77      0.81     22891
weighted avg       0.88      0.87      0.86     22891


Classification Report for Test Data:
              precision    recall  f1-score   support

           0       0.82      0.97      0.89      4313
           1       0.77      0.36      0.49      1410

    accuracy                           0.82      5723
   macro avg       0.80      0.66      0.69      5723
weighted avg       0.81      0.82      0.79      5723

Model saved at c:\Users\Ali\Documents\Trading BOt\model\xgb_model.pkl
