Skip to content

Commit

Permalink
account_subscription_into_multiple_channels_and_merge_them (#344)
Browse files Browse the repository at this point in the history
* account_subscription_into_multiple_channels_and_merge_them

* Removing unecessary changes

* Minor fix after groovies comments
  • Loading branch information
godmodegalactus committed Mar 1, 2024
1 parent 24eb9a9 commit 8ef1b56
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ prometheus = "0.13.3"
lazy_static = "1.4.0"
dotenv = "0.15.0"
async-channel = "1.8.0"
merge-streams = "0.1.2"

quinn = "0.10.2"
quinn-proto = "0.10.5"
Expand Down
1 change: 1 addition & 0 deletions accounts-on-demand/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async-trait = { workspace = true }
itertools = { workspace = true }
prometheus = { workspace = true }
lazy_static = { workspace = true }
merge-streams = { workspace = true }

solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-accounts = { workspace = true }
Expand Down
80 changes: 60 additions & 20 deletions accounts-on-demand/src/subscription_manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

use futures::StreamExt;
use itertools::Itertools;
use merge_streams::MergeStreams;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_lite_rpc_accounts::account_store_interface::AccountStorageInterface;
use solana_lite_rpc_cluster_endpoints::geyser_grpc_connector::GrpcSourceConfig;
Expand Down Expand Up @@ -94,23 +99,16 @@ pub fn start_account_streaming_task(
'main_loop: loop {
let processed_commitment = yellowstone_grpc_proto::geyser::CommitmentLevel::Processed;

let mut subscribe_accounts: HashMap<String, SubscribeRequestFilterAccounts> =
let mut subscribe_programs: HashMap<String, SubscribeRequestFilterAccounts> =
HashMap::new();

let mut accounts_to_subscribe = HashSet::new();

for (index, accounts_filter) in accounts_filters.iter().enumerate() {
if !accounts_filter.accounts.is_empty() {
subscribe_accounts.insert(
format!("accounts_on_demand_{index:?}"),
SubscribeRequestFilterAccounts {
account: accounts_filter
.accounts
.iter()
.map(|x| x.to_string())
.collect_vec(),
owner: vec![],
filters: vec![],
},
);
accounts_filter.accounts.iter().for_each(|account| {
accounts_to_subscribe.insert(account.clone());
});
}
if let Some(program_id) = &accounts_filter.program_id {
let filters = if let Some(filters) = &accounts_filter.filters {
Expand Down Expand Up @@ -152,8 +150,8 @@ pub fn start_account_streaming_task(
} else {
vec![]
};
subscribe_accounts.insert(
format!("program_accounts_on_demand_{}", program_id),
subscribe_programs.insert(
format!("program_accounts_on_demand_{}", index),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![program_id.clone()],
Expand All @@ -163,8 +161,8 @@ pub fn start_account_streaming_task(
}
}

let subscribe_request = SubscribeRequest {
accounts: subscribe_accounts,
let program_subscribe_request = SubscribeRequest {
accounts: subscribe_programs,
slots: Default::default(),
transactions: Default::default(),
blocks: Default::default(),
Expand All @@ -189,13 +187,55 @@ pub fn start_account_streaming_task(
continue;
};

let Ok(mut account_stream) = client.subscribe_once2(subscribe_request).await else {
let Ok(account_stream) = client.subscribe_once2(program_subscribe_request).await else {
// problem subscribing to geyser stream, retry after a sec
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
};

while let Some(message) = account_stream.next().await {
// each account subscription batch will require individual stream
let mut subscriptions = vec![account_stream];
let mut index = 0;
for accounts_chunk in accounts_to_subscribe.iter().collect_vec().chunks(100) {
let mut accounts_subscription: HashMap<String, SubscribeRequestFilterAccounts> =
HashMap::new();
index += 1;
accounts_subscription.insert(
format!("account_sub_{}", index),
SubscribeRequestFilterAccounts {
account: accounts_chunk
.iter()
.map(|acc| (*acc).clone())
.collect_vec(),
owner: vec![],
filters: vec![],
},
);
let mut client = yellowstone_grpc_client::GeyserGrpcClient::connect(
grpc_config.grpc_addr.clone(),
grpc_config.grpc_x_token.clone(),
None,
)
.unwrap();

let account_request = SubscribeRequest {
accounts: accounts_subscription,
slots: Default::default(),
transactions: Default::default(),
blocks: Default::default(),
blocks_meta: Default::default(),
entry: Default::default(),
commitment: Some(processed_commitment.into()),
accounts_data_slice: Default::default(),
ping: None,
};

let account_stream = client.subscribe_once2(account_request).await.unwrap();
subscriptions.push(account_stream);
}
let mut merged_stream = subscriptions.merge();

while let Some(message) = merged_stream.next().await {
let message = match message {
Ok(message) => message,
Err(status) => {
Expand Down
2 changes: 1 addition & 1 deletion cluster-endpoints/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ bs58 = { workspace = true }
base64 = { workspace = true }
thiserror = { workspace = true }
futures = { workspace = true }
merge-streams = "0.1.2"
merge-streams = { workspace = true }
bytes = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
Expand Down
80 changes: 60 additions & 20 deletions cluster-endpoints/src/grpc/gprc_accounts_streaming.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use futures::StreamExt;
use std::{collections::HashMap, sync::Arc, time::Duration};
use merge_streams::MergeStreams;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

use geyser_grpc_connector::GrpcSourceConfig;
use itertools::Itertools;
Expand Down Expand Up @@ -29,23 +34,16 @@ pub fn start_account_streaming_tasks(
'main_loop: loop {
let processed_commitment = yellowstone_grpc_proto::geyser::CommitmentLevel::Processed;

let mut subscribe_accounts: HashMap<String, SubscribeRequestFilterAccounts> =
let mut subscribe_programs: HashMap<String, SubscribeRequestFilterAccounts> =
HashMap::new();

let mut accounts_to_subscribe = HashSet::new();

for (index, accounts_filter) in accounts_filters.iter().enumerate() {
if !accounts_filter.accounts.is_empty() {
subscribe_accounts.insert(
format!("accounts_{index:?}"),
SubscribeRequestFilterAccounts {
account: accounts_filter
.accounts
.iter()
.map(|x| x.to_string())
.collect_vec(),
owner: vec![],
filters: vec![],
},
);
accounts_filter.accounts.iter().for_each(|account| {
accounts_to_subscribe.insert(account.clone());
});
}
if let Some(program_id) = &accounts_filter.program_id {
let filters = if let Some(filters) = &accounts_filter.filters {
Expand Down Expand Up @@ -87,8 +85,8 @@ pub fn start_account_streaming_tasks(
} else {
vec![]
};
subscribe_accounts.insert(
format!("accounts_{}", program_id),
subscribe_programs.insert(
format!("program_accounts_{}", index),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![program_id.clone()],
Expand All @@ -98,8 +96,8 @@ pub fn start_account_streaming_tasks(
}
}

let subscribe_request = SubscribeRequest {
accounts: subscribe_accounts,
let program_subscription = SubscribeRequest {
accounts: subscribe_programs,
slots: Default::default(),
transactions: Default::default(),
blocks: Default::default(),
Expand All @@ -116,9 +114,51 @@ pub fn start_account_streaming_tasks(
None,
)
.unwrap();
let mut account_stream = client.subscribe_once2(subscribe_request).await.unwrap();
let account_stream = client.subscribe_once2(program_subscription).await.unwrap();

// each account subscription batch will require individual stream
let mut subscriptions = vec![account_stream];
let mut index = 0;
for accounts_chunk in accounts_to_subscribe.iter().collect_vec().chunks(100) {
let mut accounts_subscription: HashMap<String, SubscribeRequestFilterAccounts> =
HashMap::new();
index += 1;
accounts_subscription.insert(
format!("account_sub_{}", index),
SubscribeRequestFilterAccounts {
account: accounts_chunk
.iter()
.map(|acc| (*acc).clone())
.collect_vec(),
owner: vec![],
filters: vec![],
},
);
let mut client = yellowstone_grpc_client::GeyserGrpcClient::connect(
grpc_config.grpc_addr.clone(),
grpc_config.grpc_x_token.clone(),
None,
)
.unwrap();

let account_request = SubscribeRequest {
accounts: accounts_subscription,
slots: Default::default(),
transactions: Default::default(),
blocks: Default::default(),
blocks_meta: Default::default(),
entry: Default::default(),
commitment: Some(processed_commitment.into()),
accounts_data_slice: Default::default(),
ping: None,
};

let account_stream = client.subscribe_once2(account_request).await.unwrap();
subscriptions.push(account_stream);
}
let mut merged_stream = subscriptions.merge();

while let Some(message) = account_stream.next().await {
while let Some(message) = merged_stream.next().await {
let message = message.unwrap();
let Some(update) = message.update_oneof else {
continue;
Expand Down
Loading

0 comments on commit 8ef1b56

Please sign in to comment.