Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 32 additions & 45 deletions portalnet/src/overlay/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,28 +644,16 @@ where
content,
nodes_to_poke,
} => {
let validator = self.validator.clone();
let store = self.store.clone();
let kbuckets = self.kbuckets.clone();
let command_tx = self.command_tx.clone();
let metrics = self.metrics.clone();
let utp = self.utp_controller.clone();
let disable_poke = self.disable_poke;
let utp_processing = UtpProcessing::from(&*self);
tokio::spawn(async move {
Self::process_received_content(
kbuckets,
command_tx,
validator,
store,
content.clone(),
false,
content_key,
callback,
query_info.trace,
nodes_to_poke,
metrics,
disable_poke,
utp,
utp_processing,
)
.await;
});
Expand All @@ -691,21 +679,19 @@ where
return;
}
};
let validator = self.validator.clone();
let store = self.store.clone();
let kbuckets = self.kbuckets.clone();
let command_tx = self.command_tx.clone();
let disable_poke = self.disable_poke;
let metrics = self.metrics.clone();
let utp_controller = self.utp_controller.clone();
let utp_processing = UtpProcessing::from(&*self);
tokio::spawn(async move {
let trace = query_info.trace;
let cid = utp_rs::cid::ConnectionId {
recv: connection_id,
send: connection_id.wrapping_add(1),
peer: UtpEnr(source),
};
let data = match utp_controller.connect_inbound_stream(cid).await {
let data = match utp_processing
.utp_controller
.connect_inbound_stream(cid)
.await
{
Ok(data) => data,
Err(e) => {
if let Some(responder) = callback {
Expand All @@ -723,19 +709,13 @@ where
}
};
Self::process_received_content(
kbuckets,
command_tx,
validator,
store,
data,
true,
content_key,
callback,
trace,
nodes_to_poke,
metrics,
disable_poke,
utp_controller,
utp_processing,
)
.await;
});
Expand Down Expand Up @@ -1843,31 +1823,30 @@ where
// requests to this/other overlay services.
#[allow(clippy::too_many_arguments)]
async fn process_received_content(
kbuckets: Arc<RwLock<KBucketsTable<NodeId, Node>>>,
command_tx: UnboundedSender<OverlayCommand<TContentKey>>,
validator: Arc<TValidator>,
store: Arc<RwLock<TStore>>,
content: Vec<u8>,
utp_transfer: bool,
content_key: TContentKey,
responder: Option<oneshot::Sender<RecursiveFindContentResult>>,
trace: Option<QueryTrace>,
nodes_to_poke: Vec<NodeId>,
metrics: OverlayMetricsReporter,
disable_poke: bool,
utp_controller: Arc<UtpController>,
utp_processing: UtpProcessing<TValidator, TStore, TContentKey>,
) {
let mut content = content;
// Operate under assumption that all content in the store is valid
let local_value = store.read().get(&content_key);
let local_value = utp_processing.store.read().get(&content_key);
if let Ok(Some(val)) = local_value {
// todo validate & replace content value if different & punish bad peer
warn!("Stored content doesn't match expected value");
content = val;
} else {
let content_id = content_key.content_id();
let validation_result = validator.validate_content(&content_key, &content).await;
metrics.report_validation(validation_result.is_ok());
let validation_result = utp_processing
.validator
.validate_content(&content_key, &content)
.await;
utp_processing
.metrics
.report_validation(validation_result.is_ok());

let validation_result = match validation_result {
Ok(validation_result) => validation_result,
Expand All @@ -1894,7 +1873,8 @@ where
// skip storing if content is not valid for storing, the content
// is already stored or if there's an error reading the store
let should_store = validation_result.valid_for_storing
&& store
&& utp_processing
.store
.read()
.is_key_within_radius_and_unavailable(&content_key)
.map_or_else(
Expand All @@ -1905,7 +1885,11 @@ where
|val| matches!(val, ShouldWeStoreContent::Store),
);
if should_store {
if let Err(err) = store.write().put(content_key.clone(), content.clone()) {
if let Err(err) = utp_processing
.store
.write()
.put(content_key.clone(), content.clone())
{
error!(
error = %err,
content.id = %hex_encode_compact(content_id),
Expand All @@ -1919,14 +1903,14 @@ where
let _ = responder.send(Ok((content.clone(), utp_transfer, trace)));
}

if !disable_poke {
if !utp_processing.disable_poke {
Self::poke_content(
kbuckets,
command_tx,
utp_processing.kbuckets,
utp_processing.command_tx,
content_key,
content,
nodes_to_poke,
utp_controller,
utp_processing.utp_controller,
);
}
}
Expand Down Expand Up @@ -2600,6 +2584,7 @@ where
command_tx: UnboundedSender<OverlayCommand<TContentKey>>,
utp_controller: Arc<UtpController>,
accept_queue: Arc<RwLock<AcceptQueue<TContentKey>>>,
disable_poke: bool,
}

impl<TContentKey, TMetric, TValidator, TStore>
Expand All @@ -2619,6 +2604,7 @@ where
command_tx: service.command_tx.clone(),
utp_controller: Arc::clone(&service.utp_controller),
accept_queue: Arc::clone(&service.accept_queue),
disable_poke: service.disable_poke,
}
}
}
Expand All @@ -2638,6 +2624,7 @@ where
command_tx: self.command_tx.clone(),
utp_controller: Arc::clone(&self.utp_controller),
accept_queue: Arc::clone(&self.accept_queue),
disable_poke: self.disable_poke,
}
}
}
Expand Down