# Step 3: Establish a Connection to OANDA


In [40]:
import v20
import pandas as pd
import numpy as np
import json
from oandapyV20 import API
from oandapyV20.contrib.factories import InstrumentsCandlesFactory
api = v20.Context(
    hostname='api-fxpractice.oanda.com',
    port=443,
    ssl=True,
    token='f6e0e0226d7c8c6c885277f1603f38fb-7d09224ecbaaa0db1636036429cf781a',
    datetime_format='RFC3339'
)

access_token='f6e0e0226d7c8c6c885277f1603f38fb-7d09224ecbaaa0db1636036429cf781a'

# Step 4: Fetch Historical Data


In [54]:


# Step 4: Fetch Historical Data
def fetch_forex_data(from_date, to_date, granularity, instrument):
    client = API(access_token=access_token, environment="practice")
    params = {
        "granularity": granularity,
        "from": from_date,
        "to": to_date
    }
    data = []
    try:
        for request in InstrumentsCandlesFactory(instrument=instrument, params=params):
            response = client.request(request)
            if response:
                for candle in response.get('candles'):
                    rec = {
                        'time': candle.get('time')[0:19],
                        'complete': candle['complete'],
                        'open': float(candle['mid']['o']),
                        'high': float(candle['mid']['h']),
                        'low': float(candle['mid']['l']),
                        'close': float(candle['mid']['c']),
                        'volume': candle['volume'],
                    }
                    data.append(rec)
    except Exception as e:
        print(f"An error occurred fetching data: {e}")
    return pd.DataFrame(data)

prices = fetch_forex_data('2016-01-01T00:00:00Z', '2018-02-18T00:00:00Z', 'M15', 'EUR_USD')


# Step 5: Define and Test a Simple Trading Strategy


In [55]:
def simple_moving_average_strategy(df):
    df['SMA20'] = df['close'].rolling(window=20).mean()
    df['SMA50'] = df['close'].rolling(window=50).mean()
    df['signal'] = 0
    df.loc[50:, 'signal'] = np.where(df['SMA20'][50:] > df['SMA50'][50:], 1, 0)
    df['position'] = df['signal'].diff()
    return df

df = simple_moving_average_strategy(prices)


In [56]:
prices.to_csv('EURUSD_M15.csv', index=False)

# Create a chart for yolo

In [51]:
prices.head

<bound method NDFrame.head of                       time  complete     open     high      low    close  \
0      2016-01-03T22:00:00      True  1.08743  1.08746  1.08674  1.08681   
1      2016-01-03T22:15:00      True  1.08671  1.08717  1.08667  1.08686   
2      2016-01-03T22:30:00      True  1.08689  1.08720  1.08682  1.08682   
3      2016-01-03T22:45:00      True  1.08685  1.08692  1.08667  1.08670   
4      2016-01-03T23:00:00      True  1.08665  1.08670  1.08530  1.08582   
...                    ...       ...      ...      ...      ...      ...   
52881  2018-02-16T20:45:00      True  1.24085  1.24114  1.24046  1.24058   
52882  2018-02-16T21:00:00      True  1.24056  1.24086  1.24044  1.24080   
52883  2018-02-16T21:15:00      True  1.24080  1.24116  1.24076  1.24089   
52884  2018-02-16T21:30:00      True  1.24089  1.24123  1.24084  1.24102   
52885  2018-02-16T21:45:00      True  1.24105  1.24126  1.24076  1.24097   

       volume     SMA20     SMA50  signal  position  
0  

# Generate all the images using the moving window approach

In [None]:
import os
import pandas as pd
import mplfinance as mpf

# Load the dataset
# prices = pd.read_csv('EURUSD_M15.csv')

# Ensure the 'time' column is in datetime format
prices['time'] = pd.to_datetime(prices['time'])

# Set the 'time' column as the index
prices.set_index('time', inplace=True)

# Define the window size and step size
window_size = 1000  # Number of ticks in each window
step_size = 500     # Number of ticks to move the window

# Create directories based on forex pair, timeframe, and year
pair = 'EUR_USD'
timeframe = 'M15'
base_dir = f'{pair}/{timeframe}'

# Create base directory if it does not exist
os.makedirs(base_dir, exist_ok=True)

