Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,190 changes: 1,119 additions & 71 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"src/backend/api",
"src/backend/impl",
"src/backend/external_canisters",
"src/backend/logs",
]

[profile.release]
Expand All @@ -24,6 +25,9 @@ candid = "0.10"
candid_parser = "0.1"
serde = "1.0"
uuid = "1.6"
serde_bytes = "0.11"
chrono = { version = "0.4", default-features = false, features = ["std"] }
base64 = "0.22"

mockall = "0.12"
rstest = "0.18"
Expand Down
11 changes: 11 additions & 0 deletions scripts/scrape-logs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

cargo run \
--package backend_logs \
--bin backend_logs \
-- \
--identity-pem data/codegov-website-logger-identity.pem \
--loki-endpoint https://logs-prod-eu-west-0.grafana.net \
--loki-username 152321 \
--loki-password $LOKI_PASSWORD \
--backend-canister-id nijcm-2qaaa-aaaal-qcx2a-cai
2 changes: 1 addition & 1 deletion src/backend/external_canisters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ ic-nns-governance.workspace = true

candid.workspace = true
serde.workspace = true
serde_bytes = "0.11"
serde_bytes.workspace = true
4 changes: 2 additions & 2 deletions src/backend/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ serde.workspace = true
serde_cbor = "0.11"

uuid = { workspace = true, features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["std"] }
chrono.workspace = true
hex = "0.4"
lazy_static = "1.4"
base64 = "0.22"
base64.workspace = true

rand = { version = "0.8", default-features = false }
rand_chacha = { version = "0.3", default-features = false }
Expand Down
32 changes: 32 additions & 0 deletions src/backend/logs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[package]
name = "backend_logs"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "backend_logs"
path = "src/main.rs"

[dependencies]
backend_api = { path = "../api" }

ic-cdk.workspace = true
ic-agent = "0.39"

candid.workspace = true
serde.workspace = true
serde_bytes.workspace = true

clap = { version = "4.5", features = ["derive"] }
opentelemetry = "0.26"
opentelemetry-otlp = { version = "0.26", features = [
"logs",
"http-json",
"reqwest-client",
], default-features = false }
opentelemetry_sdk = { version = "0.26", features = ["logs", "rt-tokio"] }
opentelemetry-semantic-conventions = "0.26"
tokio = { version = "1.40", features = ["full"] }
anyhow = "1.0"
base64.workspace = true
chrono.workspace = true
93 changes: 93 additions & 0 deletions src/backend/logs/src/fetcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::{
fs::{File, OpenOptions},
io::{Read, Seek, Write},
path::PathBuf,
};

use backend_api::{ApiResult, ListLogsResponse, LogEntry, LogsFilterRequest};
use candid::{Decode, Encode, Principal};
use ic_agent::{identity::Secp256k1Identity, Agent};

use crate::utils::now_timestamp_ms;

struct BackendActor {
agent: Agent,
canister_id: Principal,
}

impl BackendActor {
fn new(identity_pem: PathBuf, canister_id: Principal) -> anyhow::Result<Self> {
let identity = Secp256k1Identity::from_pem_file(identity_pem)?;
let agent = Agent::builder()
.with_identity(identity)
.with_url("https://icp-api.io")
.build()?;
Ok(Self { agent, canister_id })
}

async fn list_logs(
&self,
after_timestamp_ms: Option<u64>,
) -> Result<ListLogsResponse, anyhow::Error> {
let request = LogsFilterRequest {
after_timestamp_ms,
before_timestamp_ms: None,
context_contains_any: None,
level: None,
message_contains_any: None,
};
let response = self
.agent
.query(&self.canister_id, "list_logs")
.with_arg(Encode!(&request)?)
.await?;
let result = Decode!(&response, ApiResult<ListLogsResponse>)?;
match result {
ApiResult::Ok(ok) => Ok(ok),
ApiResult::Err(err) => Err(anyhow::anyhow!(err)),
}
}
}

pub struct LogFetcher {
last_fetch_timestamp: Option<u64>,
file: File,
actor: BackendActor,
}

