Skip to content

Commit

Permalink
WIP: Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
Jannis committed Sep 20, 2023
1 parent 62be5f7 commit df1255e
Showing 1 changed file with 100 additions and 89 deletions.
189 changes: 100 additions & 89 deletions common/src/allocations/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,63 @@
use std::{collections::HashMap, time::Duration};

use alloy_primitives::Address;
use eventuals::{timer, Eventual, EventualExt, EventualWriter};
use anyhow::anyhow;
use eventuals::{timer, Eventual, EventualExt};
use log::warn;
use serde::Deserialize;
use serde_json::json;
use tokio::sync::Mutex;
use tokio::time::sleep;

use crate::prelude::NetworkSubgraph;

use super::Allocation;

async fn current_epoch(
network_subgraph: &'static NetworkSubgraph,
graph_network_id: u64,
) -> Result<u64, anyhow::Error> {
// Types for deserializing the network subgraph response
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GraphNetworkResponse {
graph_network: Option<GraphNetwork>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GraphNetwork {
current_epoch: u64,
}

// Query the current epoch
let query = r#"query epoch($id: ID!) { graphNetwork(id: $id) { currentEpoch } }"#;
let response = network_subgraph
.query::<GraphNetworkResponse>(&json!({
"query": query,
"variables": {
"id": graph_network_id
}
}))
.await?;

if let Some(errors) = response.errors {
warn!(
"Errors encountered identifying current epoch for network {}: {}",
graph_network_id,
errors
.into_iter()
.map(|e| e.message)
.collect::<Vec<_>>()
.join(", ")
);
}

response
.data
.and_then(|data| data.graph_network)
.ok_or_else(|| anyhow!("Network {} not found", graph_network_id))
.map(|network| network.current_epoch)
}

pub fn indexer_allocations(
network_subgraph: &'static NetworkSubgraph,
indexer_address: Address,
Expand Down Expand Up @@ -82,103 +129,67 @@ pub fn indexer_allocations(
}
"#;

let (writer, reader) = Eventual::new();
let writer: &'static Mutex<EventualWriter<_>> = Box::leak(Box::new(Mutex::new(writer)));
let indexer_for_error_handler = indexer_address.clone();

// Refresh indexer allocations every now and then
timer(interval).map_with_retry(
move |_| async move {
let current_epoch = current_epoch(&network_subgraph, graph_network_id)
.await
.map_err(|e| format!("Failed to fetch current epoch: {}", e))?;

current_epoch(network_subgraph, graph_network_id, interval)
.pipe_async(move |current_epoch| async move {
// Allocations can be closed one epoch into the past
let closed_at_epoch_threshold = current_epoch - 1;

let result = network_subgraph
// Query active and recently closed allocations for the indexer,
// using the network subgraph
let response = network_subgraph
.query::<IndexerAllocationsResponse>(&json!({
"query": query,
"variables": {
"indexer": indexer_address,
"closedAtEpochThreshold": closed_at_epoch_threshold,
}}))
.await;

match result {
Ok(response) => {
let allocations = response.data.and_then(|data| data.indexer).map(|indexer| {
let Indexer {
active_allocations,
recently_closed_allocations,
} = indexer;

let mut eligible_allocations =
HashMap::from_iter(active_allocations.into_iter().map(|a| (a.id, a)));

eligible_allocations
.extend(recently_closed_allocations.into_iter().map(|a| (a.id, a)));

eligible_allocations
});

match allocations {
Some(allocations) => writer.lock().await.write(allocations),
None => warn!(
"No active or recently closed allocations found for indexer {}",
indexer_address
),
}
}
Err(err) => warn!("Failed to fetch active allocations: {}", err),
}
})
.forever();

reader
}

fn current_epoch(
network_subgraph: &'static NetworkSubgraph,
graph_network_id: u64,
interval: Duration,
) -> Eventual<u64> {
// Types for deserializing the network subgraph response
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GraphNetworkResponse {
graph_network: Option<GraphNetwork>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GraphNetwork {
current_epoch: u64,
}

let (writer, reader) = Eventual::new();
let writer: &'static Mutex<EventualWriter<u64>> = Box::leak(Box::new(Mutex::new(writer)));

timer(interval)
.pipe_async(move |_| async move {
let query = r#"query epoch($id: ID!) { graphNetwork(id: $id) { currentEpoch } }"#;
let result = network_subgraph
.query::<GraphNetworkResponse>(&json!({
"query": query,
"variables": {
"id": graph_network_id
}
}))
.await;

match result {
Ok(response) => {
match response
.data
.and_then(|data| data.graph_network)
.map(|network| network.current_epoch)
{
Some(epoch) => writer.lock().await.write(epoch),
None => warn!("No epoch found for network with ID {}", graph_network_id),
}
}
Err(err) => warn!("Failed to fetch current epoch: {}", err),
.await
.map_err(|e| e.to_string())?;

// If there are any GraphQL errors returned, we'll log them for debugging
if let Some(errors) = response.errors {
warn!(
"Errors encountered fetching active or recently closed allocations for indexer {}: {}",
indexer_address,
errors.into_iter().map(|e| e.message).collect::<Vec<_>>().join(", ")
);
}
})
.forever();

reader
// Verify that the indexer could be found at all
let indexer = response
.data
.and_then(|data| data.indexer)
.ok_or_else(|| format!("Indexer {} could not be found on the network", indexer_address))?;

// Pull active and recently closed allocations out of the indexer
let Indexer {
active_allocations,
recently_closed_allocations
} = indexer;

Ok(HashMap::from_iter(
active_allocations.into_iter().map(|a| (a.id, a)).chain(
recently_closed_allocations.into_iter().map(|a| (a.id, a)))
))
},

// Need to use string errors here because eventuals `map_with_retry` retries
// errors that can be cloned
move |err: String| {
warn!(
"Failed to fetch active or recently closed allocations for indexer {}: {}",
indexer_for_error_handler, err
);

// Sleep for a bit before we retry
sleep(interval.div_f32(2.0))
},
)
}

0 comments on commit df1255e

Please sign in to comment.