Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add asubmit (async submit) #39

Merged
merged 1 commit into from
Apr 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions examples/examples/openai/async-submission.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio
import os

import gentrace
from dotenv import load_dotenv

load_dotenv()


async def main():
pipeline = gentrace.Pipeline(
"test-gentrace-python-pipeline",
os.getenv("GENTRACE_API_KEY"),
host="http://localhost:3000/api/v1",
openai_config={
"api_key": os.getenv("OPENAI_KEY"),
},
)

pipeline.setup()

runner = pipeline.start()

openai = runner.get_openai()

result = await openai.Embedding.acreate(
input="sample text", model="text-similarity-davinci-001"
)

print("Result: ", result)

info = await runner.asubmit()

print("Response: ", info["pipelineRunId"])


asyncio.run(main())
8 changes: 4 additions & 4 deletions examples/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ openai = "^0.27.4"
pinecone-client = "^2.2.1"
python = "^3.11"
python-dotenv = "^1.0.0"
gentrace-py = {path = "../package/dist/gentrace_py-0.2.0.tar.gz", develop = true}
gentrace-py = {path = "../package/dist/gentrace_py-0.2.2.tar.gz", develop = true}

[tool.poetry.group.lint.dependencies]
black = "^23.3.0"
Expand Down
36 changes: 36 additions & 0 deletions package/gentrace/providers/pipeline_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from gentrace.configuration import Configuration
from gentrace.providers.llms.openai import OpenAIPipelineHandler
from gentrace.providers.step_run import StepRun
from gentrace.providers.utils import pipeline_run_post_async


class PipelineRun:
Expand Down Expand Up @@ -83,6 +84,41 @@ def get_pinecone(self):
def add_step_run(self, step_run: StepRun):
self.step_runs.append(step_run)

async def asubmit(self) -> Dict:
configuration = Configuration(host=self.pipeline.config.get("host"))
configuration.access_token = self.pipeline.config.get("api_key")
api_client = ApiClient(configuration=configuration)
ingestion_api = IngestionApi(api_client=api_client)

step_runs_data = [
{
"provider": {
"name": step_run.provider,
"invocation": step_run.invocation,
"modelParams": step_run.model_params,
"inputs": step_run.inputs,
"outputs": step_run.outputs,
},
"elapsedTime": step_run.elapsed_time,
"startTime": step_run.start_time,
"endTime": step_run.end_time,
}
for step_run in self.step_runs
]

try:
pipeline_post_response = await pipeline_run_post_async(
ingestion_api, {"name": self.pipeline.id, "stepRuns": step_runs_data}
)
return {
"pipelineRunId": pipeline_post_response.body.get_item_oapg(
"pipelineRunId"
)
}
except Exception as e:
print(f"Error submitting to Gentrace: {e}")
return {"pipelineRunId": None}

def submit(self) -> Dict:
configuration = Configuration(host=self.pipeline.config.get("host"))
configuration.access_token = self.pipeline.config.get("api_key")
Expand Down
15 changes: 15 additions & 0 deletions package/gentrace/providers/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime

from gentrace.apis.tags.ingestion_api import IngestionApi
from gentrace.models import PipelineRunRequest

__all__ = ["to_date_string"]


def to_date_string(time_value):
return (
datetime.fromtimestamp(time_value).strftime("%Y-%m-%dT%H:%M:%S.%fZ")[:-4] + "Z"
)


async def pipeline_run_post_async(
api_instance: IngestionApi, pipeline_run_data: PipelineRunRequest
):
with ThreadPoolExecutor() as executor:
result = await asyncio.get_event_loop().run_in_executor(
executor, api_instance.pipeline_run_post, pipeline_run_data
)
return result