Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ class _SourceRefreshOptions:
refresh_interval: datetime.timedelta | None = None


@dataclass
class _ExecutionOptions:
max_inflight_count: int | None = None


class FlowBuilder:
"""
A flow builder is used to build a flow.
Expand All @@ -439,6 +444,7 @@ def add_source(
*,
name: str | None = None,
refresh_interval: datetime.timedelta | None = None,
max_inflight_count: int | None = None,
) -> DataSlice[T]:
"""
Import a source to the flow.
Expand All @@ -454,9 +460,12 @@ def add_source(
self._state.field_name_builder.build_name(
name, prefix=_to_snake_case(_spec_kind(spec)) + "_"
),
dump_engine_object(
refresh_options=dump_engine_object(
_SourceRefreshOptions(refresh_interval=refresh_interval)
),
execution_options=dump_engine_object(
_ExecutionOptions(max_inflight_count=max_inflight_count)
),
),
name,
)
Expand Down
8 changes: 8 additions & 0 deletions src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ impl SpecFormatter for OpSpec {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ExecutionOptions {
pub max_inflight_count: Option<u32>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct SourceRefreshOptions {
pub refresh_interval: Option<std::time::Duration>,
Expand All @@ -274,6 +279,9 @@ pub struct ImportOpSpec {

#[serde(default)]
pub refresh_options: SourceRefreshOptions,

#[serde(default)]
pub execution_options: ExecutionOptions,
}

impl SpecFormatter for ImportOpSpec {
Expand Down
3 changes: 3 additions & 0 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,9 @@ impl AnalyzerContext {
primary_key_type,
name: op_name,
refresh_options: import_op.spec.refresh_options,
concurrency_controller: utils::ConcurrencyController::new(
import_op.spec.execution_options.max_inflight_count,
),
})
};
Ok(result_fut)
Expand Down
6 changes: 5 additions & 1 deletion src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl FlowBuilder {
OpScopeRef(self.root_op_scope.clone())
}

#[pyo3(signature = (kind, op_spec, target_scope, name, refresh_options=None))]
#[pyo3(signature = (kind, op_spec, target_scope, name, refresh_options=None, execution_options=None))]
pub fn add_source(
&mut self,
py: Python<'_>,
Expand All @@ -297,6 +297,7 @@ impl FlowBuilder {
target_scope: Option<OpScopeRef>,
name: String,
refresh_options: Option<py::Pythonized<spec::SourceRefreshOptions>>,
execution_options: Option<py::Pythonized<spec::ExecutionOptions>>,
) -> PyResult<DataSlice> {
if let Some(target_scope) = target_scope {
if *target_scope != self.root_op_scope {
Expand All @@ -313,6 +314,9 @@ impl FlowBuilder {
spec: op_spec.into_inner(),
},
refresh_options: refresh_options.map(|o| o.into_inner()).unwrap_or_default(),
execution_options: execution_options
.map(|o| o.into_inner())
.unwrap_or_default(),
},
};
let analyzer_ctx = AnalyzerContext {
Expand Down
1 change: 1 addition & 0 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct AnalyzedImportOp {
pub output: AnalyzedOpOutput,
pub primary_key_type: schema::ValueType,
pub refresh_options: spec::SourceRefreshOptions,
pub concurrency_controller: utils::ConcurrencyController,
}

pub struct AnalyzedFunctionExecInfo {
Expand Down
1 change: 1 addition & 0 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl SourceIndexingContext {
state.scan_generation
};
while let Some(row) = rows_stream.next().await {
let _ = import_op.concurrency_controller.acquire().await?;
for row in row? {
self.process_source_key_if_newer(
row.key,
Expand Down
30 changes: 30 additions & 0 deletions src/utils/concur_control.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::prelude::*;

use tokio::sync::{Semaphore, SemaphorePermit};

pub struct ConcurrencyController {
inflight_count_sem: Option<Semaphore>,
}

pub struct ConcurrencyControllerPermit<'a> {
_inflight_count_permit: Option<SemaphorePermit<'a>>,
}

impl ConcurrencyController {
pub fn new(max_inflight_count: Option<u32>) -> Self {
Self {
inflight_count_sem: max_inflight_count.map(|max| Semaphore::new(max as usize)),
}
}

pub async fn acquire<'a>(&'a self) -> Result<ConcurrencyControllerPermit<'a>> {
let inflight_count_permit = if let Some(sem) = &self.inflight_count_sem {
Some(sem.acquire().await?)
} else {
None
};
Ok(ConcurrencyControllerPermit {
_inflight_count_permit: inflight_count_permit,
})
}
}
3 changes: 3 additions & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ pub mod fingerprint;
pub mod immutable;
pub mod retryable;
pub mod yaml_ser;

mod concur_control;
pub use concur_control::ConcurrencyController;
Loading