diff --git a/examples/examples/self-contained/openai/create-chat-completion-async-stream.py b/examples/examples/self-contained/openai/create-chat-completion-async-stream.py new file mode 100644 index 0000000..b90acf0 --- /dev/null +++ b/examples/examples/self-contained/openai/create-chat-completion-async-stream.py @@ -0,0 +1,35 @@ +import asyncio +import os + +import gentrace +from dotenv import load_dotenv +from gentrace import providers + +load_dotenv() + + +async def main(): + gentrace.api_key = os.getenv("GENTRACE_API_KEY") + gentrace.host = "http://localhost:3000/api/v1" + + providers.openai_api_key = os.getenv("OPENAI_KEY") + openai = providers.openai + + result = await openai.ChatCompletion.acreate( + pipeline_id="testing-chat-completion-value", + messages=[{"role": "user", "content": "Hello!"}], + model="gpt-3.5-turbo", + stream=True, + ) + + pipeline_run_id = None + async for value in result: + if value.get("pipeline_run_id"): + pipeline_run_id = value.get("pipeline_run_id") + + gentrace.flush() + + print("Result: ", pipeline_run_id) + + +asyncio.run(main()) diff --git a/examples/examples/self-contained/openai/create-chat-completion-async.py b/examples/examples/self-contained/openai/create-chat-completion-async.py new file mode 100644 index 0000000..c41ef8d --- /dev/null +++ b/examples/examples/self-contained/openai/create-chat-completion-async.py @@ -0,0 +1,29 @@ +import asyncio +import os + +import gentrace +from dotenv import load_dotenv +from gentrace import providers + +load_dotenv() + + +async def main(): + gentrace.api_key = os.getenv("GENTRACE_API_KEY") + gentrace.host = "http://localhost:3000/api/v1" + + providers.openai_api_key = os.getenv("OPENAI_KEY") + openai = providers.openai + + result = await openai.ChatCompletion.acreate( + pipeline_id="testing-chat-completion-value", + messages=[{"role": "user", "content": "Hello!"}], + model="gpt-3.5-turbo", + ) + + gentrace.flush() + + print("Result: ", result.pipeline_run_id) + + +asyncio.run(main()) diff --git a/examples/examples/self-contained/openai/create-chat-completion-stream.py b/examples/examples/self-contained/openai/create-chat-completion-stream.py new file mode 100644 index 0000000..8705fac --- /dev/null +++ b/examples/examples/self-contained/openai/create-chat-completion-stream.py @@ -0,0 +1,30 @@ +import os + +import gentrace +from dotenv import load_dotenv +from gentrace import providers + +load_dotenv() + +gentrace.api_key = os.getenv("GENTRACE_API_KEY") +gentrace.host = "http://localhost:3000/api/v1" + +providers.openai_api_key = os.getenv("OPENAI_KEY") +openai = providers.openai + +result = openai.ChatCompletion.create( + pipeline_id="testing-chat-completion-value", + messages=[{"role": "user", "content": "Hello!"}], + model="gpt-3.5-turbo", + stream=True, +) + +pipeline_run_id = None + +for value in result: + if value.get("pipeline_run_id"): + pipeline_run_id = value.get("pipeline_run_id") + +print("Result: ", pipeline_run_id) + +gentrace.flush() diff --git a/examples/examples/self-contained/openai/create-chat-completion.py b/examples/examples/self-contained/openai/create-chat-completion.py new file mode 100644 index 0000000..08359aa --- /dev/null +++ b/examples/examples/self-contained/openai/create-chat-completion.py @@ -0,0 +1,23 @@ +import os + +import gentrace +from dotenv import load_dotenv +from gentrace import providers + +load_dotenv() + +gentrace.api_key = os.getenv("GENTRACE_API_KEY") +gentrace.host = "http://localhost:3000/api/v1" + +providers.openai_api_key = os.getenv("OPENAI_KEY") +openai = providers.openai + +result = openai.ChatCompletion.create( + pipeline_id="testing-chat-completion-value", + messages=[{"role": "user", "content": "Hello!"}], + model="gpt-3.5-turbo", +) + +gentrace.flush() + +print("Result: ", result.pipeline_run_id) diff --git a/examples/examples/self-contained/openai/create-completion-async-stream.py b/examples/examples/self-contained/openai/create-completion-async-stream.py new file mode 100644 index 0000000..6f98578 --- /dev/null +++ b/examples/examples/self-contained/openai/create-completion-async-stream.py @@ -0,0 +1,36 @@ +import asyncio +import os + +import gentrace +from dotenv import load_dotenv +from gentrace import providers + +load_dotenv() + + +async def main(): + gentrace.api_key = os.getenv("GENTRACE_API_KEY") + gentrace.host = "http://localhost:3000/api/v1" + + providers.openai_api_key = os.getenv("OPENAI_KEY") + openai = providers.openai + + result = await openai.Completion.acreate( + pipeline_id="text-generation", + model="text-davinci-003", + prompt_template="Hello world {{ name }}", + prompt_inputs={"name": "test"}, + stream=True, + ) + + pipeline_run_id = None + async for value in result: + if value.get("pipeline_run_id"): + pipeline_run_id = value.get("pipeline_run_id") + + gentrace.flush() + + print("Result: ", pipeline_run_id) + + +asyncio.run(main()) diff --git a/examples/examples/self-contained/openai/create-completion-async.py b/examples/examples/self-contained/openai/create-completion-async.py new file mode 100644 index 0000000..4a3589b --- /dev/null +++ b/examples/examples/self-contained/openai/create-completion-async.py @@ -0,0 +1,30 @@ +import asyncio +import os + +import gentrace +from dotenv import load_dotenv +from gentrace import providers + +load_dotenv() + + +async def main(): + gentrace.api_key = os.getenv("GENTRACE_API_KEY") + gentrace.host = "http://localhost:3000/api/v1" + + providers.openai_api_key = os.getenv("OPENAI_KEY") + openai = providers.openai + + result = await openai.Completion.acreate( + pipeline_id="text-generation", + model="text-davinci-003", + prompt_template="Hello world {{ name }}", + prompt_inputs={"name": "test"}, + ) + + gentrace.flush() + + print("Result: ", result.pipeline_run_id) + + +asyncio.run(main()) diff --git a/examples/examples/self-contained/openai/create-completion.py b/examples/examples/self-contained/openai/create-completion.py new file mode 100644 index 0000000..8367380 --- /dev/null +++ b/examples/examples/self-contained/openai/create-completion.py @@ -0,0 +1,24 @@ +import os + +import gentrace +from dotenv import load_dotenv +from gentrace import providers + +load_dotenv() + +gentrace.api_key = os.getenv("GENTRACE_API_KEY") +gentrace.host = "http://localhost:3000/api/v1" + +providers.openai_api_key = os.getenv("OPENAI_KEY") +openai = providers.openai + +result = openai.Completion.create( + pipeline_id="text-generation", + model="text-davinci-003", + prompt_template="Hello world {{ name }}", + prompt_inputs={"name": "test"}, +) + +gentrace.flush() + +print("Result: ", result.pipeline_run_id) diff --git a/examples/examples/self-contained/openai/create-embedding-async.py b/examples/examples/self-contained/openai/create-embedding-async.py new file mode 100644 index 0000000..7f869e1 --- /dev/null +++ b/examples/examples/self-contained/openai/create-embedding-async.py @@ -0,0 +1,29 @@ +import asyncio +import os + +import gentrace +from dotenv import load_dotenv +from gentrace import providers + +load_dotenv() + + +async def main(): + gentrace.api_key = os.getenv("GENTRACE_API_KEY") + gentrace.host = "http://localhost:3000/api/v1" + + providers.openai_api_key = os.getenv("OPENAI_KEY") + openai = providers.openai + + result = await openai.Embedding.acreate( + input="sample text", + model="text-similarity-davinci-001", + pipeline_id="testing-value", + ) + + gentrace.flush() + + print("Result: ", result.pipeline_run_id) + + +asyncio.run(main()) diff --git a/examples/poetry.lock b/examples/poetry.lock index 00dff3d..4462ca7 100644 --- a/examples/poetry.lock +++ b/examples/poetry.lock @@ -493,13 +493,13 @@ files = [ [[package]] name = "gentrace-py" -version = "0.3.0" +version = "0.4.0" description = "Python SDK for the Gentrace API" category = "main" optional = false python-versions = ">=3.8.1,<4.0" files = [ - {file = "gentrace_py-0.3.0.tar.gz", hash = "sha256:46aaf7866ac6a3eb0abe0793b6c58daf29cc14b44d7e81191e0fb945a60720bd"}, + {file = "gentrace_py-0.4.0.tar.gz", hash = "sha256:6e7290ca5e0be54d668b614286b803a4b8b02c8032ecdb286ff0cdbe2137f686"}, ] [package.dependencies] @@ -518,7 +518,7 @@ vectorstores = ["pinecone-client (>=2.2.1,<3.0.0)"] [package.source] type = "file" -url = "../package/dist/gentrace_py-0.3.0.tar.gz" +url = "../package/dist/gentrace_py-0.4.0.tar.gz" [[package]] name = "idna" @@ -1156,4 +1156,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "9b70c0db0d43b28e6749520ea58458e255860775b232b2a0e867bb5474648239" +content-hash = "222b896d5a5cd3dae1b43568a57ecac7b9199e283f98dd083614dfd201abcfb2" diff --git a/examples/pyproject.toml b/examples/pyproject.toml index 3ffa621..7ce7380 100644 --- a/examples/pyproject.toml +++ b/examples/pyproject.toml @@ -11,7 +11,7 @@ openai = "^0.27.4" pinecone-client = "^2.2.1" python = "^3.11" python-dotenv = "^1.0.0" -gentrace-py = {path = "../package/dist/gentrace_py-0.3.0.tar.gz", develop = true} +gentrace-py = {path = "../package/dist/gentrace_py-0.4.0.tar.gz", develop = true} [tool.poetry.group.lint.dependencies] black = "^23.3.0" diff --git a/package/gentrace/providers/llms/openai.py b/package/gentrace/providers/llms/openai.py index 05cde35..03191d1 100644 --- a/package/gentrace/providers/llms/openai.py +++ b/package/gentrace/providers/llms/openai.py @@ -1,4 +1,5 @@ import time +import uuid from typing import Dict, Optional import openai @@ -33,7 +34,7 @@ def set_pipeline_run(self, pipeline_run): self.pipeline_run = pipeline_run -def create_step_run( +def create_completion_step_run( cls, pipeline_id: str, gentrace_config: Configuration, @@ -43,6 +44,8 @@ def create_step_run( prompt_template, prompt_inputs, completion, + pipeline_run_id: Optional[str] = None, + stream=False, ): elapsed_time = int((end_time - start_time) * 1000) @@ -71,6 +74,7 @@ def create_step_run( pipeline_run = PipelineRun( pipeline=pipeline, + id=pipeline_run_id, ) if pipeline_run: @@ -87,11 +91,13 @@ def create_step_run( if is_self_contained: submit_result = pipeline_run.submit() - completion.pipeline_run_id = ( - submit_result["pipelineRunId"] - if "pipelineRunId" in submit_result - else None - ) + + if not stream: + completion.pipeline_run_id = ( + submit_result["pipelineRunId"] + if "pipelineRunId" in submit_result + else None + ) def create_stream_response(stream_list): @@ -132,7 +138,7 @@ def intercept_completion(original_fn, gentrace_config: Configuration): def wrapper(cls, *args, **kwargs): prompt_template = kwargs.get("prompt_template") prompt_inputs = kwargs.get("prompt_inputs") - pipeline_id = kwargs.get("pipeline_id") + pipeline_id = kwargs.pop("pipeline_id", None) stream = kwargs.get("stream") base_completion_options = { k: v @@ -161,9 +167,15 @@ def wrapper(cls, *args, **kwargs): start_time = time.time() completion = original_fn(**new_completion_options) + is_self_contained = not cls.pipeline_run and pipeline_id + if is_self_contained: + pipeline_run_id = str(uuid.uuid4()) + def profiled_completion(): modified_response = [] for value in completion: + if value and is_self_contained: + value["pipeline_run_id"] = pipeline_run_id modified_response.append(value) yield value @@ -171,7 +183,7 @@ def profiled_completion(): full_response = create_stream_response(modified_response) - create_step_run( + create_completion_step_run( cls, pipeline_id, gentrace_config, @@ -181,6 +193,8 @@ def profiled_completion(): prompt_template, prompt_inputs, full_response, + pipeline_run_id if is_self_contained else None, + stream, ) return profiled_completion() @@ -194,7 +208,7 @@ def profiled_completion(): end_time = time.time() - create_step_run( + create_completion_step_run( cls, pipeline_id, gentrace_config, @@ -204,6 +218,8 @@ def profiled_completion(): prompt_template, prompt_inputs, completion, + None, + stream, ) return completion @@ -216,7 +232,7 @@ def intercept_completion_async(original_fn, gentrace_config: Configuration): async def wrapper(cls, *args, **kwargs): prompt_template = kwargs.get("prompt_template") prompt_inputs = kwargs.get("prompt_inputs") - pipeline_id = kwargs.get("pipeline_id") + pipeline_id = kwargs.pop("pipeline_id", None) stream = kwargs.get("stream") base_completion_options = { k: v @@ -245,9 +261,15 @@ async def wrapper(cls, *args, **kwargs): start_time = time.time() completion = await original_fn(**new_completion_options) + is_self_contained = not cls.pipeline_run and pipeline_id + if is_self_contained: + pipeline_run_id = str(uuid.uuid4()) + async def profiled_completion(): modified_response = [] async for value in completion: + if value and is_self_contained: + value["pipeline_run_id"] = pipeline_run_id modified_response.append(value) yield value @@ -255,7 +277,7 @@ async def profiled_completion(): full_response = create_stream_response(modified_response) - create_step_run( + create_completion_step_run( cls, pipeline_id, gentrace_config, @@ -265,6 +287,8 @@ async def profiled_completion(): prompt_template, prompt_inputs, full_response, + pipeline_run_id if is_self_contained else None, + stream, ) return profiled_completion() @@ -277,7 +301,7 @@ async def profiled_completion(): completion = await original_fn(**new_completion_options) end_time = time.time() - create_step_run( + create_completion_step_run( cls, pipeline_id, gentrace_config, @@ -287,6 +311,8 @@ async def profiled_completion(): prompt_template, prompt_inputs, completion, + None, + stream, ) return completion @@ -298,7 +324,7 @@ def intercept_chat_completion(original_fn, gentrace_config: Configuration): def wrapper(cls, *args, **kwargs): messages = kwargs.get("messages") user = kwargs.get("user") - pipeline_id = kwargs.get("pipeline_id") + pipeline_id = kwargs.pop("pipeline_id", None) stream = kwargs.get("stream") model_params = { @@ -309,9 +335,15 @@ def wrapper(cls, *args, **kwargs): start_time = time.time() completion = original_fn(**kwargs) + is_self_contained = not cls.pipeline_run and pipeline_id + if is_self_contained: + pipeline_run_id = str(uuid.uuid4()) + def profiled_completion(): modified_response = [] for value in completion: + if value and is_self_contained: + value["pipeline_run_id"] = pipeline_run_id modified_response.append(value) yield value @@ -323,8 +355,6 @@ def profiled_completion(): full_response = create_stream_response(modified_response) - is_self_contained = not pipeline_run and pipeline_id - if is_self_contained: pipeline = Pipeline( id=pipeline_id, @@ -332,9 +362,7 @@ def profiled_completion(): host=gentrace_config.host, ) - pipeline_run = PipelineRun( - pipeline=pipeline, - ) + pipeline_run = PipelineRun(pipeline=pipeline, id=pipeline_run_id) if pipeline_run: pipeline_run.add_step_run( @@ -349,12 +377,7 @@ def profiled_completion(): ) if is_self_contained: - submit_result = pipeline_run.submit() - completion.pipeline_run_id = ( - submit_result["pipelineRunId"] - if "pipelineRunId" in submit_result - else None - ) + pipeline_run.submit() return profiled_completion() @@ -410,7 +433,7 @@ async def wrapper(cls, *args, **kwargs): messages = kwargs.get("messages") user = kwargs.get("user") stream = kwargs.get("stream") - pipeline_id = kwargs.get("pipeline_id") + pipeline_id = kwargs.pop("pipeline_id", None) model_params = { k: v for k, v in kwargs.items() if k not in ["messages", "user"] } @@ -419,10 +442,15 @@ async def wrapper(cls, *args, **kwargs): completion = await original_fn(**kwargs) if stream: + is_self_contained = not cls.pipeline_run and pipeline_id + if is_self_contained: + pipeline_run_id = str(uuid.uuid4()) async def profiled_completion(): modified_response = [] async for value in completion: + if value and is_self_contained: + value["pipeline_run_id"] = pipeline_run_id modified_response.append(value) yield value @@ -434,8 +462,6 @@ async def profiled_completion(): pipeline_run = cls.pipeline_run - is_self_contained = not pipeline_run and pipeline_id - if is_self_contained: pipeline = Pipeline( id=pipeline_id, @@ -445,6 +471,7 @@ async def profiled_completion(): pipeline_run = PipelineRun( pipeline=pipeline, + id=pipeline_run_id, ) if pipeline_run: @@ -460,12 +487,7 @@ async def profiled_completion(): ) if is_self_contained: - submit_result = pipeline_run.submit() - completion.pipeline_run_id = ( - submit_result["pipelineRunId"] - if "pipelineRunId" in submit_result - else None - ) + pipeline_run.submit() return profiled_completion() @@ -516,7 +538,7 @@ def intercept_embedding(original_fn, gentrace_config: Configuration): @classmethod def wrapper(cls, *args, **kwargs): model = kwargs.get("model") - pipeline_id = kwargs.get("pipeline_id") + pipeline_id = kwargs.pop("pipeline_id", None) input_params = {k: v for k, v in kwargs.items() if k not in ["model"]} start_time = time.time() @@ -568,7 +590,7 @@ def intercept_embedding_async(original_fn, gentrace_config: Configuration): @classmethod async def wrapper(cls, *args, **kwargs): model = kwargs.get("model") - pipeline_id = kwargs.get("pipeline_id") + pipeline_id = kwargs.pop("pipeline_id", None) input_params = {k: v for k, v in kwargs.items() if k not in ["model"]} start_time = time.time() @@ -579,8 +601,6 @@ async def wrapper(cls, *args, **kwargs): pipeline_run = cls.pipeline_run if hasattr(cls, "pipeline_run") else None - print("pipeline_run", pipeline_run) - is_self_contained = not pipeline_run and pipeline_id if is_self_contained: diff --git a/package/gentrace/providers/pipeline_run.py b/package/gentrace/providers/pipeline_run.py index 45a0ba8..9bcdf94 100644 --- a/package/gentrace/providers/pipeline_run.py +++ b/package/gentrace/providers/pipeline_run.py @@ -4,7 +4,7 @@ import inspect import threading import uuid -from typing import Dict, List, cast +from typing import Dict, List, Optional, cast from gentrace.api_client import ApiClient from gentrace.apis.tags.ingestion_api import IngestionApi @@ -37,8 +37,9 @@ def flush(): class PipelineRun: - def __init__(self, pipeline): + def __init__(self, pipeline, id: Optional[str] = None): self.pipeline: Pipeline = pipeline + self.pipeline_run_id: str = id or str(uuid.uuid4()) self.step_runs: List[StepRun] = [] def get_pipeline(self): @@ -147,27 +148,25 @@ def submit(self, wait_for_server=False) -> Dict: for step_run in self.step_runs ] - pipeline_run_id = str(uuid.uuid4()) - if not wait_for_server: fire_and_forget( pipeline_run_post_background( ingestion_api, { - "id": pipeline_run_id, + "id": self.pipeline_run_id, "name": self.pipeline.id, "stepRuns": step_runs_data, }, ) ) - return {"pipelineRunId": pipeline_run_id} + return {"pipelineRunId": self.pipeline_run_id} if wait_for_server: try: pipeline_post_response = ingestion_api.pipeline_run_post( { - "id": pipeline_run_id, + "id": self.pipeline_run_id, "name": self.pipeline.id, "stepRuns": step_runs_data, }