Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/gdrive_text_embedding/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dotenv import load_dotenv

import cocoindex
import datetime
import os

@cocoindex.flow_def(name="GoogleDriveTextEmbedding")
Expand All @@ -14,7 +15,9 @@ def gdrive_text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope:
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.GoogleDrive(
service_account_credential_path=credential_path,
root_folder_ids=root_folder_ids))
root_folder_ids=root_folder_ids),
refresh_options=cocoindex.SourceRefreshOptions(
refresh_interval=datetime.timedelta(minutes=1)))

doc_embeddings = data_scope.add_collector()

Expand Down
2 changes: 1 addition & 1 deletion python/cocoindex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
from . import flow, functions, query, sources, storages, cli
from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def
from .flow import EvaluateAndDumpOptions, GeneratedField
from .flow import EvaluateAndDumpOptions, GeneratedField, SourceRefreshOptions
from .llm import LlmSpec, LlmApiType
from .vector import VectorSimilarityMetric
from .lib import *
Expand Down
23 changes: 21 additions & 2 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import re
import inspect
import datetime
from typing import Any, Callable, Sequence, TypeVar, get_origin
from threading import Lock
from enum import Enum
Expand Down Expand Up @@ -64,10 +65,17 @@ def _spec_kind(spec: Any) -> str:

def _dump_engine_object(v: Any) -> Any:
"""Recursively dump an object for engine. Engine side uses `Pythonzized` to catch."""
if isinstance(v, type) or get_origin(v) is not None:
if v is None:
return None
elif isinstance(v, type) or get_origin(v) is not None:
return encode_enriched_type(v)
elif isinstance(v, Enum):
return v.value
elif isinstance(v, datetime.timedelta):
total_secs = v.total_seconds()
secs = int(total_secs)
nanos = int((total_secs - secs) * 1e9)
return {'secs': secs, 'nanos': nanos}
elif hasattr(v, '__dict__'):
return {k: _dump_engine_object(v) for k, v in v.__dict__.items()}
elif isinstance(v, (list, tuple)):
Expand Down Expand Up @@ -314,6 +322,13 @@ def get_data_slice(self, v: Any) -> _engine.DataSlice:
return v._state.engine_data_slice
return self.engine_flow_builder.constant(encode_enriched_type(type(v)), v)

@dataclass
class SourceRefreshOptions:
"""
Options for refreshing a source.
"""
refresh_interval: datetime.timedelta | None = None

class FlowBuilder:
"""
A flow builder is used to build a flow.
Expand All @@ -329,7 +344,10 @@ def __str__(self):
def __repr__(self):
return repr(self._state.engine_flow_builder)

def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSlice:
def add_source(self, spec: op.SourceSpec, /, *,
name: str | None = None,
refresh_options: SourceRefreshOptions | None = None,
) -> DataSlice:
"""
Add a source to the flow.
"""
Expand All @@ -341,6 +359,7 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli
target_scope,
self._state.field_name_builder.build_name(
name, prefix=_to_snake_case(_spec_kind(spec))+'_'),
_dump_engine_object(refresh_options),
),
name
)
Expand Down
Loading