-
Notifications
You must be signed in to change notification settings - Fork 48
/
event.rs
220 lines (203 loc) · 7.84 KB
/
event.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
//! This module provides utilities corresponding to the events emitted by a contract.
//!
//! There are two ways that you can get contract events:
//! 1. By fetching events corresponding to a particular transaction. For this, you will need to
//! provide a connection, contract instance and transaction coordinate to [get_contract_events]
//! function. Similarly to [crate::utility::BlocksApi::get_tx_events], it will fetch block
//! events, filter them and decode all relevant ones.
//! 2. By listening to all contract events. For this, you will need to provide a connection, some
//! contracts and an `UnboundedSender` to the [listen_contract_events] function. In a loop,
//! it will inspect every finalized block and look for contract events.
use std::{collections::HashMap, error::Error};
use anyhow::{anyhow, bail, Result};
use contract_transcode::Value;
use futures::{channel::mpsc::UnboundedSender, StreamExt};
use subxt::{events::EventDetails, ext::sp_core::H256};
use crate::{
api::contracts::events::ContractEmitted, connections::TxInfo, contract::ContractInstance,
utility::BlocksApi, AccountId, AlephConfig, Connection,
};
/// Represents details about the block contianing the event.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct BlockDetails {
/// the block number
pub block_number: u32,
/// the block hash
pub block_hash: H256,
}
/// Represents a single event emitted by a contract.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ContractEvent {
/// The address of the contract that emitted the event.
pub contract: AccountId,
/// The name of the event.
pub name: Option<String>,
/// Data contained in the event.
pub data: HashMap<String, Value>,
/// details about the block containing the event
pub block_details: Option<BlockDetails>,
}
/// Fetch and decode all events that correspond to the call identified by `tx_info` made to
/// `contract`.
///
/// ```no_run
/// # use aleph_client::{AccountId, Connection, SignedConnection};
/// # use aleph_client::contract::ContractInstance;
/// # use aleph_client::contract::event::{get_contract_events, listen_contract_events};
/// # use anyhow::Result;
/// use futures::{channel::mpsc::unbounded, StreamExt};
///
/// # async fn example(conn: Connection, signed_conn: SignedConnection, address: AccountId, path: &str) -> Result<()> {
/// let contract = ContractInstance::new(address, path)?;
///
/// let tx_info = contract.contract_exec0(&signed_conn, "some_method").await?;
///
/// println!("Received events {:?}", get_contract_events(&conn, &contract, tx_info).await);
///
/// # Ok(())
/// # }
/// ```
pub async fn get_contract_events(
conn: &Connection,
contract: &ContractInstance,
tx_info: TxInfo,
) -> Result<Vec<ContractEvent>> {
let events = conn.get_tx_events(tx_info).await?;
translate_events(events.iter(), &[contract], None)
.into_iter()
.collect()
}
/// Starts an event listening loop. Will send contract event and every error encountered while
/// fetching through the provided [UnboundedSender].
///
/// Only events coming from the address of one of the `contracts` will be decoded.
///
/// The loop will terminate once `sender` is closed. The loop may also terminate in case of errors while fetching blocks
/// or decoding events (pallet events, contract event decoding errors are sent over the channel).
///
/// You most likely want to `tokio::spawn` the resulting future, so that it runs concurrently.
///
/// ```no_run
/// # use std::sync::Arc;
/// # use std::sync::mpsc::channel;
/// # use std::time::Duration;
/// # use aleph_client::{AccountId, Connection, SignedConnection};
/// # use aleph_client::contract::ContractInstance;
/// # use aleph_client::contract::event::{listen_contract_events};
/// # use anyhow::Result;
/// use futures::{channel::mpsc::unbounded, StreamExt};
///
/// # async fn example(conn: Connection, signed_conn: SignedConnection, address1: AccountId, address2: AccountId, path1: &str, path2: &str) -> Result<()> {
/// // The `Arc` makes it possible to pass a reference to the contract to another thread
/// let contract1 = Arc::new(ContractInstance::new(address1, path1)?);
/// let contract2 = Arc::new(ContractInstance::new(address2, path2)?);
///
/// let conn_copy = conn.clone();
/// let contract1_copy = contract1.clone();
/// let contract2_copy = contract2.clone();
///
/// let (tx, mut rx) = unbounded();
/// let listen = || async move {
/// listen_contract_events(&conn, &[contract1_copy.as_ref(), contract2_copy.as_ref()], tx).await?;
/// <Result<(), anyhow::Error>>::Ok(())
/// };
/// let join = tokio::spawn(listen());
///
/// contract1.contract_exec0(&signed_conn, "some_method").await?;
/// contract2.contract_exec0(&signed_conn, "some_other_method").await?;
///
/// println!("Received event {:?}", rx.next().await);
///
/// rx.close();
/// join.await??;
///
/// # Ok(())
/// # }
/// ```
pub async fn listen_contract_events(
conn: &Connection,
contracts: &[&ContractInstance],
sender: UnboundedSender<Result<ContractEvent>>,
) -> Result<()> {
let mut block_subscription = conn.as_client().blocks().subscribe_finalized().await?;
while let Some(block) = block_subscription.next().await {
if sender.is_closed() {
break;
}
let block = block?;
let events = block.events().await?;
for event in translate_events(
events.iter(),
contracts,
Some(BlockDetails {
block_number: block.number(),
block_hash: block.hash(),
}),
) {
sender.unbounded_send(event)?;
}
}
Ok(())
}
/// Try to convert `events` to `ContractEvent` using matching contract from `contracts`.
pub fn translate_events<
Err: Error + Into<anyhow::Error> + Send + Sync + 'static,
E: Iterator<Item = Result<EventDetails<AlephConfig>, Err>>,
>(
events: E,
contracts: &[&ContractInstance],
block_details: Option<BlockDetails>,
) -> Vec<Result<ContractEvent>> {
events
.filter_map(|maybe_event| {
maybe_event
.map(|e| e.as_event::<ContractEmitted>().ok().flatten())
.transpose()
})
.map(|maybe_event| match maybe_event {
Ok(e) => translate_event(&e, contracts, block_details.clone()),
Err(e) => Err(anyhow::Error::from(e)),
})
.collect()
}
/// Try to convert `event` to `ContractEvent` using matching contract from `contracts`.
fn translate_event(
event: &ContractEmitted,
contracts: &[&ContractInstance],
block_details: Option<BlockDetails>,
) -> Result<ContractEvent> {
let matching_contract = contracts
.iter()
.find(|contract| contract.address() == &event.contract.0)
.ok_or_else(|| anyhow!("The event wasn't emitted by any of the provided contracts"))?;
let data = zero_prefixed(&event.data);
let data = matching_contract
.transcoder
.decode_contract_event(&mut data.as_slice())?;
build_event(matching_contract.address.clone(), data, block_details)
}
/// The contract transcoder assumes there is an extra byte (that it discards) indicating the size of the data. However,
/// data arriving through the subscription as used in this file don't have this extra byte. This function adds it.
fn zero_prefixed(data: &[u8]) -> Vec<u8> {
let mut result = vec![0];
result.extend_from_slice(data);
result
}
fn build_event(
address: AccountId,
event_data: Value,
block_details: Option<BlockDetails>,
) -> Result<ContractEvent> {
match event_data {
Value::Map(map) => Ok(ContractEvent {
contract: address,
name: map.ident(),
data: map
.iter()
.map(|(key, value)| (key.to_string(), value.clone()))
.collect(),
block_details,
}),
_ => bail!("Contract event data is not a map"),
}
}