# Backtest

In [1]:
from system.agent import Agent
from system.env import Env

import warnings
warnings.filterwarnings('ignore')

%load_ext autoreload
%autoreload 2


config = {"capital" : 100, "interval" : "1d",
          "symbols" : ["BTC", "ETH", "SOL", "DASH", "MATIC",
                       "TWT", "GALA", "EGLD", "XMR"],
          "start" : "2018", "end" : "2023"
          }

env = Env(**config)

agentIds = [1, 2, 3, 4]
env.config_agents(agentIds)

params_1 = {"agentId" : 1, "symbol" : "BTC", "allocation" : 100, "env" : env, "policy_name" : "MOMENTUM"}
params_2 = {"agentId" : 2, "symbol" : "BTC", "allocation" : 100, "env" : env, "policy_name":"TRIPLEMA"}
params_3 = {"agentId" : 3, "symbol" : "ETH", "allocation" : 100, "env" : env, "policy_name":"MOMENTUM"}
params_4 = {"agentId" : 4, "symbol" : "ETH", "allocation" : 100, "env" : env, "policy_name":"TRIPLEMA"}

In [2]:
agent_1 = Agent(**params_1)
agent_1.update_policy_params(params = 3)
agent_1.run_episode()

In [None]:

agent_2 = Agent(**params_2)
agent_2.update_policy_params(params = (7, 14, 21))
agent_2.run_episode()

agent_3 = Agent(**params_3)
agent_3.update_policy_params(params = 5)
agent_3.run_episode()

agent_4 = Agent(**params_4)
agent_4.update_policy_params(params = (7, 14, 21))
agent_4.run_episode()

In [4]:
import pandas as pd
import numpy as np
import scipy.stats


dd = env.trades[1]

class Metric:
    
    def __init__(self, tradesData):
        self.tradesData = tradesData
        self.nbTrades = 0
        self.winTrades = 0
        self.lossTrades = 0
        
        self.winRate = []
        
        self.amoungWin = []
        self.amoungLoss = []
        
        self.avgWin = []
        self.avgLoss = []
        
        self.expectancies = []
        
        self.profitFactor = []
        
    
    @staticmethod
    def calculate_product(r):
        return (1 + r).prod()
    
    @staticmethod
    def average(r):
        return 0 if len(r) == 0 else Metric.calculate_product(r) ** (1 / len(r))
    
    @staticmethod
    def calculate_kurtosis(r, n=8):
        return scipy.stats.kurtosis(r) if len(r) > n else 0

    @staticmethod
    def calculate_skewness(r, n=8):
        return scipy.stats.skew(r) if len(r) > n else 0
    
    @staticmethod
    def expectancy(winRate, avgWin, avgLoss):
        return winRate * avgWin + (1 - winRate) * avgLoss
    
    staticmethod
    def sharpe_ratio(r):
        mean = Metric.calculate_product(r)
        std = np.std(r)
        return mean / std
    
    
    def update(self):
        i = j = 0
        while True:
            j += 1
            data = self.tradesData.iloc[i : j+1]
            if self.tradesData.iloc[j]["status"] == "Close":
                i = j+1
                self.nbTrades += 1
                pnl = self.tradesData.iloc[j]["pnl"]
                
                if pnl > 0:
                    self.winTrades += 1
                    self.amoungWin.append(pnl)
                    self.amoungLoss.append(0)
                    
                else:
                    self.lossTrades += 1
                    self.amoungWin.append(0)
                    self.amoungLoss.append(pnl)
                    
                self.winRate.append(self.winTrades / self.nbTrades)
                
                self.avgWin.append(np.mean(self.amoungWin))
                self.avgLoss.append(np.mean(self.amoungLoss))
                
                self.expectancies.append(
                    Metric.expectancy(winRate = self.winRate[-1] , avgWin = self.avgWin[-1], avgLoss = self.avgLoss[-1])
                    )
                
                self.profitFactor.append(
                    np.sum(self.amoungWin) / np.sum(self.amoungLoss)*(-1)
                    )
                    
                yield data, pnl
    

    