def plot_candlestick_chart(df, filename):
    # Ensure the date formatting is similar to the sample
    df.index = pd.to_datetime(df.index)
    df.index.name = 'Date'

    # Define the style of the chart
    mc = mpf.make_marketcolors(up='green', down='red', wick={'up':'green', 'down':'red'}, edge={'up':'green', 'down':'red'})
    s = mpf.make_mpf_style(marketcolors=mc, gridstyle='--')

    # Add moving averages if they exist
    addplots = []
    if 'SMA20' in df.columns:
        addplots.append(mpf.make_addplot(df['SMA20'], color='blue'))
    if 'SMA50' in df.columns:
        addplots.append(mpf.make_addplot(df['SMA50'], color='orange'))
    
    # Plot the chart
    mpf.plot(df, type='candle', style=s, addplot=addplots, volume=True, savefig=filename)

# Function to create moving window chart images
def create_moving_window_charts(prices, window_size, step_size, base_dir):
    start_index = 0
    end_index = window_size
    chart_count = 0

    while start_index < len(prices):
        # Get the window data
        window_data = prices.iloc[start_index:end_index].copy()
        
        # Create the directory structure based on the year
        year = window_data.index[0].year
        year_dir = os.path.join(base_dir, str(year))
        os.makedirs(year_dir, exist_ok=True)

        # Create the chart image filename
        start_date = window_data.index[0].strftime('%Y-%m-%d_%H-%M')
        end_date = window_data.index[-1].strftime('%Y-%m-%d_%H-%M')
        filename = os.path.join(year_dir, f'{start_date}_to_{end_date}.png')

        # Plot and save the chart image
        plot_candlestick_chart(window_data, filename)
        
        # Print the created chart image path
        print(f'Created chart image: {filename}')
        
        # Move the window
        start_index += step_size
        end_index = start_index + window_size

# Create and analyze moving window charts
create_moving_window_charts(prices, window_size, step_size, base_dir)


# Run YOLO Model on Generated Images and Save Detected Patterns

In [62]:
import os
import pandas as pd
import cv2
from ultralyticsplus import YOLO
from datetime import datetime, timedelta

# Load the YOLO model
model_path = 'foduucom/stockmarket-pattern-detection-yolov8'
model = YOLO(model_path)

# Define the base directories for the generated images and the target directory for detected patterns
base_dir = 'EUR_USD/M15'
target_base_dir = 'yolo_pattern/EUR_USD/M15'

# Create the target base directory if it does not exist
os.makedirs(target_base_dir, exist_ok=True)

# DataFrame to store prediction details
predictions_df = pd.DataFrame(columns=[
    'image_path', 'pattern', 'confidence', 'end_date', 'end_price', 'start_date', 'start_price', 'signal', 'tp', 'sl', 'validation', 'profit'
])

def detect_patterns_in_image(image_path):
    results = model.predict(source=image_path)
    return results

def create_target_directory_structure(base_dir, sub_path):
    target_dir = os.path.join(base_dir, sub_path)
    os.makedirs(target_dir, exist_ok=True)
    return target_dir

