Skip to content

Commit

Permalink
finishing up with save and get block
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Oct 9, 2023
1 parent ac707b2 commit c86c6fb
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 31 deletions.
24 changes: 22 additions & 2 deletions history/src/block_stores/postgres_block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,27 @@ pub struct PostgresBlockStore {
session_cache: PostgresSessionCache,
epoch_cache: EpochCache,
postgres_data: Arc<RwLock<PostgresData>>,
save_blocks: bool,
get_blocks: bool,
}

impl PostgresBlockStore {
pub fn new(session_cache: PostgresSessionCache, epoch_cache: EpochCache) -> Self {
pub fn new(
session_cache: PostgresSessionCache,
epoch_cache: EpochCache,
save_blocks: bool,
get_blocks: bool,
) -> Self {
Self {
session_cache,
epoch_cache,
postgres_data: Arc::new(RwLock::new(PostgresData { from_slot: 0, to_slot: 0, current_epoch: 0 })),
postgres_data: Arc::new(RwLock::new(PostgresData {
from_slot: 0,
to_slot: 0,
current_epoch: 0,
})),
save_blocks,
get_blocks,
}
}

Expand Down Expand Up @@ -69,6 +82,9 @@ impl PostgresBlockStore {
#[async_trait]
impl BlockStorageInterface for PostgresBlockStore {
async fn save(&self, block: ProducedBlock) -> Result<()> {
if !self.save_blocks {
return Ok(());
}
let PostgresData { current_epoch, .. } = { *self.postgres_data.read().await };

let slot = block.slot;
Expand Down Expand Up @@ -99,10 +115,14 @@ impl BlockStorageInterface for PostgresBlockStore {
PostgresTransaction::save_transactions(&session, &schema, chunk).await?;
}
postgres_block.save(&session, &schema).await?;
self.postgres_data.write().await.to_slot = slot;
Ok(())
}

async fn get(&self, slot: Slot, _config: RpcBlockConfig) -> Result<ProducedBlock> {
if !self.get_blocks {
bail!("getting blocks from postgres is disabled");
}
let epoch = self.epoch_cache.get_epoch_at_slot(slot);
let schema = self.get_schema(epoch.epoch);

Expand Down
17 changes: 16 additions & 1 deletion history/src/history.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
use solana_lite_rpc_core::traits::block_storage_interface::BlockStorageInterface;
use anyhow::bail;
use solana_lite_rpc_core::{
traits::block_storage_interface::BlockStorageInterface, types::BlockStream, AnyhowJoinHandle,
};
use std::sync::Arc;

pub struct History {
pub block_storage: Arc<dyn BlockStorageInterface>,
}

impl History {
pub fn start_saving_blocks(&self, mut block_notifier: BlockStream) -> AnyhowJoinHandle {
let block_storage = self.block_storage.clone();
tokio::spawn(async move {
while let Ok(block) = block_notifier.recv().await {
block_storage.save(block).await?;
}
bail!("saving of blocks stopped");
})
}
}
40 changes: 25 additions & 15 deletions lite-rpc/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@ use crate::{
jsonrpsee_subscrption_handler_sink::JsonRpseeSubscriptionHandlerSink,
rpc::LiteRpcServer,
};
use solana_sdk::epoch_info::EpochInfo;
use solana_lite_rpc_services::{
transaction_service::TransactionService, tx_sender::TXS_IN_CHANNEL,
};
use anyhow::Context;
use itertools::Itertools;
use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
use log::info;
use prometheus::{opts, register_int_counter, IntCounter};
Expand All @@ -16,6 +13,9 @@ use solana_lite_rpc_core::{
AnyhowJoinHandle,
};
use solana_lite_rpc_history::history::History;
use solana_lite_rpc_services::{
transaction_service::TransactionService, tx_sender::TXS_IN_CHANNEL,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::{
config::{
Expand All @@ -24,11 +24,14 @@ use solana_rpc_client_api::{
},
response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo},
};
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot};
use solana_transaction_status::{TransactionStatus, UiConfirmedBlock, EncodedTransactionWithStatusMeta, UiTransactionStatusMeta, option_serializer::OptionSerializer};
use solana_transaction_status::{
option_serializer::OptionSerializer, EncodedTransactionWithStatusMeta, TransactionStatus,
UiConfirmedBlock, UiTransactionStatusMeta,
};
use std::{str::FromStr, sync::Arc};
use tokio::net::ToSocketAddrs;
use itertools::Itertools;

lazy_static::lazy_static! {
static ref RPC_SEND_TX: IntCounter =
Expand Down Expand Up @@ -329,14 +332,21 @@ impl LiteRpcServer for LiteBridge {
let config = config.map_or(RpcBlockConfig::default(), |x| x.convert_to_current());
let block = self.history.block_storage.get(slot, config).await;
if let Ok(block) = block {

let transactions: Option<Vec<EncodedTransactionWithStatusMeta>> = match config.transaction_details {
let transactions: Option<Vec<EncodedTransactionWithStatusMeta>> = match config
.transaction_details
{
Some(transaction_details) => {
let (is_full, include_rewards, include_accounts) = match transaction_details {
solana_transaction_status::TransactionDetails::Full => (true, true, true),
solana_transaction_status::TransactionDetails::Signatures => ( false, false, false),
solana_transaction_status::TransactionDetails::None => (false, false, false),
solana_transaction_status::TransactionDetails::Accounts => (false, false, true),
solana_transaction_status::TransactionDetails::Signatures => {
(false, false, false)
}
solana_transaction_status::TransactionDetails::None => {
(false, false, false)
}
solana_transaction_status::TransactionDetails::Accounts => {
(false, false, true)
}
};

if is_full || include_accounts || include_rewards {
Expand All @@ -345,7 +355,7 @@ impl LiteRpcServer for LiteBridge {
version: None,
meta: Some(UiTransactionStatusMeta {
err: transaction_info.err.clone(),
status: transaction_info.err.clone().map_or(Ok(()), |x| Err(x)),
status: transaction_info.err.clone().map_or(Ok(()), Err),
fee: 0,
pre_balances: vec![],
post_balances: vec![],
Expand All @@ -356,16 +366,16 @@ impl LiteRpcServer for LiteBridge {
rewards: OptionSerializer::None,
loaded_addresses: OptionSerializer::None,
return_data: OptionSerializer::None,
compute_units_consumed: transaction_info.cu_consumed.map_or( OptionSerializer::None, |x| OptionSerializer::Some(x)),
compute_units_consumed: transaction_info.cu_consumed.map_or( OptionSerializer::None,OptionSerializer::Some),
}),
transaction: solana_transaction_status::EncodedTransaction::Binary(transaction_info.message.clone(), solana_transaction_status::TransactionBinaryEncoding::Base64)
}
}).collect_vec())
} else {
None
}
},
None => None
}
None => None,
};
Ok(Some(UiConfirmedBlock {
previous_blockhash: block.previous_blockhash,
Expand Down
35 changes: 22 additions & 13 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use solana_lite_rpc_core::traits::block_storage_interface::BlockStorageImpl;
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_lite_rpc_history::block_stores::inmemory_block_store::InmemoryBlockStore;
use solana_lite_rpc_history::block_stores::multiple_strategy_block_store::MultipleStrategyBlockStorage;
use solana_lite_rpc_history::block_stores::postgres_block_store::PostgresBlockStore;
use solana_lite_rpc_history::history::History;
use solana_lite_rpc_history::postgres::postgres_session::PostgresSessionCache;
Expand Down Expand Up @@ -66,7 +67,6 @@ async fn get_latest_block(
pub async fn start_postgres(
postgres_session_cache: PostgresSessionCache,
) -> anyhow::Result<(NotificationSender, AnyhowJoinHandle)> {

let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
let postgres = PostgresLogger::start(postgres_session_cache, postgres_recv);

Expand Down Expand Up @@ -145,9 +145,8 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
cluster_info_notifier,
vote_account_notifier,
);
drop(blocks_notifier);
let postgres_session_cache = if enable_fetch_blocks_from_postgres || enable_postgres_logging {
Some(PostgresSessionCache::new().await?)
Some(PostgresSessionCache::new().await?)
} else {
None
};
Expand Down Expand Up @@ -207,16 +206,23 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R

let support_service = tokio::spawn(async move { spawner.spawn_support_services().await });

let block_storage: BlockStorageImpl = if enable_save_blocks_in_postgres {
let session_cache = postgres_session_cache.unwrap();
Arc::new(PostgresBlockStore::new(session_cache, epoch_data.clone()))
} else {
Arc::new(InmemoryBlockStore::new(1024))
};
let block_storage: BlockStorageImpl =
if enable_save_blocks_in_postgres || enable_fetch_blocks_from_postgres {
let session_cache = postgres_session_cache.expect("Session cache should be some");
let postgres_store = Arc::new(PostgresBlockStore::new(
session_cache,
epoch_data.clone(),
enable_save_blocks_in_postgres,
enable_fetch_blocks_from_postgres,
));
Arc::new(MultipleStrategyBlockStorage::new(postgres_store, None, 64))
} else {
Arc::new(InmemoryBlockStore::new(1024))
};

let history = History {
block_storage,
};
let history = History { block_storage };
let history_service = history.start_saving_blocks(blocks_notifier.resubscribe());
drop(blocks_notifier);

let bridge_service = tokio::spawn(
LiteBridge::new(
Expand All @@ -238,14 +244,17 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
anyhow::bail!("Server {res:?}")
}
res = postgres => {
anyhow::bail!("Postgres service {res:?}");
anyhow::bail!("Postgres service {res:?}")
}
res = futures::future::select_all(data_caching_service) => {
anyhow::bail!("Data caching service failed {res:?}")
}
res = futures::future::select_all(cluster_endpoint_tasks) => {
anyhow::bail!("cluster endpoint failure {res:?}")
}
res = history_service => {
anyhow::bail!("History service failure {res:?}")
}
}
}

Expand Down

0 comments on commit c86c6fb

Please sign in to comment.