Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/injective chain streams #6760

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f1d8268
(feat) Refactored Injective v2 connectors to start using the chain st…
Oct 4, 2023
470db0d
(fix) Improve Injective v2 connectors to mark orders as failed when t…
Oct 4, 2023
a14df02
(fix) Fix failing unit test
Oct 4, 2023
98f1916
(fix) Update injective dependency
Oct 10, 2023
6ed38f7
(feat) Refactored Injective v2 connectors to start using the chain st…
Oct 4, 2023
8ca884f
(fix) Improve Injective v2 connectors to mark orders as failed when t…
Oct 4, 2023
5e4d3ce
(fix) Fix failing unit test
Oct 4, 2023
ece89c9
(fix) Update injective dependency
Oct 10, 2023
f3a776f
Merge branch 'feat/injective_chain_streams' of https://github.com/aar…
Oct 12, 2023
5b9934c
(fix) Update Injective SDK version
Oct 12, 2023
a5bb817
(fix) Changed grpcio version requirement
Oct 12, 2023
596f99e
(fix) Update injective SDK version
Oct 12, 2023
089cae6
(fix) Update Injective Python SDK version number
Oct 13, 2023
9517182
(fix) Updated version of Injective Python SDK
Oct 13, 2023
4e8c607
(fix) Added log for spot order updates chain stream events
Oct 13, 2023
504bdf8
(fix) Added extra log line to show the derivative order update events
Oct 13, 2023
1dce783
Merge branch 'development' of https://github.com/aarmoa/hummingbot in…
Oct 25, 2023
a7ace5e
(feat) Refactored Injective V2 connectors to use the new parameter CI…
Oct 26, 2023
8d50275
Merge branch 'development' of https://github.com/aarmoa/hummingbot in…
Nov 7, 2023
135e9eb
(fix) Fixed Injective V2 failing tests due to timeout. Updated Inject…
Nov 8, 2023
5dfe974
(feat) Added HBOT as prefix to all Injective V2 client order ids
Nov 13, 2023
adeadbc
(fix) Updated Injective V2 connectors to use the order hash (exchange…
Nov 15, 2023
da13a0f
(fix) Changed the trade_id used for trades requested throught the Inj…
Nov 17, 2023
805f03b
(feat) Refactored the logic to load markets and tokens in Injective c…
Nov 23, 2023
74eb2aa
Merge branch 'development' of https://github.com/aarmoa/hummingbot in…
Nov 23, 2023
a88c090
Merge branch 'development' of https://github.com/aarmoa/hummingbot in…
Nov 28, 2023
70ae4d9
(feat) Updated Injective V2 datasource to use the new tradeId include…
Nov 28, 2023
add1369
Merge branch 'development' of https://github.com/aarmoa/hummingbot in…
Dec 8, 2023
407f82c
(feat) Changed spot and derivative trades endpoint for Injective V2 c…
Dec 8, 2023
53656bf
Merge branch 'development' of https://github.com/aarmoa/hummingbot in…
Jan 8, 2024
60673a2
(fix) Solved issue in processing order book updates from chain stream…
Jan 9, 2024
04cf29f
(fix) Fix to ensure order updates for orders in pending state do not …
Jan 9, 2024
b8e43c1
Merge branch 'development' into feat/injective_chain_streams
nikspz Jan 10, 2024
165096f
Merge branch 'development' into feat/injective_chain_streams
rapcmia Jan 12, 2024
8db15d6
(fix) Solve async event loop management issues in test_mqtt.py. Remov…
Jan 12, 2024
0a21b4a
Merge branch 'development' of https://github.com/aarmoa/hummingbot in…
Jan 12, 2024
593e813
Merge branch 'feat/injective_chain_streams' of https://github.com/aar…
Jan 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/hummingbot.py
Expand Up @@ -15,6 +15,7 @@
load_client_config_map_from_file,
write_config_to_yml,
)
from hummingbot.client.config.security import Security
from hummingbot.client.hummingbot_application import HummingbotApplication
from hummingbot.client.settings import AllConnectorSettings
from hummingbot.client.ui import login_prompt
Expand Down Expand Up @@ -53,6 +54,7 @@ async def ui_start_handler(self):


