Skip to content

Commit

Permalink
intermediate changes
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Oct 9, 2023
1 parent 1947b06 commit ac707b2
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions history/src/block_stores/postgres_block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ pub struct PostgresBlockStore {
}

impl PostgresBlockStore {
pub fn new(session_cache: PostgresSessionCache, epoch_cache: EpochCache) -> Self {
Self {

Check warning on line 36 in history/src/block_stores/postgres_block_store.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/history/src/block_stores/postgres_block_store.rs
session_cache,
epoch_cache,
postgres_data: Arc::new(RwLock::new(PostgresData { from_slot: 0, to_slot: 0, current_epoch: 0 })),
}
}

pub async fn start_new_epoch(&self, schema: &String) -> Result<()> {
// create schema for new epoch
let session = self
Expand Down
1 change: 1 addition & 0 deletions lite-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async-trait = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }
chrono = { workspace = true }
itertools = { workspace = true }

solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-services = { workspace = true }
Expand Down
22 changes: 10 additions & 12 deletions lite-rpc/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@ use crate::{
jsonrpsee_subscrption_handler_sink::JsonRpseeSubscriptionHandlerSink,

Check warning on line 3 in lite-rpc/src/bridge.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/lite-rpc/src/bridge.rs
rpc::LiteRpcServer,
};
use solana_sdk::{epoch_info::EpochInfo, transaction::VersionedTransaction, message::{v0::Message, VersionedMessage}};

use solana_sdk::epoch_info::EpochInfo;
use solana_lite_rpc_services::{
transaction_service::TransactionService, tx_sender::TXS_IN_CHANNEL,
};

use anyhow::Context;
use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
use log::info;
use prometheus::{opts, register_int_counter, IntCounter};
use solana_lite_rpc_core::{
stores::{block_information_store::BlockInformation, data_cache::DataCache, tx_store::TxProps},
AnyhowJoinHandle, encoding::BASE64,
AnyhowJoinHandle,

Check warning on line 16 in lite-rpc/src/bridge.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/lite-rpc/src/bridge.rs
};
use solana_lite_rpc_history::history::History;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
Expand All @@ -30,6 +28,7 @@ use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_histo
use solana_transaction_status::{TransactionStatus, UiConfirmedBlock, EncodedTransactionWithStatusMeta, UiTransactionStatusMeta, option_serializer::OptionSerializer};
use std::{str::FromStr, sync::Arc};
use tokio::net::ToSocketAddrs;
use itertools::Itertools;

lazy_static::lazy_static! {
static ref RPC_SEND_TX: IntCounter =
Expand Down Expand Up @@ -340,14 +339,13 @@ impl LiteRpcServer for LiteBridge {
solana_transaction_status::TransactionDetails::Accounts => (false, false, true),
};

if include_accounts || include_rewards {
block.transactions.iter().map(|transaction_info| {
let signature = transaction_info.signature.clone();
if is_full || include_accounts || include_rewards {
Some(block.transactions.iter().map(|transaction_info| {
EncodedTransactionWithStatusMeta {
version: None,
meta: Some(UiTransactionStatusMeta {
err: transaction_info.err,
status: transaction_info.err.map_or(Ok(()), |x| Err(x)),
err: transaction_info.err.clone(),
status: transaction_info.err.clone().map_or(Ok(()), |x| Err(x)),
fee: 0,
pre_balances: vec![],
post_balances: vec![],
Expand All @@ -360,9 +358,9 @@ impl LiteRpcServer for LiteBridge {
return_data: OptionSerializer::None,
compute_units_consumed: transaction_info.cu_consumed.map_or( OptionSerializer::None, |x| OptionSerializer::Some(x)),
}),
transaction: solana_transaction_status::EncodedTransaction::Binary(transaction_info.message, solana_transaction_status::TransactionBinaryEncoding::Base64)
transaction: solana_transaction_status::EncodedTransaction::Binary(transaction_info.message.clone(), solana_transaction_status::TransactionBinaryEncoding::Base64)
}
}).collect_vec()
}).collect_vec())
} else {

Check warning on line 364 in lite-rpc/src/bridge.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/lite-rpc/src/bridge.rs
None
}
Expand All @@ -375,7 +373,7 @@ impl LiteRpcServer for LiteBridge {
parent_slot: block.parent_slot,
transactions,
signatures: None,
rewards: None,
rewards: block.rewards,
block_time: Some(block.block_time as i64),
block_height: Some(block.block_height),
}))
Expand Down
52 changes: 33 additions & 19 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ use solana_lite_rpc_core::structures::{
epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender,
produced_block::ProducedBlock,
};
use solana_lite_rpc_core::traits::block_storage_interface::BlockStorageImpl;
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_lite_rpc_history::block_stores::inmemory_block_store::InmemoryBlockStore;
use solana_lite_rpc_history::block_stores::postgres_block_store::PostgresBlockStore;
use solana_lite_rpc_history::history::History;
use solana_lite_rpc_history::postgres::postgres_session::PostgresSessionCache;
use solana_lite_rpc_services::data_caching_service::DataCachingService;
Expand Down Expand Up @@ -62,32 +64,23 @@ async fn get_latest_block(
}

pub async fn start_postgres(

Check warning on line 66 in lite-rpc/src/main.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/lite-rpc/src/main.rs
enable: bool,
) -> anyhow::Result<(Option<NotificationSender>, AnyhowJoinHandle)> {
if !enable {
return Ok((
None,
tokio::spawn(async {
std::future::pending::<()>().await;
unreachable!()
}),
));
}
postgres_session_cache: PostgresSessionCache,
) -> anyhow::Result<(NotificationSender, AnyhowJoinHandle)> {

let (postgres_send, postgres_recv) = mpsc::unbounded_channel();

let postgres_session_cache = PostgresSessionCache::new().await?;
let postgres = PostgresLogger::start(postgres_session_cache, postgres_recv);

Ok((Some(postgres_send), postgres))
Ok((postgres_send, postgres))
}

pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::Result<()> {
let Args {
lite_rpc_ws_addr,
lite_rpc_http_addr,
fanout_size,
enable_postgres_logging: enable_postgres,
enable_postgres_logging,
enable_fetch_blocks_from_postgres,
enable_save_blocks_in_postgres,
prometheus_addr,
identity_keypair,
maximum_retries_per_tx,
Expand Down Expand Up @@ -137,7 +130,7 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
store: Arc::new(DashMap::new()),
save_for_additional_slots: NB_SLOTS_TRANSACTIONS_TO_CACHE,
},
epoch_data,
epoch_data: epoch_data.clone(),
};

let lata_cache_service = DataCachingService {
Expand All @@ -153,8 +146,22 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
vote_account_notifier,
);

Check warning on line 147 in lite-rpc/src/main.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/lite-rpc/src/main.rs
drop(blocks_notifier);
let postgres_session_cache = if enable_fetch_blocks_from_postgres || enable_postgres_logging {
Some(PostgresSessionCache::new().await?)
} else {
None
};

let (notification_channel, postgres) = start_postgres(enable_postgres).await?;
let (notification_channel, postgres) = if enable_postgres_logging {
let (channel, postgres) = start_postgres(postgres_session_cache.clone().unwrap()).await?;
(Some(channel), postgres)
} else {
let jh = tokio::spawn(async {
std::future::pending::<()>().await;
unreachable!()
});
(None, jh)
};

let tpu_config = TpuServiceConfig {
fanout_slots: fanout_size,
Expand Down Expand Up @@ -200,8 +207,15 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R

let support_service = tokio::spawn(async move { spawner.spawn_support_services().await });

Check warning on line 208 in lite-rpc/src/main.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/lite-rpc/src/main.rs

let block_storage: BlockStorageImpl = if enable_save_blocks_in_postgres {
let session_cache = postgres_session_cache.unwrap();
Arc::new(PostgresBlockStore::new(session_cache, epoch_data.clone()))
} else {
Arc::new(InmemoryBlockStore::new(1024))
};

let history = History {
block_storage: Arc::new(InmemoryBlockStore::new(1024)),
block_storage,
};

let bridge_service = tokio::spawn(
Expand Down Expand Up @@ -240,7 +254,7 @@ fn get_args() -> Args {

dotenv().ok();

args.enable_postgres = args.enable_postgres
args.enable_postgres_logging = args.enable_postgres_logging
|| if let Ok(enable_postgres_env_var) = env::var("PG_ENABLED") {
enable_postgres_env_var != "false"
} else {
Expand Down

0 comments on commit ac707b2

Please sign in to comment.