Skip to content

Commit

Permalink
feat: get tracing and jaeger going
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Aug 17, 2022
1 parent 719154d commit ed6e16b
Show file tree
Hide file tree
Showing 23 changed files with 269 additions and 81 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ exclude = [
]

[patch.crates-io]
drogue-bazaar = { path = "../drogue-bazaar" }
#drogue-bazaar = { git = "https://github.com/drogue-iot/drogue-bazaar", rev = "3a5e58d05b4be89342e1c7b4ca1409e1cc305310" }
#drogue-bazaar = { path = "../drogue-bazaar" }
drogue-bazaar = { git = "https://github.com/drogue-iot/drogue-bazaar", rev = "f77db10ce88b81c084d5109f48a17d3987b7b620" }
#drogue-client = { path = "../drogue-client" }
#drogue-client = { git = "https://github.com/drogue-iot/drogue-client", rev = "0cb6998da75905240f06f38a44aac31d7b3fdde5" } # FIXME: awaiting release 0.11.0

Expand Down
2 changes: 2 additions & 0 deletions core/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_trait::async_trait;
use drogue_bazaar::app::Startup;
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use tracing::instrument;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Command {
Expand All @@ -22,6 +23,7 @@ pub trait CommandSink: Sized + Send + Sync + 'static {

async fn send_command(&self, command: Command) -> Result<(), Self::Error>;

#[instrument(skip_all, err)]
async fn send_commands(&self, commands: Vec<Command>) -> Result<(), Self::Error> {
for command in commands {
self.send_command(command).await?;
Expand Down
6 changes: 6 additions & 0 deletions core/src/command/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{command::Command, mqtt::MqttClient};
use async_trait::async_trait;
use drogue_bazaar::app::{Startup, StartupExt};
use rumqttc::{AsyncClient, ClientError, Event, EventLoop, Incoming, Outgoing, QoS};
use tracing::instrument;

#[derive(Clone, Debug, serde::Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -63,6 +64,11 @@ impl super::CommandSink for CommandSink {
Ok(Self { client, mode })
}

#[instrument(skip_all, fields(
application=command.application,
device=command.device,
channel=command.channel,
), err)]
async fn send_command(&self, command: Command) -> Result<(), Self::Error> {
let topic = self
.mode
Expand Down
48 changes: 31 additions & 17 deletions core/src/injector/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use crate::{
mqtt::MqttClient,
processor::{sink::Sink, Event},
};
use anyhow::bail;
use chrono::Utc;
use lazy_static::lazy_static;
use prometheus::{register_histogram, register_int_counter_vec, Histogram, IntCounterVec};
use rumqttc::{AsyncClient, EventLoop, Incoming, QoS, SubscribeReasonCode};
use rumqttc::{AsyncClient, EventLoop, Incoming, Publish, QoS, SubscribeReasonCode};
use tracing::instrument;

lazy_static! {
static ref EVENTS: IntCounterVec = register_int_counter_vec!(
Expand Down Expand Up @@ -120,22 +122,9 @@ impl<S: Sink> Injector<S> {
}
}
Ok(rumqttc::Event::Incoming(Incoming::Publish(publish))) => {
match self.build_event(&publish.payload) {
Ok(Some(event)) => {
log::debug!("Injecting event: {event:?}");
if let Err(err) = self.sink.publish(event).await {
log::error!("Failed to inject event: {err}, Exiting loop");
}
EVENTS.with_label_values(&["ok"]).inc();
}
Ok(None) => {
// got skipped
EVENTS.with_label_values(&["skipped"]).inc();
}
Err(err) => {
EVENTS.with_label_values(&["failed"]).inc();
log::info!("Unable to parse event: {err}, skipping...")
}
if let Err(err) = self.handle_publish(&publish).await {
log::warn!("Failed to schedule message: {err}");
break;
}
if perform_ack {
if let Err(err) = self.client.try_ack(&publish) {
Expand All @@ -158,6 +147,7 @@ impl<S: Sink> Injector<S> {
Ok(())
}

#[instrument(skip_all, fields(payload_len=payload.len()), err)]
fn build_event(&self, payload: &[u8]) -> anyhow::Result<Option<Event>> {
let mut event: cloudevents::Event = serde_json::from_slice(payload)?;

Expand Down Expand Up @@ -192,4 +182,28 @@ impl<S: Sink> Injector<S> {
message,
}))
}

#[instrument(skip_all, fields(topic=publish.topic, pkid=publish.pkid))]
async fn handle_publish(&self, publish: &Publish) -> anyhow::Result<()> {
match self.build_event(&publish.payload) {
Ok(Some(event)) => {
log::debug!("Injecting event: {event:?}");
if let Err(err) = self.sink.publish(event).await {
log::error!("Failed to inject event: {err}, Exiting loop");
bail!("Failed to inject event: {err}")
}
EVENTS.with_label_values(&["ok"]).inc();
}
Ok(None) => {
// got skipped
EVENTS.with_label_values(&["skipped"]).inc();
}
Err(err) => {
EVENTS.with_label_values(&["invalid"]).inc();
log::info!("Unable to parse event: {err}, skipping...");
}
}

Ok(())
}
}
23 changes: 23 additions & 0 deletions core/src/kafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use opentelemetry::propagation::Injector;
use rdkafka::message::OwnedHeaders;

pub struct KafkaHeaders(Option<OwnedHeaders>);

impl Injector for KafkaHeaders {
fn set(&mut self, key: &str, value: String) {
let h = self.0.take().unwrap_or_default();
self.0 = Some(h.add(&format!("ce_{}", key), &value))
}
}

impl From<OwnedHeaders> for KafkaHeaders {
fn from(headers: OwnedHeaders) -> Self {
Self(Some(headers))
}
}

impl From<KafkaHeaders> for OwnedHeaders {
fn from(headers: KafkaHeaders) -> Self {
headers.0.unwrap_or_default()
}
}
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod command;
pub mod config;
pub mod error;
pub mod injector;
pub mod kafka;
pub mod listener;
pub mod machine;
pub mod model;
Expand Down
74 changes: 48 additions & 26 deletions core/src/machine/deno.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use deno_core::{include_js_files, serde_v8, v8, Extension, JsRuntime, RuntimeOpt
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use tokio::{runtime::Handle, task::JoinHandle, time::Instant};
use tracing::{instrument, Span};

#[derive(Clone, Debug)]
pub struct DenoOptions {
Expand Down Expand Up @@ -134,6 +135,7 @@ impl Execution {
}
}

#[instrument(skip_all)]
fn create_runtime(&self) -> JsRuntime {
// disable some operations
let disable = Extension::builder()
Expand All @@ -148,15 +150,21 @@ impl Execution {
.js(include_js_files!(prefix "drogue:extensions/api", "js/core.js",))
.build();

tracing::info!("Built extensions");

// FIXME: doesn't work as advertised, we keep it anyway
let create_params = v8::Isolate::create_params().heap_limits(0, 3 * 1024 * 1024);

tracing::info!("Created parameters");

let mut runtime = JsRuntime::new(RuntimeOptions {
create_params: Some(create_params),
extensions: vec![disable, api],
..Default::default()
});

tracing::info!("Created runtime");

let isolate = runtime.v8_isolate().thread_safe_handle();
runtime.add_near_heap_limit_callback(move |_, current| {
// FIXME: again, this currently doesn't work properly, we keep it anyway
Expand All @@ -167,7 +175,8 @@ impl Execution {
runtime
}

pub async fn run<I, O, R>(self, input: I) -> anyhow::Result<ExecutionResult<O, R>>
#[instrument(parent = parent, skip_all)]
fn run_inner<I, O, R>(self, parent: Span, input: I) -> anyhow::Result<ExecutionResult<O, R>>
where
I: Injectable + 'static,
O: Extractable + 'static,
Expand All @@ -176,44 +185,57 @@ impl Execution {
struct Deadline(JoinHandle<()>);
impl Drop for Deadline {
fn drop(&mut self) {
tracing::warn!("Aborting script");
self.0.abort();
}
}

Handle::current()
.spawn_blocking(move || {
let mut runtime = self.create_runtime();
let mut runtime = self.create_runtime();

let name = self.name;
let code = self.code;
let name = self.name;
let code = self.code;

let isolate = runtime.v8_isolate().thread_safe_handle();
let deadline = Deadline(Handle::current().spawn(async move {
tokio::time::sleep_until(self.opts.deadline).await;
isolate.terminate_execution();
}));
let isolate = runtime.v8_isolate().thread_safe_handle();
let deadline = Deadline(Handle::current().spawn(async move {
tokio::time::sleep_until(self.opts.deadline).await;
isolate.terminate_execution();
}));

// set_context::<_, O>(&mut runtime, input)?;
input.inject(&mut runtime, "context")?;
input.inject(&mut runtime, "context")?;

let global = runtime.execute_script(&name, &code)?;
tracing::info!("Execute script");
let global = runtime.execute_script(&name, &code)?;

let return_value = R::r#return(&mut runtime, global)?;
let return_value = R::r#return(&mut runtime, global)?;

Handle::current().block_on(async { runtime.run_event_loop(false).await })?;
// FIXME: eval late result
Handle::current().block_on(async { runtime.run_event_loop(false).await })?;

// stop the deadline watcher
drop(deadline);
tracing::info!("Awaited event loop");

//let output = extract_context(&mut runtime)?;
let output = O::extract(&mut runtime, "context")?;
// FIXME: eval late result

Ok::<_, anyhow::Error>(ExecutionResult {
output,
return_value,
})
})
// stop the deadline watcher
drop(deadline);

//let output = extract_context(&mut runtime)?;
let output = O::extract(&mut runtime, "context")?;

Ok::<_, anyhow::Error>(ExecutionResult {
output,
return_value,
})
}

#[instrument(skip_all, err)]
pub async fn run<I, O, R>(self, input: I) -> anyhow::Result<ExecutionResult<O, R>>
where
I: Injectable + 'static,
O: Extractable + 'static,
R: Returnable + 'static,
{
let span = Span::current();
Handle::current()
.spawn_blocking(move || self.run_inner(span, input))
.await?
}
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use lazy_static::lazy_static;
use prometheus::{register_histogram, Histogram};
use serde_json::Value;
use std::{convert::Infallible, fmt::Debug, future::Future, sync::Arc};
use tracing::instrument;

lazy_static! {
static ref TIMER_DELAY: Histogram =
Expand Down Expand Up @@ -62,6 +63,7 @@ impl Machine {
}

/// Run actions for creating a new thing.
#[instrument(skip_all, err)]
pub async fn create(new_thing: Thing) -> Result<Outcome, Error> {
// Creating means that we start with an empty thing, and then set the initial state.
// This allows to run through the reconciliation initially.
Expand All @@ -77,6 +79,7 @@ impl Machine {
}

/// Run an update.
#[instrument(skip_all, err)]
pub async fn update<F, Fut, E>(self, f: F) -> Result<Outcome, Error>
where
F: FnOnce(Thing) -> Fut,
Expand Down Expand Up @@ -149,6 +152,7 @@ impl Machine {
})
}

#[instrument(skip_all, err)]
pub async fn delete(thing: Thing) -> Result<DeletionOutcome, Error> {
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1);

Expand Down Expand Up @@ -210,6 +214,7 @@ impl Machine {
})
}

#[instrument(skip_all, err)]
fn validate(new_thing: &Thing) -> Result<(), Error> {
match &new_thing.schema {
Some(Schema::Json(schema)) => match schema {
Expand Down
Loading

0 comments on commit ed6e16b

Please sign in to comment.