From e5175fefdbe7e38c11b096bfe7676b3e3152589a Mon Sep 17 00:00:00 2001 From: LJ Date: Tue, 1 Apr 2025 09:23:03 -0700 Subject: [PATCH] Python SDK exposes `refresh_options` for `add_source`. --- examples/gdrive_text_embedding/main.py | 5 ++++- python/cocoindex/__init__.py | 2 +- python/cocoindex/flow.py | 23 +++++++++++++++++++++-- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/examples/gdrive_text_embedding/main.py b/examples/gdrive_text_embedding/main.py index f7d9e0e1f..388c8a7cc 100644 --- a/examples/gdrive_text_embedding/main.py +++ b/examples/gdrive_text_embedding/main.py @@ -1,6 +1,7 @@ from dotenv import load_dotenv import cocoindex +import datetime import os @cocoindex.flow_def(name="GoogleDriveTextEmbedding") @@ -14,7 +15,9 @@ def gdrive_text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: data_scope["documents"] = flow_builder.add_source( cocoindex.sources.GoogleDrive( service_account_credential_path=credential_path, - root_folder_ids=root_folder_ids)) + root_folder_ids=root_folder_ids), + refresh_options=cocoindex.SourceRefreshOptions( + refresh_interval=datetime.timedelta(minutes=1))) doc_embeddings = data_scope.add_collector() diff --git a/python/cocoindex/__init__.py b/python/cocoindex/__init__.py index 81397f404..27b93478d 100644 --- a/python/cocoindex/__init__.py +++ b/python/cocoindex/__init__.py @@ -3,7 +3,7 @@ """ from . import flow, functions, query, sources, storages, cli from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def -from .flow import EvaluateAndDumpOptions, GeneratedField +from .flow import EvaluateAndDumpOptions, GeneratedField, SourceRefreshOptions from .llm import LlmSpec, LlmApiType from .vector import VectorSimilarityMetric from .lib import * diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 5b461c242..ca1dddb68 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -6,6 +6,7 @@ import re import inspect +import datetime from typing import Any, Callable, Sequence, TypeVar, get_origin from threading import Lock from enum import Enum @@ -64,10 +65,17 @@ def _spec_kind(spec: Any) -> str: def _dump_engine_object(v: Any) -> Any: """Recursively dump an object for engine. Engine side uses `Pythonzized` to catch.""" - if isinstance(v, type) or get_origin(v) is not None: + if v is None: + return None + elif isinstance(v, type) or get_origin(v) is not None: return encode_enriched_type(v) elif isinstance(v, Enum): return v.value + elif isinstance(v, datetime.timedelta): + total_secs = v.total_seconds() + secs = int(total_secs) + nanos = int((total_secs - secs) * 1e9) + return {'secs': secs, 'nanos': nanos} elif hasattr(v, '__dict__'): return {k: _dump_engine_object(v) for k, v in v.__dict__.items()} elif isinstance(v, (list, tuple)): @@ -314,6 +322,13 @@ def get_data_slice(self, v: Any) -> _engine.DataSlice: return v._state.engine_data_slice return self.engine_flow_builder.constant(encode_enriched_type(type(v)), v) +@dataclass +class SourceRefreshOptions: + """ + Options for refreshing a source. + """ + refresh_interval: datetime.timedelta | None = None + class FlowBuilder: """ A flow builder is used to build a flow. @@ -329,7 +344,10 @@ def __str__(self): def __repr__(self): return repr(self._state.engine_flow_builder) - def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSlice: + def add_source(self, spec: op.SourceSpec, /, *, + name: str | None = None, + refresh_options: SourceRefreshOptions | None = None, + ) -> DataSlice: """ Add a source to the flow. """ @@ -341,6 +359,7 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli target_scope, self._state.field_name_builder.build_name( name, prefix=_to_snake_case(_spec_kind(spec))+'_'), + _dump_engine_object(refresh_options), ), name )