Skip to content

Commit

Permalink
hotfix stop gossip (#829)
Browse files Browse the repository at this point in the history
## Describe your changes
Stops Orderbook gossip
## Issue ticket number and link

## Checklist before requesting a review
- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] I removed all Clippy and Formatting Warnings. 
- [ ] I added required Copyrights.
  • Loading branch information
Gauthamastro committed Jul 3, 2023
2 parents 8844fcd + b26ff97 commit 272c575
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 108 deletions.
209 changes: 108 additions & 101 deletions clients/orderbook/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,61 +111,66 @@ where
message: &GossipMessage,
peerid: PeerId,
) -> ValidationResult<B::Hash> {
let msg_hash = sp_core::hashing::blake2_128(&message.encode());
// Discard if we already know this message
match message {
GossipMessage::ObMessage(msg) => {
let latest_worker_nonce = *self.latest_worker_nonce.read();
if (msg.worker_nonce > latest_worker_nonce &&
msg.version == *self.state_version.read()) ||
msg.reset
{
// It's a new message so we process it and keep it in our pool
ValidationResult::ProcessAndKeep(self.topic)
} else {
// We already saw this message, so discarding.
ValidationResult::Discard
}
},

GossipMessage::WantWorkerNonce(from, to, version) => {
if from > to || *version < *self.state_version.read() {
// Invalid request
return ValidationResult::Discard
}
// Validators only process it if the request is for nonces after
if *from >= self.last_snapshot.read().worker_nonce {
ValidationResult::ProcessAndDiscard(self.topic)
} else {
ValidationResult::Discard
}
},
GossipMessage::Want(snapshot_id, _) => {
// TODO: Currently enabled for all nodes
// if self.is_validator {
// // Only fullnodes will respond to this
// return ValidationResult::Discard
// }
// We only process the request for last snapshot
if self.last_snapshot.read().snapshot_id == *snapshot_id {
self.message_cache.write().insert((msg_hash, peerid), Instant::now());
ValidationResult::ProcessAndDiscard(self.topic)
} else {
ValidationResult::Discard
}
},
_ => {
// Rest of the match patterns are directed messages so we assume that directed
// messages are only accessible to those recipient peers so we process and
// discard them and not propagate to others
if self.message_cache.read().contains_key(&(msg_hash, peerid)) {
ValidationResult::Discard
} else {
self.message_cache.write().insert((msg_hash, peerid), Instant::now());
ValidationResult::ProcessAndDiscard(self.topic)
}
},
}
return ValidationResult::Discard;
// let msg_hash = sp_core::hashing::blake2_128(&message.encode());
// // Discard if we already know this message
// match message {
// GossipMessage::ObMessage(msg) => {
// let latest_worker_nonce = *self.latest_worker_nonce.read();
// if (msg.worker_nonce > latest_worker_nonce &&
// msg.version == *self.state_version.read()) ||
// msg.reset
// {
// // It's a new message so we process it and keep it in our pool
// ValidationResult::ProcessAndKeep(self.topic)
// } else {
// // We already saw this message, so discarding.
// ValidationResult::Discard
// }
// },
//
// GossipMessage::WantWorkerNonce(from, to, version) => {
// // Discard all WantWorkerNonce requests
// ValidationResult::Discard
// // if from > to || *version < *self.state_version.read() {
// // // Invalid request
// // return ValidationResult::Discard
// // }
// // // Validators only process it if the request is for nonces after
// // if *from >= self.last_snapshot.read().worker_nonce {
// // ValidationResult::ProcessAndDiscard(self.topic)
// // } else {
// // ValidationResult::Discard
// // }
// },
// GossipMessage::Want(snapshot_id, _) => {
// // TODO: Currently enabled for all nodes
// // if self.is_validator {
// // // Only fullnodes will respond to this
// // return ValidationResult::Discard
// // }
// // We only process the request for last snapshot
// // Discard all WantWorkerNonce requests
// ValidationResult::Discard
// // if self.last_snapshot.read().snapshot_id == *snapshot_id {
// // self.message_cache.write().insert((msg_hash, peerid), Instant::now());
// // ValidationResult::ProcessAndDiscard(self.topic)
// // } else {
// // ValidationResult::Discard
// // }
// },
// _ => {
// // Rest of the match patterns are directed messages so we assume that directed
// // messages are only accessible to those recipient peers so we process and
// // discard them and not propagate to others
// if self.message_cache.read().contains_key(&(msg_hash, peerid)) {
// ValidationResult::Discard
// } else {
// self.message_cache.write().insert((msg_hash, peerid), Instant::now());
// ValidationResult::ProcessAndDiscard(self.topic)
// }
// },
// }
}