tr = Metric(dd)
dt = tr.update()

In [None]:
tr.metrics()

In [6]:
tr.nbTrades

459

In [9]:
tr.winTrades

141

In [None]:
tr.profitFactor

In [None]:
tr.amoungLoss

In [None]:
tr.amoungWin

In [5]:
while True:
    try:
        print(next(dt)[1])
    except (StopIteration, IndexError):
        print("Stop")
        break


3.8206389636391265
-1.9774110549779493
1.420731260630376
3.922467421805891
-0.21903671533311808
-5.629862625040218
-0.135913150270369
17.652967101111116
-10.183004809226574
-4.846554862789247
-2.575653497904611
-14.112002230431187
0.08417936272802251
-1.1205508677613807
0.567429873056966
-2.563709098339473
-4.566906836623673
-0.4501743049684279
5.925534353746528
-1.326133090683399
-5.05278013278668
-3.7916181253882257
0.4056414155978558
1.1328110277117531
-0.6619360506315246
11.1481860030854
-3.540226485458689
-8.605750189278808
-1.239528164007936
-0.7619583449279617
-4.272524709984111
1.4902935343951356
-3.310456774957899
0.26178410337062985
6.679993092805304
4.455104378655463
-0.48493265037878075
14.889935904980469
-1.8472132650228872
-4.534946528878081
-1.5816431844099412
6.293785648877588
-6.302829501406961
2.865752448441029
-1.2388919890045997
-2.1132059953117732
-2.206765167341004
-3.0636725125393696
-0.09501468642451982
-0.2858079130715936
-4.056985463939029
-0.4873338357739385


In [None]:
tr.amoungWin

In [None]:
import pandas as pd

pd.DataFrame(dd)

In [3]:
env.get_report(agentId=1, symbol = "BTC")

## Global Report

In [None]:
env.globalReport()

#  MultiAgent System

In [None]:
from magent import MAgentThread
from master import  MasterAgentThread
from system.env import Env

from threading import Thread, Condition, Event, Barrier, Semaphore

import warnings
warnings.filterwarnings('ignore')

%load_ext autoreload
%autoreload 2


config = {"capital" : 100, "interval" : "1d",
          "symbols" : ["BTC", "ETH", "SOL", "DASH", "MATIC",
                       "TWT", "GALA", "EGLD", "XMR"],
          "start" : "2023", "end" : "2023"
          }
env = Env(**config)


def custom_action():
    print(" *** Barrier passee *** ")


condition = Condition()
barrier = Barrier(2, action=custom_action)

agentId_1 = 1
agentId_2 = 2

agent_msg = {agentId_1 : None, agentId_2 : None}
master_msg = {agentId_1 : None, agentId_2 : None}

activeAgent = 1

boss_params = {"condition" : condition, "activeAgent" : activeAgent ,  "agent_msg" : agent_msg, "master_msg" : master_msg}

params_1 = {"agentId" : agentId_1, "symbol" : "BTC", "allocation" : 100, "env" : env, "policy_name" : "MOMENTUM",
            "condition" : condition, "barrier" : barrier, "agent_msg" : agent_msg, "master_msg" : master_msg}

params_2 = {"agentId" : agentId_2, "symbol" : "BTC", "allocation" : 100, "env" : env, "policy_name":"TRIPLEMA",
            "condition" : condition, "barrier" : barrier, "agent_msg" : agent_msg, "master_msg" : master_msg}


Boss = MasterAgentThread(**boss_params)
Boss.addAgent(1)
Boss.addAgent(2)

agent_1 = MAgentThread(**params_1)
agent_1.update_policy_params(params = 3)

agent_2 = MAgentThread(**params_2)
agent_2.update_policy_params(params = (7, 14, 21))

In [None]:
Boss.start()
agent_1.start()
agent_2.start()


agent_1.join()
agent_2.join()
Boss.join()

