Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/gdrive_text_embedding/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dotenv import load_dotenv

import asyncio
import cocoindex
import datetime
import os
Expand Down Expand Up @@ -73,5 +74,4 @@ async def _run():

if __name__ == "__main__":
load_dotenv(override=True)
import asyncio
asyncio.run(_run())
8 changes: 4 additions & 4 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from . import op
from .convert import dump_engine_object
from .typing import encode_enriched_type
from .runtime import op_execution_context
from .runtime import execution_context

class _NameBuilder:
_existing_names: set[str]
Expand Down Expand Up @@ -378,7 +378,7 @@ def __enter__(self) -> FlowLiveUpdater:

def __exit__(self, exc_type, exc_value, traceback):
self.abort()
asyncio.run(self.wait())
execution_context.run(self.wait())

async def __aenter__(self) -> FlowLiveUpdater:
return self
Expand Down Expand Up @@ -476,7 +476,7 @@ def _create_engine_flow() -> _engine.Flow:
root_scope = DataScope(
flow_builder_state, flow_builder_state.engine_flow_builder.root_scope())
fl_def(FlowBuilder(flow_builder_state), root_scope)
return flow_builder_state.engine_flow_builder.build_flow(op_execution_context.event_loop)
return flow_builder_state.engine_flow_builder.build_flow(execution_context.event_loop)

return Flow(_create_engine_flow)

Expand Down Expand Up @@ -572,7 +572,7 @@ def __init__(
flow_builder_state.engine_flow_builder.set_direct_output(
_data_slice_state(output).engine_data_slice)
self._engine_flow = flow_builder_state.engine_flow_builder.build_transient_flow(
op_execution_context.event_loop)
execution_context.event_loop)

def __str__(self):
return str(self._engine_flow)
Expand Down
16 changes: 12 additions & 4 deletions python/cocoindex/runtime.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
"""
This module provides a standalone execution runtime for executing coroutines in a thread-safe
manner.
"""

import threading
import asyncio

class _OpExecutionContext:
from typing import Coroutine
class _ExecutionContext:
_lock: threading.Lock
_event_loop: asyncio.AbstractEventLoop | None = None

Expand All @@ -14,8 +19,11 @@ def event_loop(self) -> asyncio.AbstractEventLoop:
with self._lock:
if self._event_loop is None:
self._event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._event_loop)
threading.Thread(target=self._event_loop.run_forever, daemon=True).start()
return self._event_loop

op_execution_context = _OpExecutionContext()
def run(self, coro: Coroutine):
"""Run a coroutine in the event loop, blocking until it finishes. Return its result."""
return asyncio.run_coroutine_threadsafe(coro, self.event_loop).result()

execution_context = _ExecutionContext()