diff --git a/examples/examples/openai/async-submission.py b/examples/examples/openai/async-submission.py new file mode 100644 index 0000000..ba6b255 --- /dev/null +++ b/examples/examples/openai/async-submission.py @@ -0,0 +1,37 @@ +import asyncio +import os + +import gentrace +from dotenv import load_dotenv + +load_dotenv() + + +async def main(): + pipeline = gentrace.Pipeline( + "test-gentrace-python-pipeline", + os.getenv("GENTRACE_API_KEY"), + host="http://localhost:3000/api/v1", + openai_config={ + "api_key": os.getenv("OPENAI_KEY"), + }, + ) + + pipeline.setup() + + runner = pipeline.start() + + openai = runner.get_openai() + + result = await openai.Embedding.acreate( + input="sample text", model="text-similarity-davinci-001" + ) + + print("Result: ", result) + + info = await runner.asubmit() + + print("Response: ", info["pipelineRunId"]) + + +asyncio.run(main()) diff --git a/examples/poetry.lock b/examples/poetry.lock index b458bfb..e29b5b4 100644 --- a/examples/poetry.lock +++ b/examples/poetry.lock @@ -493,13 +493,13 @@ files = [ [[package]] name = "gentrace-py" -version = "0.2.0" +version = "0.2.2" description = "Python SDK for the Gentrace API" category = "main" optional = false python-versions = ">=3.8.1,<4.0" files = [ - {file = "gentrace_py-0.2.0.tar.gz", hash = "sha256:3f37ad7f83b22873e7e537e545dc2924fb1f19dd000321f42a49491dc7f7c277"}, + {file = "gentrace_py-0.2.2.tar.gz", hash = "sha256:2694b737a18a10a1056f9b7f3d3a69ae2cfd9e9a9bad9b179079a8aa72a6c0e2"}, ] [package.dependencies] @@ -518,7 +518,7 @@ vectorstores = ["pinecone-client (>=2.2.1,<3.0.0)"] [package.source] type = "file" -url = "../package/dist/gentrace_py-0.2.0.tar.gz" +url = "../package/dist/gentrace_py-0.2.2.tar.gz" [[package]] name = "idna" @@ -1156,4 +1156,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "dc045ffac838a0207bf932ab1c931082b94488571fa22379ec07e5846c090f39" +content-hash = "0b842be9743ff7ca83d6c21572b924c899c43b1c193e6f6104bba86e9747a288" diff --git a/examples/pyproject.toml b/examples/pyproject.toml index 1dca909..6cca619 100644 --- a/examples/pyproject.toml +++ b/examples/pyproject.toml @@ -11,7 +11,7 @@ openai = "^0.27.4" pinecone-client = "^2.2.1" python = "^3.11" python-dotenv = "^1.0.0" -gentrace-py = {path = "../package/dist/gentrace_py-0.2.0.tar.gz", develop = true} +gentrace-py = {path = "../package/dist/gentrace_py-0.2.2.tar.gz", develop = true} [tool.poetry.group.lint.dependencies] black = "^23.3.0" diff --git a/package/gentrace/providers/pipeline_run.py b/package/gentrace/providers/pipeline_run.py index a82dc13..317b2ff 100644 --- a/package/gentrace/providers/pipeline_run.py +++ b/package/gentrace/providers/pipeline_run.py @@ -7,6 +7,7 @@ from gentrace.configuration import Configuration from gentrace.providers.llms.openai import OpenAIPipelineHandler from gentrace.providers.step_run import StepRun +from gentrace.providers.utils import pipeline_run_post_async class PipelineRun: @@ -83,6 +84,41 @@ def get_pinecone(self): def add_step_run(self, step_run: StepRun): self.step_runs.append(step_run) + async def asubmit(self) -> Dict: + configuration = Configuration(host=self.pipeline.config.get("host")) + configuration.access_token = self.pipeline.config.get("api_key") + api_client = ApiClient(configuration=configuration) + ingestion_api = IngestionApi(api_client=api_client) + + step_runs_data = [ + { + "provider": { + "name": step_run.provider, + "invocation": step_run.invocation, + "modelParams": step_run.model_params, + "inputs": step_run.inputs, + "outputs": step_run.outputs, + }, + "elapsedTime": step_run.elapsed_time, + "startTime": step_run.start_time, + "endTime": step_run.end_time, + } + for step_run in self.step_runs + ] + + try: + pipeline_post_response = await pipeline_run_post_async( + ingestion_api, {"name": self.pipeline.id, "stepRuns": step_runs_data} + ) + return { + "pipelineRunId": pipeline_post_response.body.get_item_oapg( + "pipelineRunId" + ) + } + except Exception as e: + print(f"Error submitting to Gentrace: {e}") + return {"pipelineRunId": None} + def submit(self) -> Dict: configuration = Configuration(host=self.pipeline.config.get("host")) configuration.access_token = self.pipeline.config.get("api_key") diff --git a/package/gentrace/providers/utils.py b/package/gentrace/providers/utils.py index 33d6c3c..3c02cd0 100644 --- a/package/gentrace/providers/utils.py +++ b/package/gentrace/providers/utils.py @@ -1,5 +1,10 @@ +import asyncio +from concurrent.futures import ThreadPoolExecutor from datetime import datetime +from gentrace.apis.tags.ingestion_api import IngestionApi +from gentrace.models import PipelineRunRequest + __all__ = ["to_date_string"] @@ -7,3 +12,13 @@ def to_date_string(time_value): return ( datetime.fromtimestamp(time_value).strftime("%Y-%m-%dT%H:%M:%S.%fZ")[:-4] + "Z" ) + + +async def pipeline_run_post_async( + api_instance: IngestionApi, pipeline_run_data: PipelineRunRequest +): + with ThreadPoolExecutor() as executor: + result = await asyncio.get_event_loop().run_in_executor( + executor, api_instance.pipeline_run_post, pipeline_run_data + ) + return result