# I Test and store async updater tasks

### 1.1 Testing single methods:

In [1]:
import Misso.services.helper as ph
import Misso.services.async_helper as ah

config = ph.parse_config_yaml("Misso/updater_config.yaml")
filter_config = config["class_attributes"]["watch_list_filter"]
aftx = ph.initialize_exchange_driver("HFT", init_async=True)
wl = await ah.get_filtered_watch_list(aftx, filter_config)
pos = await ah.get_open_positions(aftx)

### 1.2 Testing BaseUpdater Class with config.yaml

In [None]:
from Misso.updater.base_updater import BaseUpdater

base = BaseUpdater("Misso/updater_config.yaml")
base.run_in_thread()

# II ASYNC UPDATER TASKS

all t_xxxx_xxxx(self, task_config) methods should be add to class BaseMethods in base_methods.py,
All used methods are defined in updater_config.yaml. They will be executed via Threading in background.

#### 2.1 ASYNC FUNCTION: Task_initial_update

Task_initial_update should contain every api call (defined as subtask in updater_config) for initialization.
This async function is only called once before strategy initialization.
After fetching data and setting system relevant attributes variable 'self.first_update'
will be marked as True (used for blocking all other methods.
It is recommanded to define here a "watch_list", fetch price ticker, ohlc, etc. for every market in watch_list.

In [1]:
async def t_init_update(self, task_config):
    name = task_config["name"]
    for subtask, config in task_config["subtasks"].items():
        if config["type"] == "async":
            resp = await self.async_get_set(config)
        else:
            resp = self.get_set(config)
    self.first_update = True

example configuration for t_init_update with one subtask

        t_init_update:
            input_args: None
            name: init_update
            freq: None
            subtasks:
              subtask_1:
                type: async
                modul: Misso.services.async_helper
                method: get_filtered_watch_list
                input_args:
                  - self.aftx
                  - self.watch_list_filter
                  - self.wl_restricted
                target_attr: watch_list
                target_save_update: False
                return_value: True
              subtask_2:.......


example watch list configuration:

          watch_list_filter: {info.volumeUsd24h: {value: 2000000, type: greater_or_equal, value_type: float}}


#### 2.2 ASYNC FUNCTION: Task_update_template
Below you can see a template updater task. Task_update_template can be entirely defined via config.
For example:
    With following configuration we will get an update_task which fetches 1 minute OHLCV data (method: get_ohlcv_data()) for every market in watch_list every 60 seconds.
    The resulting data will be stored in variable 'self.ohlcs' as type dict self.ohlcs = {'XXX/USD':ohlcv_data, 'YYY/USD':ohlcv_data, ....}

          t_update_template:
            name: update_ohlcv
            freq: 60
            subtasks:
              subtask_0:
                type: async
                modul: Misso.services.async_helper
                method: get_ohlcv_data
                input_args:
                  - self.aftx
                  - self.watch_list
                  - 1m
                  - 1000
                target_attr: ohlcs
                target_save_update: False
                return_value: True

In [None]:
async def t_update_template(self, task_config):
    name = task_config["name"]
    print(f"starting update template {name}")
    while not self.first_update:
        await asyncio.sleep(5)
    while not self.stop_updates:
        try:
            for subtask, config in task_config["subtasks"].items():
                while self.restart_exchange:
                    await asyncio.sleep(2)
                if config["type"] == "async":
                    resp = await self.async_get_set(config)
                else:
                    resp = self.get_set(config)
        except Exception as e:
            print(f"ERROR restarting [{name}]", e)
            self.error_logger.error(e, exc_info=True)
            self.system_status[name] = False
        if self.stop_updates:
            break
        await asyncio.sleep[task_config["freq"]]
    await asyncio.sleep(0.1)

#### 2.3 ASYNC FUNCTION: t_update_tickers

In [16]:
async def t_update_tickers(self, freq, name="tickers"):
    while not self.stop_updates:
        try:
            while self.restart_exchange:
                await asyncio.sleep(5)
            tickers = await self.aftx.fetch_tickers(self.watch_list)
            for symbol, ticker in tickers.items():
                self.last_price[symbol] = ticker["last"]
            self.thread_logger.info(f"t_Modul {name} finished")
            self.first_update[name] = True
            self.system_status[name] = True
            await asyncio.sleep(freq)
        except Exception as e:
            print(f"ERROR restarting [{name}]", e)
            self.error_logger.error(e, exc_info=True)
            self.system_status[name] = False
            continue
        if self.stop_updates:
            break
    self.stop_updates = True
    await asyncio.sleep(1)
    return

{'t_init_attributes': {'async_helper_get_set': {'subtask_1': {'method': 'gather_current_ranges_exchange',
    'input_attr': 'self.aftx',
    'target_attr': 'wl_ranges',
    'return_value': True}}}}

#### 2.4 ASYNC FUNCTION: t_update_balance

In [None]:
async def t_update_balance(self, task_config):
    freq = task_config["freq"]
    name = task_config["name"]
    while not self.stop_updates:
        try:
            while self.restart_exchange:
                await asyncio.sleep(5)
            try:
                balance = await self.aftx.fetch_balance()
            except:
                try:
                    await asyncio.sleep(10)
                    while self.restart_exchange:
                        await asyncio.sleep(5)
                    balance = await self.aftx.fetch_balance()
                except:
                    self.error_logger.error("ERROR update_balance failed restarting thread")
                    self.thread_logger.info("ERROR update_balance failed restarting thread")
                    self.system_status[name] = True
                    continue
            self.total_capital = balance["total"]["USD"]
            self.free_capital = balance["free"]["USD"]
            if self.initial_capital == 0:
                self.initial_capital = self.total_capital
            self.system_status[name] = True
            await asyncio.sleep(freq)
        except Exception as e:
            print(f"ERROR restarting [{name}]", e)
            self.error_logger.error(e, exc_info=True)
            self.system_status[name] = False
            if self.stop_updates:
                break
            await asyncio.sleep(freq)
    self.stop_updates = True
    await asyncio.sleep(1)

#### 2.5 ASYNC FUNCTION: t_update_balance

In [None]:
    async def t_update_balance(self, task_config):
    freq = task_config["freq"]
    name = task_config["name"]
    while not self.stop_updates:
        try:
            while self.restart_exchange:
                await asyncio.sleep(5)
            try:
                balance = await self.aftx.fetch_balance()
            except:
                try:
                    await asyncio.sleep(10)
                    while self.restart_exchange:
                        await asyncio.sleep(5)
                    balance = await self.aftx.fetch_balance()
                except:
                    self.error_logger.error("ERROR update_balance failed restarting thread")
                    self.thread_logger.info("ERROR update_balance failed restarting thread")
                    self.system_status[name] = True
                    continue
            self.total_capital = balance["total"]["USD"]
            self.free_capital = balance["free"]["USD"]
            if self.initial_capital == 0:
                self.initial_capital = self.total_capital
            self.system_status[name] = True
            await asyncio.sleep(freq)
        except Exception as e:
            print(f"ERROR restarting [{name}]", e)
            self.error_logger.error(e, exc_info=True)
            self.system_status[name] = False
            if self.stop_updates:
                break
            await asyncio.sleep(freq)
    self.stop_updates = True
    await asyncio.sleep(1)

#### 2.6 ASYNC FUNCTION: t_execute_orders
This functions executes orders which are queued to 'self.pending_orders' by the main strategy and stores/appends order information as dict to 'self.open_orders':['XXX/USD':{id:...., 'size':....,'price':...,'side':....}]

In [None]:
#v0 (old)
async def t_execute_orders(self, task_config):
    freq = task_config["freq"]
    name = task_config["name"]
    while not self.stop_updates:
        try:
            while not self.pending_orders.empty():
                order = self.pending_orders.get()
                symbol = order[4]
                while self.restart_exchange:
                    await asyncio.sleep(1)
                order = await ah.create_limit_order_from_list(self.aftx, symbol, order)
                if order[0] != "failed":
                    self.open_orders[symbol].append(order)
                else:
                    self.thread_logger.info(f"[t_execute_orders] failed with order {order}")
                await asyncio.sleep(1)
                self.thread_logger.info(f"t_Modul {name} executed order {order}")
            self.thread_logger.info(f"t_Modul {name} finished")
            self.system_status[name] = True
            await asyncio.sleep(freq)
        except Exception as e:
            print(f"ERROR restarting [{name}]", e)
            self.error_logger.error(e, exc_info=True)
            self.system_status[name] = False
    self.stop_updates = True
    await asyncio.sleep(1)

##### 2.6.1 (new version) t_execute_orders:
new: after submitting order to exchange, Order dataclass is created and stored in self.open_orders[market]

In [None]:
#v1 new
from Misso.services.orders import Order

async def t_execute_orders(self, task_config):
    freq = task_config["freq"]
    name = task_config["name"]
    while not self.stop_updates:
        try:
            while not self.pending_orders.empty():
                order = self.pending_orders.get()
                symbol = order[4]
                while self.restart_exchange:
                    await asyncio.sleep(1)
                order = await ah.create_limit_order_from_list(self.aftx, symbol, order)
                if order[0] != "failed":
                    self.open_orders[symbol].append(Order(order))
                else:
                    self.thread_logger.info(f"[t_execute_orders] failed with order {order}")
                await asyncio.sleep(1)
                self.thread_logger.info(f"t_Modul {name} executed order {order}")
            self.thread_logger.info(f"t_Modul {name} finished")
            self.system_status[name] = True
            await asyncio.sleep(freq)
        except Exception as e:
            print(f"ERROR restarting [{name}]", e)
            self.error_logger.error(e, exc_info=True)
            self.system_status[name] = False
    self.stop_updates = True
    await asyncio.sleep(0.1)

#### 2.7 ASYNC FUNCTION: t_update_open_orders
Note: here order status updates are ONLY fetched for orders which were append to self.open_orders.
It would be also possible to fetch all currently open orders with another update_task

In [None]:
async def t_update_open_orders_dict(self, task_config):
    freq = task_config["freq"]
    name = task_config["name"]
    while not self.stop_updates:
        try:
            while self.restart_exchange:
                await asyncio.sleep(1)
            open_orders = self.open_orders.copy()
            if len(open_orders) > 0:
                for market, orders in open_orders.items():
                    if len(orders) > 0:
                        for order in orders:
                            if isinstance(order, list) or order["status"] == "open":
                                self.open_orders[market].remove(order)
                                id = order[0] if isinstance(order, list) else order["id"]
                                while self.restart_exchange:
                                    await asyncio.sleep(5)
                                order = await ah.get_order_by_id(self.aftx, id)
                                self.open_orders[market].append(order)
            self.thread_logger.info(f"t_Modul {name} finished")
            self.system_status[name] = True
            await asyncio.sleep(freq)
        except Exception as e:
            self.error_logger.info(f"ERROR {name} wit Error: {e}", exc_info=True)
            self.system_status[name] = False
    self.stop_updates = True
    await asyncio.sleep(1)

##### 2.7.1 (New Version) t_update_open_orders:
updates theoretical open_orders: list

In [None]:
async def t_update_open_orders(self, task_config):
    """update status and remove orders in self.open_orders"""
    name = task_config["name"]
    freq = task_config["freq"]
    print(f"starting {name}")
    while not self.stop_updates:
        open_orders = ph.find_orders_by_status(self.open_orders, "open")
        for order in open_orders:
            if not ph.order_in_is_open_orders(order, self.is_open_orders):
                while self.restart_exchange:
                    await asyncio.sleep(2)
                response = await ah.get_order_by_id(self.aftx, order.id)
                order.parse_update(response)
        #remove finished orders (closed/canceled/failed and processed)
        for order in ph.find_processed_orders(self.open_orders):
            self.open_orders[order.symbol].remove(order)
        self.system_status[name] = True
        if self.stop_updates:
            break
        await asyncio.sleep(freq)
    await asyncio.sleep(0.1)

#### 2.8 ASYNC FUNCTION: t_get_open_orders
fetch all currently open orders for a subaccount. Parse Open Orders list to dict is_open_orders={market_1:{buys:[], sells:[]}, market_2:{buys:[], sells:[]}}

In [None]:
async def t_get_open_orders(self, task_config):
    name = task_config["name"]
    freq = task_config["freq"]
    print(f"starting {name}")
    while not self.stop_updates:
        try:
            while self.restart_exchange:
                await asyncio.sleep(2)
            response = await self.aftx.fetch_open_orders
            is_open_orders = ph.parse_is_open_orders(response)
            with self.thread_lock:
                self.is_open_orders = is_open_orders
        except Exception as e:
            print(f"ERROR restarting [{name}]", e)
            self.error_logger.error(e, exc_info=True)
            self.system_status[name] = False
        if self.stop_updates:
            break
        await asyncio.sleep(freq)
    await asyncio.sleep(0.1)

##### 2.9 ASYNC FUNCTION: t_get_open_positions

In [None]:
async def t_get_open_positions(self, task_config):
    name = task_config["name"]
    freq = task_config["freq"]
    print(f"starting {name}")
    while not self.stop_updates:
        try:
            while self.restart_exchange:
                await asyncio.sleep(2)
            response = await ah.get_open_positions(self.aftx)
            with self.thread_lock:
                self.is_open_positions = response
        except Exception as e:
            print(f"ERROR restarting [{name}]", e)
            self.error_logger.error(e, exc_info=True)
            self.system_status[name] = False
        if self.stop_updates:
            break
        await asyncio.sleep(freq)
    await asyncio.sleep(0.1)

# 3 System relevant Tasks:

For mantaining a constant loop. There are several system relevant tasks needed:
- t_shutdown
- t_restart_exchange

This Tasks are used to shutdown all tasks and restart asyncio event loop or to restart ccxt exchange session in case of failure.

Moreover it is recommanded to use this tasks for checking system status per task every X seconds:
- t_system_status

In [None]:
async def t_shutdown(self, task_config):
    while True:
        if self.stop_updates:
            break
        await asyncio.sleep(5)
    asyncio.set_event_loop(self.loop)
    self.thread_logger.info(f"Restarting update tasks ... Canceling tasks")
    self.error_logger.error("shutting down update tasks")
    tasks = [t for t in asyncio.all_tasks(self.loop) if t is not asyncio.current_task()]
    [task.cancel() for task in tasks]

    self.thread_logger.info(f"Canceling {len(tasks)} tasks")
    await asyncio.gather(*tasks, return_exceptions=True)
    self.loop.stop()

async def t_restart_exchange(self, task_config):
    name = task_config["name"]
    freq = task_config["freq"]
    await asyncio.sleep(freq)
    while not self.stop_updates:
        try:
            self.restart_exchange = True
            await asyncio.sleep(10)
            await self.aftx.close()
            self.aftx = ph.initialize_exchange_driver(self.subaccount, init_async=True)
            self.restart_exchange = False
            self.thread_logger.info(f"t_Modul restart exchange finished")
            await asyncio.sleep(freq)
            self.system_status[name] = True
        except Exception as e:
            print(f"ERROR restarting [{name}]", e)
            self.error_logger.error(e, exc_info=True)
            self.system_status[name] = False
        i = 0
        while i < freq:
            await asyncio.sleep(5)
            i += 5
            if self.stop_updates:
                break
    return



async def t_system_status(self, task_config):
    freq = task_config["freq"]
    await asyncio.sleep(100)
    while not self.stop_updates:
        msg = f" Free Risk Capital {self.free_risk_margin} | Free Priority Capital {self.free_risk_margin_priority} | Total Capital: {self.total_capital}, #Open Units {self.open_units_counter}, total unit risk {self.total_unit_risk}, next_unit_value {self.next_unit_value}/max.:{self.unit_value_factors[0]*self.total_capital}"
        ph.telegram_notify_msg(msg, self.error_logger)
        if len(self.system_status) > 0:
            for key in self.system_status.keys():
                self.system_status[key] = None
            await asyncio.sleep(task_config["freq"])
            ph.evaluate_system_status(self.system_status.copy(), self.error_logger)
        if self.stop_updates:
            break
        await asyncio.sleep(freq)

# 4. Config

        class_attributes:
          subaccount: test_strategy
          interactor: None
          proc_name: None
          open_orders: {}
          system_status: {}
          interactions: []
          activate_interactor: True
          stop_updates: False
          restart_exchange: False
          positions: {}
          current_ranges: {}
          ohlcs: {}
          last_price: {}
          wl_ranges: None
          wl_restricted: ["BNB/USD:USD", "USDT/USD:USD"]
          watch_list_filter: {info.volumeUsd24h: {value: 2000000, type: greater_or_equal, value_type: float}}
          watch_list: None
          closed_trades: []
          total_capital: 0
          free_capital: 0
          leverage: 20
          initial_capital: 0
        shared_attributes:
          pending_orders:
            modul: queue
            function: Queue
        tasks:
          t_init_update:
            input_args: None
            name: init_update
            freq: None
            subtasks:
              subtask_0:
                type: async
                modul: Misso.services.async_helper
                method: get_filtered_watch_list
                input_args:
                  - self.aftx
                  - self.watch_list_filter
                  - self.wl_restricted
                target_attr: watch_list
                target_save_update: False
                return_value: True
              subtask_1:
                type: async
                modul: Misso.services.async_helper
                method: get_current_ranges
                input_args:
                  - self.aftx
                  - self.watch_list
                target_attr: wl_ranges
                target_save_update: False
                return_value: True
              subtask_2:
                type: standard
                modul: Misso.services.helper
                method: get_watch_list_from_wl_ranges
                input_args:
                  - self.wl_ranges
                  - self.wl_restricted
                target_attr: watch_list
                target_save_update: False
                return_value: True
              subtask_3:
                type: async
                modul: Misso.services.async_helper
                method: get_last_prices_from_watch_list
                input_args:
                  - self.aftx
                  - self.watch_list
                target_attr: last_price
                target_save_update: False
                return_value: True
              subtask_4:
                type: async
                modul: Misso.services.async_helper
                method: get_total_balance
                input_args:
                  - self.aftx
                target_attr: total_capital
                target_save_update: False
                return_value: True
              subtask_5:
                type: async
                modul: Misso.services.async_helper
                method: get_total_balance
                input_args:
                  - self.aftx
                target_attr: free_capital
                target_save_update: False
                return_value: True
          t_update_tickers:
            name: update_tickers
            freq: 10
            out: self.last_price
          t_update_open_orders_dict:
            name: update_open_orders
            freq: 30
          t_update_balance:
            name: update_balance
            freq: 30
          t_execute_orders:
            name: execute_orders
            freq: 5
          t_shutdown:
            name: shutdown_tasks
          t_system_status:
            name: check_update_task_status
            freq: 300
          t_restart_exchange:
            name: restart_exchange
            freq: 300
          t_update_template:
            name: update_ohlcv
            freq: 60
            subtasks:
              subtask_0:
                type: async
                modul: Misso.services.async_helper
                method: get_ohlcv_data
                input_args:
                  - self.aftx
                  - self.watch_list
                  - 1m
                  - 1000
                target_attr: ohlcs
                target_save_update: False
                return_value: True

In [17]:
from dataclasses import dataclass, field

@dataclass
class Order:
    id: str
    value: int
    status: bool = False
    size: float = field(init=False)

    def __post_init__(self):
        self.size = self.value*2

a = Order("1001", 23)
print(a)

Order(id='1001', value=23, status=False, size=46)


In [9]:
from dataclasses import dataclass

@dataclass
class Order:
    id: str
    value: int
    status: bool = False

a = Order("1001", 23)
b = Order("1002", 26)
c = Order("1003", 49)

orders = [a, b, c]
print(orders)
chg = []
for o in orders:
    if o.id == "1002":
        chg.append(o)

for o in chg:
    o.status = True
print(chg)
print(orders)

[Order(id='1001', value=23, status=False), Order(id='1002', value=26, status=False), Order(id='1003', value=49, status=False)]
[Order(id='1002', value=26, status=True)]
[Order(id='1001', value=23, status=False), Order(id='1002', value=26, status=True), Order(id='1003', value=49, status=False)]
