-
Notifications
You must be signed in to change notification settings - Fork 187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(chain-listener): initial refactoring + tests #2232
Conversation
@@ -85,6 +85,7 @@ jobs: | |||
uses: fluencelabs/cli/.github/workflows/tests.yml@main | |||
with: | |||
nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}" | |||
ref: "timeout" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget to revert it after merge
897f5e7
to
fe9caea
Compare
config.network_id, | ||
config.wallet_key, | ||
config.default_base_fee, | ||
config.default_base_fee, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config.default_base_fee, | |
config.default_priority_fee, |
let ccp_client = if let Some(ccp_endpoint) = listener_config.ccp_endpoint.clone() { | ||
let ccp_client: Option<Arc<dyn CCPClient>> = if let Some(ccp_endpoint) = | ||
listener_config.ccp_endpoint.clone() | ||
{ | ||
let ccp_client = CCPRpcHttpClient::new(ccp_endpoint.clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method could receive AsRef<str>
instead, it'll allow not to clone the ccp_endpoint
string several times.
@@ -123,28 +125,40 @@ async fn setup_listener( | |||
config.chain_config.clone(), | |||
config.chain_listener_config.clone(), | |||
) { | |||
let ccp_client = if let Some(ccp_endpoint) = listener_config.ccp_endpoint.clone() { | |||
let ccp_client: Option<Arc<dyn CCPClient>> = if let Some(ccp_endpoint) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like listener_config.ccp_endpoint.map(|ccp_endpoint| ....)
would be more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need typing in ccp_client
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
afaik clippy advices to use if let Some
in these cases
Some(ccp_client) | ||
} else { | ||
None | ||
}; | ||
|
||
let ws_client = ChainListener::create_ws_client(&listener_config.ws_endpoint).await?; | ||
let cc_events_dir = config.dir_config.cc_events_dir.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It better to move this definition close to usage of this variable or even remove it and use this statement directly in the ChainListener::new
.
let ws_client = self.ws_client.read().await; | ||
ws_client.subscribe("eth_subscribe", params.clone(), "eth_unsubscribe") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that this is formatted?
if let RestartNeeded(_) = err { | ||
tracing::error!(target: "chain-listener", "Failed to subscribe to {method}: {err};"); | ||
Permanent(err) | ||
} else { | ||
tracing::warn!(target: "chain-listener", "Failed to subscribe to {method}: {err}; Retrying..."); | ||
backoff::Error::transient(err) | ||
}}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The match
block will be more readable.
@@ -72,9 +71,47 @@ pub trait ChainConnector: Send + Sync { | |||
) -> Result<Vec<Result<Value, ConnectorError>>, ConnectorError>; | |||
} | |||
|
|||
#[derive(Clone)] | |||
pub struct HttpChainConnectorConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still not sure that we need such a structure.
@@ -773,7 +674,7 @@ impl ChainListener { | |||
/// Unit goes to Deal | |||
async fn process_unit_deactivated( | |||
&mut self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the acquire_cores_for_cc
function you can make core more readable by decoupling assignments from match to make it smth like
let assignment = match cores {
Ok(assignment) => assignment,
Err(AcquireError::NotFoundAvailableCores {
required,
available,
..
}) => {
tracing::warn!(target: "chain-listener", "Found {required} CUs in the Capacity Commitment, but Nox has only {available} Cores available for CC");
let assign_units = units.iter().take(available).cloned().collect();
let assignment = self.core_manager.acquire_worker_core(AcquireRequest::new(
assign_units,
WorkType::CapacityCommitment,
))?;
assignment
};
let priority_units = filter(&cu_groups.priority_units, &assignment);
let non_priority_units = filter(&cu_groups.non_priority_units, &assignment);
let pending_units = filter(&cu_groups.pending_units, &assignment);
let finished_units = filter(&cu_groups.finished_units, &assignment);
Ok(PhysicalCoreGroups {
priority_cores: priority_units,
non_priority_cores: non_priority_units,
pending_cores: pending_units,
finished_cores: finished_units,
})
.name("ChainListener") | ||
.spawn(async move { | ||
.spawn( | ||
async move { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
excess empty line here
use std::collections::HashMap; | ||
|
||
#[async_trait] | ||
pub trait CCPClient: Sync + Send { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is the same trait with name NoxCCPApi
inside CCP intended to use in Nox.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And nobody uses it) and CCPRpcHttpClient also doesn't)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can avoid using this upper-case mess? Like, CcpClient
instead of CCPC...
?
impl CoreRange { | ||
pub fn is_subset(&self, cores: &NonEmpty<PhysicalCoreId>) -> bool { | ||
let range: RangeSetBlaze<usize> = | ||
RangeSetBlaze::from_iter(cores.into_iter().map(|core| <usize>::from(core.clone()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [clippy] reported by reviewdog 🐶
error: using `clone` on type `PhysicalCoreId` which implements the `Copy` trait
--> crates/core-manager/src/core_range.rs:15:81
|
15 | RangeSetBlaze::from_iter(cores.into_iter().map(|core| ::from(core.clone())));
| ^^^^^^^^^^^^ help: try dereferencing it: `*core`
|
= help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#clone_on_copy
= note: `-D clippy::clone-on-copy` implied by `-D warnings`
= help: to override `-D warnings` add `#[allow(clippy::clone_on_copy)]`
@@ -12,7 +12,16 @@ fn truncate(s: &str) -> &str { | |||
pub fn builtin_log_fn(service: &str, args: &str, elapsed: FormattedDuration, particle_id: String) { | |||
let args = truncate(args); | |||
match service { | |||
"array" | "cmp" | "debug" | "math" | "op" | "getDataSrv" | "run-console" | "json" => { | |||
"run-console" => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure you want to leave it INFO
?
self.subscribe("logs", self.unit_matched_params()).await | ||
} | ||
|
||
async fn refresh(&self) -> Result<(), Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is unused
use std::collections::HashMap; | ||
|
||
#[async_trait] | ||
pub trait CCPClient: Sync + Send { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can avoid using this upper-case mess? Like, CcpClient
instead of CCPC...
?
Description
Added chain listener test for cc activation flow.
Additional Notes
Checklist
Reviewer Checklist