Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions discovery/src/api/routes/get_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,15 @@ mod tests {
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::SystemTime;
use tokio::sync::Mutex;

#[actix_web::test]
async fn test_get_nodes() {
let app_state = AppState {
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
contracts: None,
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
};
let app = test::init_service(
App::new()
Expand Down Expand Up @@ -160,6 +163,7 @@ mod tests {
let app_state = AppState {
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
contracts: None,
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
};
let app = test::init_service(
App::new()
Expand Down
6 changes: 6 additions & 0 deletions discovery/src/api/routes/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ mod tests {
use shared::security::request_signer::sign_request;
use shared::web3::wallet::Wallet;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::Mutex;
use url::Url;

#[actix_web::test]
Expand All @@ -137,6 +139,7 @@ mod tests {
let app_state = AppState {
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
contracts: None,
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
};

let app = test::init_service(
Expand Down Expand Up @@ -190,6 +193,7 @@ mod tests {
let app_state = AppState {
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
contracts: None,
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
};

let validate_signatures =
Expand Down Expand Up @@ -313,6 +317,7 @@ mod tests {
let app_state = AppState {
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
contracts: None,
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
};

let validate_signatures =
Expand Down Expand Up @@ -376,6 +381,7 @@ mod tests {
let app_state = AppState {
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
contracts: None,
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
};

let validate_signatures =
Expand Down
46 changes: 44 additions & 2 deletions discovery/src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,60 @@ use actix_web::{
web::{self, get},
App, HttpServer,
};
use log::{error, info};
use log::{error, info, warn};
use serde_json::json;
use shared::security::api_key_middleware::ApiKeyMiddleware;
use shared::security::auth_signature_middleware::{ValidateSignature, ValidatorState};
use shared::web3::contracts::core::builder::Contracts;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::Mutex;

#[derive(Clone)]
pub struct AppState {
pub node_store: Arc<NodeStore>,
pub contracts: Option<Arc<Contracts>>,
pub last_chain_sync: Arc<Mutex<Option<SystemTime>>>,
}

async fn health_check() -> HttpResponse {
async fn health_check(app_state: web::Data<AppState>) -> HttpResponse {
// Check if chain sync has happened in the last minute
let sync_status = {
let last_sync_guard = app_state.last_chain_sync.lock().await;
match *last_sync_guard {
Some(last_sync) => {
if let Ok(elapsed) = last_sync.elapsed() {
if elapsed > Duration::from_secs(60) {
warn!(
"Health check: Chain sync is delayed. Last sync was {} seconds ago",
elapsed.as_secs()
);
Some(elapsed)
} else {
None
}
} else {
warn!("Health check: Unable to determine elapsed time since last sync");
Some(Duration::from_secs(u64::MAX))
}
}
None => {
warn!("Health check: Chain sync has not occurred yet");
Some(Duration::from_secs(u64::MAX))
}
}
};

if let Some(elapsed) = sync_status {
// Return error response if sync is delayed
return HttpResponse::ServiceUnavailable().json(json!({
"status": "error",
"service": "discovery",
"message": format!("Chain sync is delayed. Last sync was {} seconds ago", elapsed.as_secs())
}));
}

// Return OK response if sync is recent
HttpResponse::Ok().json(json!({
"status": "ok",
"service": "discovery"
Expand All @@ -35,6 +75,7 @@ pub async fn start_server(
node_store: Arc<NodeStore>,
contracts: Arc<Contracts>,
platform_api_key: String,
last_chain_sync: Arc<Mutex<Option<SystemTime>>>,
) -> std::io::Result<()> {
info!("Starting server at http://{}:{}", host, port);

Expand All @@ -49,6 +90,7 @@ pub async fn start_server(
let app_state = AppState {
node_store,
contracts: Some(contracts),
last_chain_sync,
};

// it seems we have a validator for the validator
Expand Down
10 changes: 9 additions & 1 deletion discovery/src/chainsync/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use shared::models::node::DiscoveryNode;
use shared::web3::contracts::core::builder::Contracts;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime};
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
pub struct ChainSync {
pub node_store: Arc<NodeStore>,
cancel_token: CancellationToken,
chain_sync_interval: Duration,
contracts: Arc<Contracts>,
last_chain_sync: Arc<Mutex<Option<std::time::SystemTime>>>,
}

impl ChainSync {
Expand All @@ -21,12 +23,14 @@ impl ChainSync {
cancellation_token: CancellationToken,
chain_sync_interval: Duration,
contracts: Arc<Contracts>,
last_chain_sync: Arc<Mutex<Option<std::time::SystemTime>>>,
) -> Self {
Self {
node_store,
cancel_token: cancellation_token,
chain_sync_interval,
contracts,
last_chain_sync,
}
}

Expand Down Expand Up @@ -96,6 +100,7 @@ impl ChainSync {
let contracts_clone = self.contracts.clone();
let cancel_token = self.cancel_token.clone();
let chain_sync_interval = self.chain_sync_interval;
let last_chain_sync = self.last_chain_sync.clone();

tokio::spawn(async move {
let mut interval = tokio::time::interval(chain_sync_interval);
Expand All @@ -110,6 +115,9 @@ impl ChainSync {
error!("Error syncing node: {}", e);
}
}
// Update the last chain sync time
let mut last_sync = last_chain_sync.lock().await;
*last_sync = Some(SystemTime::now());
}
Err(e) => {
error!("Error getting nodes: {}", e);
Expand Down
6 changes: 6 additions & 0 deletions discovery/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use shared::web3::contracts::core::builder::ContractBuilder;
use shared::web3::wallet::Wallet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
#[derive(Parser)]
struct Args {
Expand Down Expand Up @@ -66,11 +67,15 @@ async fn main() -> Result<()> {
let cancellation_token = CancellationToken::new();
let node_store_clone = node_store.clone();
let contracts_clone = contracts.clone();
let last_chain_sync = Arc::new(Mutex::new(None::<std::time::SystemTime>));
let heartbeat_server_clone = last_chain_sync.clone();

let chain_sync = ChainSync::new(
node_store_clone,
cancellation_token.clone(),
Duration::from_secs(10),
contracts,
last_chain_sync,
);
chain_sync.run().await?;

Expand All @@ -80,6 +85,7 @@ async fn main() -> Result<()> {
node_store,
contracts_clone,
args.platform_api_key,
heartbeat_server_clone,
)
.await
{
Expand Down