def save_annotated_image(image_path, results, target_image_path):
    image = cv2.imread(image_path)
    for detection in results[0].boxes:
        x1, y1, x2, y2 = map(int, detection.xyxy[0])
        confidence = detection.conf[0]
        label = detection.cls[0].item()
        pattern = model.names[label]
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(image, f'{pattern} ({confidence:.2f})', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
    cv2.imwrite(target_image_path, image)

def get_date_and_price_from_coords(df, idx):
    if idx < 0:
        idx = 0
    elif idx >= len(df):
        idx = len(df) - 1
    date = df.index[idx]
    price = df.iloc[idx]['close']
    return date, price

def determine_signal(pattern):
    if pattern in ['Head and shoulders top', 'M_Head']:
        return 'sell'
    elif pattern in ['Head and shoulders bottom', 'W_Bottom']:
        return 'buy'
    return 'neutral'

def calculate_tp_sl(signal, price):
    if signal == 'buy':
        tp = price * 1.02
        sl = price * 0.98
    elif signal == 'sell':
        tp = price * 0.98
        sl = price * 1.02
    else:
        tp, sl = None, None
    return tp, sl

def validate_signal(df, start_idx, signal, tp, sl):
    filtered_df = df.iloc[start_idx:]
    for index, row in filtered_df.iterrows():
        price = row['close']
        if signal == 'buy':
            if price >= tp:
                return 1, tp - price
            elif price <= sl:
                return 0, price - sl
        elif signal == 'sell':
            if price <= tp:
                return 1, price - tp
            elif price >= sl:
                return 0, sl - price
    return 0, 0

def process_images_and_save_patterns(base_dir, target_base_dir, predictions_df, prices):
    for root, dirs, files in os.walk(base_dir):
        for file in files:
            if file.endswith('.png'):
                image_path = os.path.join(root, file)
                sub_path = os.path.relpath(root, base_dir)

                # Run the YOLO model on the image
                results = detect_patterns_in_image(image_path)

                # Check if any patterns were detected
                if results[0].boxes:
                    # Create the corresponding target directory
                    target_dir = create_target_directory_structure(target_base_dir, sub_path)
                    
                    # Save the image with detected patterns to the target directory
                    target_image_path = os.path.join(target_dir, file)
                    save_annotated_image(image_path, results, target_image_path)
                    
                    # Extract and save prediction data
                    for detection in results[0].boxes:
                        x1, y1, x2, y2 = map(int, detection.xyxy[0])
                        confidence = detection.conf[0]
                        label = detection.cls[0].item()
                        pattern = model.names[label]
                        
                        # Get start and end dates and prices
                        start_idx = int(x1 / 640 * len(prices))
                        end_idx = int(x2 / 640 * len(prices))
                        start_date, start_price = get_date_and_price_from_coords(prices, start_idx)
                        end_date, end_price = get_date_and_price_from_coords(prices, end_idx)

                        # Determine signal
                        signal = determine_signal(pattern)

                        # Calculate TP and SL
                        tp, sl = calculate_tp_sl(signal, end_price)

                        # Validate signal and calculate profit/loss
                        validation, profit = validate_signal(prices, end_idx, signal, tp, sl)

                        # Add data to the DataFrame
                        new_row = pd.DataFrame([{
                            'image_path': target_image_path,
                            'pattern': pattern,
                            'confidence': confidence,
                            'end_date': end_date,
                            'end_price': end_price,
                            'start_date': start_date,
                            'start_price': start_price,
                            'signal': signal,
                            'tp': tp,
                            'sl': sl,
                            'validation': validation,
                            'profit': profit
                        }])
                        predictions_df = pd.concat([predictions_df, new_row], ignore_index=True)

                    print(f'Detected pattern in image: {image_path}, saved to {target_image_path}')

    return predictions_df

# Process the images and save detected patterns and related data
predictions_df = process_images_and_save_patterns(base_dir, target_base_dir, predictions_df, prices)

# Save the predictions DataFrame to a CSV file
predictions_df.to_csv('yolo_pattern/predictions.csv', index=False)



image 1/1 d:\OneDrive\myproject\stockmarket-pattern-detection-yolov8\EUR_USD\M15\2016\2016-01-03_22-00_to_2016-01-18_07-45.png: 480x640 2 Head and shoulders bottoms, 325.8ms
Speed: 6.0ms preprocess, 325.8ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)
Detected pattern in image: EUR_USD/M15\2016\2016-01-03_22-00_to_2016-01-18_07-45.png, saved to yolo_pattern/EUR_USD/M15\2016\2016-01-03_22-00_to_2016-01-18_07-45.png



  predictions_df = pd.concat([predictions_df, new_row], ignore_index=True)


image 1/1 d:\OneDrive\myproject\stockmarket-pattern-detection-yolov8\EUR_USD\M15\2016\2016-01-11_03-00_to_2016-01-25_12-45.png: 480x640 4 Head and shoulders bottoms, 2 Head and shoulders tops, 243.6ms
Speed: 4.0ms preprocess, 243.6ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)
Detected pattern in image: EUR_USD/M15\2016\2016-01-11_03-00_to_2016-01-25_12-45.png, saved to yolo_pattern/EUR_USD/M15\2016\2016-01-11_03-00_to_2016-01-25_12-45.png

image 1/1 d:\OneDrive\myproject\stockmarket-pattern-detection-yolov8\EUR_USD\M15\2016\2016-01-18_08-00_to_2016-02-01_17-45.png: 480x640 2 Head and shoulders bottoms, 4 Head and shoulders tops, 219.1ms
Speed: 4.0ms preprocess, 219.1ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)
Detected pattern in image: EUR_USD/M15\2016\2016-01-18_08-00_to_2016-02-01_17-45.png, saved to yolo_pattern/EUR_USD/M15\2016\2016-01-18_08-00_to_2016-02-01_17-45.png

