Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions examples/gdrive_text_embedding/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ def gdrive_text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope:
default_similarity_metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)

@cocoindex.main_fn()
def _run():
async def _run():
# Use a `FlowLiveUpdater` to keep the flow data updated.
with cocoindex.FlowLiveUpdater(gdrive_text_embedding_flow):
async with cocoindex.FlowLiveUpdater(gdrive_text_embedding_flow):
# Run queries in a loop to demonstrate the query capabilities.
while True:
try:
Expand All @@ -70,4 +70,5 @@ def _run():

if __name__ == "__main__":
load_dotenv(override=True)
_run()
import asyncio
asyncio.run(_run())
52 changes: 37 additions & 15 deletions python/cocoindex/lib.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""
Library level functions and states.
"""
import json
import os
import sys
from typing import Callable, Self
import functools
import inspect
import asyncio
from typing import Callable, Self, Any
from dataclasses import dataclass

from . import _engine
Expand Down Expand Up @@ -78,20 +80,40 @@ def main_fn(

If the settings are not provided, they are loaded from the environment variables.
"""
def _main_wrapper(fn: Callable) -> Callable:

def _inner(*args, **kwargs):
effective_settings = settings or Settings.from_env()
init(effective_settings)
try:
if len(sys.argv) > 1 and sys.argv[1] == cocoindex_cmd:
return cli.cli.main(sys.argv[2:], prog_name=f"{sys.argv[0]} {sys.argv[1]}")
else:
return fn(*args, **kwargs)
finally:
stop()
def _pre_init() -> None:
effective_settings = settings or Settings.from_env()
init(effective_settings)

def _should_run_cli() -> bool:
return len(sys.argv) > 1 and sys.argv[1] == cocoindex_cmd

_inner.__name__ = fn.__name__
return _inner
def _run_cli():
return cli.cli.main(sys.argv[2:], prog_name=f"{sys.argv[0]} {sys.argv[1]}")

def _main_wrapper(fn: Callable) -> Callable:
if inspect.iscoroutinefunction(fn):
@functools.wraps(fn)
async def _inner(*args, **kwargs):
_pre_init()
try:
if _should_run_cli():
# Schedule to a separate thread as it invokes nested event loop.
return await asyncio.to_thread(_run_cli)
return await fn(*args, **kwargs)
finally:
stop()
return _inner
else:
@functools.wraps(fn)
def _inner(*args, **kwargs):
_pre_init()
try:
if _should_run_cli():
return _run_cli()
return fn(*args, **kwargs)
finally:
stop()
return _inner

return _main_wrapper