Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/block priofees #274

Merged
merged 64 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
dea0e06
boilerplate
grooviegermanikus Jan 15, 2024
ffb2e7e
add dummy endpoint
grooviegermanikus Jan 15, 2024
eda4d00
simple logic with highes block
grooviegermanikus Jan 16, 2024
c0f49c3
endpoint works
grooviegermanikus Jan 17, 2024
c54d662
remove cu calculus
grooviegermanikus Jan 17, 2024
7997e14
log get_latest_block startup timing
grooviegermanikus Jan 17, 2024
855844f
websocket subscription basics
grooviegermanikus Jan 17, 2024
9961233
WIP
grooviegermanikus Jan 17, 2024
5c73513
propagate fees via channel
grooviegermanikus Jan 17, 2024
a15834d
Merge remote-tracking branch 'origin/main' into feature/block-priofees
grooviegermanikus Jan 17, 2024
359f68c
cleanup
grooviegermanikus Jan 17, 2024
c80ed8c
use broadcast sender
grooviegermanikus Jan 17, 2024
dab1296
proper handling of broadcast channel issues
grooviegermanikus Jan 17, 2024
def5d25
rename stuff
grooviegermanikus Jan 17, 2024
63a0798
move to dedicated crate
grooviegermanikus Jan 17, 2024
02545ed
cleanup deps
grooviegermanikus Jan 17, 2024
5eca3b1
move data definition
grooviegermanikus Jan 17, 2024
eea7363
hide private types
grooviegermanikus Jan 17, 2024
8dc77d4
percentile math
grooviegermanikus Jan 17, 2024
9153a22
code format
grooviegermanikus Jan 17, 2024
25f152c
integrated stats by cu
grooviegermanikus Jan 17, 2024
40b5e9e
add cu stats
grooviegermanikus Jan 17, 2024
00d9cb3
restart inline asserts
grooviegermanikus Jan 17, 2024
c7fd2bb
warn about DashMap access
grooviegermanikus Jan 17, 2024
33e3fa2
switch to BTreeMap
grooviegermanikus Jan 17, 2024
2a4e861
remove vote transactions from calculus
grooviegermanikus Jan 17, 2024
a8ef786
return vec
grooviegermanikus Jan 17, 2024
a5a21a7
array
grooviegermanikus Jan 17, 2024
963876d
flat format
grooviegermanikus Jan 17, 2024
ad6a6b3
reformat
grooviegermanikus Jan 17, 2024
0e62404
split arrays
grooviegermanikus Jan 17, 2024
77b0c64
rename keys
grooviegermanikus Jan 17, 2024
c499b42
enable experimental tag
grooviegermanikus Jan 17, 2024
0fdeee6
code format
grooviegermanikus Jan 17, 2024
7c5d57a
more logging
grooviegermanikus Jan 17, 2024
a088fb9
reset log level
grooviegermanikus Jan 17, 2024
b8408a8
rename websocket method to blockPrioritizationFeesSubscribe
grooviegermanikus Jan 17, 2024
ff17d94
HACK: use processed blocks
grooviegermanikus Jan 17, 2024
1a453f5
Merge remote-tracking branch 'origin/main' into feature/block-priofees
grooviegermanikus Jan 18, 2024
ad7f233
add is_vote_transaction
grooviegermanikus Jan 18, 2024
f2f96d7
do not fail if fees goes down
grooviegermanikus Jan 19, 2024
c285988
udp message size example
grooviegermanikus Jan 19, 2024
689f6a2
imrpove 100 handling
grooviegermanikus Jan 22, 2024
67d34a4
simplify inital sort
grooviegermanikus Jan 22, 2024
244aa79
Revert "HACK: use processed blocks"
grooviegermanikus Jan 22, 2024
dd59963
Merge branch 'main' into feature/block-priofees
grooviegermanikus Jan 22, 2024
5217bef
add percentile test case
grooviegermanikus Jan 22, 2024
36b199e
Merge remote-tracking branch 'origin/main' into feature/block-priofees
grooviegermanikus Jan 22, 2024
cdd8317
add per block cu_consumed+tx_count(nonvote)
grooviegermanikus Jan 23, 2024
5a603e3
Merge remote-tracking branch 'origin/main' into feature/block-priofees
grooviegermanikus Jan 23, 2024
446cb88
fix supid cleanup bug
grooviegermanikus Jan 23, 2024
d330db2
add TxAggregateStats
grooviegermanikus Jan 23, 2024
5de6a58
add by_cu test
grooviegermanikus Jan 24, 2024
52e60b6
add alternative cu percentlie impl
grooviegermanikus Jan 24, 2024
45c18fc
cleanup cu tests
grooviegermanikus Jan 24, 2024
31eaf66
add check for step
grooviegermanikus Jan 24, 2024
a90131b
saturating_sub
grooviegermanikus Jan 24, 2024
21ae8a5
remove useless file
grooviegermanikus Jan 24, 2024
252580f
add is_vote to history crate
grooviegermanikus Jan 24, 2024
66229de
fix fmt+clippy (nightly)
grooviegermanikus Jan 24, 2024
ff91c8d
clippy
grooviegermanikus Jan 24, 2024
df41381
clippy
grooviegermanikus Jan 24, 2024
fe2fa4e
clippy
grooviegermanikus Jan 24, 2024
e7da18a
clippy
grooviegermanikus Jan 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/fly-deploy-staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name: Deploy to Fly Staging
on:
push:
branches: [main]
tags: ['experimental/*']