async def main_async(client_config_map: ClientConfigAdapter):
await Security.wait_til_decryption_done()
await create_yml_files_legacy()

# This init_logging() call is important, to skip over the missing config warnings.
Expand Down
10 changes: 10 additions & 0 deletions hummingbot/client/config/security.py
@@ -1,4 +1,5 @@
import asyncio
import logging
from pathlib import Path
from typing import Dict, Optional

Expand All @@ -16,6 +17,7 @@
)
from hummingbot.core.utils.async_call_scheduler import AsyncCallScheduler
from hummingbot.core.utils.async_utils import safe_ensure_future
from hummingbot.logger import HummingbotLogger


class Security:
Expand All @@ -24,6 +26,14 @@ class Security:
_secure_configs = {}
_decryption_done = asyncio.Event()

_logger: Optional[HummingbotLogger] = None

@classmethod
def logger(cls) -> HummingbotLogger:
if cls._logger is None:
cls._logger = logging.getLogger(__name__)
return cls._logger

@staticmethod
def new_password_required() -> bool:
return not PASSWORD_VERIFICATION_PATH.exists()
Expand Down
5 changes: 3 additions & 2 deletions hummingbot/connector/client_order_tracker.py
Expand Up @@ -390,8 +390,9 @@ def _trigger_failure_event(self, order: InFlightOrder):
)

def _trigger_order_creation(self, tracked_order: InFlightOrder, previous_state: OrderState, new_state: OrderState):
if previous_state == OrderState.PENDING_CREATE and new_state not in [OrderState.CANCELED, OrderState.FAILED,
OrderState.PENDING_CANCEL]:
if (previous_state == OrderState.PENDING_CREATE and
previous_state != new_state and
new_state not in [OrderState.CANCELED, OrderState.FAILED, OrderState.PENDING_CANCEL]):
self.logger().info(tracked_order.build_order_created_message())
self._trigger_created_event(tracked_order)

Expand Down
Expand Up @@ -5,6 +5,9 @@
DEFAULT_DOMAIN = ""
TESTNET_DOMAIN = "testnet"

MAX_ORDER_ID_LEN = CONSTANTS.MAX_ORDER_ID_LEN
HBOT_ORDER_ID_PREFIX = CONSTANTS.HBOT_ORDER_ID_PREFIX

TRANSACTIONS_CHECK_INTERVAL = CONSTANTS.TRANSACTIONS_CHECK_INTERVAL

ORDER_STATE_MAP = CONSTANTS.ORDER_STATE_MAP
Expand Down
Expand Up @@ -94,11 +94,11 @@ def domain(self) -> str:

@property
def client_order_id_max_length(self) -> int:
return None
return CONSTANTS.MAX_ORDER_ID_LEN

@property
def client_order_id_prefix(self) -> str:
return ""
return CONSTANTS.HBOT_ORDER_ID_PREFIX

@property
def trading_rules_request_path(self) -> str:
Expand Down Expand Up @@ -704,37 +704,14 @@ async def _user_stream_event_listener(self):
await self._check_created_orders_status_for_transaction(transaction_hash=transaction_hash)
elif channel == "trade":
trade_update = event_data
tracked_order = self._order_tracker.all_fillable_orders_by_exchange_order_id.get(
trade_update.exchange_order_id
)
if tracked_order is not None:
new_trade_update = TradeUpdate(
trade_id=trade_update.trade_id,
client_order_id=tracked_order.client_order_id,
exchange_order_id=trade_update.exchange_order_id,
trading_pair=trade_update.trading_pair,
fill_timestamp=trade_update.fill_timestamp,
fill_price=trade_update.fill_price,
fill_base_amount=trade_update.fill_base_amount,
fill_quote_amount=trade_update.fill_quote_amount,
fee=trade_update.fee,
is_taker=trade_update.is_taker,
)
self._order_tracker.process_trade_update(new_trade_update)
self._order_tracker.process_trade_update(trade_update)
elif channel == "order":
order_update = event_data
tracked_order = self._order_tracker.all_updatable_orders_by_exchange_order_id.get(
order_update.exchange_order_id)
tracked_order = self._order_tracker.all_updatable_orders.get(order_update.client_order_id)
if tracked_order is not None:
new_order_update = OrderUpdate(
trading_pair=order_update.trading_pair,
update_timestamp=order_update.update_timestamp,
new_state=order_update.new_state,
client_order_id=tracked_order.client_order_id,
exchange_order_id=order_update.exchange_order_id,
misc_updates=order_update.misc_updates,
)
self._order_tracker.process_order_update(order_update=new_order_update)
is_partial_fill = order_update.new_state == OrderState.FILLED and not tracked_order.is_filled
if not is_partial_fill:
self._order_tracker.process_order_update(order_update=order_update)
elif channel == "balance":
if event_data.total_balance is not None:
self._account_balances[event_data.asset_name] = event_data.total_balance
Expand Down Expand Up @@ -806,41 +783,21 @@ async def _all_trade_updates_for_order(self, order: GatewayPerpetualInFlightOrde
async def _update_orders_fills(self, orders: List[GatewayPerpetualInFlightOrder]):
oldest_order_creation_time = self.current_timestamp
all_market_ids = set()
orders_by_hash = {}

for order in orders:
oldest_order_creation_time = min(oldest_order_creation_time, order.creation_timestamp)
all_market_ids.add(await self.exchange_symbol_associated_to_pair(trading_pair=order.trading_pair))
if order.exchange_order_id is not None:
orders_by_hash[order.exchange_order_id] = order

try:
start_time = min(oldest_order_creation_time, self._latest_polled_order_fill_time)
trade_updates = await self._data_source.perpetual_trade_updates(market_ids=all_market_ids, start_time=start_time)
trade_updates = await self._data_source.perpetual_trade_updates(
market_ids=all_market_ids, start_time=start_time
)
for trade_update in trade_updates:
tracked_order = orders_by_hash.get(trade_update.exchange_order_id)
if tracked_order is not None:
fee = TradeFeeBase.new_perpetual_fee(
fee_schema=self.trade_fee_schema(),
position_action=tracked_order.position,
percent_token=trade_update.fee.percent_token,
flat_fees=trade_update.fee.flat_fees,
)
new_trade_update = TradeUpdate(
trade_id=trade_update.trade_id,
client_order_id=tracked_order.client_order_id,
exchange_order_id=trade_update.exchange_order_id,
trading_pair=trade_update.trading_pair,
fill_timestamp=trade_update.fill_timestamp,
fill_price=trade_update.fill_price,
fill_base_amount=trade_update.fill_base_amount,
fill_quote_amount=trade_update.fill_quote_amount,
fee=fee,
is_taker=trade_update.is_taker,
)
self._latest_polled_order_fill_time = max(self._latest_polled_order_fill_time,
trade_update.fill_timestamp)
self._order_tracker.process_trade_update(new_trade_update)
self._latest_polled_order_fill_time = max(
self._latest_polled_order_fill_time, trade_update.fill_timestamp
)
self._order_tracker.process_trade_update(trade_update)
except asyncio.CancelledError:
raise
except Exception as ex:
Expand All @@ -856,13 +813,12 @@ async def _request_order_status(self, tracked_order: GatewayPerpetualInFlightOrd
async def _update_orders_with_error_handler(self, orders: List[GatewayPerpetualInFlightOrder], error_handler: Callable):
oldest_order_creation_time = self.current_timestamp
all_market_ids = set()
orders_by_hash = {}
orders_by_id = {}

for order in orders:
oldest_order_creation_time = min(oldest_order_creation_time, order.creation_timestamp)
all_market_ids.add(await self.exchange_symbol_associated_to_pair(trading_pair=order.trading_pair))
if order.exchange_order_id is not None:
orders_by_hash[order.exchange_order_id] = order
orders_by_id[order.client_order_id] = order

try:
order_updates = await self._data_source.perpetual_order_updates(
Expand All @@ -871,48 +827,37 @@ async def _update_orders_with_error_handler(self, orders: List[GatewayPerpetualI
)

for order_update in order_updates:
tracked_order = orders_by_hash.get(order_update.exchange_order_id)
tracked_order = orders_by_id.get(order_update.client_order_id)
if tracked_order is not None:
try:
new_order_update = OrderUpdate(
trading_pair=order_update.trading_pair,
update_timestamp=order_update.update_timestamp,
new_state=order_update.new_state,
client_order_id=tracked_order.client_order_id,
exchange_order_id=order_update.exchange_order_id,
misc_updates=order_update.misc_updates,
)

if tracked_order.current_state == OrderState.PENDING_CREATE and new_order_update.new_state != OrderState.OPEN:
if tracked_order.current_state == OrderState.PENDING_CREATE and order_update.new_state != OrderState.OPEN:
open_update = OrderUpdate(
trading_pair=order_update.trading_pair,
update_timestamp=order_update.update_timestamp,
new_state=OrderState.OPEN,
client_order_id=tracked_order.client_order_id,
client_order_id=order_update.client_order_id,
exchange_order_id=order_update.exchange_order_id,
misc_updates=order_update.misc_updates,
)
self._order_tracker.process_order_update(open_update)

del orders_by_hash[order_update.exchange_order_id]
self._order_tracker.process_order_update(new_order_update)
del orders_by_id[order_update.client_order_id]
self._order_tracker.process_order_update(order_update)
except asyncio.CancelledError:
raise
except Exception as ex:
await error_handler(tracked_order, ex)

if len(orders_by_hash) > 0:
# await self._data_source.check_order_hashes_synchronization(orders=orders_by_hash.values())
for order in orders_by_hash.values():
not_found_error = RuntimeError(
f"There was a problem updating order {order.client_order_id} "
f"({CONSTANTS.ORDER_NOT_FOUND_ERROR_MESSAGE})"
)
await error_handler(order, not_found_error)
for order in orders_by_id.values():
not_found_error = RuntimeError(
f"There was a problem updating order {order.client_order_id} "
f"({CONSTANTS.ORDER_NOT_FOUND_ERROR_MESSAGE})"
)
await error_handler(order, not_found_error)
except asyncio.CancelledError:
raise
except Exception as request_error:
for order in orders_by_hash.values():
for order in orders_by_id.values():
await error_handler(order, request_error)

def _create_web_assistants_factory(self) -> WebAssistantsFactory:
Expand Down Expand Up @@ -1029,46 +974,22 @@ async def _check_orders_transactions(self):
async def _check_orders_creation_transactions(self):
orders: List[GatewayPerpetualInFlightOrder] = self._order_tracker.active_orders.values()
orders_by_creation_tx = defaultdict(list)
orders_with_inconsistent_hash = []

for order in orders:
if order.creation_transaction_hash is not None and order.is_pending_create:
orders_by_creation_tx[order.creation_transaction_hash].append(order)

for transaction_hash, orders in orders_by_creation_tx.items():
all_orders = orders.copy()
try:
order_updates = await self._data_source.order_updates_for_transaction(
transaction_hash=transaction_hash, perpetual_orders=orders
)

for order_update in order_updates:
tracked_order = self._order_tracker.active_orders.get(order_update.client_order_id)
if tracked_order is not None:
all_orders.remove(tracked_order)
if (tracked_order.exchange_order_id is not None
and tracked_order.exchange_order_id != order_update.exchange_order_id):
tracked_order.update_exchange_order_id(order_update.exchange_order_id)
orders_with_inconsistent_hash.append(tracked_order)
self._order_tracker.process_order_update(order_update=order_update)

for not_found_order in all_orders:
self._update_order_after_failure(
order_id=not_found_order.client_order_id,
trading_pair=not_found_order.trading_pair
)

except ValueError:
self.logger().debug(f"Transaction not included in a block yet ({transaction_hash})")

if len(orders_with_inconsistent_hash) > 0:
async with self._data_source.order_creation_lock:
active_orders = [
order for order in self._order_tracker.active_orders.values()
if order not in orders_with_inconsistent_hash and order.current_state == OrderState.PENDING_CREATE
]
await self._data_source.reset_order_hash_generator(active_orders=active_orders)

async def _check_created_orders_status_for_transaction(self, transaction_hash: str):
transaction_orders = []
order: GatewayPerpetualInFlightOrder
Expand Down