image 1/1 d:\OneDrive\myproject\stockmarket-pattern-detection-yolov8

# Step 6: Automate Trading


In [6]:
def execute_trades(df, symbol):
    last_row = df.iloc[-1]
    try:
        if last_row['position'] == 1:
            print("Signal to BUY!")
            api.order.market(accountID=account_id, instrument=symbol, units=100)
        elif last_row['position'] == -1:
            print("Signal to SELL!")
            api.order.market(accountID=account_id, instrument=symbol, units=-100)
    except Exception as e:
        print(f"Failed to execute trade: {e}")

execute_trades(df, 'EUR_USD')

# Step 7: Implement Risk Management


In [7]:
def place_order_with_risk_management(symbol, units, sl_distance, tp_distance):
    try:
        api.order.market(
            accountID=account_id,
            instrument=symbol,
            units=units,
            stopLossOnFill={
                'distance': str(sl_distance)
            },
            takeProfitOnFill={
                'distance': str(tp_distance)
            }
        )
    except Exception as e:
        print(f"Failed to place order with risk management: {e}")


## Backtesting

# Step 1: Setup Backtesting Environment


In [8]:
# Import necessary modules
import backtrader as bt
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

# Define the trading strategy

class SmaCross(bt.Strategy):
    params = (('sma1', 20), ('sma2', 50), ('rsi_period', 14), ('macd1', 12), ('macd2', 26), ('macdsig', 9), ('stddev_mult', 2))

    def __init__(self):
        # Moving Averages
        sma1 = bt.ind.SMA(period=self.params.sma1)
        sma2 = bt.ind.SMA(period=self.params.sma2)
        self.crossover = bt.ind.CrossOver(sma1, sma2)

        # RSI
        self.rsi = bt.ind.RSI(period=self.params.rsi_period)

        # MACD
        self.macd = bt.ind.MACD(period_me1=self.params.macd1, period_me2=self.params.macd2, period_signal=self.params.macdsig)

        # Bollinger Bands
        self.boll = bt.ind.BollingerBands(period=self.params.sma2, devfactor=self.params.stddev_mult)

    def next(self):
        if not self.position:
            if self.crossover > 0 and self.rsi < 30 and self.macd > 0:
                self.buy()
        elif self.crossover < 0 and self.rsi > 70 and self.macd < 0:
            self.sell()
# Load your data
data = pd.read_csv('EURUSD_M15.csv', parse_dates=True, index_col='time')

# Create a Backtrader data feed
data_feed = bt.feeds.PandasData(dataname=data)

# Set up the backtest
cerebro = bt.Cerebro()
cerebro.adddata(data_feed)
cerebro.addstrategy(SmaCross)
cerebro.addsizer(bt.sizers.FixedSize, stake=1000)
cerebro.broker.set_cash(10000)  # Set initial cash

# Add analyzers for the performance metrics
cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')

# Updated backtest setup to include slippage and commission
cerebro.broker.set_slippage_fixed(0.0002)
cerebro.broker.setcommission(commission=0.0001)
# Run the backtest
results = cerebro.run()
strat = results[0]

# Print the analytics
print('Sharpe Ratio:', strat.analyzers.sharpe.get_analysis()['sharperatio'])
print('Drawdown:', strat.analyzers.drawdown.get_analysis()['max']['drawdown'])
print('Returns:', strat.analyzers.returns.get_analysis()['rtot'])

# Extract and analyze metrics
win_rate, total_pip_profit, total_pip_loss = calculate_metrics(strat.trades)
print(f"Win Rate: {win_rate:.2f}%")
print(f"Total Pip Profit: {total_pip_profit:.2f}")
print(f"Total Pip Loss: {total_pip_loss:.2f}")

# Plot the results
cerebro.plot(style='candlestick')
plt.savefig('backtest_results.png')  # Saves the current figure to a file named 'backtest_results.png'
plt.close()
from IPython.display import Image

# Display the image file in the Jupyter Notebook output cell
Image(filename='backtest_results.png')

KeyboardInterrupt: 