In [19]:
from eventhandler import EventHandler
from datetime import datetime

from pprint import pprint

from core.position_manager import Position

class Strategy:

    def __init__(self):
        self.event_handler = EventHandler('newSignal')
        self.event_handler.link(self.__on_new_signal, 'newSignal')

    def __on_new_signal(self, signal):
        print(f"__on_new_signal fired")

    def emulate_signal(self, signal):
        self.event_handler.fire('newSignal', signal)


In [20]:
class PositionManager:

    def __init__(self, strategy: Strategy):
        self.__positions = []  # Stores all positions
        self.event_handler = EventHandler('onOpenPosition', 'onClosePosition')
        self.event_handler.link(self.__on_open_position, 'onOpenPosition')
        self.event_handler.link(self.__on_close_position, 'onOpenPosition')


        strategy.event_handler.link(self.open, 'newSignal')

    # This callback will be called when onOpenPosition event happens
    def __on_open_position(self, position):
        print("PositionManager.__on_open_position fired!\n")
        print(f"Position Opened:\n"
              f"{position}")
        self.__positions.append(position)

    # This callback will be called when onClosePosition event happens
    def __on_close_position(self, position):
        print("PositionManager.__on_close_position fired!\n")
        print(f"Position Closed:\n"
              f"{position}")

    # Now let's define the public methods of the PositionManager to be used outside the class
    def open(self, direction):
        position = {
            "direction": direction,
            "open_timestamp": datetime.utcnow(),
            "status": "open"
        }
        print(f"Opening {direction} position:\n")
        pprint(position)

        self.event_handler.fire('onOpenPosition', position)

    def close(self, position):
        print(f"Closing Position:\n"
              f"{position}")
        position['status'] = "closed"
        position['close_timestamp'] = datetime.utcnow()

        self.event_handler.fire('onClosePosition', position)


In [21]:
my_strategy = Strategy()  # Must init with ()

pprint(dir(my_strategy))

['_Strategy__on_new_signal',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 'emulate_signal',
 'event_handler']


In [27]:

my_pm = PositionManager(my_strategy)
my_strategy.emulate_signal('LONG')

__on_new_signal fired
Opening LONG position:

{'direction': 'LONG',
 'open_timestamp': datetime.datetime(2022, 3, 26, 20, 19, 4, 597546),
 'status': 'open'}
PositionManager.__on_open_position fired!

Position Opened:
{'direction': 'LONG', 'open_timestamp': datetime.datetime(2022, 3, 26, 20, 19, 4, 597546), 'status': 'open'}
PositionManager.__on_close_position fired!

Position Closed:
{'direction': 'LONG', 'open_timestamp': datetime.datetime(2022, 3, 26, 20, 19, 4, 597546), 'status': 'open'}
Opening LONG position:

{'direction': 'LONG',
 'open_timestamp': datetime.datetime(2022, 3, 26, 20, 19, 4, 597940),
 'status': 'open'}
PositionManager.__on_open_position fired!

Position Opened:
{'direction': 'LONG', 'open_timestamp': datetime.datetime(2022, 3, 26, 20, 19, 4, 597940), 'status': 'open'}
PositionManager.__on_close_position fired!

Position Closed:
{'direction': 'LONG', 'open_timestamp': datetime.datetime(2022, 3, 26, 20, 19, 4, 597940), 'status': 'open'}
Opening LONG position:

{'dire

In [23]:
class Notifier:

    def __init__(self, pm: PositionManager):

        # Subscribe to external ChatRoom class events
        pm.event_handler.link(self.send_telegram_notification, 'onOpenPosition')
        pm.event_handler.link(self.send_telegram_notification, 'onClosePosition')

    # When chatroom fires the onNewUser event our bot will saludate will link this method.
    def send_telegram_notification(self, position):
        print("Sending Telegram Notification...\n"
              "Position Info:\n")
        pprint(position)


In [26]:
my_strategy.emulate_signal('LONG')

__on_new_signal fired
Opening LONG position:

{'direction': 'LONG',
 'open_timestamp': datetime.datetime(2022, 3, 26, 20, 6, 57, 736645),
 'status': 'open'}
PositionManager.__on_open_position fired!

Position Opened:
{'direction': 'LONG', 'open_timestamp': datetime.datetime(2022, 3, 26, 20, 6, 57, 736645), 'status': 'open'}
PositionManager.__on_close_position fired!

Position Closed:
{'direction': 'LONG', 'open_timestamp': datetime.datetime(2022, 3, 26, 20, 6, 57, 736645), 'status': 'open'}
Opening LONG position:

{'direction': 'LONG',
 'open_timestamp': datetime.datetime(2022, 3, 26, 20, 6, 57, 737165),
 'status': 'open'}
PositionManager.__on_open_position fired!

Position Opened:
{'direction': 'LONG', 'open_timestamp': datetime.datetime(2022, 3, 26, 20, 6, 57, 737165), 'status': 'open'}
PositionManager.__on_close_position fired!

Position Closed:
{'direction': 'LONG', 'open_timestamp': datetime.datetime(2022, 3, 26, 20, 6, 57, 737165), 'status': 'open'}
