Skip to content

Commit

Permalink
grpc-plugin: Add notification-service
Browse files Browse the repository at this point in the history
  • Loading branch information
ErikDeSmedt committed Feb 14, 2024
1 parent 7094278 commit d495552
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 28 deletions.
47 changes: 24 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions plugins/grpc-plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ rcgen = { version = "0.10", features = ["pem", "x509-parser"] }
cln-grpc = { version = "0.1", features = ["server"], path = "../../cln-grpc"}
cln-plugin = { version = "0.1", path = "../../plugins" }
cln-rpc = { version = "0.1", path = "../../cln-rpc" }
serde_json = "1.0.113"

[dependencies.tokio]
features = ["net", "rt-multi-thread"]
Expand Down
55 changes: 50 additions & 5 deletions plugins/grpc-plugin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use anyhow::{Context, Result};
use cln_grpc::pb::node_server::NodeServer;
use cln_plugin::{options, Builder};
use cln_grpc::pb::notification_server::NotificationServer;
use cln_plugin::{options, Builder, Plugin};
use cln_rpc::notifications::Notification;
use log::{debug, warn};
use std::net::SocketAddr;
use std::path::PathBuf;
use tokio::sync::broadcast;

mod tls;

Expand All @@ -12,12 +15,19 @@ struct PluginState {
rpc_path: PathBuf,
identity: tls::Identity,
ca_cert: Vec<u8>,
events : broadcast::Sender<cln_rpc::notifications::Notification>,
}

const OPTION_GRPC_PORT : options::IntegerConfigOption = options::ConfigOption::new_i64_no_default(
"grpc-port",
"Which port should the grpc plugin listen for incoming connections?");

const OPTION_GRPC_MSG_BUFFER_SIZE : options::DefaultIntegerConfigOption = options::ConfigOption::new_i64_with_default(
"grpc-msg-buffer-size",
1024,
"Number of notifications which can be stored in the grpc message buffer. Notifications can be skipped if this buffer is full");


#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
debug!("Starting grpc plugin");
Expand All @@ -26,6 +36,8 @@ async fn main() -> Result<()> {

let plugin = match Builder::new(tokio::io::stdin(), tokio::io::stdout())
.option(OPTION_GRPC_PORT)
.option(OPTION_GRPC_MSG_BUFFER_SIZE)
.subscribe("*", handle_notification)
.configure()
.await?
{
Expand All @@ -44,12 +56,26 @@ async fn main() -> Result<()> {
}
};

let buffer_size : i64 = plugin.option(&OPTION_GRPC_MSG_BUFFER_SIZE).unwrap();
let buffer_size = match usize::try_from(buffer_size) {
Ok(b) => b,
Err(_) => {
plugin
.disable("'grpc-msg-buffer-size' should be strictly positive")
.await?;
return Ok(())
}
};

let (sender, _) = broadcast::channel(buffer_size);

let (identity, ca_cert) = tls::init(&directory)?;

let state = PluginState {
rpc_path: PathBuf::from(plugin.configuration().rpc_file.as_str()),
identity,
ca_cert,
events : sender
};

let plugin = plugin.start(state.clone()).await?;
Expand Down Expand Up @@ -81,11 +107,16 @@ async fn run_interface(bind_addr: SocketAddr, state: PluginState) -> Result<()>
let server = tonic::transport::Server::builder()
.tls_config(tls)
.context("configuring tls")?
.add_service(NodeServer::new(
cln_grpc::Server::new(&state.rpc_path)
.await
.context("creating NodeServer instance")?,
.add_service(
NodeServer::new(
cln_grpc::Server::new(&state.rpc_path)
.await
.context("creating NodeServer instance")?,
))
.add_service(
NotificationServer::new(
cln_grpc::GrpcNotificationServer::new(
state.events.clone())))
.serve(bind_addr);

debug!(
Expand All @@ -97,3 +128,17 @@ async fn run_interface(bind_addr: SocketAddr, state: PluginState) -> Result<()>

Ok(())
}

async fn handle_notification(plugin : Plugin<PluginState>, value : serde_json::Value) -> Result<()> {
let notification : Result<Notification, _> = serde_json::from_value(value);
match notification {
Err(err) => {
log::debug!("Failed to parse notification from lightningd {:?}", err);
return Err(err.into());
},
Ok(notification) => {
plugin.state().events.send(notification)?;
return Ok(())
}
}
}

0 comments on commit d495552

Please sign in to comment.