env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
Expand Down
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"quic-forward-proxy-integration-test",
"cluster-endpoints",
"history",
"block_priofees",
"bench"
]

Expand Down Expand Up @@ -69,6 +70,7 @@ solana-lite-rpc-core = {path = "core", version="0.2.3"}
solana-lite-rpc-cluster-endpoints = {path = "cluster-endpoints", version="0.2.3"}
solana-lite-rpc-history = {path = "history", version="0.2.3"}
solana-lite-rpc-stakevote = {path = "stake_vote", version="0.2.3"}
solana-lite-rpc-block-priofees = {path = "block_priofees", version="0.2.3"}

async-trait = "0.1.68"
yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
Expand Down
20 changes: 20 additions & 0 deletions block_priofees/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "solana-lite-rpc-block-priofees"
version = "0.2.3"
edition = "2021"
description = "Expose priority fees stats per block via RPC and WebSocket"

[dependencies]
solana-lite-rpc-core = {workspace = true}

solana-sdk = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
log = { workspace = true }
itertools = { workspace = true }
dashmap = { workspace = true }
jsonrpsee = { workspace = true }
tracing-subscriber = { workspace = true }
prometheus = { workspace = true }
lazy_static = { workspace = true }
tokio = { version = "1.28.2", features = ["full"]}
157 changes: 157 additions & 0 deletions block_priofees/src/block_priofees.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use crate::rpc_data::{PrioFeesStats, PrioFeesUpdateMessage, TxAggregateStats};
use crate::stats_calculation::calculate_supp_percentiles;
use log::{error, info, trace, warn};
use solana_lite_rpc_core::types::BlockStream;
use solana_sdk::clock::Slot;
use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::sync::broadcast::error::RecvError::{Closed, Lagged};
use tokio::sync::broadcast::Sender;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;

// note: ATM only the latest slot (highest key) is used
const SLOTS_TO_RETAIN: u64 = 100;

/// put everything required to serve sync data calls here
#[derive(Clone)]
pub struct PrioFeeStore {
// store priofees stats for recently processed blocks up to CLEANUP_SLOTS_AFTER
recent: Arc<RwLock<BTreeMap<Slot, PrioFeesStats>>>,
}

pub struct PrioFeesService {
pub block_fees_store: PrioFeeStore,
// use .subscribe() to get a receiver
pub block_fees_stream: Sender<PrioFeesUpdateMessage>,
}

impl PrioFeesService {
pub async fn get_latest_priofees(&self) -> Option<(Slot, PrioFeesStats)> {
let lock = self.block_fees_store.recent.read().await;
let latest_in_store = lock.last_key_value();
latest_in_store.map(|x| (*x.0, x.1.clone()))
}
}

