Skip to content

Commit

Permalink
Slight refactoring to handle relayer Channel error with helper (infor…
Browse files Browse the repository at this point in the history
…malsystems#922)

* Slight refactoring to handle relayer Channel error with handle_channel_error

* Rename handle_channel_error to Kind::channel
  • Loading branch information
soareschen committed May 11, 2021
1 parent bfc500e commit 64c5868
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 98 deletions.
132 changes: 34 additions & 98 deletions relayer/src/chain/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,15 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
Ok(event_batch) => {
self.event_bus
.broadcast(Arc::new(event_batch))
.map_err(|e| Kind::Channel.context(e))?;
.map_err(Kind::channel)?;
},
Err(e) => error!("received error via event bus: {}", e),
}
},
recv(self.request_receiver) -> event => {
match event {
Ok(ChainRequest::Terminate { reply_to }) => {
reply_to.send(Ok(())).map_err(|_| Kind::Channel)?;
reply_to.send(Ok(())).map_err(Kind::channel)?;
break;
}

Expand Down Expand Up @@ -320,9 +320,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
fn subscribe(&mut self, reply_to: ReplyTo<Subscription>) -> Result<(), Error> {
let subscription = self.event_bus.subscribe();

reply_to
.send(Ok(subscription))
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(Ok(subscription)).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -334,49 +332,39 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
) -> Result<(), Error> {
let result = self.chain.send_msgs(proto_msgs);

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}

fn query_latest_height(&self, reply_to: ReplyTo<Height>) -> Result<(), Error> {
let latest_height = self.chain.query_latest_height();

reply_to
.send(latest_height)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(latest_height).map_err(Kind::channel)?;

Ok(())
}

fn get_signer(&mut self, reply_to: ReplyTo<Signer>) -> Result<(), Error> {
let result = self.chain.get_signer();

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}

fn get_key(&mut self, reply_to: ReplyTo<KeyEntry>) -> Result<(), Error> {
let result = self.chain.get_key();

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}

fn module_version(&self, port_id: PortId, reply_to: ReplyTo<String>) -> Result<(), Error> {
let result = self.chain.query_module_version(&port_id);

reply_to
.send(Ok(result))
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(Ok(result)).map_err(Kind::channel)?;

Ok(())
}
Expand Down Expand Up @@ -411,9 +399,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
.map_or_else(Err, |header| Ok(header.wrap_any())),
};

reply_to
.send(header)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(header).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -429,9 +415,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
.build_client_state(height)
.map(|cs| cs.wrap_any());

reply_to
.send(client_state)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(client_state).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -451,9 +435,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
.build_consensus_state(light_block)
.map(|cs| cs.wrap_any());

reply_to
.send(consensus_state)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(consensus_state).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -469,9 +451,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
.light_client
.check_misbehaviour(update_event, &client_state);

reply_to
.send(misbehaviour)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(misbehaviour).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -494,9 +474,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
let result = result
.map(|(opt_client_state, proofs)| (opt_client_state.map(|cs| cs.wrap_any()), proofs));

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -512,9 +490,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
.query_client_state(&client_id, height)
.map(|cs| cs.wrap_any());

reply_to
.send(client_state)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(client_state).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -529,9 +505,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
.query_upgraded_client_state(height)
.map(|(cl, proof)| (cl.wrap_any(), proof));

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -543,9 +517,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
) -> Result<(), Error> {
let consensus_states = self.chain.query_consensus_states(request);

reply_to
.send(consensus_states)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(consensus_states).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -561,9 +533,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
self.chain
.query_consensus_state(client_id, consensus_height, query_height);

reply_to
.send(consensus_state)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(consensus_state).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -578,29 +548,23 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
.query_upgraded_consensus_state(height)
.map(|(cs, proof)| (cs.wrap_any(), proof));

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}

fn query_commitment_prefix(&self, reply_to: ReplyTo<CommitmentPrefix>) -> Result<(), Error> {
let prefix = self.chain.query_commitment_prefix();

reply_to
.send(prefix)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(prefix).map_err(Kind::channel)?;

Ok(())
}

fn query_compatible_versions(&self, reply_to: ReplyTo<Vec<Version>>) -> Result<(), Error> {
let versions = self.chain.query_compatible_versions();

reply_to
.send(versions)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(versions).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -613,9 +577,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
) -> Result<(), Error> {
let connection_end = self.chain.query_connection(&connection_id, height);

reply_to
.send(connection_end)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(connection_end).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -627,9 +589,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
) -> Result<(), Error> {
let result = self.chain.query_channels(request);

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -643,9 +603,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
) -> Result<(), Error> {
let result = self.chain.query_channel(&port_id, &channel_id, height);

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -661,9 +619,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
.proven_client_state(&client_id, height)
.map(|(cs, mp)| (cs.wrap_any(), mp));

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -676,9 +632,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
) -> Result<(), Error> {
let result = self.chain.proven_connection(&connection_id, height);

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -695,9 +649,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
.proven_client_consensus(&client_id, consensus_height, height)
.map(|(cs, mp)| (cs.wrap_any(), mp));

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -713,9 +665,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
.chain
.build_channel_proofs(&port_id, &channel_id, height);

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -733,9 +683,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
self.chain
.build_packet_proofs(packet_type, port_id, channel_id, sequence, height);

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -747,9 +695,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
) -> Result<(), Error> {
let result = self.chain.query_packet_commitments(request);

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -761,9 +707,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
) -> Result<(), Error> {
let result = self.chain.query_unreceived_packets(request);

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -775,9 +719,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
) -> Result<(), Error> {
let result = self.chain.query_packet_acknowledgements(request);

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -789,9 +731,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
) -> Result<(), Error> {
let result = self.chain.query_unreceived_acknowledgements(request);

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -803,9 +743,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
) -> Result<(), Error> {
let result = self.chain.query_next_sequence_receive(request);

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand All @@ -817,9 +755,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
) -> Result<(), Error> {
let result = self.chain.query_txs(request);

reply_to
.send(result)
.map_err(|e| Kind::Channel.context(e))?;
reply_to.send(result).map_err(Kind::channel)?;

Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions relayer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,8 @@ impl Kind {
pub fn context(self, source: impl Into<BoxError>) -> Context<Self> {
Context::new(self, Some(source.into()))
}

pub fn channel(err: impl Into<BoxError>) -> Context<Self> {
Self::Channel.context(err)
}
}

0 comments on commit 64c5868

Please sign in to comment.