/// Defines if the message can be rebroadcasted.
Expand All @@ -175,34 +180,35 @@ where
/// * `message`: Gossip message to rebroadcast.
/// * `peerid`: Identifier of a peer of the network.
pub fn rebroadcast_check(&self, message: &GossipMessage, peerid: PeerId) -> bool {
let mut cache = self.message_cache.write();
let msg_hash = sp_core::hashing::blake2_128(&message.encode());

if self.message_expired_check(message) {
// Remove the message from cache when the message is expired.
cache.remove(&(msg_hash, peerid));
return false
}

let interval = match message {
GossipMessage::Want(_, _) => WANT_REBROADCAST_INTERVAL,
_ => REBROADCAST_INTERVAL,
};
match cache.get(&(msg_hash, peerid)) {
None => {
// Record the first rebroadcast of this message in cache
cache.insert((msg_hash, peerid), Instant::now());
true
},
Some(last_time) => {
let expired = Instant::now().sub(*last_time) > interval;
if expired {
// Remove the message from cache when the message is expired.
cache.remove(&(msg_hash, peerid));
}
expired
},
}
return false
// let mut cache = self.message_cache.write();
// let msg_hash = sp_core::hashing::blake2_128(&message.encode());
//
// if self.message_expired_check(message) {
// // Remove the message from cache when the message is expired.
// cache.remove(&(msg_hash, peerid));
// return false
// }
//
// let interval = match message {
// GossipMessage::Want(_, _) => WANT_REBROADCAST_INTERVAL,
// _ => REBROADCAST_INTERVAL,
// };
// match cache.get(&(msg_hash, peerid)) {
// None => {
// // Record the first rebroadcast of this message in cache
// cache.insert((msg_hash, peerid), Instant::now());
// true
// },
// Some(last_time) => {
// let expired = Instant::now().sub(*last_time) > interval;
// if expired {
// // Remove the message from cache when the message is expired.
// cache.remove(&(msg_hash, peerid));
// }
// expired
// },
// }
}

/// Returns true if the message is expired.
Expand All @@ -211,24 +217,25 @@ where
///
/// * `message`: Gossip message to check if it is expired.
pub fn message_expired_check(&self, message: &GossipMessage) -> bool {
match message {
GossipMessage::ObMessage(msg) if msg.reset =>
msg.worker_nonce < self.last_snapshot.read().worker_nonce ||
msg.version.saturating_add(1) != *self.state_version.read(),
GossipMessage::ObMessage(msg) if !msg.reset =>
msg.worker_nonce < self.last_snapshot.read().worker_nonce ||
(msg.version < *self.state_version.read()),

GossipMessage::WantWorkerNonce(from, _, version) => {
// Validators only process it if the request is for nonces after
(*from < self.last_snapshot.read().worker_nonce) ||
(*version < *self.state_version.read())
},

GossipMessage::Want(snapshot_id, _) =>
*snapshot_id != self.last_snapshot.read().snapshot_id,
_ => false,
}
return true // to fix temporary problem
// match message {
// GossipMessage::ObMessage(msg) if msg.reset =>
// msg.worker_nonce < self.last_snapshot.read().worker_nonce ||
// msg.version.saturating_add(1) != *self.state_version.read(),
// GossipMessage::ObMessage(msg) if !msg.reset =>
// msg.worker_nonce < self.last_snapshot.read().worker_nonce ||
// (msg.version < *self.state_version.read()),
//
// GossipMessage::WantWorkerNonce(from, _, version) => {
// // Validators only process it if the request is for nonces after
// (*from < self.last_snapshot.read().worker_nonce) ||
// (*version < *self.state_version.read())
// },
//
// GossipMessage::Want(snapshot_id, _) =>
// *snapshot_id != self.last_snapshot.read().snapshot_id,
// _ => false,
// }
}
}

