Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/base/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl<T: Into<BasicValue>> From<Vec<T>> for BasicValue {
}

impl BasicValue {
pub fn to_key(self) -> Result<KeyValue> {
pub fn into_key(self) -> Result<KeyValue> {
let result = match self {
BasicValue::Bytes(v) => KeyValue::Bytes(v),
BasicValue::Str(v) => KeyValue::Str(v),
Expand Down Expand Up @@ -473,13 +473,13 @@ impl<VS> Value<VS> {
matches!(self, Value::Null)
}

pub fn to_key(self) -> Result<KeyValue> {
pub fn into_key(self) -> Result<KeyValue> {
let result = match self {
Value::Basic(v) => v.to_key()?,
Value::Basic(v) => v.into_key()?,
Value::Struct(v) => KeyValue::Struct(
v.fields
.into_iter()
.map(|v| v.to_key())
.map(|v| v.into_key())
.collect::<Result<Vec<_>>>()?,
),
Value::Null | Value::Collection(_) | Value::Table(_) | Value::List(_) => {
Expand Down Expand Up @@ -661,7 +661,7 @@ where
})
}

pub fn from_json<'a>(value: serde_json::Value, fields_schema: &[FieldSchema]) -> Result<Self> {
pub fn from_json(value: serde_json::Value, fields_schema: &[FieldSchema]) -> Result<Self> {
match value {
serde_json::Value::Array(v) => {
if v.len() != fields_schema.len() {
Expand Down Expand Up @@ -821,7 +821,7 @@ where
})?,
&key_field.value_type.typ,
)?
.to_key()?;
.into_key()?;
let values = FieldValues::from_json_values(
fields_iter.zip(field_vals_iter),
)?;
Expand All @@ -839,7 +839,7 @@ where
)?),
&key_field.value_type.typ,
)?
.to_key()?;
.into_key()?;
let values = FieldValues::from_json_object(v, fields_iter)?;
Ok((key, values.into()))
}
Expand Down
2 changes: 1 addition & 1 deletion src/builder/analyzer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, HashSet};
use std::sync::Mutex;
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, u32};
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};

use super::plan::*;
use crate::execution::db_tracking_setup;
Expand Down
15 changes: 12 additions & 3 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,11 @@ impl FlowBuilder {
flow_ctx: &self.flow_inst_context,
};
let mut root_data_scope = self.root_data_scope.lock().unwrap();
let _ = analyzer_ctx

let analyzed = analyzer_ctx
.analyze_source_op(&mut root_data_scope, source_op.clone(), None, None)
.into_py_result()?;
std::mem::drop(analyzed);

let result =
Self::last_field_to_data_slice(&root_data_scope, self.root_data_scope_ref.clone())
Expand Down Expand Up @@ -498,7 +500,10 @@ impl FlowBuilder {
op: spec,
}),
};
let _ = analyzer_ctx.analyze_reactive_op(scope, &reactive_op, parent_scopes)?;

let analyzed =
analyzer_ctx.analyze_reactive_op(scope, &reactive_op, parent_scopes)?;
std::mem::drop(analyzed);

reactive_ops.push(reactive_op);
let result = Self::last_field_to_data_slice(scope.data, common_scope.clone())
Expand Down Expand Up @@ -537,7 +542,11 @@ impl FlowBuilder {
collector_name: collector.name.clone(),
}),
};
let _ = analyzer_ctx.analyze_reactive_op(scope, &reactive_op, parent_scopes)?;

let analyzed =
analyzer_ctx.analyze_reactive_op(scope, &reactive_op, parent_scopes)?;
std::mem::drop(analyzed);

