# ライブラリのインポート

In [1]:
!pip install ta

Collecting ta
  Downloading ta-0.11.0.tar.gz (25 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: ta
  Building wheel for ta (setup.py) ... [?25l[?25hdone
  Created wheel for ta: filename=ta-0.11.0-py3-none-any.whl size=29412 sha256=2f2559d1059a969cf2617a3f4c66c76ee92a5ec8e5181568e7c6a14500270f32
  Stored in directory: /root/.cache/pip/wheels/5f/67/4f/8a9f252836e053e532c6587a3230bc72a4deb16b03a829610b
Successfully built ta
Installing collected packages: ta
Successfully installed ta-0.11.0


In [2]:
import time # 時間を扱うライブラリ
import copy # ファイルをコピーする際に使用するライブラリ
import pandas as pd  # 表データを扱うライブラリ
import numpy as np  # 数値計算用のライブラリ
import matplotlib.pyplot as plt # 可視化用のライブラリ
import seaborn as sns  #　綺麗に可視化することができるライブラリ
import warnings
warnings.filterwarnings('ignore')  # 警告を無視する


### Jupyter Notebookで、pandasの表示行数や表示列数を変更する

In [3]:
#現在の最大表示行数の出力
print(pd.get_option("display.max_rows"))

#最大表示行数の指定（ここでは　１００行を指定）
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 100)

60


### データの中身を確認

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# ここでファイルパスはGoogle Drive内の実際のCSVファイルの場所に合わせて変更してください。
file_path = '/content/drive/My Drive/trade_data/1d_btc_20211101-20231030.csv'

df = pd.read_csv(file_path)


In [None]:
df = df.reset_index(drop=True).rename(columns={"time":"datetime"})

In [None]:
df.head()

# バックテストを行う

In [None]:
class EasyBacktest:
    def __init__(self, df, order_lot=1, columns=None):
        self.df = df.copy()
        self.order_lot = order_lot
        self.columns = columns

        self.position = {"qty": 0, "avgEntry": 0}
        self.exec_history = []

    def market_order(self, order_size=None, next_open_price=None):
        if order_size is None:
            order_size = self.order_lot
        if next_open_price is None:
            next_open_price = self.next_open_price

        order = {
            "timestamp": self.current_timestamp,
            "size": order_size,
            "price": next_open_price
        }
        self.exec_history.append(order)
        self.position = self._change_position(order_lot=order_size)

    def _change_position(self, order_lot):
        current_qty = self.position["qty"]
        current_avg = self.position["avgEntry"]
        new_qty = current_qty + order_lot

        if new_qty == 0:
            new_avg = 0
        else:
            if current_qty * order_lot >= 0:
                new_avg = (current_qty * current_avg + order_lot * self.next_open_price) / (current_qty + order_lot)
            else:
                if current_qty >= 0:
                    if new_qty > 0:
                        new_avg = current_avg
                    else:
                        new_avg = self.next_open_price
                else:
                    if new_qty > 0:
                        new_avg = self.next_open_price
                    else:
                        new_avg = current_avg

        self.position["qty"] = new_qty
        self.position["avgEntry"] = new_avg
        return self.position

    def get_column(self, df):
        if self.columns is None:
            return df[["datetime", "close", "open"]]
        return df[["datetime", "close", "open"] + self.columns].reset_index(drop=True)

    def get_position(self):
        return self.position

    def action(self):
        pass

    def do_backtest(self):
        print(" ----   Do Backtest   ----")
        start = time.time()
        # df = self.get_column(self.df) # カラムを制限しても良い

        for i in range(1, len(self.df) - 1):
            self.i = i
            self.close_price = self.df["close"][i]
            self.next_open_price = self.df["open"][i + 1]
            self.current_timestamp = self.df["datetime"][i]

            # ここでaction()メソッドを呼び出して、トレードのロジックを実行する
            self.action()


        print(" ----   finish backtest   ----")
        elapsed_time = time.time() - start
        print(f" ----   elapsed time: {elapsed_time}   ---- ")
        return pd.DataFrame(self.exec_history, columns=["timestamp", "size", "price"])


### profit and lossのグラフを作成

In [None]:
def make_pl(df, comfee=None, initial=1):
    """
    df : pd.DataFrame
        size : float or int sellはマイナス　buyはプラス
        time : unixtime or datetime
        price : float or int
    initial :
        初期資金
    comfee ：
        手数料
    profit and loss を作成
    """

    print(" ----   Make PL Graph   ----")
    start = time.time()
    if len(df) < 2:
        return df, 0
    # それぞれの値をnumpyに変換
    size = df["size"].values
    pct_price = df["price"].pct_change().values

    # 計算用pl(空のarrayを作成)
    PLs = np.zeros(len(df))
    # 積み上げposition
    cumsum_position_size = np.cumsum(size)
    for i in range(1,len(df)):
        #　手数料がある場合
        if comfee:
            PLs[i] = pct_price[i]*cumsum_position_size[i - 1] - abs(comfee*cumsum_position_size[i - 1])
        else:
            PLs[i] = pct_price[i]*cumsum_position_size[i - 1]
    # 一回の損益
    df["PL"] = PLs
    # 念のためコピーして初期資金分を足しておく
    tmp_PLs = copy.copy(PLs)
    tmp_PLs[0] += initial
    # 累積損益
    PL_graph = np.cumsum(tmp_PLs)
    df["PL_graph"] = PL_graph
    # 勝率
    tmp = PLs > 0
    tmp = tmp.sum()
    win_rate = tmp/len(PLs)
    # 実現損益
    profit = PL_graph[-1]-initial
    # 平均損益
    avg_pl = profit/len(PLs)
    df["cumsum_position"] = cumsum_position_size
    print(f"取引回数:{len(PLs)} 実現損益:{profit} 勝率:{win_rate} 平均損益{avg_pl} ")
    elapsed_time = time.time() - start
    print(f" ----   elapsed time: {elapsed_time}   ---- ")
    return df, profit


In [None]:
# sanada_bot

from ta.trend import ADXIndicator, EMAIndicator
# pandasライブラリをpdという名前でインポート
import pandas as pd


# ここでファイルパスはGoogle Drive内の実際のCSVファイルの場所に合わせて変更してください。
fr_file_path = '/content/drive/My Drive/trade_data/20211101-20231030_bitcoin-futures-perpetual-funding-rate-all-exchanges.csv'

fr_df = pd.read_csv(fr_file_path)

# 戦略を定義するクラス
class Strategy(EasyBacktest):
    def __init__(self, df, fr_df, order_lot=1):
        # データフレームのコピーを作成
        df_copy = df.copy()
        # ADX指標をデータフレームに追加
        self.make_adx(df_copy)
        # EMA指標をデータフレームに追加
        self.make_ema(df_copy, 14)
        # EasyBacktestクラスの初期化メソッドを呼び出し
        super().__init__(df_copy, order_lot)
        self.fr_df = fr_df  # 資金調達率のデータフレーム

    @staticmethod
    def make_adx(df):
        # ADX計算のためのインスタンスを作成
        adxI = ADXIndicator(df['high'], df['low'], df['close'])
        # ADX値をデータフレームに追加
        df['adx'] = adxI.adx()

    @staticmethod
    def make_ema(df, span):
        # EMA計算のためのインスタンスを作成
        emaI = EMAIndicator(df['close'], window=span)
        # EMA値をデータフレームに追加
        df[f'ema{span}'] = emaI.ema_indicator()

    def action(self):
        # 現在のADX値を取得
        adx_value = self.df['adx'][self.i]
        # 現在の14日EMA値を取得
        ema14_value = self.df['ema14'][self.i]
        # 現在の終値を取得
        close_price = self.df['close'][self.i]
        # 現在の資金調達率を取得
        fr_value = self.fr_df.loc[self.i, 'value']  # fr_dfがメインdfと揃っていると仮定
        # 現在のポジション情報を取得
        position = self.get_position()

        # ポジションがない場合
        if position["qty"] == 0:
            # ADX値が20以下の場合
            if adx_value <= 20:
                # 終値が14日EMAより高く、資金調達率が0.01以下であれば買い注文
                if close_price > ema14_value and fr_value <= 0.0001:
                    self.market_order(order_size=self.order_lot)
                # 終値が14日EMAより低く、資金調達率が0.01以上であれば売り注文
                elif close_price < ema14_value and fr_value >= 0.0001:
                    self.market_order(order_size=-self.order_lot)

        # すでにロングポジションを持っている場合
        elif position["qty"] > 0:
            # 終値が14日EMAより低い、または資金調達率が0.01以上であれば売り注文
            if close_price < ema14_value or fr_value >= 0.0001:
                self.market_order(order_size=-self.order_lot)

        # すでにショートポジションを持っている場合
        elif position["qty"] < 0:
            # 終値が14日EMAより高い
            # 終値が14日EMAより高い、または資金調達率が-0.01以下であれば買い戻し注文
            if close_price > ema14_value or fr_value <= -0.0001:
                self.market_order(order_size=self.order_lot)

# 'df'がメインのデータフレーム、'fr_df'が資金調達率のデータフレームと仮定する
strategy = Strategy(df=df, fr_df=fr_df, order_lot=1)
# バックテストを実行
strategy.do_backtest()

# 損益計算関数を使って損益データを生成
pl, profit = make_pl(pd.DataFrame(strategy.exec_history), comfee=0.0024)

pl.to_csv('test.csv')

# トレード履歴をCSVファイルとして保存
trade_history_df = pd.DataFrame(strategy.exec_history)
trade_history_df.to_csv('/content/drive/My Drive/trade_data/trade_history.csv', index=False)

# 損益データが2行以上ある場合、グラフを描画
if len(pl) > 2:
    # タイムスタンプをpandasのdatetime形式に変換
    pl["timestamp"] = pd.to_datetime(pl["timestamp"])
    # 損益グラフを描画
    pl.plot(x="timestamp", y="PL_graph");
    # グラフを画像ファイルとして保存（オプション）
    plt.savefig('/content/drive/My Drive/trade_data/profit_loss_graph.png')

