Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 101 additions & 16 deletions src/indexer/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,23 @@ async fn setup_database_and_rpc(

fn spawn_router_service(should_terminate: Arc<AtomicBool>) -> tokio::task::JoinHandle<Result<()>> {
tokio::spawn(async move {
if let Err(e) = router::initialize_router(should_terminate.clone()).await {
error!("[router] unexpected error {}", e);
info!("[router] Starting router service");
match router::initialize_router(should_terminate.clone()).await {
Ok(()) => {
info!("[router] Router service completed normally");
Ok(())
}
Err(e) => {
error!(
"[router] CRITICAL: Router service failed with error: {:?}",
e
);
error!("[router] This router failure may have caused the indexer to become unreachable");
Err(BlockchainError::internal(format!(
"Router service failed: {e}"
)))
}
}
info!("[router] shutting down");
Ok(())
})
}

Expand All @@ -292,11 +304,23 @@ fn spawn_quick_indexer_service(
let quick_indexer = QuickIndexer::new(quick_config, db, rpc_client, should_terminate);

Ok(tokio::spawn(async move {
info!("Starting quick indexer");
if let Err(e) = quick_indexer.index().await {
error!("[quick_index] unexpected error {}", e);
info!("[quick_index] Starting quick indexer service");
match quick_indexer.index().await {
Ok(()) => {
info!("[quick_index] Quick indexer completed normally");
Ok(())
}
Err(e) => {
error!(
"[quick_index] CRITICAL: Quick indexer failed with error: {:?}",
e
);
error!("[quick_index] Quick indexer handles real-time block indexing - this failure stops new block processing");
Err(BlockchainError::internal(format!(
"Quick indexer failed: {e}"
)))
}
}
Ok(())
}))
}

Expand All @@ -319,11 +343,23 @@ fn spawn_batch_indexer_service(
let batch_indexer = BatchIndexer::new(batch_config, db, rpc_client, should_terminate);

Ok(tokio::spawn(async move {
info!("Starting batch indexer");
if let Err(e) = batch_indexer.index().await {
error!("[batch_index] unexpected error {}", e);
info!("[batch_index] Starting batch indexer service");
match batch_indexer.index().await {
Ok(()) => {
info!("[batch_index] Batch indexer completed normally");
Ok(())
}
Err(e) => {
error!(
"[batch_index] CRITICAL: Batch indexer failed with error: {:?}",
e
);
error!("[batch_index] Batch indexer handles historical block indexing - this failure stops backfilling");
Err(BlockchainError::internal(format!(
"Batch indexer failed: {e}"
)))
}
}
Ok(())
}))
}

Expand Down Expand Up @@ -389,20 +425,69 @@ async fn initialize_index_metadata(
Err(BlockchainError::internal("Failed to get indexer metadata"))
}

#[allow(clippy::cognitive_complexity)]
async fn wait_for_thread_completion(handles: Vec<JoinHandle<Result<()>>>) -> Result<()> {
for handle in handles {
let mut has_errors = false;

for (index, handle) in handles.into_iter().enumerate() {
let service_name = match index {
0 => "router",
1 => "quick_indexer",
2 => "batch_indexer",
_ => "unknown_service",
};

match handle.await {
Ok(Ok(())) => {
info!("Thread completed successfully");
info!("[{}] Thread completed successfully", service_name);
}
Ok(Err(e)) => {
error!("Thread completed with an error: {:?}", e);
error!(
"[{}] CRITICAL: Thread completed with an error: {:?}",
service_name, e
);
error!(
"[{}] Error details: {}",
service_name,
format_error_details(&e)
);
has_errors = true;
}
Err(e) => {
error!("Thread panicked: {:?}", e);
error!("[{}] CRITICAL: Thread panicked: {:?}", service_name, e);
if e.is_panic() {
error!("[{}] This was a panic - check for unwrap(), expect(), or other panic sources", service_name);
}
if e.is_cancelled() {
error!("[{}] Task was cancelled", service_name);
}
has_errors = true;
}
}
}

if has_errors {
error!(
"INDEXER SHUTDOWN: One or more services failed - this explains why the indexer stopped"
);
return Err(BlockchainError::internal(
"One or more indexing services failed",
));
}

info!("All indexing services completed successfully");
Ok(())
}

fn format_error_details(error: &BlockchainError) -> String {
let mut details = Vec::new();
details.push(format!("Error: {error}"));

let mut current_error: &dyn std::error::Error = error;
while let Some(source) = current_error.source() {
details.push(format!("Caused by: {source}"));
current_error = source;
}

details.join("\n ")
}
77 changes: 72 additions & 5 deletions src/indexer/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use fossil_headers_db::errors::{BlockchainError, Result};
use fossil_headers_db::indexer::lib::{start_indexing_services, IndexingConfig};
use std::{
env,
env, panic,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tracing::info;
use tracing::{error, info};
use tracing_subscriber::fmt;

#[tokio::main]
Expand Down Expand Up @@ -48,6 +48,9 @@ pub async fn main() -> Result<()> {
.compact()
.init();

// Setup panic hook to log panics before the application crashes
setup_panic_hook();

let should_terminate = Arc::new(AtomicBool::new(false));

setup_ctrlc_handler(Arc::clone(&should_terminate))?;
Expand All @@ -63,9 +66,19 @@ pub async fn main() -> Result<()> {

let indexing_config = indexing_config_builder.build()?;

start_indexing_services(indexing_config, should_terminate).await?;

Ok(())
// Main indexing operation with comprehensive error logging
match start_indexing_services(indexing_config, should_terminate).await {
Ok(()) => {
info!("Indexing services completed successfully");
Ok(())
}
Err(e) => {
error!("CRITICAL: Indexing services failed with error: {:?}", e);
error!("Error chain: {}", format_error_chain(&e));
error!("This is a fatal error that caused the indexer to stop");
Err(e)
}
}
}

fn setup_ctrlc_handler(should_terminate: Arc<AtomicBool>) -> Result<()> {
Expand All @@ -76,3 +89,57 @@ fn setup_ctrlc_handler(should_terminate: Arc<AtomicBool>) -> Result<()> {
})
.map_err(|e| BlockchainError::internal(format!("Failed to set Ctrl+C handler: {e}")))
}

#[allow(clippy::cognitive_complexity)]
fn setup_panic_hook() {
panic::set_hook(Box::new(|panic_info| {
let location = panic_info.location().map_or_else(
|| "unknown location".to_string(),
|loc| format!("{}:{}:{}", loc.file(), loc.line(), loc.column()),
);

let message = panic_info.payload().downcast_ref::<&str>().map_or_else(
|| {
panic_info.payload().downcast_ref::<String>().map_or_else(
|| "unknown panic message".to_string(),
std::clone::Clone::clone,
)
},
|s| (*s).to_string(),
);

error!("PANIC OCCURRED - INDEXER CRASHING!");
error!("Panic location: {}", location);
error!("Panic message: {}", message);
error!("This indicates a critical bug in the indexer");

// Try to log the backtrace if available
if let Ok(backtrace) = std::env::var("RUST_BACKTRACE") {
if !backtrace.is_empty() && backtrace != "0" {
error!(
"Backtrace logging is enabled (RUST_BACKTRACE={})",
backtrace
);
}
} else {
error!("Enable RUST_BACKTRACE=1 for stack traces");
}

// Flush logs before panic continues
std::io::Write::flush(&mut std::io::stderr()).ok();
}));
}

fn format_error_chain(error: &BlockchainError) -> String {
let mut chain = Vec::new();
let mut current_error: &dyn std::error::Error = error;

chain.push(current_error.to_string());

while let Some(source) = current_error.source() {
chain.push(source.to_string());
current_error = source;
}

chain.join(" -> ")
}
Loading