reactive_ops.push(reactive_op);
Ok(())
},
Expand Down
4 changes: 2 additions & 2 deletions src/execution/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ async fn evaluate_op_scope(
.fingerprinter
.clone()
.with(&input_values)?
.to_fingerprint();
.into_fingerprint();
Some(cache.get(
key,
&op.function_exec_info.output_type,
Expand Down Expand Up @@ -426,7 +426,7 @@ async fn evaluate_op_scope(
Ok(())
}

pub async fn evaluate_source_entry<'a>(
pub async fn evaluate_source_entry(
plan: &ExecutionPlan,
source_op_idx: usize,
schema: &schema::DataSchema,
Expand Down
4 changes: 2 additions & 2 deletions src/execution/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ async fn precommit_source_tracking_info(
let curr_fp = Some(
Fingerprinter::default()
.with(&field_values)?
.to_fingerprint(),
.into_fingerprint(),
);

let existing_target_keys = target_info.existing_keys_info.remove(&primary_key_json);
Expand Down Expand Up @@ -437,7 +437,7 @@ pub async fn evaluation_cache_on_existing_data(
))
}

pub async fn update_source_entry<'a>(
pub async fn update_source_entry(
plan: &ExecutionPlan,
source_op_idx: usize,
schema: &schema::DataSchema,
Expand Down
6 changes: 3 additions & 3 deletions src/execution/memoization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ impl EvaluationCache {
}
}

pub async fn evaluate_with_cell<'a, Fut>(
cell: Option<&'a CacheEntryCell>,
pub async fn evaluate_with_cell<Fut>(
cell: Option<&CacheEntryCell>,
compute: impl FnOnce() -> Fut,
) -> Result<Cow<'a, value::Value>>
) -> Result<Cow<'_, value::Value>>
where
Fut: Future<Output = Result<value::Value>>,
{
Expand Down
12 changes: 6 additions & 6 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub struct ResolvedOpArg {

pub trait ResolvedOpArgExt: Sized {
fn expect_type(self, expected_type: &ValueType) -> Result<Self>;
fn value<'a>(&self, args: &'a Vec<value::Value>) -> Result<&'a value::Value>;
fn take_value(&self, args: &mut Vec<value::Value>) -> Result<value::Value>;
fn value<'a>(&self, args: &'a [value::Value]) -> Result<&'a value::Value>;
fn take_value(&self, args: &mut [value::Value]) -> Result<value::Value>;
}

impl ResolvedOpArgExt for ResolvedOpArg {
Expand All @@ -43,7 +43,7 @@ impl ResolvedOpArgExt for ResolvedOpArg {
Ok(self)
}

fn value<'a>(&self, args: &'a Vec<value::Value>) -> Result<&'a value::Value> {
fn value<'a>(&self, args: &'a [value::Value]) -> Result<&'a value::Value> {
if self.idx >= args.len() {
api_bail!(
"Two few arguments, {} provided, expected at least {} for `{}`",
Expand All @@ -55,7 +55,7 @@ impl ResolvedOpArgExt for ResolvedOpArg {
Ok(&args[self.idx])
}

fn take_value(&self, args: &mut Vec<value::Value>) -> Result<value::Value> {
fn take_value(&self, args: &mut [value::Value]) -> Result<value::Value> {
if self.idx >= args.len() {
api_bail!(
"Two few arguments, {} provided, expected at least {} for `{}`",
Expand All @@ -73,15 +73,15 @@ impl ResolvedOpArgExt for Option<ResolvedOpArg> {
self.map(|arg| arg.expect_type(expected_type)).transpose()
}

fn value<'a>(&self, args: &'a Vec<value::Value>) -> Result<&'a value::Value> {
fn value<'a>(&self, args: &'a [value::Value]) -> Result<&'a value::Value> {
Ok(self
.as_ref()
.map(|arg| arg.value(args))
.transpose()?
.unwrap_or(&value::Value::Null))
}

fn take_value(&self, args: &mut Vec<value::Value>) -> Result<value::Value> {
fn take_value(&self, args: &mut [value::Value]) -> Result<value::Value> {
Ok(self
.as_ref()
.map(|arg| arg.take_value(args))
Expand Down
14 changes: 6 additions & 8 deletions src/ops/storages/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ fn key_value_fields_iter<'a>(

fn convertible_to_pgvector(vec_schema: &VectorTypeSchema) -> bool {
if vec_schema.dimension.is_some() {
match &*vec_schema.element_type {
BasicValueType::Float32 => true,
BasicValueType::Float64 => true,
BasicValueType::Int64 => true,
_ => false,
}
matches!(
*vec_schema.element_type,
BasicValueType::Float32 | BasicValueType::Float64 | BasicValueType::Int64
)
} else {
false
}
Expand Down Expand Up @@ -468,8 +466,8 @@ pub struct SetupState {
impl SetupState {
fn new(
table_id: &TableId,
key_fields_schema: &Vec<FieldSchema>,
value_fields_schema: &Vec<FieldSchema>,
key_fields_schema: &[FieldSchema],
value_fields_schema: &[FieldSchema],
index_options: &IndexOptions,
) -> Self {
Self {
Expand Down
2 changes: 1 addition & 1 deletion src/py/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub fn value_from_py_object<'py>(
.into_iter()
.map(|v| {
let mut iter = v.fields.into_iter();
let key = iter.next().unwrap().to_key().into_py_result()?;
let key = iter.next().unwrap().into_key().into_py_result()?;
Ok((
key,
value::ScopeValue(value::FieldValues {
Expand Down
27 changes: 13 additions & 14 deletions src/setup/states.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/// Concepts:
/// - Resource: some setup that needs to be tracked and maintained.
/// - Setup State: current state of a resource.
/// - Staging Change: states changes that may not be really applied yet.
/// - Combined Setup State: Setup State + Staging Change.
/// - Status Check: information about changes that are being applied / need to be applied.
///
/// Resource hierarchy:
/// - [resource: setup metadata table] /// - Flow
/// - [resource: metadata]
/// - [resource: tracking table]
/// - Target
/// - [resource: target-specific stuff]
use anyhow::Result;
use axum::async_trait;
use indenter::indented;
Expand All @@ -15,20 +28,6 @@ use crate::execution::db_tracking_setup;

const INDENT: &str = " ";

/// Concepts:
/// - Resource: some setup that needs to be tracked and maintained.
/// - Setup State: current state of a resource.
/// - Staging Change: states changes that may not be really applied yet.
/// - Combined Setup State: Setup State + Staging Change.
/// - Status Check: information about changes that are being applied / need to be applied.
///
/// Resource hierarchy:
/// - [resource: setup metadata table] /// - Flow
/// - [resource: metadata]
/// - [resource: tracking table]
/// - Target
/// - [resource: target-specific stuff]

pub trait StateMode: Clone + Copy {
type State<T: Debug + Clone>: Debug + Clone;
type DefaultState<T: Debug + Clone + Default>: Debug + Clone + Default;
Expand Down
4 changes: 2 additions & 2 deletions src/utils/fingerprint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl serde::ser::Error for FingerprinterError {
pub struct Fingerprint([u8; 16]);

impl Fingerprint {
pub fn to_base64(&self) -> String {
pub fn to_base64(self) -> String {
BASE64_STANDARD.encode(self.0)
}

Expand Down Expand Up @@ -77,7 +77,7 @@ pub struct Fingerprinter {
}

impl Fingerprinter {
pub fn to_fingerprint(self) -> Fingerprint {
pub fn into_fingerprint(self) -> Fingerprint {
Fingerprint(self.hasher.finalize().into())
}

Expand Down