impl LogFetcher {
pub fn new(identity_pem: PathBuf, backend_canister_id: String) -> anyhow::Result<Self> {
let path = "data/last-fetch-timestamp.txt";
let mut file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(path)?;
let mut last_fetch_timestamp = String::new();
file.read_to_string(&mut last_fetch_timestamp)?;

let actor = BackendActor::new(identity_pem, Principal::from_text(backend_canister_id)?)?;

Ok(Self {
file,
last_fetch_timestamp: last_fetch_timestamp.trim().parse().ok(),
actor,
})
}

pub async fn fetch_logs(&mut self) -> anyhow::Result<Vec<LogEntry>> {
let logs = self.actor.list_logs(self.last_fetch_timestamp).await?;
let now = now_timestamp_ms();
self.update_last_fetch_timestamp(now);
Ok(logs.logs)
}

fn update_last_fetch_timestamp(&mut self, timestamp: u64) {
self.last_fetch_timestamp = Some(timestamp);
self.file.set_len(0).unwrap();
self.file.rewind().unwrap();
self.file
.write_all(timestamp.to_string().as_bytes())
.unwrap();
}
}
140 changes: 140 additions & 0 deletions src/backend/logs/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::{collections::HashMap, path::PathBuf};

use backend_api::{LogEntry, LogLevel};
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use clap::Parser;
use opentelemetry::{
logs::{AnyValue, Logger as _, LoggerProvider as _, Severity},
KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
logs::{BatchConfigBuilder, BatchLogProcessor, LogRecord, Logger, LoggerProvider},
runtime::Tokio,
Resource,
};

mod fetcher;
mod utils;

use fetcher::LogFetcher;
use utils::now;

/// `2048` is the default batch size for the OTLP logs batch processor.
/// We use `2048 * 4` to accommodate cases where we fetch a lot of logs at once.
const LOGS_PROCESSOR_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048 * 4;

fn init_telemetry(args: &Args) -> anyhow::Result<LoggerProvider> {
let headers = {
let mut headers = HashMap::new();
let auth_header = format!(
"Basic {}",
BASE64.encode(format!("{}:{}", args.loki_username, args.loki_password))
);
headers.insert("Authorization".to_string(), auth_header.parse().unwrap());
headers
};
// from https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs-using-otlp
let loki_endpoint = format!("{}/otlp/v1/logs", args.loki_endpoint);
let exporter = opentelemetry_otlp::new_exporter()
.http()
.with_endpoint(loki_endpoint)
.with_headers(headers)
.build_log_exporter()?;

let processor = BatchLogProcessor::builder(exporter, Tokio)
.with_batch_config(
BatchConfigBuilder::default()
.with_max_queue_size(LOGS_PROCESSOR_MAX_QUEUE_SIZE_DEFAULT)
.build(),
)
.build();

let logger_provider = LoggerProvider::builder()
.with_log_processor(processor)
.with_resource(Resource::new(vec![
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
"backend_canister",
),
KeyValue::new("canister_id", args.backend_canister_id.clone()),
]))
.build();

Ok(logger_provider)
}

fn build_logger(provider: &LoggerProvider) -> Logger {
provider.logger_builder("backend_canister_logger").build()
}

struct LogEntryAdapter(LogEntry);

impl TryInto<LogRecord> for LogEntryAdapter {
type Error = anyhow::Error;

fn try_into(self) -> Result<LogRecord, Self::Error> {
let mut log_record = LogRecord::default();
log_record.timestamp =
Some(chrono::DateTime::parse_from_rfc3339(&self.0.date_time)?.into());
log_record.observed_timestamp = Some(now());
log_record.severity_number = Some(match self.0.level {
LogLevel::Info => Severity::Info,
LogLevel::Warn => Severity::Warn,
LogLevel::Error => Severity::Error,
});
let mut body = HashMap::new();
body.insert("message".into(), self.0.message.into());
if let Some(context) = self.0.context {
body.insert("context".into(), context.into());
}
log_record.body = Some(AnyValue::Map(Box::new(body)));

Ok(log_record)
}
}

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Path to the identity PEM file
#[arg(long, value_name = "FILE")]
identity_pem: PathBuf,

/// Loki endpoint URL
#[arg(long, value_name = "URL")]
loki_endpoint: String,

/// Loki username
#[arg(long)]
loki_username: String,

/// Loki password
#[arg(long)]
loki_password: String,

/// Backend canister ID
#[arg(long, value_name = "CANISTER_ID")]
backend_canister_id: String,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();

// we need to keep the logger provider in memory because the drop implementation shuts down the processors
let logger_provider = init_telemetry(&args)?;
let logger = build_logger(&logger_provider);

let mut log_fetcher = LogFetcher::new(args.identity_pem, args.backend_canister_id)?;

let logs = log_fetcher.fetch_logs().await?;
println!("Sending {} logs to Loki...", logs.len());
for log in logs {
let log_entry = LogEntryAdapter(log);
logger.emit(log_entry.try_into()?);
}
println!("Logs sent to Loki");

Ok(())
}
9 changes: 9 additions & 0 deletions src/backend/logs/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use std::time::{SystemTime, UNIX_EPOCH};

pub fn now() -> SystemTime {
SystemTime::now()
}

pub fn now_timestamp_ms() -> u64 {
now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64
}