Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/cocoindex/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _dump_type(t, metadata):
}
elif dataclasses.is_dataclass(elem_type):
encoded_type = {
'kind': 'Table',
'kind': 'List',
'row': { 'fields': _dump_fields_schema(elem_type) },
}
else:
Expand Down
100 changes: 17 additions & 83 deletions src/base/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ use crate::builder::plan::AnalyzedValueMapping;
use super::spec::*;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
ops::Deref,
sync::{Arc, LazyLock},
};
use std::{collections::BTreeMap, ops::Deref, sync::Arc};

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct VectorTypeSchema {
Expand Down Expand Up @@ -127,21 +123,20 @@ pub struct CollectionSchema {
impl CollectionSchema {
pub fn has_key(&self) -> bool {
match self.kind {
CollectionKind::Collection => false,
CollectionKind::Table | CollectionKind::List => true,
CollectionKind::Table => true,
CollectionKind::Collection | CollectionKind::List => false,
}
}

pub fn key_type(&self) -> Option<&EnrichedValueType> {
match self.kind {
CollectionKind::Collection => None,
CollectionKind::Table => self
.row
.fields
.first()
.as_ref()
.map(|field| &field.value_type),
CollectionKind::List => Some(&LIST_INDEX_FIELD.value_type),
CollectionKind::Collection | CollectionKind::List => None,
}
}

Expand Down Expand Up @@ -172,89 +167,21 @@ impl std::fmt::Display for CollectionSchema {
}
}

pub const KEY_FIELD_NAME: &'static str = "__key";
pub const VALUE_FIELD_NAME: &'static str = "__value";
pub const LIST_INDEX_FIELD_NAME: &'static str = "__index";

pub static LIST_INDEX_FIELD: LazyLock<FieldSchema> = LazyLock::new(|| FieldSchema {
name: LIST_INDEX_FIELD_NAME.to_string(),
value_type: EnrichedValueType {
typ: ValueType::Basic(BasicValueType::Int64),
nullable: false,
attrs: Default::default(),
},
});

impl CollectionSchema {
pub fn new_collection(value_name: Option<String>, value: EnrichedValueType) -> Self {
Self {
kind: CollectionKind::Collection,
row: StructSchema {
fields: Arc::new(vec![FieldSchema {
name: value_name.unwrap_or_else(|| VALUE_FIELD_NAME.to_string()),
value_type: value,
}]),
},
collectors: Default::default(),
}
}

pub fn new_table(
key_name: Option<String>,
key: EnrichedValueType,
value_name: Option<String>,
value: EnrichedValueType,
) -> Self {
pub fn new(kind: CollectionKind, fields: Vec<FieldSchema>) -> Self {
Self {
kind: CollectionKind::Table,
kind,
row: StructSchema {
fields: Arc::new(vec![
FieldSchema {
name: key_name.unwrap_or_else(|| KEY_FIELD_NAME.to_string()),
value_type: key,
},
FieldSchema {
name: value_name.unwrap_or_else(|| VALUE_FIELD_NAME.to_string()),
value_type: value,
},
]),
fields: Arc::new(fields),
},
collectors: Default::default(),
}
}

pub fn new_list(value_name: Option<String>, value: EnrichedValueType) -> Self {
Self {
kind: CollectionKind::List,
row: StructSchema {
fields: Arc::new(vec![
LIST_INDEX_FIELD.clone(),
FieldSchema {
name: value_name.unwrap_or_else(|| VALUE_FIELD_NAME.to_string()),
value_type: value,
},
]),
},
collectors: Default::default(),
}
}

pub fn is_table(&self) -> bool {
match self.kind {
CollectionKind::Collection => false,
CollectionKind::Table | CollectionKind::List => true,
}
}

pub fn is_list(&self) -> bool {
self.kind == CollectionKind::List
}

pub fn key_field<'a>(&'a self) -> Option<&'a FieldSchema> {
if self.is_table() {
Some(self.row.fields.first().unwrap())
} else {
None
match self.kind {
CollectionKind::Table => Some(self.row.fields.first().unwrap()),
CollectionKind::Collection | CollectionKind::List => None,
}
}
}
Expand Down Expand Up @@ -373,6 +300,13 @@ pub struct FieldSchema<DataType = ValueType> {
}

impl FieldSchema {
pub fn new(name: impl ToString, value_type: EnrichedValueType) -> Self {
Self {
name: name.to_string(),
value_type,
}
}

pub fn without_attrs(&self) -> Self {
Self {
name: self.name.clone(),
Expand Down
25 changes: 16 additions & 9 deletions src/execution/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ fn augmented_value(
.map(|v| ScopeValueBuilder::augmented_from(v, t))
.collect::<Result<Vec<_>>>()?,
),
(val, _) => panic!("Value kind doesn't match the type {val_type}: {val:?}"),
(val, _) => bail!("Value kind doesn't match the type {val_type}: {val:?}"),
};
Ok(value)
}
Expand Down Expand Up @@ -157,18 +157,19 @@ impl<'a> ScopeEntry<'a> {
fn get_local_field_schema<'b>(
schema: &'b schema::StructSchema,
indices: &[u32],
) -> &'b schema::FieldSchema {
) -> Result<&'b schema::FieldSchema> {
let field_idx = indices[0] as usize;
let field_schema = &schema.fields[field_idx];
if indices.len() == 1 {
let result = if indices.len() == 1 {
field_schema
} else {
let struct_field_schema = match &field_schema.value_type.typ {
schema::ValueType::Struct(s) => s,
_ => panic!("Expect struct field"),
_ => bail!("Expect struct field"),
};
Self::get_local_field_schema(&struct_field_schema, &indices[1..])
}
Self::get_local_field_schema(&struct_field_schema, &indices[1..])?
};
Ok(result)
}

fn get_local_key_field<'b>(
Expand Down Expand Up @@ -229,8 +230,14 @@ impl<'a> ScopeEntry<'a> {
}
}

