Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodic L1 synchronization #96

Merged
merged 41 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
27f2c82
Add `casper-event-toolkit` dependency.
koxu1996 May 8, 2024
b9f2713
Add `reqwest` dependency.
koxu1996 May 8, 2024
fbf99e5
Add `thiserror` dependency.
koxu1996 May 8, 2024
03544d2
Prototype of state synchronization.
koxu1996 May 8, 2024
a60cbb9
Fix deadlock.
koxu1996 May 10, 2024
6897e36
Use amount greater than 0 for mocked tx.
koxu1996 May 10, 2024
413c599
Merge remote-tracking branch 'origin/feature/configure-casper-rpc-in-…
koxu1996 May 13, 2024
8806f1b
Move L1 sync into separate function.
koxu1996 May 13, 2024
a4b0434
Skip synchronization if contract hash not configured.
koxu1996 May 13, 2024
88aa500
Use scoped ENV for contract hash.
koxu1996 May 14, 2024
70c7057
Merge branch 'main' into feature/l1-synchronization
koxu1996 May 14, 2024
f57b840
Fix default contract hash value.
koxu1996 May 14, 2024
308577b
Add default contract hash to nix setup.
koxu1996 May 14, 2024
c2ccfdf
Bump `casper-event-toolkit` to `v0.1.3`.
koxu1996 Jun 6, 2024
c09d3fd
Remove `into()` that is no longer required.
koxu1996 Jun 6, 2024
1fac715
Replace unused `Result` match with `map_err`.
koxu1996 Jun 6, 2024
be86df8
Replace another unused `Result` with `map_err`.
koxu1996 Jun 6, 2024
326b147
Replace yet another unused `Result` with `map_err`.
koxu1996 Jun 6, 2024
15a1197
Get rid of `Mutex` for `EventManager`.
koxu1996 Jun 7, 2024
1f1b1d3
Return error if L1 processing started without proper init.
koxu1996 Jun 7, 2024
1065394
Prepare sync service for storing event manager handle.
koxu1996 Jun 7, 2024
1b6cae2
Directly initialize event manager.
koxu1996 Jun 7, 2024
f437b2e
Return broken channel error when completion ACK cannot be sent.
koxu1996 Jun 7, 2024
7523128
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 7, 2024
9e1a58a
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 7, 2024
487d3c6
Post merge fixes.
koxu1996 Jun 7, 2024
ca726c4
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 7, 2024
e5f9f8b
Use empty contract hash in test utils.
koxu1996 Jun 7, 2024
50a3654
Initialize `EventManager` directly in constructor.
koxu1996 Jun 11, 2024
dd3bc22
Remove unusued `InitializationError`.
koxu1996 Jun 11, 2024
0d4fb13
Initialize `L1SyncService` directly in constructor.
koxu1996 Jun 11, 2024
c89f14d
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 11, 2024
736a540
Take RPC url from server config.
koxu1996 Jun 12, 2024
3b52fb1
Take contract hash from server config.
koxu1996 Jun 12, 2024
8c5cc97
Update default contract hash check.
koxu1996 Jun 12, 2024
f26f045
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 13, 2024
5b7934b
Add `contract-utils` to server workspace.
koxu1996 Jun 13, 2024
ff7b6c6
Build deposit transaction from parsed event.
koxu1996 Jun 13, 2024
7c35fd0
Fixes for Nix build.
koxu1996 Jun 13, 2024
9f90399
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 14, 2024
59b91af
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions kairos-server/.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
KAIROS_SERVER_SOCKET_ADDR="127.0.0.1:7893"
KAIROS_SERVER_CASPER_RPC="http://127.0.0.1:11101/rpc"
KAIROS_SERVER_CASPER_CONTRACT_HASH="0000000000000000000000000000000000000000000000000000000000000000"
2 changes: 2 additions & 0 deletions kairos-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ kairos-circuit-logic = { path = "../kairos-prover/kairos-circuit-logic", feature
kairos-trie = { git = "https://github.com/cspr-rad/kairos-trie" }
sha2 = "0.10"
reqwest = "0.12"
casper-event-toolkit = { git = "https://github.com/koxu1996/casper-event-toolkit.git", version = "0.1.3" }
thiserror = "1.0"

[dev-dependencies]
proptest = "1"
Expand Down
3 changes: 3 additions & 0 deletions kairos-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ use std::{fmt, str::FromStr};
pub struct ServerConfig {
pub socket_addr: SocketAddr,
pub casper_rpc: Url,
pub casper_contract_hash: String,
}

impl ServerConfig {
pub fn from_env() -> Result<Self, String> {
let socket_addr = parse_env_as::<SocketAddr>("KAIROS_SERVER_SOCKET_ADDR")?;
let casper_rpc = parse_env_as::<Url>("KAIROS_SERVER_CASPER_RPC")?;
let casper_contract_hash = parse_env_as::<String>("KAIROS_SERVER_CASPER_CONTRACT_HASH")?;
Ok(Self {
socket_addr,
casper_rpc,
casper_contract_hash,
})
}
}
Expand Down
19 changes: 19 additions & 0 deletions kairos-server/src/l1_sync/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use thiserror::Error;

#[derive(Error, Debug)]
pub enum L1SyncError {
/// Casper Event Toolkit error.
#[error("toolkit error: {error}")]
ToolkitError {
#[from]
error: casper_event_toolkit::error::ToolkitError,
},

/// Communication error.
#[error("channel error: {0}")]
BrokenChannel(String),

/// Error that we cannot recover from.
#[error("Unexpected error: {0}")]
UnexpectedError(String),
}
73 changes: 73 additions & 0 deletions kairos-server/src/l1_sync/event_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::sync::Arc;

use casper_event_toolkit::fetcher::{Fetcher, Schemas};
use casper_event_toolkit::metadata::CesMetadataRef;
use casper_event_toolkit::rpc::client::CasperClient;

use crate::state::ServerStateInner;
use kairos_circuit_logic::transactions::{KairosTransaction, L1Deposit};

use super::error::L1SyncError;

pub struct EventManager {
next_event_id: u32,
fetcher: Fetcher,
schemas: Schemas,
server_state: Arc<ServerStateInner>,
}

impl EventManager {
pub async fn new(
rpc_url: &str,
contract_hash: &str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get the rpc_url from the server state. i.e. server_state.server_config.casper_rpc same goes for the contract hash.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 736a540 and 3b52fb1.

server_state: Arc<ServerStateInner>,
) -> Result<Self, L1SyncError> {
tracing::info!("Initializing event manager");

let client = CasperClient::new(rpc_url);
let metadata = CesMetadataRef::fetch_metadata(&client, contract_hash).await?;
tracing::debug!("Metadata fetched successfully");

let fetcher = Fetcher {
client,
ces_metadata: metadata,
};
let schemas = fetcher.fetch_schema().await?;
tracing::debug!("Schemas fetched successfully");

Ok(EventManager {
next_event_id: 0,
fetcher,
schemas,
server_state,
})
}

/// Processes new events starting from the last known event ID.
pub async fn process_new_events(&mut self) -> Result<(), L1SyncError> {
tracing::info!("Looking for new events");

let num_events = self.fetcher.fetch_events_count().await?;
for i in self.next_event_id..num_events {
let event = self.fetcher.fetch_event(i, &self.schemas).await?;
tracing::debug!("Event {} fetched: {:?}.", i, event);

// TODO: Parse full transaction data from event, then push it to Data Availability layer.

// TODO: Once we have ASN transaction, it should be converted and pushed into batch.
let recipient: Vec<u8> = "cafebabe".into();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we mocking the deposit here?

Copy link
Contributor Author

@koxu1996 koxu1996 Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping that #88 will get merged, and then it could be used for parsing. Since kairos-tx in the contract is postponed for later discussion, I will implement basic parsing here. Blocked on #121.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#121 got merged, so I introduced Deposit event parsing in ff7b6c6.

However, recipient field is still mocked. It is not currently possible to obtain depositor public key from event, as it contains account hash (unnecessarily wrapped in Key)... This was already solved in kairos-tx for contract PR - validation if explicitly given public key matches with caller account hash - so maybe we should reconsider merging it before demo?

let txn = KairosTransaction::Deposit(L1Deposit {
amount: 100,
recipient,
});
self.server_state
.batch_state_manager
.enqueue_transaction(txn)
.await
.map_err(|e| L1SyncError::UnexpectedError(format!("unable to batch tx: {}", e)))?;
self.next_event_id = i + 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads odd to me, I would move this outside of the for loop, and set it to fetch_events_count. We are not benefiting from updating that value immediately.

Copy link
Contributor Author

@koxu1996 koxu1996 Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to keep EventManager state - next_event_id - in sync with enqueued transactions. In other words if some transaction fails to batch, then manager does not consider event as processed.

Update: Counter gets incremented with every successfully processed event.

}

Ok(())
}
}
18 changes: 18 additions & 0 deletions kairos-server/src/l1_sync/interval_trigger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use tokio::time::{self, Duration};

use std::sync::Arc;

use super::service::L1SyncService;

pub async fn run(sync_service: Arc<L1SyncService>) {
let mut interval = time::interval(Duration::from_secs(30));

loop {
interval.tick().await;

tracing::debug!("Triggering periodic L1 sync");
let _ = sync_service.trigger_sync().await.map_err(|e| {
tracing::error!("Unable to trigger sync: {}", e);
});
}
}
5 changes: 5 additions & 0 deletions kairos-server/src/l1_sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod error;
pub mod event_manager;
pub mod service;

pub mod interval_trigger;
82 changes: 82 additions & 0 deletions kairos-server/src/l1_sync/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use crate::state::ServerStateInner;

use super::error::L1SyncError;
use super::event_manager::EventManager;

use tokio::sync::mpsc;
use tokio::sync::oneshot;

use std::sync::Arc;

pub enum SyncCommand {
TriggerSync(oneshot::Sender<()>),
// NOTE: More commands can be here.
}

pub struct L1SyncService {
command_sender: mpsc::Sender<SyncCommand>,
//event_manager_handle: tokio::task::JoinHandle<()>,
}

impl L1SyncService {
pub async fn new(
rpc_url: String,
contract_hash: String,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes here as commented for the EventManager both these are in the server_state.server_config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 736a540 and 3b52fb1.

server_state: Arc<ServerStateInner>,
) -> Result<Self, L1SyncError> {
let event_manager =
EventManager::new(&rpc_url, &contract_hash, server_state.clone()).await?;

let (tx, rx) = mpsc::channel(32);
let _handle = tokio::spawn(async move {
run_event_manager(rx, event_manager).await;
});

Ok(L1SyncService {
command_sender: tx,
//event_manager_handle: _handle,
})
}

pub async fn trigger_sync(&self) -> Result<(), L1SyncError> {
let (tx, rx) = oneshot::channel();
self.command_sender
.send(SyncCommand::TriggerSync(tx))
.await
.map_err(|e| L1SyncError::BrokenChannel(format!("Unable to send trigger: {}", e)))?;
rx.await.map_err(|e| {
L1SyncError::BrokenChannel(format!("Unable to receive trigger ack: {}", e))
})?;

Ok(())
}
}

/// Handles incoming commands and delegates tasks to EventManager.
async fn run_event_manager(mut rx: mpsc::Receiver<SyncCommand>, mut event_manager: EventManager) {
tracing::debug!("Event manager running and waiting for commands");
while let Some(command) = rx.recv().await {
let _ = handle_command(command, &mut event_manager)
.await
.map_err(|e| match e {
L1SyncError::UnexpectedError(e) => panic!("Unrecoverable error: {}", e),
_ => tracing::error!("Transient error: {}", e),
});
}
}

async fn handle_command(
command: SyncCommand,
event_manager: &mut EventManager,
) -> Result<(), L1SyncError> {
match command {
SyncCommand::TriggerSync(completion_ack) => {
event_manager.process_new_events().await?;
completion_ack
.send(())
.map_err(|_| L1SyncError::BrokenChannel("Sender dropped".to_string()))?;
}
}

Ok(())
}
34 changes: 34 additions & 0 deletions kairos-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod errors;
pub mod routes;
pub mod state;

mod l1_sync;
mod utils;

use std::sync::Arc;
Expand All @@ -13,6 +14,7 @@ use axum_extra::routing::RouterExt;
pub use errors::AppErr;

use crate::config::ServerConfig;
use crate::l1_sync::service::L1SyncService;
use crate::state::{BatchStateManager, ServerState, ServerStateInner};

/// TODO: support secp256k1
Expand All @@ -38,6 +40,35 @@ pub fn app_router(state: ServerState) -> Router {
.with_state(state)
}

pub async fn run_l1_sync(config: ServerConfig, server_state: Arc<ServerStateInner>) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't take ownership here.

Copy link
Contributor Author

@koxu1996 koxu1996 Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config: ServerConfig removed in 8c5cc97.

// Make sure real contract hash was provided.
if config.casper_contract_hash
== "0000000000000000000000000000000000000000000000000000000000000000"
{
tracing::warn!(
"Casper contract hash not configured, L1 synchronization will NOT be enabled."
);
return;
}
marijanp marked this conversation as resolved.
Show resolved Hide resolved

// Initialize L1 synchronizer.
let l1_sync_service = L1SyncService::new(
config.casper_rpc.to_string(),
config.casper_contract_hash,
server_state,
)
.await
.unwrap_or_else(|e| {
panic!("Event manager failed to initialize: {}", e);
});

// Run periodic synchronization.
// TODO: Add additional SSE trigger.
tokio::spawn(async move {
l1_sync::interval_trigger::run(l1_sync_service.into()).await;
});
}

