Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/ops/storages.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ doc_embeddings.export(
"doc_embeddings",
cocoindex.storages.Qdrant(
collection_name="cocoindex",
grpc_url="http://xyz-example.cloud-region.cloud-provider.cloud.qdrant.io:6334/",
grpc_url="https://xyz-example.cloud-region.cloud-provider.cloud.qdrant.io:6334/",
api_key="<your-api-key-here>",
),
primary_key_fields=["id_field"],
Expand Down
2 changes: 1 addition & 1 deletion src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl FlowLiveUpdater {
)
.map(|(import_op, stats)| stats::SourceUpdateInfo {
source_name: import_op.name.clone(),
stats: (&**stats).clone(),
stats: (**stats).clone(),
})
.collect(),
}
Expand Down
2 changes: 1 addition & 1 deletion src/execution/memoization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl EvaluationMemory {
}),
uuids: Mutex::new(
(!options.evaluation_only)
.then(|| stored_uuids)
.then_some(stored_uuids)
.flatten()
.into_iter()
.flat_map(|iter| iter.into_iter())
Expand Down
2 changes: 1 addition & 1 deletion src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ pub async fn update_source_row(
if existing_version.should_skip(source_version, Some(update_stats)) {
return Ok(SkippedOr::Skipped(existing_version));
}
info.memoization_info.map(|info| info.0).flatten()
info.memoization_info.and_then(|info| info.0)
}
None => Default::default(),
};
Expand Down
2 changes: 1 addition & 1 deletion src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl SourceIndexingContext {
row_state.touched_generation = scan_generation;
if row_state
.source_version
.should_skip(&source_version, Some(&update_stats))
.should_skip(&source_version, Some(update_stats))
{
return None;
}
Expand Down
12 changes: 6 additions & 6 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ pub struct FlowInstanceContext {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Ordinal(pub i64);

impl Into<i64> for Ordinal {
fn into(self) -> i64 {
self.0
impl From<Ordinal> for i64 {
fn from(val: Ordinal) -> Self {
val.0
}
}

Expand Down Expand Up @@ -68,10 +68,10 @@ pub struct SourceExecutorListOptions {
#[async_trait]
pub trait SourceExecutor: Send + Sync {
/// Get the list of keys for the source.
fn list<'a>(
&'a self,
fn list(
&self,
options: SourceExecutorListOptions,
) -> BoxStream<'a, Result<Vec<SourceRowMetadata>>>;
) -> BoxStream<'_, Result<Vec<SourceRowMetadata>>>;

// Get the value for the given key.
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>>;
Expand Down
8 changes: 4 additions & 4 deletions src/ops/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<'a> StructSchemaBuilder<'a> {
}
}

pub fn set_description(&mut self, description: impl Into<Arc<str>>) {
pub fn _set_description(&mut self, description: impl Into<Arc<str>>) {
self.target.description = Some(description.into());
}

Expand All @@ -88,12 +88,12 @@ impl<'a> StructSchemaBuilder<'a> {
SchemaBuilderFieldRef(AnalyzedLocalFieldReference { fields_idx })
}

pub fn add_struct_field<'b>(
&'b mut self,
pub fn _add_struct_field(
&mut self,
name: impl Into<FieldName>,
nullable: bool,
attrs: Arc<BTreeMap<String, serde_json::Value>>,
) -> (StructSchemaBuilder<'b>, SchemaBuilderFieldRef) {
) -> (StructSchemaBuilder<'_>, SchemaBuilderFieldRef) {
let field_schema = FieldSchema::new(
name.into(),
EnrichedValueType {
Expand Down
8 changes: 4 additions & 4 deletions src/ops/sources/google_drive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl Executor {
None
} else if is_supported_file_type(&mime_type) {
Some(SourceRowMetadata {
key: KeyValue::Str(Arc::from(id)),
key: KeyValue::Str(id),
ordinal: file.modified_time.map(|t| t.try_into()).transpose()?,
})
} else {
Expand Down Expand Up @@ -294,10 +294,10 @@ impl<T> ResultExt<T> for google_drive3::Result<T> {

#[async_trait]
impl SourceExecutor for Executor {
fn list<'a>(
&'a self,
fn list(
&self,
options: SourceExecutorListOptions,
) -> BoxStream<'a, Result<Vec<SourceRowMetadata>>> {
) -> BoxStream<'_, Result<Vec<SourceRowMetadata>>> {
let mut seen_ids = HashSet::new();
let mut folder_ids = self.root_folder_ids.clone();
let fields = format!(
Expand Down
6 changes: 3 additions & 3 deletions src/ops/sources/local_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ impl Executor {

#[async_trait]
impl SourceExecutor for Executor {
fn list<'a>(
&'a self,
fn list(
&self,
options: SourceExecutorListOptions,
) -> BoxStream<'a, Result<Vec<SourceRowMetadata>>> {
) -> BoxStream<'_, Result<Vec<SourceRowMetadata>>> {
let root_component_size = self.root_path.components().count();
let mut dirs = Vec::new();
dirs.push(Cow::Borrowed(&self.root_path));
Expand Down
36 changes: 17 additions & 19 deletions src/ops/storages/neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ struct RelationshipStorageExecutor {

fn json_value_to_bolt_value(value: &serde_json::Value) -> Result<BoltType> {
let bolt_value = match value {
serde_json::Value::Null => BoltType::Null(neo4rs::BoltNull::default()),
serde_json::Value::Null => BoltType::Null(neo4rs::BoltNull),
serde_json::Value::Bool(v) => BoltType::Boolean(neo4rs::BoltBoolean::new(*v)),
serde_json::Value::Number(v) => {
if let Some(i) = v.as_i64() {
Expand All @@ -163,7 +163,7 @@ fn json_value_to_bolt_value(value: &serde_json::Value) -> Result<BoltType> {
serde_json::Value::String(v) => BoltType::String(neo4rs::BoltString::new(v)),
serde_json::Value::Array(v) => BoltType::List(neo4rs::BoltList {
value: v
.into_iter()
.iter()
.map(json_value_to_bolt_value)
.collect::<Result<_>>()?,
}),
Expand Down Expand Up @@ -220,7 +220,7 @@ fn basic_value_to_bolt(value: &BasicValue, schema: &BasicValueType) -> Result<Bo
BasicValue::Bytes(v) => {
BoltType::Bytes(neo4rs::BoltBytes::new(bytes::Bytes::from_owner(v.clone())))
}
BasicValue::Str(v) => BoltType::String(neo4rs::BoltString::new(&v)),
BasicValue::Str(v) => BoltType::String(neo4rs::BoltString::new(v)),
BasicValue::Bool(v) => BoltType::Boolean(neo4rs::BoltBoolean::new(*v)),
BasicValue::Int64(v) => BoltType::Integer(neo4rs::BoltInteger::new(*v)),
BasicValue::Float64(v) => BoltType::Float(neo4rs::BoltFloat::new(*v)),
Expand All @@ -242,7 +242,7 @@ fn basic_value_to_bolt(value: &BasicValue, schema: &BasicValueType) -> Result<Bo
BasicValue::Vector(v) => match schema {
BasicValueType::Vector(t) => BoltType::List(neo4rs::BoltList {
value: v
.into_iter()
.iter()
.map(|v| basic_value_to_bolt(v, &t.element_type))
.collect::<Result<_>>()?,
}),
Expand All @@ -255,9 +255,9 @@ fn basic_value_to_bolt(value: &BasicValue, schema: &BasicValueType) -> Result<Bo

fn value_to_bolt(value: &Value, schema: &schema::ValueType) -> Result<BoltType> {
let bolt_value = match value {
Value::Null => BoltType::Null(neo4rs::BoltNull::default()),
Value::Null => BoltType::Null(neo4rs::BoltNull),
Value::Basic(v) => match schema {
ValueType::Basic(t) => basic_value_to_bolt(v, &t)?,
ValueType::Basic(t) => basic_value_to_bolt(v, t)?,
_ => anyhow::bail!("Non-basic type got basic value: {}", schema),
},
Value::Struct(v) => match schema {
Expand All @@ -267,7 +267,7 @@ fn value_to_bolt(value: &Value, schema: &schema::ValueType) -> Result<BoltType>
Value::Collection(v) | Value::List(v) => match schema {
ValueType::Collection(t) => BoltType::List(neo4rs::BoltList {
value: v
.into_iter()
.iter()
.map(|v| field_values_to_bolt(v.0.fields.iter(), t.row.fields.iter()))
.collect::<Result<_>>()?,
}),
Expand All @@ -276,7 +276,7 @@ fn value_to_bolt(value: &Value, schema: &schema::ValueType) -> Result<BoltType>
Value::Table(v) => match schema {
ValueType::Collection(t) => BoltType::List(neo4rs::BoltList {
value: v
.into_iter()
.iter()
.map(|(k, v)| {
field_values_to_bolt(
std::iter::once(&Into::<value::Value>::into(k.clone()))
Expand Down Expand Up @@ -632,7 +632,7 @@ impl RelationshipSetupState {
spec: &RelationshipSpec,
key_field_names: Vec<String>,
index_options: &IndexOptions,
rel_value_fields_info: &Vec<AnalyzedGraphFieldMapping>,
rel_value_fields_info: &[AnalyzedGraphFieldMapping],
src_label_info: &AnalyzedNodeLabelInfo,
tgt_label_info: &AnalyzedNodeLabelInfo,
) -> Result<Self> {
Expand Down Expand Up @@ -681,8 +681,7 @@ impl RelationshipSetupState {
} else if existing.nodes.iter().any(|(label, existing_node)| {
!self
.nodes
.get(label)
.map_or(false, |node| node.is_compatible(existing_node))
.get(label).is_some_and(|node| node.is_compatible(existing_node))
}) {
// If any node's key field change of some node label gone, we have to clear relationship.
SetupStateCompatibility::NotCompatible
Expand Down Expand Up @@ -747,7 +746,7 @@ impl SetupStatusCheck {
.current
.as_ref()
.filter(|existing_current| {
desired_state.as_ref().map_or(true, |desired| {
desired_state.as_ref().is_none_or(|desired| {
desired.check_compatible(existing_current)
== SetupStateCompatibility::NotCompatible
})
Expand Down Expand Up @@ -793,7 +792,7 @@ impl SetupStatusCheck {

for (index_name, vector_index) in desired_state.vector_indexes.into_iter() {
old_rel_indexes.shift_remove(&index_name);
if !existing.current.as_ref().map_or(false, |c| {
if !existing.current.as_ref().is_some_and(|c| {
Some(&vector_index) == c.vector_indexes.get(&index_name)
}) {
rel_index_to_create.insert(index_name, vector_index);
Expand All @@ -807,8 +806,7 @@ impl SetupStatusCheck {
.as_ref()
.map(|c| {
c.nodes
.get(&label)
.map_or(false, |existing_node| node.is_compatible(existing_node))
.get(&label).is_some_and(|existing_node| node.is_compatible(existing_node))
})
.unwrap_or(false)
{
Expand All @@ -820,8 +818,8 @@ impl SetupStatusCheck {

for (index_name, vector_index) in node.vector_indexes.into_iter() {
old_node_indexes.shift_remove(&index_name);
if !existing.current.as_ref().map_or(false, |c| {
c.nodes.get(&label).map_or(false, |n| {
if !existing.current.as_ref().is_some_and(|c| {
c.nodes.get(&label).is_some_and(|n| {
Some(&vector_index) == n.vector_indexes.get(&index_name)
})
}) {
Expand Down Expand Up @@ -1189,8 +1187,8 @@ impl StorageFactoryBase for RelationshipFactory {
let mut tgt_label_analyzer = NodeLabelAnalyzer::new(&spec, &spec.target)?;
let mut rel_value_fields_info = vec![];
for (field_idx, field_schema) in value_fields_schema.iter().enumerate() {
if !src_label_analyzer.process_field(field_idx, &field_schema)
&& !tgt_label_analyzer.process_field(field_idx, &field_schema)
if !src_label_analyzer.process_field(field_idx, field_schema)
&& !tgt_label_analyzer.process_field(field_idx, field_schema)
{
rel_value_fields_info.push(AnalyzedGraphFieldMapping {
field_idx,
Expand Down
5 changes: 2 additions & 3 deletions src/ops/storages/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ impl SetupStatusCheck {
.value_fields_schema
.iter()
.filter(|(field_name, schema)| {
!existing.current.as_ref().map_or(false, |v| {
!existing.current.as_ref().is_some_and(|v| {
v.value_fields_schema
.get(*field_name)
.map(to_column_type_sql)
Expand Down Expand Up @@ -641,8 +641,7 @@ impl SetupStatusCheck {
.filter(|(name, def)| {
!existing
.current
.as_ref()
.map_or(false, |v| v.vector_indexes.get(*name) != Some(def))
.as_ref().is_some_and(|v| v.vector_indexes.get(*name) != Some(def))
})
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
Expand Down
6 changes: 6 additions & 0 deletions src/setup/auth_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ pub struct AuthRegistry {
entries: RwLock<HashMap<String, serde_json::Value>>,
}

impl Default for AuthRegistry {
fn default() -> Self {
Self::new()
}
}

impl AuthRegistry {
pub fn new() -> Self {
Self {
Expand Down
3 changes: 1 addition & 2 deletions src/setup/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,7 @@ impl<K, S, C: ResourceSetupStatusCheck> std::fmt::Display for ResourceSetupInfo<
impl<K, S, C: ResourceSetupStatusCheck> ResourceSetupInfo<K, S, C> {
pub fn is_up_to_date(&self) -> bool {
self.status_check
.as_ref()
.map_or(true, |c| c.change_type() == SetupChangeType::NoChange)
.as_ref().is_none_or(|c| c.change_type() == SetupChangeType::NoChange)
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/utils/retriable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ impl From<anyhow::Error> for Error {
}
}

impl Into<anyhow::Error> for Error {
fn into(self) -> anyhow::Error {
self.error
impl From<Error> for anyhow::Error {
fn from(val: Error) -> Self {
val.error
}
}

Expand Down