Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async fn update_source(
futs.push(
async move {
let mut change_stream = change_stream;
let retry_options = retriable::RetryOptions {
let retry_options = retryable::RetryOptions {
max_retries: None,
initial_backoff: std::time::Duration::from_secs(5),
max_backoff: std::time::Duration::from_secs(60),
Expand All @@ -134,14 +134,14 @@ async fn update_source(
// Workaround as AsyncFnMut isn't mature yet.
// Should be changed to use AsyncFnMut once it is.
let change_stream = tokio::sync::Mutex::new(&mut change_stream);
let change_msg = retriable::run(
let change_msg = retryable::run(
|| async {
let mut change_stream = change_stream.lock().await;
change_stream
.next()
.await
.transpose()
.map_err(retriable::Error::always_retryable)
.map_err(retryable::Error::always_retryable)
},
&retry_options,
)
Expand Down
8 changes: 4 additions & 4 deletions src/ops/storages/neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl GraphElement {
}
}

impl retriable::IsRetryable for neo4rs::Error {
impl retryable::IsRetryable for neo4rs::Error {
fn is_retryable(&self) -> bool {
match self {
neo4rs::Error::ConnectionError => true,
Expand Down Expand Up @@ -1289,11 +1289,11 @@ impl StorageFactoryBase for Factory {
.or_insert_with(Vec::new)
.push(mut_with_ctx);
}
let retry_options = retriable::RetryOptions::default();
let retry_options = retryable::RetryOptions::default();
for muts in muts_by_graph.values_mut() {
muts.sort_by_key(|m| m.export_context.create_order);
let graph = &muts[0].export_context.graph;
retriable::run(
retryable::run(
async || {
let mut queries = vec![];
for mut_with_ctx in muts.iter() {
Expand All @@ -1311,7 +1311,7 @@ impl StorageFactoryBase for Factory {
let mut txn = graph.start_txn().await?;
txn.run_queries(queries).await?;
txn.commit().await?;
retriable::Ok(())
retryable::Ok(())
},
&retry_options,
)
Expand Down
2 changes: 1 addition & 1 deletion src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) use crate::lib_context::{get_lib_context, get_runtime, FlowContext, L
pub(crate) use crate::ops::interface;
pub(crate) use crate::service::error::ApiError;
pub(crate) use crate::setup::AuthRegistry;
pub(crate) use crate::utils::retriable;
pub(crate) use crate::utils::retryable;

pub(crate) use crate::{api_bail, api_error};

Expand Down
2 changes: 1 addition & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod db;
pub mod fingerprint;
pub mod immutable;
pub mod retriable;
pub mod retryable;
pub mod yaml_ser;
File renamed without changes.