In [None]:
dd = env.journal.trades_data
dd.head

In [None]:
import plotly.figure_factory as ff

dd1 = dd[dd["agentId"] == 1]
dd2 = dd[dd["agentId"] == 2]


# Group data together
hist_data = [dd1["pnl"], dd2["pnl"]]

group_labels = ['Agent 1', 'Agent 2']

# Create distplot with custom bin_size
fig = ff.create_distplot(hist_data, group_labels, bin_size=.2)
fig.show()

In [None]:
len(dd2)

In [None]:
len(dd1)

In [None]:
from plotly.figure_factory import create_distplot
import numpy as np
import pandas as pd

df = pd.DataFrame({'2012': np.random.randn(200), 
                   '2013': np.random.randn(200)+1}
                  )
fig = create_distplot([df[c] for c in df.columns], df.columns)
fig.show()

In [None]:
import plotly.figure_factory as ff
import numpy as np
np.random.seed(1)

x = np.random.randn(1000)
hist_data = [x]
group_labels = ['distplot'] # name of the dataset

fig = ff.create_distplot(hist_data, group_labels)
fig.show()

In [None]:
import plotly.figure_factory as ff
import numpy as np

# Add histogram data
x1 = np.random.randn(200) - 2
x2 = np.random.randn(200)
x3 = np.random.randn(200) + 2
x4 = np.random.randn(200) + 4

# Group data together
hist_data = [x1, x2, x3, x4]

group_labels = ['Group 1', 'Group 2', 'Group 3', 'Group 4']

# Create distplot with custom bin_size
fig = ff.create_distplot(hist_data, group_labels, bin_size=.2)
fig.show()

In [None]:
import plotly.figure_factory as ff
import numpy as np

x1 = np.random.randn(200)
x2 = np.random.randn(200) + 2

group_labels = ['Group 1', 'Group 2']

colors = ['slategray', 'magenta']

# Create distplot with curve_type set to 'normal'
fig = ff.create_distplot([x1, x2], group_labels, bin_size=.5,
                         curve_type='normal', # override default 'kde'
                         colors=colors)

# Add title
fig.update_layout(title_text='Distplot with Normal Distribution')
fig.show()

In [None]:
import plotly.figure_factory as ff
import numpy as np

x1 = np.random.randn(200) - 1
x2 = np.random.randn(200)
x3 = np.random.randn(200) + 1

hist_data = [x1, x2, x3]

group_labels = ['Group 1', 'Group 2', 'Group 3']
colors = ['#333F44', '#37AA9C', '#94F3E4']

# Create distplot with curve_type set to 'normal'
fig = ff.create_distplot(hist_data, group_labels, show_hist=False, colors=colors)

# Add title
fig.update_layout(title_text='Curve and Rug Plot')
fig.show()

In [None]:
import plotly.graph_objects as go

categories = ['processing cost','mechanical properties','chemical stability',
              'thermal stability', 'device integration']

fig = go.Figure()

fig.add_trace(go.Scatterpolar(
      r=[1, 5, 2, 2, 3],
      theta=categories,
      fill='toself',
      name='Product A'
))
fig.add_trace(go.Scatterpolar(
      r=[4, 3, 2.5, 1, 2],
      theta=categories,
      fill='toself',
      name='Product B'
))

fig.update_layout(
  polar=dict(
    radialaxis=dict(
      visible=True,
      range=[0, 5]
    )),
  showlegend=False
)

fig.show()

In [None]:
class MyGenerator:
    def __init__(self, n):
        self.n = n
        self.current = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.current < self.n:
            result = self.current
            self.current += 1
            return result
        else:
            raise StopIteration

# Créez une instance de la classe MyGenerator
dix_nombres = MyGenerator(10)

# Utilisez une boucle for pour obtenir les valeurs
for nombre in dix_nombres:
    print(nombre)


In [None]:
my_list = [1, 2, 3, 4, 5]
iterable = iter(my_list)

for x in iterable:
    print(x)