pub async fn start_block_priofees_task(
mut block_stream: BlockStream,
) -> (JoinHandle<()>, PrioFeesService) {
let recent_data = Arc::new(RwLock::new(BTreeMap::new()));
let store = PrioFeeStore {
recent: recent_data.clone(),
};
let (priofees_update_sender, _priofees_update_receiver) = tokio::sync::broadcast::channel(64);
let sender_to_return = priofees_update_sender.clone();

let jh_priofees_task = tokio::spawn(async move {
let sender = priofees_update_sender.clone();
'recv_loop: loop {
let block = block_stream.recv().await;
match block {
Ok(block) => {
if !block.commitment_config.is_processed() {
grooviegermanikus marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
let processed_slot = block.slot;
{
// first do some cleanup
let mut lock = recent_data.write().await;
lock.retain(|slot, _| {
*slot > processed_slot.saturating_sub(SLOTS_TO_RETAIN)
});
}

let block_priofees = block
.transactions
.iter()
.filter(|tx| !tx.is_vote)
.map(|tx| {
(
tx.prioritization_fees.unwrap_or_default(),
tx.cu_consumed.unwrap_or_default(),
)
})
.collect::<Vec<(u64, u64)>>();

let priofees_percentiles = calculate_supp_percentiles(&block_priofees);

let total_tx_count = block.transactions.len() as u64;

let nonvote_tx_count =
block.transactions.iter().filter(|tx| !tx.is_vote).count() as u64;

let total_cu_consumed = block
.transactions
.iter()
.map(|tx| tx.cu_consumed.unwrap_or(0))
.sum::<u64>();

let nonvote_cu_consumed = block
.transactions
.iter()
.filter(|tx| !tx.is_vote)
.map(|tx| tx.cu_consumed.unwrap_or(0))
.sum::<u64>();

trace!("Got prio fees stats for processed block {}", processed_slot);

let priofees_stats = PrioFeesStats {
by_tx: priofees_percentiles.by_tx,
by_tx_percentiles: priofees_percentiles.by_tx_percentiles,
by_cu: priofees_percentiles.by_cu,
by_cu_percentiles: priofees_percentiles.by_cu_percentiles,
tx_count: TxAggregateStats {
total: total_tx_count,
nonvote: nonvote_tx_count,
},
cu_consumed: TxAggregateStats {
total: total_cu_consumed,
nonvote: nonvote_cu_consumed,
},
};

{
// first do some cleanup
let mut lock = recent_data.write().await;
lock.insert(processed_slot, priofees_stats.clone());
}
let msg = PrioFeesUpdateMessage {
slot: processed_slot,
priofees_stats,
};
let send_result = sender.send(msg);
match send_result {
Ok(n_subscribers) => {
trace!(
"sent priofees update message to {} subscribers (buffer={})",
n_subscribers,
sender.len()
);
}
Err(_) => {
trace!("no subscribers for priofees update message");
}
}
}
Err(Lagged(_lagged)) => {
warn!("channel error receiving block for priofees calculation - continue");
continue 'recv_loop;
}
Err(Closed) => {
error!("failed to receive block, sender closed - aborting");
break 'recv_loop;
}
}
}
info!("priofees task shutting down");
});

(
jh_priofees_task,
PrioFeesService {
block_fees_store: store,
block_fees_stream: sender_to_return,
},
)
}
6 changes: 6 additions & 0 deletions block_priofees/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod block_priofees;

pub mod rpc_data;
mod stats_calculation;

pub use block_priofees::{start_block_priofees_task, PrioFeesService};
39 changes: 39 additions & 0 deletions block_priofees/src/rpc_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use jsonrpsee::core::Serialize;
use solana_sdk::clock::Slot;
use std::fmt::Display;

#[derive(Clone, Serialize, Debug)]
pub struct TxAggregateStats {
pub total: u64,
pub nonvote: u64,
}

#[derive(Clone, Serialize, Debug)]
pub struct PrioFeesStats {
pub by_tx: Vec<u64>,
pub by_tx_percentiles: Vec<f32>,
pub by_cu: Vec<u64>,
pub by_cu_percentiles: Vec<f32>,
pub tx_count: TxAggregateStats,
pub cu_consumed: TxAggregateStats,
}

#[derive(Clone, Serialize, Debug, Eq, PartialEq, Hash)]
pub struct FeePoint {
// percentile
pub p: u32,
// value of fees in lamports
pub v: u64,
}

impl Display for FeePoint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "(p{}, {})", self.p, self.v)
}
}

#[derive(Clone, Debug)]
pub struct PrioFeesUpdateMessage {
pub slot: Slot,
pub priofees_stats: PrioFeesStats,
}
Loading
Loading