Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Geode State Subscription #61

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
let is_authority = role.is_authority();
let subscription_task_executor =
sc_rpc::SubscriptionTaskExecutor::new(task_manager.spawn_handle());
let geode_subscription_executor =
sc_rpc::SubscriptionTaskExecutor::new(task_manager.spawn_handle());
let babe_config = babe_link.config().clone();
let shared_epoch_changes = babe_link.epoch_changes().clone();
let justification_stream = grandpa_link.justification_stream();
Expand Down Expand Up @@ -277,7 +279,11 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
},
};

automata_rpc::create_full(deps, subscription_task_executor.clone())
automata_rpc::create_full(
deps,
subscription_task_executor.clone(),
geode_subscription_executor.clone(),
)
})
};

Expand Down
3 changes: 3 additions & 0 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ hex = '0.4.3'
sp-std = '3.0.0'
frame-system-rpc-runtime-api = { default-features = false, version = '3.0.0' }
sp-core = { features = ["full_crypto"], version = '3.0.0' }
log = "0.4.8"
futures = "0.3.9"
codec = { default-features = false, features = ['derive'], package = 'parity-scale-codec', version = '2.0.0' }

# local dependencies
automata-primitives = { path = "../primitives" }
Expand Down
134 changes: 130 additions & 4 deletions rpc/src/geode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,29 @@ use sp_runtime::{traits::Block as BlockT, RuntimeDebug};
use sp_std::{collections::btree_map::BTreeMap, prelude::*};
use std::sync::Arc;

// #[cfg(feature = "std")]
use codec::{Decode, Encode};
use sc_client_api::BlockchainEvents;
use sc_client_api::StorageChangeSet;
use sp_core::storage::StorageKey;
use sp_core::{blake2_128, twox_128};

use futures::{future, StreamExt, TryStreamExt};
use jsonrpc_core::futures::{future::Future, sink::Sink, stream, stream::Stream};
use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId};
use log::warn;

use serde::{Deserialize, Serialize};

const RUNTIME_ERROR: i64 = 1;

type GeodeId = [u8; 32];

#[rpc]
/// Geode RPC methods
pub trait GeodeServer<BlockHash> {
/// RPC Metadata
type Metadata;

/// return the registered geode list
#[rpc(name = "registered_geodes")]
fn registered_geodes(&self) -> Result<Vec<WrappedGeode<Hash>>>;
Expand All @@ -29,6 +44,19 @@ pub trait GeodeServer<BlockHash> {
/// Return the current state of a geode
#[rpc(name = "geode_state")]
fn geode_state(&self, geode: [u8; 32]) -> Result<Option<GeodeState>>;

/// Geode state subscription
#[pubsub(subscription = "geode_state", subscribe, name = "geode_subscribeState")]
fn subscribe_geode_state(&self, _: Self::Metadata, _: Subscriber<GeodeState>, id: GeodeId);

/// Unsubscribe from geode state subscription.
#[pubsub(
subscription = "geode_state",
unsubscribe,
name = "geode_unsubscribeState"
)]
fn unsubscribe_geode_state(&self, _: Option<Self::Metadata>, _: SubscriptionId)
-> Result<bool>;
}

/// The geode struct shows its status
Expand Down Expand Up @@ -71,21 +99,24 @@ impl From<Geode<AccountId, Hash>> for WrappedGeode<Hash> {
/// An implementation of geode specific RPC methods.
pub struct GeodeApi<C> {
client: Arc<C>,
manager: SubscriptionManager,
}

impl<C> GeodeApi<C> {
/// Create new `Geode` with the given reference to the client.
pub fn new(client: Arc<C>) -> Self {
GeodeApi { client }
pub fn new(client: Arc<C>, manager: SubscriptionManager) -> Self {
GeodeApi { client, manager }
}
}

impl<C> GeodeServer<<Block as BlockT>::Hash> for GeodeApi<C>
where
C: Send + Sync + 'static,
C: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
C: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockchainEvents<Block>,
C::Api: GeodeRuntimeApi<Block>,
{
type Metadata = sc_rpc::Metadata;

/// get registered geode list
fn registered_geodes(&self) -> Result<Vec<WrappedGeode<Hash>>> {
let api = self.client.runtime_api();
Expand Down Expand Up @@ -154,4 +185,99 @@ where

Ok(geode_state)
}

fn subscribe_geode_state(
&self,
_metadata: Self::Metadata,
subscriber: Subscriber<GeodeState>,
id: GeodeId,
) {
// get the current state of the geode
// if the geode does not exist, reject the subscription
let initial = match self.geode_state(id.clone()) {
Ok(state) => match state {
Some(initial) => Ok(initial),
None => {
let _ = subscriber.reject(Error::invalid_params("no such geode"));
return;
}
},
Err(e) => Err(e),
};
let key: StorageKey = StorageKey(build_storage_key(id.clone()));
let keys = Into::<Option<Vec<_>>>::into(vec![key]);
let stream = match self
.client
.storage_changes_notification_stream(keys.as_ref().map(|x| &**x), None)
{
Ok(stream) => stream,
Err(err) => {
let _ = subscriber.reject(client_err(err).into());
return;
}
};

let stream = stream
.filter_map(move |(_block, changes)| match get_geode_state(changes) {
Ok(state) => future::ready(Some(Ok::<_, ()>(Ok(state)))),
Err(_) => future::ready(None),
})
.compat();

self.manager.add(subscriber, |sink| {
sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(stream::iter_result(vec![Ok(initial)]).chain(stream))
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
}

fn unsubscribe_geode_state(
&self,
_metadata: Option<Self::Metadata>,
id: SubscriptionId,
) -> Result<bool> {
Ok(self.manager.cancel(id))
}
}

fn build_storage_key(id: GeodeId) -> Vec<u8> {
let geode_module = twox_128(b"GeodeModule");
let geodes = twox_128(b"Geodes");
let geode: AccountId = id.into();
let geode = blake2_128_concat(&geode.encode());

let mut param = vec![];
param.extend(geode_module);
param.extend(geodes);
param.extend(geode);
param
}

fn blake2_128_concat(d: &[u8]) -> Vec<u8> {
let mut v = blake2_128(d).to_vec();
v.extend_from_slice(d);
v
}

fn get_geode_state(changes: StorageChangeSet) -> Result<GeodeState> {
for (_, _, data) in changes.iter() {
match data {
Some(data) => {
let mut value: &[u8] = &data.0.clone();
match Geode::<AccountId, Hash>::decode(&mut value) {
Ok(geode) => {
return Ok(geode.state);
}
Err(_) => warn!("unable to decode Geode"),
}
}
None => warn!("empty change set"),
};
}
Err(Error::internal_error())
}

fn client_err(_: sp_blockchain::Error) -> Error {
Error::invalid_request()
}
2 changes: 2 additions & 0 deletions rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub struct FullDeps<C, P, B, SC> {
pub fn create_full<C, P, BE, B, SC>(
deps: FullDeps<C, P, B, SC>,
subscription_task_executor: SubscriptionTaskExecutor,
geode_subscription_executor: SubscriptionTaskExecutor,
) -> jsonrpc_core::IoHandler<sc_rpc::Metadata>
where
BE: Backend<Block> + 'static,
Expand Down Expand Up @@ -236,6 +237,7 @@ where

io.extend_with(GeodeServer::to_delegate(geode::GeodeApi::new(
client.clone(),
SubscriptionManager::new(Arc::new(geode_subscription_executor)),
)));

io.extend_with(TransferServer::to_delegate(transfer::TransferApi::new(
Expand Down