fn get_field_schema(&self, field_ref: &AnalyzedLocalFieldReference) -> &schema::FieldSchema {
Self::get_local_field_schema(self.schema, &field_ref.fields_idx)
fn get_field_schema(
&self,
field_ref: &AnalyzedLocalFieldReference,
) -> Result<&schema::FieldSchema> {
Ok(Self::get_local_field_schema(
self.schema,
&field_ref.fields_idx,
)?)
}

fn define_field_w_builder(
Expand Down Expand Up @@ -329,7 +336,7 @@ async fn evaluate_op_scope(
}

AnalyzedReactiveOp::ForEach(op) => {
let target_field_schema = head_scope.get_field_schema(&op.local_field_ref);
let target_field_schema = head_scope.get_field_schema(&op.local_field_ref)?;
let collection_schema = match &target_field_schema.value_type.typ {
schema::ValueType::Collection(cs) => cs,
_ => panic!("Expect target field to be a collection"),
Expand Down
11 changes: 6 additions & 5 deletions src/ops/functions/split_recursively.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,12 @@ impl SimpleFunctionFactoryBase for Factory {
api_bail!("Expect String as input type, got {}", t)
}
}
Ok(make_output_type(CollectionSchema::new_table(
Some("location".to_string()),
make_output_type(BasicValueType::Range),
Some("text".to_string()),
make_output_type(BasicValueType::Str),
Ok(make_output_type(CollectionSchema::new(
CollectionKind::Table,
vec![
FieldSchema::new("location", make_output_type(BasicValueType::Range)),
FieldSchema::new("text", make_output_type(BasicValueType::Str)),
],
))
.with_attr(
field_attrs::CHUNK_BASE_TEXT,
Expand Down
9 changes: 2 additions & 7 deletions src/ops/py_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use std::{collections::BTreeMap, sync::Arc};
use axum::async_trait;
use blocking::unblock;
use futures::FutureExt;
use log::warn;
use pyo3::{
exceptions::PyException,
pyclass, pymethods,
types::{IntoPyDict, PyAnyMethods, PyList, PyString, PyTuple},
types::{IntoPyDict, PyAnyMethods, PyString, PyTuple},
Bound, IntoPyObjectExt, Py, PyAny, PyResult, Python,
};

Expand Down Expand Up @@ -156,12 +157,6 @@ fn value_from_py_object<'py>(
),
}
}
_ => {
return Err(PyException::new_err(format!(
"unsupported value type: {}",
typ
)))
}
}
};
Ok(result)
Expand Down
22 changes: 13 additions & 9 deletions src/ops/sources/local_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,19 @@ impl SourceFactoryBase for Factory {
spec: &Spec,
_context: &FlowInstanceContext,
) -> Result<EnrichedValueType> {
Ok(make_output_type(CollectionSchema::new_table(
Some("filename".to_string()),
make_output_type(BasicValueType::Str),
Some("content".to_string()),
make_output_type(if spec.binary {
BasicValueType::Bytes
} else {
BasicValueType::Str
}),
Ok(make_output_type(CollectionSchema::new(
CollectionKind::Table,
vec![
FieldSchema::new("filename", make_output_type(BasicValueType::Str)),
FieldSchema::new(
"content",
make_output_type(if spec.binary {
BasicValueType::Bytes
} else {
BasicValueType::Str
}),
),
],
)))
}

Expand Down