Skip to content

Commit

Permalink
refactor: re-introduce sandbox_id as connection_id
Browse files Browse the repository at this point in the history
  • Loading branch information
Jurshsmith committed Jul 12, 2024
1 parent 59e0743 commit 0d872ab
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 128 deletions.
88 changes: 36 additions & 52 deletions crates/fuel-core-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct Publisher {
base_asset_id: AssetId,
fuel_core_database: CombinedDatabase,
blocks_subscription: Receiver<Arc<dyn Deref<Target = ImportResult> + Send + Sync>>,
nats: nats::Nats,
nats: nats::NatsConnection,
}

impl Publisher {
Expand All @@ -51,7 +51,7 @@ impl Publisher {
Arc<dyn Deref<Target = ImportResult> + Send + Sync>,
>,
) -> anyhow::Result<Self> {
let nats = nats::Nats::connect(nats_url, nats_nkey, &None).await?;
let nats = nats::connect(nats_url, nats_nkey, None).await?;

Ok(Publisher {
chain_id,
Expand All @@ -71,17 +71,18 @@ impl Publisher {
/// blocks.{height} e.g. blocks.1
/// owners.{height}.{owner_id} e.g. owners.*.0xab..cd
/// assets.{height}.{asset_id} e.g. assets.*.0xab..cd
pub async fn run(mut self, sandbox_id: &Option<String>) -> anyhow::Result<Self> {
pub async fn run(mut self) -> anyhow::Result<Self> {
info!(
"NATS Publisher chain_id={} base_asset_id={} started",
self.chain_id, self.base_asset_id
);

let connection_id = &self.nats.id;
// Check the last block height in the stream
let stream_height = {
let config = async_nats::jetstream::consumer::pull::Config {
deliver_policy: async_nats::jetstream::consumer::DeliverPolicy::Last,
filter_subject: nats::SubjectName::Blocks.to_subject_string(sandbox_id),
filter_subject: nats::SubjectName::Blocks.get_string(connection_id),
..Default::default()
};
let consumer = self
Expand Down Expand Up @@ -146,7 +147,7 @@ impl Publisher {
receipts_.len()
);

self.publish_block(&block, &receipts_, sandbox_id).await?;
self.publish_block(&block, &receipts_).await?;
}
}

Expand All @@ -165,7 +166,7 @@ impl Publisher {
receipts_.append(&mut receipts);
}
let result = &**result;
self.publish_block(&result.sealed_block.entity, &receipts_, sandbox_id)
self.publish_block(&result.sealed_block.entity, &receipts_)
.await?;
}

Expand All @@ -177,7 +178,6 @@ impl Publisher {
&self,
block: &Block<Transaction>,
receipts: &[Receipt],
sandbox_id: &Option<String>,
) -> anyhow::Result<()> {
let height = u32::from(block.header().consensus().height);

Expand All @@ -186,9 +186,7 @@ impl Publisher {
let payload = serde_json::to_string_pretty(block)?;

let block_subject = nats::Subject::Blocks { height };
self.nats
.publish(block_subject, payload.into(), sandbox_id)
.await?;
self.nats.publish(block_subject, payload.into()).await?;

for (index, tx) in block.transactions().iter().enumerate() {
if let Transaction::Script(s) = tx {
Expand All @@ -197,17 +195,13 @@ impl Publisher {
if let Some(owner_id) = i.input_owner().cloned() {
let payload = serde_json::to_string_pretty(tx)?;
let owners_subject = nats::Subject::Owners { height, owner_id };
self.nats
.publish(owners_subject, payload.into(), sandbox_id)
.await?;
self.nats.publish(owners_subject, payload.into()).await?;
}
// Publish transaction to assets
if let Some(asset_id) = i.asset_id(&self.base_asset_id).cloned() {
let payload = serde_json::to_string_pretty(tx)?;
let assets_subject = nats::Subject::Assets { height, asset_id };
self.nats
.publish(assets_subject, payload.into(), sandbox_id)
.await?;
self.nats.publish(assets_subject, payload.into()).await?;
}
}
}
Expand All @@ -228,7 +222,7 @@ impl Publisher {
kind: tx_kind.to_string(),
};
self.nats
.publish(transactions_subject, payload.into(), sandbox_id)
.publish(transactions_subject, payload.into())
.await?;
}

Expand Down Expand Up @@ -260,9 +254,7 @@ impl Publisher {
contract_id: contract_id.clone(),
topic_or_kind: receipt_kind.to_string(),
};
self.nats
.publish(receipt1_subject, payload.into(), sandbox_id)
.await?;
self.nats.publish(receipt1_subject, payload.into()).await?;

// Publish LogData topics, if any.
if let Receipt::LogData {
Expand Down Expand Up @@ -295,9 +287,7 @@ impl Publisher {
contract_id,
topic_or_kind: topics.to_string(),
};
self.nats
.publish(receipt1_subject, payload.into(), sandbox_id)
.await?;
self.nats.publish(receipt1_subject, payload.into()).await?;
}
}
}
Expand Down Expand Up @@ -329,16 +319,15 @@ mod tests {
let (_, blocks_subscription) =
broadcast::channel::<Arc<dyn Deref<Target = ImportResult> + Send + Sync>>(1);

let sandbox_id = nats::random_sandbox_id();
let connection_id = nats::tests::get_random_connection_id();
let publisher = Publisher {
base_asset_id: AssetId::default(),
chain_id: ChainId::default(),
fuel_core_database: CombinedDatabase::default(),
blocks_subscription,
nats: nats::tests::get_nats_connection(&sandbox_id).await,
nats: nats::tests::get_nats_connection(&connection_id).await,
};

let publisher = publisher.run(&Some(sandbox_id)).await.unwrap();
let publisher = publisher.run().await.unwrap();

assert!(publisher.nats.has_no_message().await);
}
Expand All @@ -354,23 +343,22 @@ mod tests {
let _ = blocks_subscriber.clone();
drop(blocks_subscriber);

let sandbox_id = nats::random_sandbox_id();
let connection_id = nats::tests::get_random_connection_id();
let publisher = Publisher {
base_asset_id: AssetId::default(),
chain_id: ChainId::default(),
fuel_core_database: CombinedDatabase::default(),
blocks_subscription,
nats: nats::tests::get_nats_connection(&sandbox_id).await,
nats: nats::tests::get_nats_connection(&connection_id).await,
};

let sandbox_id = &Some(sandbox_id);
let publisher = publisher.run(sandbox_id).await.unwrap();
let publisher = publisher.run().await.unwrap();

assert!(publisher
.nats
.jetstream_messages
.get_last_raw_message_by_subject(
&SubjectName::Blocks.to_subject_string(sandbox_id)
&SubjectName::Blocks.get_string(&connection_id)
)
.await
.is_ok_and(|raw_message| raw_message.sequence == 1));
Expand All @@ -387,16 +375,15 @@ mod tests {
let _ = blocks_subscriber.clone();
drop(blocks_subscriber);

let sandbox_id = nats::random_sandbox_id();
let connection_id = nats::tests::get_random_connection_id();
let publisher = Publisher {
base_asset_id: AssetId::default(),
chain_id: ChainId::default(),
fuel_core_database: CombinedDatabase::default(),
blocks_subscription,
nats: nats::tests::get_nats_connection(&sandbox_id).await,
nats: nats::tests::get_nats_connection(&connection_id).await,
};
let sandbox_id = &Some(sandbox_id);
let publisher = publisher.run(sandbox_id).await.unwrap();
let publisher = publisher.run().await.unwrap();

let non_block_subjects_count = nats::SubjectName::iter().len() - 1;

Expand All @@ -415,7 +402,7 @@ mod tests {
.nats
.jetstream_messages
.get_last_raw_message_by_subject(
&SubjectName::Blocks.to_subject_string(sandbox_id)
&SubjectName::Blocks.get_string(&connection_id)
)
.await
.is_ok_and(|raw_message| raw_message.sequence == 1));
Expand All @@ -442,17 +429,16 @@ mod tests {
let _ = blocks_subscriber.clone();
drop(blocks_subscriber);

let sandbox_id = nats::random_sandbox_id();
let connection_id = nats::tests::get_random_connection_id();
let publisher = Publisher {
base_asset_id: AssetId::default(),
chain_id: ChainId::default(),
fuel_core_database: CombinedDatabase::default(),
blocks_subscription,
nats: nats::tests::get_nats_connection(&sandbox_id).await,
nats: nats::tests::get_nats_connection(&connection_id).await,
};

let sandbox_id = &Some(sandbox_id);
let publisher = publisher.run(sandbox_id).await.unwrap();
let publisher = publisher.run().await.unwrap();

for subject in [
SubjectName::Transactions,
Expand All @@ -462,7 +448,7 @@ mod tests {
assert!(publisher
.nats
.jetstream_messages
.get_last_raw_message_by_subject(&subject.to_subject_string(sandbox_id))
.get_last_raw_message_by_subject(&subject.get_string(&connection_id))
.await
.is_ok());
}
Expand Down Expand Up @@ -493,23 +479,22 @@ mod tests {
let _ = blocks_subscriber.clone();
drop(blocks_subscriber);

let sandbox_id = nats::random_sandbox_id();
let connection_id = nats::tests::get_random_connection_id();
let publisher = Publisher {
base_asset_id: AssetId::default(),
chain_id: ChainId::default(),
fuel_core_database: CombinedDatabase::default(),
blocks_subscription,
nats: nats::tests::get_nats_connection(&sandbox_id).await,
nats: nats::tests::get_nats_connection(&connection_id).await,
};

let sandbox_id = &Some(sandbox_id);
let publisher = publisher.run(sandbox_id).await.unwrap();
let publisher = publisher.run().await.unwrap();

assert!(publisher
.nats
.jetstream_messages
.get_last_raw_message_by_subject(
&SubjectName::Receipts1.to_subject_string(sandbox_id)
&SubjectName::Receipts1.get_string(&connection_id)
)
.await
.is_ok());
Expand Down Expand Up @@ -540,23 +525,22 @@ mod tests {
let _ = blocks_subscriber.clone();
drop(blocks_subscriber);

let sandbox_id = nats::random_sandbox_id();
let connection_id = nats::tests::get_random_connection_id();
let publisher = Publisher {
base_asset_id: AssetId::default(),
chain_id: ChainId::default(),
fuel_core_database: CombinedDatabase::default(),
blocks_subscription,
nats: nats::tests::get_nats_connection(&sandbox_id).await,
nats: nats::tests::get_nats_connection(&connection_id).await,
};

let sandbox_id = &Some(sandbox_id);
let publisher = publisher.run(sandbox_id).await.unwrap();
let publisher = publisher.run().await.unwrap();

assert!(publisher
.nats
.jetstream_messages
.get_last_raw_message_by_subject(
&SubjectName::Receipts1.to_subject_string(sandbox_id)
&SubjectName::Receipts1.get_string(&connection_id)
)
.await
.is_ok());
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core-nats/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() -> anyhow::Result<()> {
subscription,
)
.await?;
publisher.run(&None).await?;
publisher.run().await?;

Ok(())
}
Loading

0 comments on commit 0d872ab

Please sign in to comment.