Skip to content

Commit

Permalink
port changes from ccxt#346
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosmiei committed Jun 27, 2022
1 parent 0a34766 commit 9433f33
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
9 changes: 7 additions & 2 deletions js/base/Exchange.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ module.exports = class Exchange extends ccxt.Exchange {
// subscribe -----→ receive
//
const future = client.future (messageHash);
// read and write subscription, this is done before connecting the client
// to avoid race conditions when other parts of the code read or write to the client.subscriptions
const clientSubscription = client.subscriptions[subscribeHash];
if (!clientSubscription) {
client.subscriptions[subscribeHash] = subscription || true;
}
// we intentionally do not use await here to avoid unhandled exceptions
// the policy is to make sure that 100% of promises are resolved or rejected
// either with a call to client.resolve or client.reject with
Expand All @@ -113,8 +119,7 @@ module.exports = class Exchange extends ccxt.Exchange {
// catch any connection-level exceptions from the client
// (connection established successfully)
connected.then (() => {
if (!client.subscriptions[subscribeHash]) {
client.subscriptions[subscribeHash] = subscription || true;
if (!clientSubscription) {
const options = this.safeValue (this.options, 'ws');
const cost = this.safeValue (options, 'cost', 1);
if (message) {
Expand Down
9 changes: 6 additions & 3 deletions php/ClientTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ public function watch($url, $message_hash, $message = null, $subscribe_hash = nu
// todo: calculate the backoff delay in php
$backoff_delay = 0; // milliseconds
$future = $client->future($message_hash);
$subscribed = isset($client->subscriptions[$subscribe_hash]);
if (!$subscribed) {
$client->subscriptions[$subscribe_hash] = isset($subscription) ? $subscription : true;
}
$connected = $client->connect($backoff_delay);
$connected->then(
function($result) use ($client, $message_hash, $message, $subscribe_hash, $subscription) {
if (!isset($client->subscriptions[$subscribe_hash])) {
$client->subscriptions[$subscribe_hash] = isset($subscription) ? $subscription : true;
if (!$subscribed) {
// todo: add PHP async rate-limiting
// todo: decouple signing from subscriptions
$options = $this->safe_value($this->options, 'ws');
Expand Down Expand Up @@ -155,4 +158,4 @@ public function find_timeframe($timeframe, $timeframes = null) {
}
return null;
}
}
}
8 changes: 6 additions & 2 deletions python/ccxtpro/base/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ def watch(self, url, message_hash, message=None, subscribe_hash=None, subscripti
client = self.client(url)
future = client.future(message_hash)

subscribed = subscribe_hash in client.subscriptions

if not subscribed:
client.subscriptions[subscribe_hash] = subscription or True

# base exchange self.open starts the aiohttp Session in an async context
self.open()
connected = client.connected if client.connected.done() \
Expand All @@ -120,8 +125,7 @@ def after(fut):
# future will already have this exception set to it in self.reset
# so we don't set it again here to avoid an InvalidState error
return
if subscribe_hash not in client.subscriptions:
client.subscriptions[subscribe_hash] = subscription or True
if not subscribed:
# todo: decouple signing from subscriptions
options = self.safe_value(self.options, 'ws')
cost = self.safe_value(options, 'cost', 1)
Expand Down

0 comments on commit 9433f33

Please sign in to comment.