In [None]:
!pip install ta
!pip install pandas_ta

Collecting ta
  Downloading ta-0.11.0.tar.gz (25 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: ta
  Building wheel for ta (setup.py) ... [?25l[?25hdone
  Created wheel for ta: filename=ta-0.11.0-py3-none-any.whl size=29412 sha256=e78fb669df6afa08d52f6ea1bb6a35d394f833bbd756715d15cdd5a8bcf5c8ac
  Stored in directory: /root/.cache/pip/wheels/5f/67/4f/8a9f252836e053e532c6587a3230bc72a4deb16b03a829610b
Successfully built ta
Installing collected packages: ta
Successfully installed ta-0.11.0
Collecting pandas_ta
  Downloading pandas_ta-0.3.14b.tar.gz (115 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m115.1/115.1 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pandas_ta
  Building wheel for pandas_ta (setup.py) ... [?25l[?25hdone
  Created wheel for pandas_ta: filename=pandas_ta-0.3.14b0-py3-none-any.whl size=218909 sha25

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from ta.momentum import RSIIndicator
from ta.trend import MACD, EMAIndicator, ADXIndicator
from ta.volatility import AverageTrueRange
from ta.trend import PSARIndicator
from datetime import datetime

#on 1day dataset BTC
df = pd.read_csv('/content/BTC_2019_2023_1d.csv')


df['datetime'] = pd.to_datetime(df['datetime'], errors='coerce')
df = df.dropna(subset=['datetime'])
df.set_index('datetime', inplace=True)


required_columns = ['open', 'high', 'low', 'close', 'signals']
for column in required_columns:
    if column not in df.columns:
        if column == 'signals':

            df[column] = 0
        else:

            df[column] = np.nan
df['open'].fillna(method='ffill', inplace=True)
df['high'].fillna(df['close'], inplace=True)
df['low'].fillna(df['close'], inplace=True)
df['close'].fillna(method='ffill', inplace=True)

df['HA_close'] = (df['open'] + df['high'] + df['low'] + df['close']) / 4
df['HA_open'] = (df['open'].shift(1) + df['close'].shift(1)) / 2
df['HA_high'] = df[['high', 'HA_open', 'HA_close']].max(axis=1)
df['HA_low'] = df[['low', 'HA_open', 'HA_close']].min(axis=1)

df['RSI'] = RSIIndicator(close=df['HA_close'], window=14).rsi()
macd = MACD(close=df['HA_close'], window_slow=26, window_fast=12, window_sign=9)
df['MACD'] = macd.macd()
df['MACD_Signal'] = macd.macd_signal()
df['MACD_Hist'] = macd.macd_diff()
df['ema_10'] = EMAIndicator(close=df['HA_close'], window=10).ema_indicator()

df['ema_30'] = EMAIndicator(close=df['HA_close'], window=30).ema_indicator()
df['ema_30'] = EMAIndicator(close=df['HA_close'], window=30).ema_indicator()
df['ATR'] = AverageTrueRange(high=df['HA_high'], low=df['HA_low'], close=df['HA_close'], window=14).average_true_range()
adx = ADXIndicator(high=df['HA_high'], low=df['HA_low'], close=df['HA_close'], window=14)
df['ADX'] = adx.adx()
df['ADX_Pos'] = adx.adx_pos()
df['ADX_Neg'] = adx.adx_neg()
psar = PSARIndicator(high=df['HA_high'], low=df['HA_low'], close=df['HA_close'], step=0.02, max_step=0.2)
df['PSAR'] = psar.psar()


trend_filter = EMAIndicator(close=df['close'], window=200).ema_indicator()
df['Trend_Filter'] = trend_filter

rsi_buy_threshold = 35
rsi_sell_threshold = 65
adx_buy_threshold = 30
adx_sell_threshold = 25
trailing_stop_multiplier = 2
transaction_cost = 0.00
in_position = False
position_type = 0
entry_price = 0
highest_price = 0
lowest_price = float('inf')
net_profit = 0
balance = 10000
btc_amount = 0
trade_count = 0
winning_trades = 0
losing_trades = 0
max_drawdown_value = 0
peak_balance = balance
signal_history = [0] * len(df)
trade_type = [None] * len(df)
balance_history = []

for i, (index, row) in enumerate(df.iterrows()):
    current_price = row['close']
    rsi = row['RSI']
    macd = row['MACD']
    macd_signal = row['MACD_Signal']
    ema_10 = row['ema_10']
    ema_30 = row['ema_30']
    volume = row.get('volume', 0)
    adx = row['ADX']
    psar = row['PSAR']
    atr = row['ATR']
    trend_filter_value = row['Trend_Filter']

    volume_threshold = df['volume'].rolling(window=10).mean().iloc[i] if 'volume' in df.columns else 0
    buy_condition = (
        (rsi > rsi_buy_threshold) and
        (ema_10 > ema_30) and
        (macd > macd_signal) and
        (current_price > ema_10) and
        (adx > adx_buy_threshold) and
        (row['ADX_Pos'] > row['ADX_Neg']) and
        (current_price > psar) and
        (current_price > trend_filter_value) and
        (volume > volume_threshold)
    )

    if buy_condition and not in_position:
        entry_price = current_price
        btc_amount = (balance * (1 - transaction_cost)) / entry_price
        highest_price = entry_price
        in_position = True
        position_type = 1
        trade_count += 1
        signal_history[i] = 1
        trade_type[i] = 'long'
    stop_loss = entry_price - (atr * 2.5)
    take_profit = entry_price + (atr * 3)

    sell_condition = (
        (current_price <= psar) and
        (rsi < rsi_sell_threshold) and
        (macd < macd_signal) and
        (adx < adx_sell_threshold) and
        (current_price < ema_10)
    )

    if in_position and position_type == 1 and sell_condition:
        balance = btc_amount * current_price * (1 - transaction_cost)
        profit = (current_price - entry_price) * btc_amount
        net_profit += profit
        if profit > 0:
            winning_trades += 1
        else:
            losing_trades += 1
        in_position = False
        signal_history[i] = -1
        trade_type[i] = 'exit'
    if in_position:
        if current_price > highest_price:
            highest_price = current_price
            stop_loss = max(stop_loss, current_price - (atr * 3))

        if current_price < stop_loss or current_price > take_profit:
            balance = btc_amount * current_price * (1 - transaction_cost)
            profit = (current_price - entry_price) * btc_amount
            net_profit += profit
            if profit > 0:
                winning_trades += 1
            else:
                losing_trades += 1
            in_position = False
            signal_history[i] = -1
            trade_type[i] = 'exit'

    balance_history.append(balance)
    peak_balance = max(peak_balance, balance)
    drawdown = (peak_balance - balance) / peak_balance
    max_drawdown_value = max(max_drawdown_value, drawdown)

df['signals'] = signal_history
df['trade_type'] = trade_type
current_time = datetime.now().strftime("%Y-%m-%d")
output_filename = f'strategy1BTCSharpe20+.csv'
df.to_csv(output_filename, index=True)

print(f"File saved as {output_filename}")

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['open'].fillna(method='ffill', inplace=True)
  df['open'].fillna(method='ffill', inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['high'].fillna(df['close'], inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the i

File saved as strategy1BTCSharpe20+.csv


In [None]:
csv_file_path = "/content/strategy1BTCSharpe20+.csv"

In [None]:
!git clone https://github.com/ztuntrade/untrade-sdk.git
!pip install ./untrade-sdk/.

import uuid
import os
from untrade.client import Client

fatal: destination path 'untrade-sdk' already exists and is not an empty directory.
Processing ./untrade-sdk
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: untrade
  Building wheel for untrade (pyproject.toml) ... [?25l[?25hdone
  Created wheel for untrade: filename=untrade-0.1.0-py3-none-any.whl size=5082 sha256=31197b3357f64b30ee36b40c8baf39aa49051fa7e417fb9f79f49806376b6a57
  Stored in directory: /root/.cache/pip/wheels/c1/ce/9c/59addc63ec0e1b26a267abbcf20bcb13f36bc7f128da6e1cd6
Successfully built untrade
Installing collected packages: untrade
  Attempting uninstall: untrade
    Found existing installation: untrade 0.1.0
    Uninstalling untrade-0.1.0:
      Successfully uninstalled untrade-0.1.0
Successfully installed untrade-0.1.0


In [None]:
def perform_backtest(csv_file_path):
     client = Client()
     result = client.backtest(
         jupyter_id="test",
         file_path=csv_file_path,
         leverage=1,
     )
     return result


def perform_backtest_large_csv(csv_file_path):
    client = Client()
    file_id = str(uuid.uuid4())
    chunk_size = 90 * 1024 * 1024  # 90 MB
    total_size = os.path.getsize(csv_file_path)
    total_chunks = (total_size + chunk_size - 1) // chunk_size
    chunk_number = 0

    # Handle small files
    if total_size <= chunk_size:
        total_chunks = 1
        result = client.backtest(
            file_path=csv_file_path,
            leverage=1,
            jupyter_id="test",
        )
        for value in result:
            print(value)
        return result

    # Process large files in chunks
    with open(csv_file_path, "rb") as f:
        while True:
            chunk_data = f.read(chunk_size)
            if not chunk_data:
                break

            # Save each chunk temporarily in /tmp
            chunk_file_path = f"/tmp/{file_id}_chunk{chunk_number}.csv"
            with open(chunk_file_path, "wb") as chunk_file:
                chunk_file.write(chunk_data)

            # Perform backtest on the current chunk
            result = client.backtest(
                file_path=chunk_file_path,
                leverage=1,
                jupyter_id="test",
                file_id=file_id,
                chunk_number=chunk_number,
                total_chunks=total_chunks,
            )

            # Process the results of the backtest
            for value in result:
                print(value)

            # Remove the temporary chunk file
            os.remove(chunk_file_path)

            # Move to the next chunk
            chunk_number += 1

    return result

In [None]:
#change to perform_backtest_large_csv(csv_file_path) for large files
backtest_result = perform_backtest(csv_file_path)
print(backtest_result)
for value in backtest_result:
    print(value)

<generator object Client._handle_response_stream at 0x7c66a977f760>
data: {
  "jupyter_id": "test",
  "result_type": "Main",
  "message": "Backtest completed",
  "result": {
    "static_statistics": {
      "From": "2019-09-08 00:00:00",
      "Total Trades": 20,
      "Leverage Applied": 1.0,
      "Winning Trades": 17,
      "Losing Trades": 3,
      "No. of Long Trades": 20,
      "No. of Short Trades": 0,
      "Benchmark Return(%)": 325.632937,
      "Benchmark Return(on $1000)": 3256.329373,
      "Win Rate": 85.0,
      "Winning Streak": 12,
      "Losing Streak": 1,
      "Gross Profit": 2613.331694,
      "Net Profit": 2583.331694,
      "Average Profit": 129.166585,
      "Maximum Drawdown(%)": 3.05354,
      "Average Drawdown(%)": 0.26939,
      "Largest Win": 441.09161,
      "Average Win": 163.067248,
      "Largest Loss": -102.389742,
      "Average Loss": -62.937177,
      "Maximum Holding Time": "48 days 0:0:0",
      "Average Holding Time": "21 days 6:0:0",
      "Maxi