Expand Down
12 changes: 6 additions & 6 deletions clients/orderbook/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ where
self.latest_state_change_id = action.stid;
// Multicast the message to other peers
let gossip_message = GossipMessage::ObMessage(Box::new(action.clone()));
self.gossip_engine.gossip_message(topic::<B>(), gossip_message.encode(), true);
self.gossip_engine.gossip_message(topic::<B>(), gossip_message.encode(), false);
info!(target:"orderbook","📒Message with stid: {:?} gossiped to others",self.latest_worker_nonce.read());
Ok(())
}
Expand Down Expand Up @@ -568,7 +568,7 @@ where
*self.state_version.read(),
);

self.gossip_engine.gossip_message(topic::<B>(), message.encode(), true);
self.gossip_engine.gossip_message(topic::<B>(), message.encode(), false);
metric_inc!(self, ob_messages_sent);
metric_add!(self, ob_data_sent, message.encoded_size() as u64);
} else {
Expand Down Expand Up @@ -659,7 +659,7 @@ where
self.known_messages.insert(action.worker_nonce, action.clone());
// Multicast the message to other peers
let gossip_message = GossipMessage::ObMessage(Box::new(action.clone()));
self.gossip_engine.gossip_message(topic::<B>(), gossip_message.encode(), true);
self.gossip_engine.gossip_message(topic::<B>(), gossip_message.encode(), false);
}
self.check_state_sync().await?;
self.check_worker_nonce_gap_fill().await?;
Expand Down Expand Up @@ -1321,7 +1321,7 @@ where
.expect("📒 Expected to create bitmap"),
);
info!(target:"orderbook","📒 Sending sync requests to neighbours...");
self.gossip_engine.gossip_message(topic::<B>(), message.encode(), true);
self.gossip_engine.gossip_message(topic::<B>(), message.encode(), false);
} else {
// We have all the data, state is synced,
// so load snapshot shouldn't have any problem now
Expand Down Expand Up @@ -1420,14 +1420,14 @@ where
if to.saturating_sub(from) > 1 {
let want_request =
GossipMessage::WantWorkerNonce(from, **to, *self.state_version.read());
self.gossip_engine.gossip_message(topic::<B>(), want_request.encode(), true);
self.gossip_engine.gossip_message(topic::<B>(), want_request.encode(), false);
info!(target:"orderbook","📒 Sending periodic sync request for nonces between: from:{from:?} to: {to:?}");
} else if to.saturating_sub(from) == 1 && !self.is_validator {
// If we are a fullnode and we know all the stids
// then broadcast the next best nonce periodically
if let Some(msg) = self.known_messages.get(to).cloned() {
let best_msg = GossipMessage::ObMessage(Box::new(msg));
self.gossip_engine.gossip_message(topic::<B>(), best_msg.encode(), true);
self.gossip_engine.gossip_message(topic::<B>(), best_msg.encode(), false);
self.gossip_engine.broadcast_topic(topic::<B>(), true);
info!(target:"orderbook","📒 Sending periodic best message broadcast, nonce: {to:?}");
}
Expand Down
2 changes: 1 addition & 1 deletion node/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polkadex-node"
version = "5.1.2"
version = "5.1.3"
authors = ["Polkadex OÜ <https://polkadex.trade>"]
description = "Polkadex main blockchain"
edition = "2021"
Expand Down

0 comments on commit 272c575

Please sign in to comment.