Skip to content

Commit

Permalink
[MarkPriceUpdate] Add mark price from exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
Herklos committed Mar 16, 2020
1 parent 13f82b5 commit 5f05663
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 14 deletions.
26 changes: 18 additions & 8 deletions octobot_trading/exchanges/types/future_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
class FutureExchange(RestExchange):
# Mark price params
MARK_PRICE_IN_POSITION = False
MARK_PRICE_IN_TICKER = False

# Funding rate params
FUNDING_WITH_MARK_PRICE = False
FUNDING_IN_TICKER = False

"""
CCXT future library wrapper
Expand All @@ -33,21 +35,29 @@ async def get_symbol_open_positions(self, symbol: str) -> dict:
async def get_open_positions(self) -> dict:
raise NotImplementedError("get_open_positions is not implemented")

async def get_symbol_positions_history(self, symbol: str):
raise NotImplementedError("get_symbol_positions_history is not implemented")

async def get_positions_history(self):
raise NotImplementedError("get_positions_history is not implemented")

async def get_symbol_leverage(self, symbol: str):
raise NotImplementedError("get_symbol_leverage is not implemented")

async def get_mark_price(self, symbol: str, limit: int = 1):
async def get_mark_price(self, symbol: str) -> dict:
raise NotImplementedError("get_mark_price is not implemented")

async def get_funding_rate(self, symbol: str, limit: int = 1):
async def get_mark_price_history(self, symbol: str, limit: int = 1) -> list:
raise NotImplementedError("get_mark_price_history is not implemented")

async def get_funding_rate(self, symbol: str) -> dict:
raise NotImplementedError("get_funding_rate is not implemented")

async def get_funding_rate_history(self, symbol: str, limit: int = 1) -> list:
raise NotImplementedError("get_funding_rate_history is not implemented")

async def get_mark_price_and_funding(self, symbol: str) -> tuple:
"""
Returns the exchange mark_price and funding rate when they can be requested together
:param symbol: the pair to request
:return: mark_price, funding
"""
raise NotImplementedError("get_funding_and_mark_price is not implemented")

async def set_symbol_leverage(self, symbol: str, leverage: int):
raise NotImplementedError("set_symbol_leverage is not implemented")

Expand Down
5 changes: 3 additions & 2 deletions octobot_trading/producers/funding_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ async def fetch_symbol_funding_rate(self, symbol: str):
return None

def _should_run(self) -> bool:
if not self.channel.exchange_manager.is_margin:
if not self.channel.exchange_manager.is_future:
return False
else:
return not self.channel.exchange_manager.exchange.FUNDING_WITH_MARK_PRICE
return not self.channel.exchange_manager.exchange.FUNDING_WITH_MARK_PRICE and \
not self.channel.exchange_manager.exchange.FUNDING_IN_TICKER

async def resume(self) -> None:
if not self._should_run():
Expand Down
89 changes: 85 additions & 4 deletions octobot_trading/producers/prices_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,57 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
import asyncio

from ccxt import NotSupported

from octobot_trading.channels.exchange_channel import get_chan
from octobot_trading.channels.price import MarkPriceProducer
from octobot_trading.constants import MARK_PRICE_CHANNEL, RECENT_TRADES_CHANNEL, TICKER_CHANNEL
from octobot_trading.constants import MARK_PRICE_CHANNEL, RECENT_TRADES_CHANNEL, TICKER_CHANNEL, FUNDING_CHANNEL
from octobot_trading.data_manager.prices_manager import PricesManager
from octobot_trading.enums import ExchangeConstantsTickersColumns, ExchangeConstantsOrderColumns
from octobot_trading.enums import ExchangeConstantsTickersColumns, ExchangeConstantsOrderColumns, \
ExchangeConstantsFundingColumns


class MarkPriceUpdater(MarkPriceProducer):
CHANNEL_NAME = MARK_PRICE_CHANNEL

MARK_PRICE_REFRESH_TIME = 7

def __init__(self, channel):
super().__init__(channel)
self.recent_trades_consumer = None
self.ticker_consumer = None

async def start(self):
await get_chan(RECENT_TRADES_CHANNEL, self.channel.exchange_manager.id)\
if not self.channel.exchange_manager.is_future:
await self.subscribe()
elif self._should_run():
await self.start_fetching()

async def subscribe(self):
self.recent_trades_consumer = await get_chan(RECENT_TRADES_CHANNEL, self.channel.exchange_manager.id) \
.new_consumer(self.handle_recent_trades_update)
await get_chan(TICKER_CHANNEL, self.channel.exchange_manager.id).new_consumer(self.handle_ticker_update)
self.ticker_consumer = await get_chan(TICKER_CHANNEL, self.channel.exchange_manager.id) \
.new_consumer(self.handle_ticker_update)

async def unsubscribe(self):
if self.recent_trades_consumer:
await get_chan(RECENT_TRADES_CHANNEL, self.channel.exchange_manager.id) \
.remove_consumer(self.recent_trades_consumer)
if self.ticker_consumer:
await get_chan(TICKER_CHANNEL, self.channel.exchange_manager.id) \
.remove_consumer(self.ticker_consumer)

async def resume(self) -> None:
await super().resume()
if not self.is_running:
await self.run()

async def pause(self) -> None:
await super().pause()
if not self.channel.exchange_manager.is_future:
await self.unsubscribe()

"""
Recent trades channel consumer callback
Expand All @@ -51,3 +88,47 @@ async def handle_ticker_update(self, exchange: str, exchange_id: str, symbol: st
await self.push(symbol, ticker[ExchangeConstantsTickersColumns.CLOSE.value])
except Exception as e:
self.logger.exception(e, True, f"Fail to handle ticker update : {e}")

"""
Mark price updater from exchange data
"""

async def start_fetching(self):
while not self.should_stop and not self.channel.is_paused:
try:
for pair in self.channel.exchange_manager.exchange_config.traded_symbol_pairs:
await self.fetch_market_price(pair)
except (NotSupported, NotImplementedError):
self.logger.warning(f"{self.channel.exchange_manager.exchange_name} is not supporting updates")
await self.pause()
finally:
await asyncio.sleep(self.MARK_PRICE_REFRESH_TIME)

async def fetch_market_price(self, symbol: str):
try:
if self.channel.exchange_manager.exchange.FUNDING_WITH_MARK_PRICE:
mark_price, funding_rate = await self.channel.exchange_manager.exchange. \
get_mark_price_and_funding(symbol)
await self.push_funding_rate(symbol, funding_rate)
else:
mark_price = await self.channel.exchange_manager.exchange.get_mark_price(symbol)

if mark_price:
await self.push(symbol, mark_price)
except (NotSupported, NotImplementedError) as ne:
raise ne
except Exception as e:
self.logger.exception(e, True, f"Fail to update funding rate : {e}")
return None

async def push_funding_rate(self, symbol, funding_rate):
if funding_rate:
await get_chan(FUNDING_CHANNEL, self.channel.exchange_manager.id).get_internal_producer(). \
push(symbol=symbol,
funding_rate=funding_rate[ExchangeConstantsFundingColumns.FUNDING_RATE.value],
next_funding_time=funding_rate[ExchangeConstantsFundingColumns.NEXT_FUNDING_TIME.value],
timestamp=funding_rate[ExchangeConstantsFundingColumns.TIMESTAMP.value])

def _should_run(self) -> bool:
return self.channel.exchange_manager.exchange.FUNDING_WITH_MARK_PRICE and \
not self.channel.exchange_manager.exchange.MARK_PRICE_IN_TICKER

0 comments on commit 5f05663

Please sign in to comment.