Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding custom tpu send transaction example #380

Merged
merged 5 commits into from
Apr 2, 2024

Conversation

godmodegalactus
Copy link
Collaborator

No description provided.

@@ -122,6 +122,15 @@ pub struct TransactionService {

impl TransactionService {
pub async fn send_transaction(
&self,
tx: Transaction,
max_retries: Option<u16>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should stick to one data type for this (i.e. usize)

@godmodegalactus godmodegalactus changed the title Adding custom tpu send transaction Adding custom tpu send transaction example Apr 2, 2024
@@ -127,7 +128,7 @@ impl TpuService {
.get_slot_leaders(current_slot, last_slot)
.await?;
// get next leader with its tpu port
let connections_to_keep = next_leaders
let connections_to_keep: HashMap<_, _> = next_leaders
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need that?
maybe you could specify the full type in that case

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no method to collect as Vec. Specifying HashMap should be fine here.

@@ -11,10 +11,10 @@ pub fn poll_cluster_info(
) -> AnyhowJoinHandle {
// task MUST not terminate but might be aborted from outside
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add a comment why you need that

@@ -23,7 +23,7 @@ pub fn poll_cluster_info(
Err(error) => {
warn!("rpc_cluster_info failed <{:?}> - retrying", error);
// throttle
tokio::time::sleep(Duration::from_secs(2500)).await;
tokio::time::sleep(Duration::from_secs(10)).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would put that on a bugfix branch

started.elapsed().as_secs_f32() * 1000.0
);
}
Ok(Err(_error)) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!! you should handle Lagged and Closed are differently:

  • lagged: continue
  • Closed: sleep some 10 second, then panic (create_grpc_multiplex_blocks_subscription reconnect_loop never terminates unless cluster_endpoint_tasks get aborted which happens on shutdown; this method could gracefully wait if it detects shutdown)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The aim of this method is to wait till we get block of required commitment, this channel should never lag. If it lags then the logic is wrong.

}
Err(_elapsed) => {
debug!(
"waiting for latest block info ({}) ... {:.02}ms",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add to log line what it's doing -> i.e. "timed out waiting for latest block info (...) - continue waiting"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for debugging we do not need any additional log lines.

loop {
match timeout(Duration::from_millis(500), blockinfo_stream.recv()).await {
Ok(Ok(block_info)) => {
if block_info.commitment_config == commitment_config {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you accept block_info(finalized) if you request confirmed or not?

@@ -7,7 +9,7 @@ pub type WireTransaction = Vec<u8>;
pub struct SentTransactionInfo {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it called Sent? this reads to me like it happened already but the actually that struct contains the transaction to be sent

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is structure that trackts the transaction that is sent.

@@ -7,7 +9,7 @@ pub type WireTransaction = Vec<u8>;
pub struct SentTransactionInfo {
pub signature: Signature,
pub slot: Slot,
pub transaction: WireTransaction,
pub transaction: Arc<WireTransaction>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could that Arc be avoided somehow? let me try for a minute

Copy link
Collaborator

@grooviegermanikus grooviegermanikus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small readability comments

nb_signers, signer_balance
);

let validator_identity = Arc::new(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could move the ValidatorIdentity out of quic-proxy which wraps that case explicitly

endpoints.vote_account_notifier,
);

let count = args.transaction_count.unwrap_or(10);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should put that in Args struct

let count = args.transaction_count.unwrap_or(10);
let prioritization_heap_size = Some(count * args.number_of_seconds);

let tpu_config = TpuServiceConfig {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to print TpuServiceConfig values on console to help the user to inspect the parameters used

let tx = tx.clone();
let signer = *signer;
tokio::spawn(
async move { (signer, rpc_client.send_and_confirm_transaction(&tx).await) },
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add .context

)
.await?;
let transaction_service_builder = TransactionServiceBuilder::new(
TxSender::new(data_cache.clone(), tpu_service.clone()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the services are quite tangled. shouldn't it be easy to just wire up the TxSender? it requires recent blockhash and leader schedule

leader schedule could be:

  1. full schedule from RPC
  2. schedule for current epoch as text-file with each lead qualified by slots
  3. hardcoded leaders (if the setup is just 1 test-validator or a small number of test nodes)

@godmodegalactus godmodegalactus merged commit 6813341 into main Apr 2, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants