Skip to content

Commit

Permalink
Refactoring 1
Browse files Browse the repository at this point in the history
Signed-off-by: Caleb Schoepp <caleb.schoepp@fermyon.com>
  • Loading branch information
calebschoepp committed May 2, 2024
1 parent 9146a38 commit db12ff3
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 73 deletions.
29 changes: 13 additions & 16 deletions crates/observe/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ pin_project! {
}

pub trait FutureExt: Future + Sized {
fn instrument_rename(
/// Manage WASI Observe guest spans.
fn manage_guest_spans(
self,
observe_context: ObserveContext,
) -> Result<impl Future<Output = Self::Output>>;
}

impl<F: Future> FutureExt for F {
fn instrument_rename(
fn manage_guest_spans(
self,
observe_context: ObserveContext,
) -> Result<impl Future<Output = Self::Output>> {
Expand All @@ -45,18 +46,22 @@ impl<F: Future> FutureExt for F {
impl<F: Future> Future for Instrumented<F> {
type Output = F::Output;

/// Maintains the invariant that all active spans are entered before polling the inner future
/// and exited otherwise. If we don't do this then the timing (among many other things) of the
/// spans becomes wildly incorrect.
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();

// Enter the active spans before entering the inner poll
{
let state = this.observe_context.state.write().unwrap();

// TODO: Make it a method on state
for span_id in state.active_spans.iter() {
println!("Attempting to enter {span_id:?}");
if let Some(span_resource) = state.span_resources.get(*span_id) {
for guest_span_id in state.active_spans.iter() {
if let Some(span_resource) = state.guest_spans.get(*guest_span_id) {
span_resource.enter();
} else {
tracing::error!("No span to enter")
Expand All @@ -66,14 +71,13 @@ impl<F: Future> Future for Instrumented<F> {

let ret = this.inner.poll(cx);

// Exit the active spans after exiting the inner poll
{
let state = this.observe_context.state.write().unwrap();

// TODO: Make it a method on state
for span_id in state.active_spans.iter().rev() {
println!("Attempting to exit {span_id:?}");

if let Some(span_resource) = state.span_resources.get(*span_id) {
if let Some(span_resource) = state.guest_spans.get(*span_id) {
span_resource.exit();
} else {
tracing::error!("span already dropped")
Expand Down Expand Up @@ -105,13 +109,6 @@ impl ObserveContext {
fn drop_all(&self) {
let mut state: std::sync::RwLockWriteGuard<State> = self.state.write().unwrap();

state.close_back_to(0);
state.close_from_back_to(0);
}
}

// TODO: Rename everything

// Problems we need to fix
// - Cancelling future
// - Guest mismanages inner spans
// - Guest mismanages outer span and holds it in global state
116 changes: 63 additions & 53 deletions crates/observe/src/host_component.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::future::Future;
use std::sync::{Arc, RwLock};

use anyhow::Result;
Expand All @@ -7,11 +6,11 @@ use spin_core::wasmtime::component::Resource;
use spin_core::{async_trait, HostComponent};
use spin_world::v2::observe;
use spin_world::v2::observe::Span as WitSpan;
use tracing::span::EnteredSpan;

pub struct ObserveHostComponent {}

impl ObserveHostComponent {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {}
}
Expand All @@ -30,118 +29,129 @@ impl HostComponent for ObserveHostComponent {
fn build_data(&self) -> Self::Data {
ObserveData {
state: Arc::new(RwLock::new(State {
span_resources: table::Table::new(1024),
guest_spans: table::Table::new(1024),
active_spans: Default::default(),
})),
}
}
}

impl DynamicHostComponent for ObserveHostComponent {
fn update_data(&self, _data: &mut Self::Data, component: &AppComponent) -> anyhow::Result<()> {
fn update_data(&self, _data: &mut Self::Data, _component: &AppComponent) -> anyhow::Result<()> {
Ok(())
}
}

/// TODO
pub struct ObserveData {
pub(crate) state: Arc<RwLock<State>>,
}

#[async_trait]
impl observe::Host for ObserveData {}

pub(crate) struct State {
pub span_resources: table::Table<GuestSpan>,
pub active_spans: Vec<u32>,
}

impl State {
/// TODO: Both exits the guest spans and removes them from active_spans in reverse order to index
pub fn close_back_to(&mut self, index: usize) {
self.active_spans
.split_off(index)
.iter()
.rev()
.for_each(|id| {
if let Some(guest_span) = self.span_resources.get(*id) {
guest_span.exit();
} else {
tracing::error!("adsf");
}
});
}
}

#[async_trait]
impl observe::HostSpan for ObserveData {
async fn enter(&mut self, name: String) -> Result<Resource<WitSpan>> {
println!("Entering {name:?}");
let span = tracing::info_span!("lame_name", "otel.name" = name);
// Create the underlying tracing span
let tracing_span = tracing::info_span!("WASI Observe guest", "otel.name" = name);

// Wrap it in a GuestSpan for our own bookkeeping purposes and enter it
let guest_span = GuestSpan {
name: name.clone(),
inner: span,
inner: tracing_span,
};
guest_span.enter();

// Put the GuestSpan in our resource table and push it to our stack of active spans
let mut state = self.state.write().unwrap();

let resource_id = state.span_resources.push(guest_span).unwrap();

let resource_id = state.guest_spans.push(guest_span).unwrap();
state.active_spans.push(resource_id);

Ok(Resource::new_own(resource_id))
}

async fn close(&mut self, resource: Resource<WitSpan>) -> Result<()> {
println!("Closing");
let mut state: std::sync::RwLockWriteGuard<State> = self.state.write().unwrap();

if let Some(index) = state
.active_spans
.iter()
.rposition(|id| *id == resource.rep())
{
state.close_back_to(index);
} else {
tracing::error!("did not find span to close")
}

self.safely_close(resource, false);
Ok(())
}

fn drop(&mut self, resource: Resource<WitSpan>) -> Result<()> {
self.safely_close(resource, true);
Ok(())
}
}

impl ObserveData {
/// Close the span associated with the given resource and optionally drop the resource
/// from the table. Additionally close any other active spans that are more recent on the stack
/// in reverse order.
///
/// Exiting any spans that were already closed will not cause this to error.
fn safely_close(&mut self, resource: Resource<WitSpan>, drop_resource: bool) {
let mut state: std::sync::RwLockWriteGuard<State> = self.state.write().unwrap();

if let Some(index) = state
.active_spans
.iter()
.rposition(|id| *id == resource.rep())
{
state.close_back_to(index);
state.close_from_back_to(index);
} else {
tracing::error!("did not find span to close")
tracing::debug!("found no active spans to close")
}

state.span_resources.remove(resource.rep()).unwrap();
Ok(())
if drop_resource {
state.guest_spans.remove(resource.rep()).unwrap();
}
}
}

/// Internal state of the observe host component.
pub(crate) struct State {
/// A resource table that holds the guest spans.
pub guest_spans: table::Table<GuestSpan>,
/// A LIFO stack of guest spans that are currently active.
///
/// Only a reference ID to the guest span is held here. The actual guest span must be looked up
/// in the `guest_spans` table using the reference ID.
pub active_spans: Vec<u32>,
}

impl State {
/// Close all active spans from the top of the stack to the given index. Closing entails exiting
/// the inner [tracing] span and removing it from the active spans stack.
pub fn close_from_back_to(&mut self, index: usize) {
self.active_spans
.split_off(index)
.iter()
.rev()
.for_each(|id| {
if let Some(guest_span) = self.guest_spans.get(*id) {
guest_span.exit();
} else {
tracing::debug!("active_span {id:?} already removed from resource table");
}
});
}
}

/// The WIT resource Span. Effectively wraps a [tracing] span.
pub struct GuestSpan {
pub name: String,
/// The [tracing] span we use to do the actual tracing work.
pub inner: tracing::Span,
pub name: String,
}

// Necessary because of phantom don't send

// Note: We use tracing enter instead of Entered because Entered is not Send
impl GuestSpan {
/// Enter the inner [tracing] span.
pub fn enter(&self) {
self.inner.with_subscriber(|(id, dispatch)| {
dispatch.enter(id);
});
}

/// Exits the inner [tracing] span.
pub fn exit(&self) {
self.inner.with_subscriber(|(id, dispatch)| {
dispatch.exit(id);
Expand Down
8 changes: 4 additions & 4 deletions crates/trigger-http/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl HttpHandlerExecutor {

let (resp,) = func
.call_async(&mut store, (req,))
.instrument_rename(observe_context)?
.manage_guest_spans(observe_context)?
.await?;

if resp.status < 100 || resp.status > 600 {
Expand Down Expand Up @@ -243,21 +243,21 @@ impl HttpHandlerExecutor {
proxy
.wasi_http_incoming_handler()
.call_handle(&mut store, request, response)
.instrument_rename(observe_context)?
.manage_guest_spans(observe_context)?
.instrument(span)
.await
}
Handler::Handler2023_10_18(proxy) => {
proxy
.call_handle(&mut store, request, response)
.instrument_rename(observe_context)?
.manage_guest_spans(observe_context)?
.instrument(span)
.await
}
Handler::Handler2023_11_10(proxy) => {
proxy
.call_handle(&mut store, request, response)
.instrument_rename(observe_context)?
.manage_guest_spans(observe_context)?
.instrument(span)
.await
}
Expand Down

0 comments on commit db12ff3

Please sign in to comment.