pub async fn run(config: ServerConfig) {
let listener = tokio::net::TcpListener::bind(config.socket_addr)
.await
Expand All @@ -48,6 +79,9 @@ pub async fn run(config: ServerConfig) {
batch_state_manager: BatchStateManager::new_empty(),
server_config: config.clone(),
});

run_l1_sync(config.clone(), state.clone()).await;

let app = app_router(state);

axum::serve(listener, app)
Expand Down
2 changes: 2 additions & 0 deletions kairos-server/tests/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ fn new_test_app_with_casper_node(casper_node_url: &Url) -> TestServer {
server_config: ServerConfig {
socket_addr: "0.0.0.0:0".parse().unwrap(),
casper_rpc: casper_node_url.clone(),
casper_contract_hash:
"0000000000000000000000000000000000000000000000000000000000000000".to_string(),
},
});

Expand Down
3 changes: 3 additions & 0 deletions kairos-test-utils/src/kairos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ impl Kairos {
let socket_addr = TcpListener::bind("0.0.0.0:0")?.local_addr()?;
let port = socket_addr.port().to_string();
let url = Url::parse(&format!("http://0.0.0.0:{}", port)).unwrap();
let casper_contract_hash =
String::from("0000000000000000000000000000000000000000000000000000000000000000");
let config = kairos_server::config::ServerConfig {
socket_addr,
casper_rpc,
casper_contract_hash,
};

let process_handle = tokio::spawn(async move {
Expand Down
1 change: 1 addition & 0 deletions nixos/modules/kairos.nix
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ in
RUST_LOG = cfg.logLevel;
KAIROS_SERVER_SOCKET_ADDR = "${cfg.bindAddress}:${builtins.toString cfg.port}";
KAIROS_SERVER_CASPER_RPC = "${cfg.casperRpcUrl}";
KAIROS_SERVER_CASPER_CONTRACT_HASH = "0000000000000000000000000000000000000000000000000000000000000000";
};
serviceConfig = mkMerge [
{
Expand Down
Loading