From ae286fb356c2e740b2629a824bc633a195e0cf63 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Mon, 1 Apr 2024 19:22:02 +0530 Subject: [PATCH 01/25] feat: logger function for langchain --- portkey_ai/api_resources/apis/logger.py | 58 ++++ portkey_ai/llms/langchain/portkey_callback.py | 252 ++++++++++++++++++ 2 files changed, 310 insertions(+) create mode 100644 portkey_ai/api_resources/apis/logger.py create mode 100644 portkey_ai/llms/langchain/portkey_callback.py diff --git a/portkey_ai/api_resources/apis/logger.py b/portkey_ai/api_resources/apis/logger.py new file mode 100644 index 0000000..92290ca --- /dev/null +++ b/portkey_ai/api_resources/apis/logger.py @@ -0,0 +1,58 @@ +import json +import os +from typing import Optional +from portkey_ai.api_resources import apis +from portkey_ai.api_resources.apis.api_resource import APIResource +from portkey_ai.api_resources.base_client import APIClient +from portkey_ai.api_resources.exceptions import AuthenticationError +import requests + + +class Logger: + def __init__( + self, + api_key: Optional[str] = None, + ) -> None: + api_key = api_key or os.getenv("PORTKEY_API_KEY") + print("Logger API Key: ", api_key) + + self.headers = { + "Content-Type": "application/json", + "x-portkey-api-key": api_key, + } + + self.url = "https://api.portkey.ai/v1/logger" + + if api_key is None: + raise ValueError("API key is required to use the Logger API") + + def log( + self, + log_object: dict, + ): + body = log_object + + print("self.url", self.url) + print("self.body", body) + + self.headers.update( + { + "x-portkey-provider": Logger.get_provider( + body["requestHeaders"]["provider"] + ) + } + ) + + response = requests.post( + url=self.url, json=json.dumps(log_object, default=str), headers=self.headers + ) + print("Logger response", response.json()) + return response.status_code + + @staticmethod + def get_provider(provider): + provider_dict = { + "openai": "openai", + "mistralai": "mistral-ai", + } + return provider_dict.get(provider) diff --git a/portkey_ai/llms/langchain/portkey_callback.py b/portkey_ai/llms/langchain/portkey_callback.py new file mode 100644 index 0000000..2a22aa1 --- /dev/null +++ b/portkey_ai/llms/langchain/portkey_callback.py @@ -0,0 +1,252 @@ +from datetime import datetime +from typing import Any, Dict, List, Optional +from langchain_core.callbacks import BaseCallbackHandler +from langchain_core.outputs import LLMResult +from langchain_core.agents import AgentFinish, AgentAction + +from portkey_ai.api_resources.apis.logger import Logger + + +class PortkeyCallbackHandler(BaseCallbackHandler): + startTimestamp: int = 0 + endTimestamp: int = 0 + + def __init__( + self, + api_key: str, + ) -> None: + super().__init__() + + self.api_key = api_key + + self.portkey_logger = Logger(api_key=api_key) + + # ------------------------------------------------ + self.log_object: Dict[str, Any] = {} + self.prompt_records: List[str] = [] + self.usage_records: Any = {} + self.prompt_tokens: int = 0 + self.completion_tokens: int = 0 + self.total_tokens: int = 0 + # ------------------------------------------------ + + # ------------------------------------------------ + self.requestMethod: str = "POST" + self.requestURL: str = "" + self.requestHeaders: Dict[str, Any] = {} + self.requestBody: Any = {} + + self.responseHeaders: Dict[str, Any] = {} # Nowhere to get this from + self.responseBody: Any = None + self.responseStatus: int = 0 + self.responseTime: int = 0 + + self.streamingMode: bool = False + + # ------------------------------------------------ + # self.config: Dict[str, Any] = {} + # self.organisationConfig: Dict[str, Any] = {} + # self.organisationDetails: Dict[str, Any] = {} + # self.cacheStatus: str = None + # self.retryCount: int = 0 + # self.portkeyHeaders: Dict[str, Any] = {} + # ------------------------------------------------ + + if not api_key: + raise ValueError("Please provide an API key to use PortkeyCallbackHandler") + + def on_llm_start( + self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any + ) -> None: + print("on_llm_start serialized", serialized["id"][2]) + print("on_llm_start prompts", prompts) + print("on_llm_start kwargs", kwargs) + + self.startTimestamp = int(datetime.now().timestamp()) + + for prompt in prompts: + self.prompt_records.append(prompt.replace("\n", " ")) + + self.requestURL = serialized.get("kwargs", "").get("base_url", "") + self.requestHeaders = serialized.get("kwargs", {}).get("default_headers", {}) + self.requestHeaders.update({"provider": serialized["id"][2]}) + + self.requestBody = kwargs + self.requestBody["prompts"] = self.prompt_records + + print("on_llm_start requestBody:", self.requestBody) + + self.streamingMode = kwargs.get("invocation_params", False).get("stream", False) + + def on_chain_start( + self, + serialized: Dict[str, Any], + inputs: Dict[str, Any], + **kwargs: Any, + ) -> None: + """Run when chain starts running.""" + self.requestBody = {**inputs, **kwargs} + self.requestHeaders = ( + serialized.get("kwargs", {}) + .get("llm", {}) + .get("kwargs", {}) + .get("default_headers", {}) + ) + self.requestURL = ( + serialized.get("kwargs", "") + .get("llm", "") + .get("kwargs", "") + .get("base_url", "") + ) + + self.startTimestamp = int(datetime.now().timestamp()) + print("on_chain_start inputs", inputs) + print("on_chain_start kwargs", kwargs) + print("on_chain_start serialized", serialized) + + # -------------------------------------------------------------------------------- + + def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: + print("on_llm_end response", response.generations) + print("on_llm_end kwargs", kwargs) + self.responseBody = response + self.responseStatus = 200 + self.endTimestamp = int(datetime.now().timestamp()) + self.responseTime = self.endTimestamp - self.startTimestamp + + """This will handle all the token usage information from the LLM.""" + if response.llm_output and "token_usage" in response.llm_output: + usage = response.llm_output["token_usage"] + self.completion_tokens = usage.get("completion_tokens", 0) + self.prompt_tokens = usage.get("prompt_tokens", 0) + self.total_tokens = usage.get( + "total_tokens", self.completion_tokens + self.prompt_tokens + ) + self.usage_records["usage"] = usage + """This will handle all the token usage information from the LLM.""" + + self.log_object.update( + { + "requestMethod": self.requestMethod, + "requestURL": self.requestURL, + "requestHeaders": self.requestHeaders, + "requestBody": self.requestBody, + "responseHeaders": self.responseHeaders, + "responseBody": self.responseBody, + "responseStatus": self.responseStatus, + "responseTime": self.responseTime, + "streamingMode": self.streamingMode, + } + ) + + print("on_llm_end log_object", {**self.log_object}) + + self.portkey_logger.log(log_object=self.log_object) + + def on_chain_end( + self, + outputs: Dict[str, Any], + **kwargs: Any, + ) -> None: + """Run when chain ends running.""" + print("on_chain_end outputs", outputs) + self.responseBody = outputs + self.responseStatus = 200 + self.endTimestamp = int(datetime.now().timestamp()) + self.responseTime = self.endTimestamp - self.startTimestamp + + # -------------------------------------------------------------------------------- + + def on_chain_error(self, error: BaseException, **kwargs: Any) -> None: + self.responseBody = error + self.responseStatus = error.status_code # type: ignore[attr-defined] + print("on_chain_error error", error) + print("on_chain_error kwargs", kwargs) + """Do nothing.""" + pass + + def on_llm_error(self, error: BaseException, **kwargs: Any) -> None: + self.responseBody = error + self.responseStatus = error.status_code # type: ignore[attr-defined] + print("on_llm_error error", error) + print("on_llm_error kwargs", kwargs) + """Do nothing.""" + pass + + def on_tool_error(self, error: BaseException, **kwargs: Any) -> None: + self.responseBody = error + self.responseStatus = error.status_code # type: ignore[attr-defined] + pass + + # -------------------------------------------------------------------------------- + + def on_text(self, text: str, **kwargs: Any) -> None: + pass + + def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> None: + pass + + # -------------------------------------------------------------------------------- + + def on_llm_new_token(self, token: str, **kwargs: Any) -> None: + self.streamingMode = True + print("on_llm_new_token token", token) + print("on_llm_new_token kwargs", kwargs) + """Do nothing.""" + pass + + # -------------------------------------------------------------------------------- + + def on_tool_start( + self, + serialized: Dict[str, Any], + input_str: str, + **kwargs: Any, + ) -> None: + print("on_tool_start input_str", input_str) + print("on_tool_start serialized", serialized) + print("on_tool_start kwargs", kwargs) + + pass + + def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any: + print("on_agent_action action", action) + print("on_agent_action kwargs", kwargs) + """Do nothing.""" + pass + + def on_tool_end( + self, + output: Any, + observation_prefix: Optional[str] = None, + llm_prefix: Optional[str] = None, + **kwargs: Any, + ) -> None: + print("on_tool_end output", output) + print("on_tool_end observation_prefix", observation_prefix) + print("on_tool_end llm_prefix", llm_prefix) + print("on_tool_end kwargs", kwargs) + pass + + +""" +winkychLogObject = { + requestMethod: store.requestMethod, + requestURL: store.proxyUrl, + requestHeaders: store.requestHeadersWithoutPortkeyHeaders, + requestBody: store.requestBody, + + responseHeaders: Object.fromEntries(store.response.headers), + responseBody: {}, + responseStatus: store.responseStatus, + responseTime: store.config.responseTime, + config: { + organisationConfig: { id: store.requestHeaders[globals.PORTKEY_CONFIG_HEADER], ...store.organisationConfig}, + organisationDetails: store.organisationDetails, + cacheStatus: store.config.cacheStatus, + retryCount: store.retryCount, + isStreamingMode: store.streamingMode, + portkeyHeaders: store.requestHeadersPortkey + } + } +""" From 25dcec916a2be3063c701d036f68d6db51dd7867 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Thu, 4 Apr 2024 14:15:00 +0530 Subject: [PATCH 02/25] feat: basic llama index callbackHandler template --- portkey_ai/llms/langchain/arize_callback.py | 245 ++++++++++++++++++ portkey_ai/llms/llama_index/llama_callback.py | 66 +++++ 2 files changed, 311 insertions(+) create mode 100644 portkey_ai/llms/langchain/arize_callback.py create mode 100644 portkey_ai/llms/llama_index/llama_callback.py diff --git a/portkey_ai/llms/langchain/arize_callback.py b/portkey_ai/llms/langchain/arize_callback.py new file mode 100644 index 0000000..0fc9aad --- /dev/null +++ b/portkey_ai/llms/langchain/arize_callback.py @@ -0,0 +1,245 @@ +from datetime import datetime +from typing import Any, Dict, List, Optional + +from langchain_core.agents import AgentAction, AgentFinish +from langchain_core.callbacks import BaseCallbackHandler, AsyncCallbackHandler +from langchain_core.outputs import LLMResult + +from langchain_community.callbacks.utils import import_pandas + + +class ArizeCallbackHandler(BaseCallbackHandler): + """Callback Handler that logs to Arize.""" + + def __init__( + self, + model_id: Optional[str] = None, + model_version: Optional[str] = None, + SPACE_KEY: Optional[str] = None, + API_KEY: Optional[str] = None, + ) -> None: + """Initialize callback handler.""" + + super().__init__() + self.model_id = model_id + self.model_version = model_version + self.space_key = SPACE_KEY + self.api_key = API_KEY + self.prompt_records: List[str] = [] + self.response_records: List[str] = [] + self.prediction_ids: List[str] = [] + self.pred_timestamps: List[int] = [] + self.response_embeddings: List[float] = [] + self.prompt_embeddings: List[float] = [] + self.prompt_tokens = 0 + self.completion_tokens = 0 + self.total_tokens = 0 + self.step = 0 + + # from arize.pandas.embeddings import EmbeddingGenerator, UseCases + from arize.pandas.logger import Client + + # self.generator = EmbeddingGenerator.from_use_case( + # use_case=UseCases.NLP.SEQUENCE_CLASSIFICATION, + # model_name="distilbert-base-uncased", + # tokenizer_max_length=512, + # batch_size=256, + # ) + self.arize_client = Client(space_key=SPACE_KEY, api_key=API_KEY) + # if SPACE_KEY == "SPACE_KEY" or API_KEY == "API_KEY": + # raise ValueError("❌ CHANGE SPACE AND API KEYS") + # else: + # print("✅ Arize client setup done! Now you can start using Arize!") # noqa: T201 + + def on_llm_start( + self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any + ) -> None: + print("on_llm_start prompts", prompts) + print("on_llm_start serialized", serialized) + print("on_llm_start kwargs", kwargs) + for prompt in prompts: + self.prompt_records.append(prompt.replace("\n", "")) + + def on_llm_new_token(self, token: str, **kwargs: Any) -> None: + print("on_llm_new_token token", token) + print("on_llm_new_token kwargs", kwargs) + """Do nothing.""" + pass + + def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: + print("on_llm_end response", response) + print("on_llm_end kwargs", kwargs) + pd = import_pandas() + from arize.utils.types import ( + EmbeddingColumnNames, + Environments, + ModelTypes, + Schema, + ) + + # Safe check if 'llm_output' and 'token_usage' exist + if response.llm_output and "token_usage" in response.llm_output: + self.prompt_tokens = response.llm_output["token_usage"].get( + "prompt_tokens", 0 + ) + self.total_tokens = response.llm_output["token_usage"].get( + "total_tokens", 0 + ) + self.completion_tokens = response.llm_output["token_usage"].get( + "completion_tokens", 0 + ) + else: + self.prompt_tokens = ( + self.total_tokens + ) = self.completion_tokens = 0 # assign default value + + for generations in response.generations: + for generation in generations: + prompt = self.prompt_records[self.step] + self.step = self.step + 1 + # prompt_embedding = pd.Series( + # self.generator.generate_embeddings( + # text_col=pd.Series(prompt.replace("\n", " ")) + # ).reset_index(drop=True) + # ) + + # Assigning text to response_text instead of response + response_text = generation.text.replace("\n", " ") + # response_embedding = pd.Series( + # self.generator.generate_embeddings( + # text_col=pd.Series(generation.text.replace("\n", " ")) + # ).reset_index(drop=True) + # ) + pred_timestamp = datetime.now().timestamp() + + # Define the columns and data + columns = [ + "prediction_ts", + "response", + "prompt", + "response_vector", + "prompt_vector", + "prompt_token", + "completion_token", + "total_token", + ] + # data = [ + # [ + # pred_timestamp, + # response_text, + # prompt, + # # response_embedding[0], + # # prompt_embedding[0], + # self.prompt_tokens, + # self.total_tokens, + # self.completion_tokens, + # ] + # ] + + # # Create the DataFrame + # df = pd.DataFrame(data, columns=columns) + + # # Declare prompt and response columns + # prompt_columns = EmbeddingColumnNames( + # vector_column_name="prompt_vector", data_column_name="prompt" + # ) + + # response_columns = EmbeddingColumnNames( + # vector_column_name="response_vector", data_column_name="response" + # ) + + # schema = Schema( + # timestamp_column_name="prediction_ts", + # tag_column_names=[ + # "prompt_token", + # "completion_token", + # "total_token", + # ], + # prompt_column_names=prompt_columns, + # response_column_names=response_columns, + # ) + + # response_from_arize = self.arize_client.log( + # dataframe=df, + # schema=schema, + # model_id=self.model_id, + # model_version=self.model_version, + # model_type=ModelTypes.GENERATIVE_LLM, + # environment=Environments.PRODUCTION, + # ) + # if response_from_arize.status_code == 200: + # print("✅ Successfully logged data to Arize!") # noqa: T201 + # else: + # print(f'❌ Logging failed "{response_from_arize.text}"') # noqa: T201 + + def on_llm_error(self, error: BaseException, **kwargs: Any) -> None: + print("on_llm_error error", error) + print("on_llm_error kwargs", kwargs) + """Do nothing.""" + pass + + def on_chain_start( + self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any + ) -> None: + print("on_chain_start inputs",inputs) + print("on_chain_start serialized", serialized.get('kwargs', '').get('llm', '').get('kwargs', '').get('base_url', '')) + print("on_chain_start kwargs", kwargs) + pass + + def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None: + print("on_chain_end outputs", outputs) + print("on_chain_end kwargs", kwargs) + """Do nothing.""" + pass + + def on_chain_error(self, error: BaseException, **kwargs: Any) -> None: + print("on_chain_error error", error) + print("on_chain_error kwargs", kwargs) + """Do nothing.""" + pass + + def on_tool_start( + self, + serialized: Dict[str, Any], + input_str: str, + **kwargs: Any, + ) -> None: + print("on_tool_start input_str", input_str) + print("on_tool_start serialized", serialized) + print("on_tool_start kwargs", kwargs) + + pass + + def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any: + print("on_agent_action action", action) + print("on_agent_action kwargs", kwargs) + """Do nothing.""" + pass + + def on_tool_end( + self, + output: Any, + observation_prefix: Optional[str] = None, + llm_prefix: Optional[str] = None, + **kwargs: Any, + ) -> None: + print("on_tool_end output", output) + print("on_tool_end observation_prefix", observation_prefix) + print("on_tool_end llm_prefix", llm_prefix) + print("on_tool_end kwargs", kwargs) + pass + + def on_tool_error(self, error: BaseException, **kwargs: Any) -> None: + print("on_tool_error error", error) + print("on_tool_error kwargs", kwargs) + pass + + def on_text(self, text: str, **kwargs: Any) -> None: + print("on_text text", text) + print("on_text kwargs", kwargs) + pass + + def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> None: + print("on_agent_finish finish", finish) + print("on_agent_finish kwargs", kwargs) + pass diff --git a/portkey_ai/llms/llama_index/llama_callback.py b/portkey_ai/llms/llama_index/llama_callback.py new file mode 100644 index 0000000..258b625 --- /dev/null +++ b/portkey_ai/llms/llama_index/llama_callback.py @@ -0,0 +1,66 @@ +from typing import Any, Dict, List, Optional +from llama_index.core.callbacks import BaseCallbackHandler +from llama_index.core.callbacks.schema import BASE_TRACE_EVENT, CBEventType +from llama_index.core.callbacks.base_handler import ( + BaseCallbackHandler as LlamaIndexBaseCallbackHandler, + ) + +from portkey_ai.api_resources.apis.logger import Logger + + + +class PortkeyCallbackHandler(LlamaIndexBaseCallbackHandler): + def __init__( + self, + api_key: str, + ) -> None: + super().__init__( + event_starts_to_ignore=[], + event_ends_to_ignore=[], + ) + + self.api_key = api_key + + + self.portkey_logger = Logger(api_key=api_key) + + def on_event_start( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + parent_id: str = "", + **kwargs: Any, + ) -> str: + """Run when an event starts and return id of event.""" + + print("on_event_start event_type", event_type) + print("on_event_start payload", payload) + print("on_event_start event_id", event_id) + print("on_event_start parent_id", parent_id) + print("on_event_start kwargs", kwargs) + + def on_event_end( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + **kwargs: Any, + ) -> None: + """Run when an event ends.""" + print("on_event_end event_type", event_type) + print("on_event_end payload", payload) + print("on_event_end event_id", event_id) + + def start_trace(self, trace_id: Optional[str] = None) -> None: + """Run when an overall trace is launched.""" + print("start_trace trace_id", trace_id) + + def end_trace( + self, + trace_id: Optional[str] = None, + trace_map: Optional[Dict[str, List[str]]] = None, + ) -> None: + """Run when an overall trace is exited.""" + print("end_trace trace_id",trace_id) + print("end_trace trace_map",trace_map) \ No newline at end of file From 2d83b4f9470467e41849bb05d8149cfc6ab12c6f Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Thu, 4 Apr 2024 14:16:07 +0530 Subject: [PATCH 03/25] feat: basic llama index callbackHandler template --- portkey_ai/llms/langchain/arize_callback.py | 245 -------------------- 1 file changed, 245 deletions(-) delete mode 100644 portkey_ai/llms/langchain/arize_callback.py diff --git a/portkey_ai/llms/langchain/arize_callback.py b/portkey_ai/llms/langchain/arize_callback.py deleted file mode 100644 index 0fc9aad..0000000 --- a/portkey_ai/llms/langchain/arize_callback.py +++ /dev/null @@ -1,245 +0,0 @@ -from datetime import datetime -from typing import Any, Dict, List, Optional - -from langchain_core.agents import AgentAction, AgentFinish -from langchain_core.callbacks import BaseCallbackHandler, AsyncCallbackHandler -from langchain_core.outputs import LLMResult - -from langchain_community.callbacks.utils import import_pandas - - -class ArizeCallbackHandler(BaseCallbackHandler): - """Callback Handler that logs to Arize.""" - - def __init__( - self, - model_id: Optional[str] = None, - model_version: Optional[str] = None, - SPACE_KEY: Optional[str] = None, - API_KEY: Optional[str] = None, - ) -> None: - """Initialize callback handler.""" - - super().__init__() - self.model_id = model_id - self.model_version = model_version - self.space_key = SPACE_KEY - self.api_key = API_KEY - self.prompt_records: List[str] = [] - self.response_records: List[str] = [] - self.prediction_ids: List[str] = [] - self.pred_timestamps: List[int] = [] - self.response_embeddings: List[float] = [] - self.prompt_embeddings: List[float] = [] - self.prompt_tokens = 0 - self.completion_tokens = 0 - self.total_tokens = 0 - self.step = 0 - - # from arize.pandas.embeddings import EmbeddingGenerator, UseCases - from arize.pandas.logger import Client - - # self.generator = EmbeddingGenerator.from_use_case( - # use_case=UseCases.NLP.SEQUENCE_CLASSIFICATION, - # model_name="distilbert-base-uncased", - # tokenizer_max_length=512, - # batch_size=256, - # ) - self.arize_client = Client(space_key=SPACE_KEY, api_key=API_KEY) - # if SPACE_KEY == "SPACE_KEY" or API_KEY == "API_KEY": - # raise ValueError("❌ CHANGE SPACE AND API KEYS") - # else: - # print("✅ Arize client setup done! Now you can start using Arize!") # noqa: T201 - - def on_llm_start( - self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any - ) -> None: - print("on_llm_start prompts", prompts) - print("on_llm_start serialized", serialized) - print("on_llm_start kwargs", kwargs) - for prompt in prompts: - self.prompt_records.append(prompt.replace("\n", "")) - - def on_llm_new_token(self, token: str, **kwargs: Any) -> None: - print("on_llm_new_token token", token) - print("on_llm_new_token kwargs", kwargs) - """Do nothing.""" - pass - - def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: - print("on_llm_end response", response) - print("on_llm_end kwargs", kwargs) - pd = import_pandas() - from arize.utils.types import ( - EmbeddingColumnNames, - Environments, - ModelTypes, - Schema, - ) - - # Safe check if 'llm_output' and 'token_usage' exist - if response.llm_output and "token_usage" in response.llm_output: - self.prompt_tokens = response.llm_output["token_usage"].get( - "prompt_tokens", 0 - ) - self.total_tokens = response.llm_output["token_usage"].get( - "total_tokens", 0 - ) - self.completion_tokens = response.llm_output["token_usage"].get( - "completion_tokens", 0 - ) - else: - self.prompt_tokens = ( - self.total_tokens - ) = self.completion_tokens = 0 # assign default value - - for generations in response.generations: - for generation in generations: - prompt = self.prompt_records[self.step] - self.step = self.step + 1 - # prompt_embedding = pd.Series( - # self.generator.generate_embeddings( - # text_col=pd.Series(prompt.replace("\n", " ")) - # ).reset_index(drop=True) - # ) - - # Assigning text to response_text instead of response - response_text = generation.text.replace("\n", " ") - # response_embedding = pd.Series( - # self.generator.generate_embeddings( - # text_col=pd.Series(generation.text.replace("\n", " ")) - # ).reset_index(drop=True) - # ) - pred_timestamp = datetime.now().timestamp() - - # Define the columns and data - columns = [ - "prediction_ts", - "response", - "prompt", - "response_vector", - "prompt_vector", - "prompt_token", - "completion_token", - "total_token", - ] - # data = [ - # [ - # pred_timestamp, - # response_text, - # prompt, - # # response_embedding[0], - # # prompt_embedding[0], - # self.prompt_tokens, - # self.total_tokens, - # self.completion_tokens, - # ] - # ] - - # # Create the DataFrame - # df = pd.DataFrame(data, columns=columns) - - # # Declare prompt and response columns - # prompt_columns = EmbeddingColumnNames( - # vector_column_name="prompt_vector", data_column_name="prompt" - # ) - - # response_columns = EmbeddingColumnNames( - # vector_column_name="response_vector", data_column_name="response" - # ) - - # schema = Schema( - # timestamp_column_name="prediction_ts", - # tag_column_names=[ - # "prompt_token", - # "completion_token", - # "total_token", - # ], - # prompt_column_names=prompt_columns, - # response_column_names=response_columns, - # ) - - # response_from_arize = self.arize_client.log( - # dataframe=df, - # schema=schema, - # model_id=self.model_id, - # model_version=self.model_version, - # model_type=ModelTypes.GENERATIVE_LLM, - # environment=Environments.PRODUCTION, - # ) - # if response_from_arize.status_code == 200: - # print("✅ Successfully logged data to Arize!") # noqa: T201 - # else: - # print(f'❌ Logging failed "{response_from_arize.text}"') # noqa: T201 - - def on_llm_error(self, error: BaseException, **kwargs: Any) -> None: - print("on_llm_error error", error) - print("on_llm_error kwargs", kwargs) - """Do nothing.""" - pass - - def on_chain_start( - self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any - ) -> None: - print("on_chain_start inputs",inputs) - print("on_chain_start serialized", serialized.get('kwargs', '').get('llm', '').get('kwargs', '').get('base_url', '')) - print("on_chain_start kwargs", kwargs) - pass - - def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None: - print("on_chain_end outputs", outputs) - print("on_chain_end kwargs", kwargs) - """Do nothing.""" - pass - - def on_chain_error(self, error: BaseException, **kwargs: Any) -> None: - print("on_chain_error error", error) - print("on_chain_error kwargs", kwargs) - """Do nothing.""" - pass - - def on_tool_start( - self, - serialized: Dict[str, Any], - input_str: str, - **kwargs: Any, - ) -> None: - print("on_tool_start input_str", input_str) - print("on_tool_start serialized", serialized) - print("on_tool_start kwargs", kwargs) - - pass - - def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any: - print("on_agent_action action", action) - print("on_agent_action kwargs", kwargs) - """Do nothing.""" - pass - - def on_tool_end( - self, - output: Any, - observation_prefix: Optional[str] = None, - llm_prefix: Optional[str] = None, - **kwargs: Any, - ) -> None: - print("on_tool_end output", output) - print("on_tool_end observation_prefix", observation_prefix) - print("on_tool_end llm_prefix", llm_prefix) - print("on_tool_end kwargs", kwargs) - pass - - def on_tool_error(self, error: BaseException, **kwargs: Any) -> None: - print("on_tool_error error", error) - print("on_tool_error kwargs", kwargs) - pass - - def on_text(self, text: str, **kwargs: Any) -> None: - print("on_text text", text) - print("on_text kwargs", kwargs) - pass - - def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> None: - print("on_agent_finish finish", finish) - print("on_agent_finish kwargs", kwargs) - pass From cf33db13b15a6d10b17ad573ae98a949bc0cf202 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Mon, 8 Apr 2024 17:54:26 +0530 Subject: [PATCH 04/25] feat: updated req res object for logging --- portkey_ai/llms/langchain/portkey_callback.py | 132 ++++++++---------- 1 file changed, 56 insertions(+), 76 deletions(-) diff --git a/portkey_ai/llms/langchain/portkey_callback.py b/portkey_ai/llms/langchain/portkey_callback.py index 2a22aa1..63bdb54 100644 --- a/portkey_ai/llms/langchain/portkey_callback.py +++ b/portkey_ai/llms/langchain/portkey_callback.py @@ -1,4 +1,5 @@ from datetime import datetime +import time from typing import Any, Dict, List, Optional from langchain_core.callbacks import BaseCallbackHandler from langchain_core.outputs import LLMResult @@ -8,76 +9,62 @@ class PortkeyCallbackHandler(BaseCallbackHandler): - startTimestamp: int = 0 - endTimestamp: int = 0 + def __init__( self, api_key: str, ) -> None: super().__init__() + self.startTimestamp: float = 0 + self.endTimestamp: float = 0 self.api_key = api_key self.portkey_logger = Logger(api_key=api_key) - # ------------------------------------------------ self.log_object: Dict[str, Any] = {} self.prompt_records: List[str] = [] - self.usage_records: Any = {} - self.prompt_tokens: int = 0 - self.completion_tokens: int = 0 - self.total_tokens: int = 0 - # ------------------------------------------------ - - # ------------------------------------------------ - self.requestMethod: str = "POST" - self.requestURL: str = "" - self.requestHeaders: Dict[str, Any] = {} - self.requestBody: Any = {} + + self.request: Any = {} + + self.response: Any = {} self.responseHeaders: Dict[str, Any] = {} # Nowhere to get this from self.responseBody: Any = None self.responseStatus: int = 0 - self.responseTime: int = 0 self.streamingMode: bool = False - # ------------------------------------------------ - # self.config: Dict[str, Any] = {} - # self.organisationConfig: Dict[str, Any] = {} - # self.organisationDetails: Dict[str, Any] = {} - # self.cacheStatus: str = None - # self.retryCount: int = 0 - # self.portkeyHeaders: Dict[str, Any] = {} - # ------------------------------------------------ - if not api_key: raise ValueError("Please provide an API key to use PortkeyCallbackHandler") def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any ) -> None: - print("on_llm_start serialized", serialized["id"][2]) + print("on_llm_start serialized", serialized) print("on_llm_start prompts", prompts) print("on_llm_start kwargs", kwargs) - self.startTimestamp = int(datetime.now().timestamp()) - for prompt in prompts: - self.prompt_records.append(prompt.replace("\n", " ")) - - self.requestURL = serialized.get("kwargs", "").get("base_url", "") - self.requestHeaders = serialized.get("kwargs", {}).get("default_headers", {}) - self.requestHeaders.update({"provider": serialized["id"][2]}) - - self.requestBody = kwargs - self.requestBody["prompts"] = self.prompt_records + messages = prompt.split("\n") + for message in messages: + role, content = message.split(":",1) + self.prompt_records.append({'role':role.lower(), 'content':content.strip()}) - print("on_llm_start requestBody:", self.requestBody) + self.startTimestamp = float(datetime.now().timestamp()) self.streamingMode = kwargs.get("invocation_params", False).get("stream", False) + self.request['url'] = serialized.get("kwargs", "").get("base_url", "") + self.request['method'] = "POST" + self.request['headers'] = serialized.get("kwargs", {}).get("default_headers", {}) + self.request['headers'].update({"provider": serialized["id"][2]}) + self.request['body']= {'messages':self.prompt_records} + self.request['body'].update({**kwargs.get("invocation_params", {})}) + + print("on_llm_start request", self.request) + def on_chain_start( self, serialized: Dict[str, Any], @@ -85,21 +72,6 @@ def on_chain_start( **kwargs: Any, ) -> None: """Run when chain starts running.""" - self.requestBody = {**inputs, **kwargs} - self.requestHeaders = ( - serialized.get("kwargs", {}) - .get("llm", {}) - .get("kwargs", {}) - .get("default_headers", {}) - ) - self.requestURL = ( - serialized.get("kwargs", "") - .get("llm", "") - .get("kwargs", "") - .get("base_url", "") - ) - - self.startTimestamp = int(datetime.now().timestamp()) print("on_chain_start inputs", inputs) print("on_chain_start kwargs", kwargs) print("on_chain_start serialized", serialized) @@ -107,34 +79,45 @@ def on_chain_start( # -------------------------------------------------------------------------------- def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: - print("on_llm_end response", response.generations) + print("on_llm_end response", response) print("on_llm_end kwargs", kwargs) self.responseBody = response self.responseStatus = 200 - self.endTimestamp = int(datetime.now().timestamp()) - self.responseTime = self.endTimestamp - self.startTimestamp - - """This will handle all the token usage information from the LLM.""" - if response.llm_output and "token_usage" in response.llm_output: - usage = response.llm_output["token_usage"] - self.completion_tokens = usage.get("completion_tokens", 0) - self.prompt_tokens = usage.get("prompt_tokens", 0) - self.total_tokens = usage.get( - "total_tokens", self.completion_tokens + self.prompt_tokens - ) - self.usage_records["usage"] = usage - """This will handle all the token usage information from the LLM.""" + self.endTimestamp = float(datetime.now().timestamp()) + responseTime = self.endTimestamp - self.startTimestamp + + usage = response.llm_output.get("token_usage", {}) + + self.response['status'] = 200 + self.response['body'] = {'choices': [{ + "index":0, + "message": { + "role": "assistant", + "content": response.generations[0][0].text + }, + "logprobs": response.generations[0][0].generation_info['logprobs'], + "finish_reason": response.generations[0][0].generation_info['finish_reason'], + + }]} + self.response['body'].update({'usage': usage}) + self.response['body'].update({'id': str(kwargs.get("run_id", ""))}) + self.response['body'].update({'created':int(time.time())}) + self.response['body'].update({'model': response.llm_output.get("model_name", "")}) + self.response['body'].update({'system_fingerprint': response.llm_output.get("system_fingerprint", "")}) + self.response['responseTime'] = int(responseTime * 1000) + + print("on_llm_end response", self.response) self.log_object.update( { - "requestMethod": self.requestMethod, - "requestURL": self.requestURL, - "requestHeaders": self.requestHeaders, - "requestBody": self.requestBody, + "requestMethod": self.request['method'], + "requestURL": self.request['url'], + "requestHeaders": self.request['headers'], + "requestBody": self.request['body'], "responseHeaders": self.responseHeaders, - "responseBody": self.responseBody, - "responseStatus": self.responseStatus, - "responseTime": self.responseTime, + "responseBody": self.response['body'], + "responseStatus": self.response['status'], + "responseTime": self.response['responseTime'] , "streamingMode": self.streamingMode, } ) @@ -150,10 +133,7 @@ def on_chain_end( ) -> None: """Run when chain ends running.""" print("on_chain_end outputs", outputs) - self.responseBody = outputs - self.responseStatus = 200 - self.endTimestamp = int(datetime.now().timestamp()) - self.responseTime = self.endTimestamp - self.startTimestamp + print("on_chain_end kwargs", kwargs) # -------------------------------------------------------------------------------- From 4fe6eafb13fbf38346ee6cc1ab76789c95795018 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Sat, 13 Apr 2024 10:21:39 +0530 Subject: [PATCH 05/25] feat: clean up for langchain and logger --- portkey_ai/api_resources/apis/logger.py | 26 +++++++-------- portkey_ai/llms/langchain/portkey_callback.py | 33 +------------------ 2 files changed, 14 insertions(+), 45 deletions(-) diff --git a/portkey_ai/api_resources/apis/logger.py b/portkey_ai/api_resources/apis/logger.py index 92290ca..62b99bb 100644 --- a/portkey_ai/api_resources/apis/logger.py +++ b/portkey_ai/api_resources/apis/logger.py @@ -12,9 +12,10 @@ class Logger: def __init__( self, api_key: Optional[str] = None, + chain: Optional[str] = None, ) -> None: api_key = api_key or os.getenv("PORTKEY_API_KEY") - print("Logger API Key: ", api_key) + self.chain = chain self.headers = { "Content-Type": "application/json", @@ -32,21 +33,20 @@ def log( ): body = log_object - print("self.url", self.url) - print("self.body", body) - - self.headers.update( - { - "x-portkey-provider": Logger.get_provider( - body["requestHeaders"]["provider"] - ) - } - ) + if self.chain == "llama_index": + self.headers.update({"x-portkey-provider": "openai"}) # placeholder + else: + self.headers.update( + { + "x-portkey-provider": Logger.get_provider( + body["requestHeaders"]["provider"] + ) + } + ) response = requests.post( - url=self.url, json=json.dumps(log_object, default=str), headers=self.headers + url=self.url, json=json.dumps(log_object), headers=self.headers ) - print("Logger response", response.json()) return response.status_code @staticmethod diff --git a/portkey_ai/llms/langchain/portkey_callback.py b/portkey_ai/llms/langchain/portkey_callback.py index 63bdb54..665a40d 100644 --- a/portkey_ai/llms/langchain/portkey_callback.py +++ b/portkey_ai/llms/langchain/portkey_callback.py @@ -42,9 +42,6 @@ def __init__( def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any ) -> None: - print("on_llm_start serialized", serialized) - print("on_llm_start prompts", prompts) - print("on_llm_start kwargs", kwargs) for prompt in prompts: messages = prompt.split("\n") @@ -62,8 +59,6 @@ def on_llm_start( self.request['headers'].update({"provider": serialized["id"][2]}) self.request['body']= {'messages':self.prompt_records} self.request['body'].update({**kwargs.get("invocation_params", {})}) - - print("on_llm_start request", self.request) def on_chain_start( self, @@ -72,15 +67,10 @@ def on_chain_start( **kwargs: Any, ) -> None: """Run when chain starts running.""" - print("on_chain_start inputs", inputs) - print("on_chain_start kwargs", kwargs) - print("on_chain_start serialized", serialized) # -------------------------------------------------------------------------------- def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: - print("on_llm_end response", response) - print("on_llm_end kwargs", kwargs) self.responseBody = response self.responseStatus = 200 self.endTimestamp = float(datetime.now().timestamp()) @@ -106,8 +96,6 @@ def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: self.response['body'].update({'system_fingerprint': response.llm_output.get("system_fingerprint", "")}) self.response['responseTime'] = int(responseTime * 1000) - print("on_llm_end response", self.response) - self.log_object.update( { "requestMethod": self.request['method'], @@ -122,8 +110,6 @@ def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: } ) - print("on_llm_end log_object", {**self.log_object}) - self.portkey_logger.log(log_object=self.log_object) def on_chain_end( @@ -132,24 +118,19 @@ def on_chain_end( **kwargs: Any, ) -> None: """Run when chain ends running.""" - print("on_chain_end outputs", outputs) - print("on_chain_end kwargs", kwargs) + pass # -------------------------------------------------------------------------------- def on_chain_error(self, error: BaseException, **kwargs: Any) -> None: self.responseBody = error self.responseStatus = error.status_code # type: ignore[attr-defined] - print("on_chain_error error", error) - print("on_chain_error kwargs", kwargs) """Do nothing.""" pass def on_llm_error(self, error: BaseException, **kwargs: Any) -> None: self.responseBody = error self.responseStatus = error.status_code # type: ignore[attr-defined] - print("on_llm_error error", error) - print("on_llm_error kwargs", kwargs) """Do nothing.""" pass @@ -170,8 +151,6 @@ def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> None: def on_llm_new_token(self, token: str, **kwargs: Any) -> None: self.streamingMode = True - print("on_llm_new_token token", token) - print("on_llm_new_token kwargs", kwargs) """Do nothing.""" pass @@ -183,15 +162,9 @@ def on_tool_start( input_str: str, **kwargs: Any, ) -> None: - print("on_tool_start input_str", input_str) - print("on_tool_start serialized", serialized) - print("on_tool_start kwargs", kwargs) - pass def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any: - print("on_agent_action action", action) - print("on_agent_action kwargs", kwargs) """Do nothing.""" pass @@ -202,10 +175,6 @@ def on_tool_end( llm_prefix: Optional[str] = None, **kwargs: Any, ) -> None: - print("on_tool_end output", output) - print("on_tool_end observation_prefix", observation_prefix) - print("on_tool_end llm_prefix", llm_prefix) - print("on_tool_end kwargs", kwargs) pass From 77f0340e57eccfceab0a82e9abb5eed692f4de1a Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Thu, 18 Apr 2024 19:45:46 +0530 Subject: [PATCH 06/25] feat: llama index callback handler --- portkey_ai/api_resources/apis/logger.py | 24 +- portkey_ai/llms/llama_index/__init__.py | 4 +- portkey_ai/llms/llama_index/llama_callback.py | 66 ----- .../llama_index/portkey_llama_callback.py | 245 ++++++++++++++++++ 4 files changed, 257 insertions(+), 82 deletions(-) delete mode 100644 portkey_ai/llms/llama_index/llama_callback.py create mode 100644 portkey_ai/llms/llama_index/portkey_llama_callback.py diff --git a/portkey_ai/api_resources/apis/logger.py b/portkey_ai/api_resources/apis/logger.py index 62b99bb..4a8e82a 100644 --- a/portkey_ai/api_resources/apis/logger.py +++ b/portkey_ai/api_resources/apis/logger.py @@ -12,10 +12,8 @@ class Logger: def __init__( self, api_key: Optional[str] = None, - chain: Optional[str] = None, ) -> None: api_key = api_key or os.getenv("PORTKEY_API_KEY") - self.chain = chain self.headers = { "Content-Type": "application/json", @@ -33,19 +31,17 @@ def log( ): body = log_object - if self.chain == "llama_index": - self.headers.update({"x-portkey-provider": "openai"}) # placeholder - else: - self.headers.update( - { - "x-portkey-provider": Logger.get_provider( - body["requestHeaders"]["provider"] - ) - } - ) + + self.headers.update( + { + "x-portkey-provider": Logger.get_provider( + body["requestHeaders"]["provider"] + ) + } + ) response = requests.post( - url=self.url, json=json.dumps(log_object), headers=self.headers + url=self.url, json=json.dumps(log_object, default='str'), headers=self.headers ) return response.status_code @@ -55,4 +51,4 @@ def get_provider(provider): "openai": "openai", "mistralai": "mistral-ai", } - return provider_dict.get(provider) + return provider_dict.get(provider, "openai") # placeholder diff --git a/portkey_ai/llms/llama_index/__init__.py b/portkey_ai/llms/llama_index/__init__.py index 63c2303..9d73cdf 100644 --- a/portkey_ai/llms/llama_index/__init__.py +++ b/portkey_ai/llms/llama_index/__init__.py @@ -1,3 +1,3 @@ -from .completions import PortkeyLLM +# from .completions import PortkeyLLM -__all__ = ["PortkeyLLM"] +# __all__ = ["PortkeyLLM"] diff --git a/portkey_ai/llms/llama_index/llama_callback.py b/portkey_ai/llms/llama_index/llama_callback.py deleted file mode 100644 index 258b625..0000000 --- a/portkey_ai/llms/llama_index/llama_callback.py +++ /dev/null @@ -1,66 +0,0 @@ -from typing import Any, Dict, List, Optional -from llama_index.core.callbacks import BaseCallbackHandler -from llama_index.core.callbacks.schema import BASE_TRACE_EVENT, CBEventType -from llama_index.core.callbacks.base_handler import ( - BaseCallbackHandler as LlamaIndexBaseCallbackHandler, - ) - -from portkey_ai.api_resources.apis.logger import Logger - - - -class PortkeyCallbackHandler(LlamaIndexBaseCallbackHandler): - def __init__( - self, - api_key: str, - ) -> None: - super().__init__( - event_starts_to_ignore=[], - event_ends_to_ignore=[], - ) - - self.api_key = api_key - - - self.portkey_logger = Logger(api_key=api_key) - - def on_event_start( - self, - event_type: CBEventType, - payload: Optional[Dict[str, Any]] = None, - event_id: str = "", - parent_id: str = "", - **kwargs: Any, - ) -> str: - """Run when an event starts and return id of event.""" - - print("on_event_start event_type", event_type) - print("on_event_start payload", payload) - print("on_event_start event_id", event_id) - print("on_event_start parent_id", parent_id) - print("on_event_start kwargs", kwargs) - - def on_event_end( - self, - event_type: CBEventType, - payload: Optional[Dict[str, Any]] = None, - event_id: str = "", - **kwargs: Any, - ) -> None: - """Run when an event ends.""" - print("on_event_end event_type", event_type) - print("on_event_end payload", payload) - print("on_event_end event_id", event_id) - - def start_trace(self, trace_id: Optional[str] = None) -> None: - """Run when an overall trace is launched.""" - print("start_trace trace_id", trace_id) - - def end_trace( - self, - trace_id: Optional[str] = None, - trace_map: Optional[Dict[str, List[str]]] = None, - ) -> None: - """Run when an overall trace is exited.""" - print("end_trace trace_id",trace_id) - print("end_trace trace_map",trace_map) \ No newline at end of file diff --git a/portkey_ai/llms/llama_index/portkey_llama_callback.py b/portkey_ai/llms/llama_index/portkey_llama_callback.py new file mode 100644 index 0000000..6e8b192 --- /dev/null +++ b/portkey_ai/llms/llama_index/portkey_llama_callback.py @@ -0,0 +1,245 @@ +from collections import defaultdict +import time +from typing import Any, Dict, List, Optional +from llama_index.core.callbacks.base_handler import ( + BaseCallbackHandler as LlamaIndexBaseCallbackHandler, + ) + +from portkey_ai.api_resources.apis.logger import Logger +from datetime import datetime, timezone +from llama_index.core.callbacks.schema import ( + CBEventType, + EventPayload + ) +from llama_index.core.utilities.token_counting import TokenCounter + + +class PortkeyCallbackHandler(LlamaIndexBaseCallbackHandler): + startTimestamp: int = 0 + endTimestamp: int = 0 + + def __init__( + self, + api_key: str, + ) -> None: + super().__init__( + event_starts_to_ignore=[], + event_ends_to_ignore=[], + ) + + self.api_key = api_key + + self.portkey_logger = Logger(api_key=api_key) + + # self.llm_flag_start = False + # self.embedding_flag_start = False + + # self.llm_flag_stop = False + # self.embedding_flag_stop = False + + self.start = False + self.end = False + + self.end_trace_flag = False + + self._token_counter = TokenCounter() + self.token_embedding = 0 + self.token_llm = 0 + self.token_sum = 0 + + + # ------------------------------------------------ + self.log_object: Dict[str, Any] = {} + self.prompt_records: List[str] = [] + self.usage_records: Any = {} + self.prompt_tokens: int = 0 + self.completion_tokens: int = 0 + self.total_tokens: int = 0 + # ------------------------------------------------ + + # ------------------------------------------------ + self.request: Any = {} + self.requestMethod: str = "POST" # Done + self.requestURL: str = "" + self.requestHeaders: Dict[str, Any] = {} + self.requestBody: Any = {} # Done + + # ----------------------------------------------- + self.response: Any = {} + self.responseHeaders: Dict[str, Any] = {} # Nowhere to get this from + self.responseBody: Any = {} # Done + self.responseStatus: int = 0 # Done + self.responseTime: int = 0 # Done + + self.streamingMode: bool = False + + if not api_key: + raise ValueError("Please provide an API key to use PortkeyCallbackHandler") + + + + def on_event_start( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + parent_id: str = "", + **kwargs: Any, + ) -> str: + """Run when an event starts and return id of event.""" + + if(event_type == CBEventType.LLM): + self.llm_event_start(payload) + + print("on_event_start event_type", event_type) + print("on_event_start payload", payload) + print("on_event_start event_id", event_id) + print("on_event_start parent_id", parent_id) + print("on_event_start kwargs", kwargs) + + def on_event_end( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + **kwargs: Any, + ) -> None: + """Run when an event ends.""" + + if(event_type == CBEventType.LLM): + self.llm_event_stop(payload, event_id) + + # if(event_type == CBEventType.EMBEDDING): + # self.embedding_event_stop(payload, self.embedding_flag_stop) + + print("on_event_end event_type", event_type) + print("on_event_end event_id", event_id) + print("on_event_end payload", payload) + + + def start_trace(self, trace_id: Optional[str] = None) -> None: + """Run when an overall trace is launched.""" + if not self.start: + self.startTimestamp = int(datetime.now().timestamp()) + # self.start = True + print("start_trace trace_id",trace_id) + + def end_trace( + self, + trace_id: Optional[str] = None, + trace_map: Optional[Dict[str, List[str]]] = None, + ) -> None: + """Run when an overall trace is exited.""" + # self.endTimestamp = int(datetime.now().timestamp()) + # self.responseTime = self.endTimestamp - self.startTimestamp + # self.total_tokens = self.token_llm + self.token_embedding + # self.responseStatus = 200 + # self.log_object.update( + # { + # "requestMethod": self.requestMethod, + # "requestURL": self.requestURL, + # "requestHeaders": self.requestHeaders, + # "requestBody": self.requestBody, + # "responseHeaders": self.responseHeaders, + # "responseBody": self.responseBody, + # "responseStatus": self.responseStatus, + # "responseTime": self.responseTime, + # "streamingMode": self.streamingMode, + # } + # ) + # print("LOGGER WILL BE CALLED NOW") + # # self.portkey_logger.log(log_object=self.log_object) + + # # self.end_trace_flag = True + + + print("end_trace trace_id",trace_id) + print("end_trace trace_map",trace_map) + print("end_trace total_tokens", self.total_tokens) + + + def llm_event_start(self, payload: Any) -> None: + + if EventPayload.MESSAGES in payload: + messages = payload.get(EventPayload.MESSAGES, {}) + self.prompt_records = [{'role': m.role.value, 'content': m.content} for m in messages] + + print("llm_event_start prompt_records: ", self.prompt_records) + + self.request['url'] = payload.get(EventPayload.SERIALIZED, {}).get("base_url", "") + self.request['method'] = "POST" + self.requestHeaders["provider"] = "openai" + self.request['body'] = {'messages':self.prompt_records} + self.request['body'].update({"model": payload.get(EventPayload.SERIALIZED, {}).get("model", "")}) + self.request['body'].update({"temperature": payload.get(EventPayload.SERIALIZED, {}).get("temperature", "")}) + + + + print("llm_event_start REQUEST: ", self.request) + + + + + # if not llm_flag_start and EventPayload.SERIALIZED in payload: + # print("llm_event_start llm_flag_start: ", llm_flag_start) + # print("llm_event_start payload: ", payload.get(EventPayload.SERIALIZED, {})) + # self.requestBody["llm"] = payload.get(EventPayload.SERIALIZED, {}) + + # self.llm_flag_start = True + + return None + + # def embedding_event_start(self, payload: Any, embedding_flag_start: bool) -> None: + + # if not embedding_flag_start and EventPayload.SERIALIZED in payload: + # print("embedding_event_start embedding_flag_start: ", embedding_flag_start) + # print("embedding_event_start payload: ", payload.get(EventPayload.SERIALIZED, {})) + # self.requestBody["embedding"] = payload.get(EventPayload.SERIALIZED, {}) + # # self.embedding_flag_start = True + + def llm_event_stop(self, payload: Any, event_id) -> None: + + self.endTimestamp = float(datetime.now().timestamp()) + responseTime = self.endTimestamp - self.startTimestamp + + data = payload.get(EventPayload.RESPONSE, {}) + # print("llm_event_stop BODY: ", data) + chunks = payload.get(EventPayload.MESSAGES, {}) + self.token_llm = self._token_counter.estimate_tokens_in_messages(chunks) + + self.response['status'] = 200 + self.response['body'] = {'choices': [{ + "index":0, + "message": { + "role": data.message.role.value, + "content": data.message.content + }, + "logprobs": data.logprobs, + "finish_reason": "done" + }]} + self.response['body'].update({'usage': {'total_tokens': self.token_llm}}) + self.response['body'].update({'id': event_id}) + self.response['body'].update({'created':int(time.time())}) + self.response['body'].update({'model': data.raw.get("model", "")}) + self.response['responseTime'] = int(responseTime * 1000) + + + self.log_object.update( + { + "requestMethod": self.request['method'], + "requestURL": self.request['url'], + "requestHeaders": self.requestHeaders, + "requestBody": self.request['body'], + "responseHeaders": self.responseHeaders, + "responseBody": self.response['body'], + "responseStatus": self.response['status'], + "responseTime": self.response['responseTime'] , + "streamingMode": self.streamingMode, + } + ) + + self.portkey_logger.log(log_object=self.log_object) + + print("llm_event_stop RESPONSE: ", self.response) + + return None From e2d39eaae77ad5290f44221f2761e0e4925485d2 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Mon, 6 May 2024 18:59:51 +0530 Subject: [PATCH 07/25] feat: update langchain and llamaindex handlers + logger file --- portkey_ai/api_resources/apis/logger.py | 20 ++-- portkey_ai/llms/langchain/portkey_callback.py | 41 ++------ .../llama_index/portkey_llama_callback.py | 97 ++----------------- 3 files changed, 23 insertions(+), 135 deletions(-) diff --git a/portkey_ai/api_resources/apis/logger.py b/portkey_ai/api_resources/apis/logger.py index 4a8e82a..006de9b 100644 --- a/portkey_ai/api_resources/apis/logger.py +++ b/portkey_ai/api_resources/apis/logger.py @@ -7,6 +7,8 @@ from portkey_ai.api_resources.exceptions import AuthenticationError import requests +from portkey_ai.api_resources.global_constants import PORTKEY_BASE_URL + class Logger: def __init__( @@ -20,7 +22,8 @@ def __init__( "x-portkey-api-key": api_key, } - self.url = "https://api.portkey.ai/v1/logger" + # self.url = PORTKEY_BASE_URL + "/logs" + self.url = "https://api.portkeydev.com/v1/logs" if api_key is None: raise ValueError("API key is required to use the Logger API") @@ -29,22 +32,13 @@ def log( self, log_object: dict, ): - body = log_object - - - self.headers.update( - { - "x-portkey-provider": Logger.get_provider( - body["requestHeaders"]["provider"] - ) - } - ) - response = requests.post( - url=self.url, json=json.dumps(log_object, default='str'), headers=self.headers + url=self.url, data=json.dumps(log_object, default='str'), headers=self.headers ) + return response.status_code + # Not using this function right now @staticmethod def get_provider(provider): provider_dict = { diff --git a/portkey_ai/llms/langchain/portkey_callback.py b/portkey_ai/llms/langchain/portkey_callback.py index 665a40d..4a502d7 100644 --- a/portkey_ai/llms/langchain/portkey_callback.py +++ b/portkey_ai/llms/langchain/portkey_callback.py @@ -53,8 +53,9 @@ def on_llm_start( self.streamingMode = kwargs.get("invocation_params", False).get("stream", False) - self.request['url'] = serialized.get("kwargs", "").get("base_url", "") self.request['method'] = "POST" + self.request['url'] = serialized.get("kwargs", "").get("base_url", "https://api.openai.com/v1/chat/completions") # placeholder + self.request['provider'] = serialized["id"][2] self.request['headers'] = serialized.get("kwargs", {}).get("default_headers", {}) self.request['headers'].update({"provider": serialized["id"][2]}) self.request['body']= {'messages':self.prompt_records} @@ -94,19 +95,14 @@ def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: self.response['body'].update({'created':int(time.time())}) self.response['body'].update({'model': response.llm_output.get("model_name", "")}) self.response['body'].update({'system_fingerprint': response.llm_output.get("system_fingerprint", "")}) - self.response['responseTime'] = int(responseTime * 1000) + self.response['time'] = int(responseTime * 1000) + self.response['headers'] = {} + self.response['streamingMode'] = self.streamingMode self.log_object.update( { - "requestMethod": self.request['method'], - "requestURL": self.request['url'], - "requestHeaders": self.request['headers'], - "requestBody": self.request['body'], - "responseHeaders": self.responseHeaders, - "responseBody": self.response['body'], - "responseStatus": self.response['status'], - "responseTime": self.response['responseTime'] , - "streamingMode": self.streamingMode, + "request": self.request, + "response": self.response, } ) @@ -176,26 +172,3 @@ def on_tool_end( **kwargs: Any, ) -> None: pass - - -""" -winkychLogObject = { - requestMethod: store.requestMethod, - requestURL: store.proxyUrl, - requestHeaders: store.requestHeadersWithoutPortkeyHeaders, - requestBody: store.requestBody, - - responseHeaders: Object.fromEntries(store.response.headers), - responseBody: {}, - responseStatus: store.responseStatus, - responseTime: store.config.responseTime, - config: { - organisationConfig: { id: store.requestHeaders[globals.PORTKEY_CONFIG_HEADER], ...store.organisationConfig}, - organisationDetails: store.organisationDetails, - cacheStatus: store.config.cacheStatus, - retryCount: store.retryCount, - isStreamingMode: store.streamingMode, - portkeyHeaders: store.requestHeadersPortkey - } - } -""" diff --git a/portkey_ai/llms/llama_index/portkey_llama_callback.py b/portkey_ai/llms/llama_index/portkey_llama_callback.py index 6e8b192..42617a4 100644 --- a/portkey_ai/llms/llama_index/portkey_llama_callback.py +++ b/portkey_ai/llms/llama_index/portkey_llama_callback.py @@ -31,12 +31,6 @@ def __init__( self.portkey_logger = Logger(api_key=api_key) - # self.llm_flag_start = False - # self.embedding_flag_start = False - - # self.llm_flag_stop = False - # self.embedding_flag_stop = False - self.start = False self.end = False @@ -91,11 +85,6 @@ def on_event_start( if(event_type == CBEventType.LLM): self.llm_event_start(payload) - print("on_event_start event_type", event_type) - print("on_event_start payload", payload) - print("on_event_start event_id", event_id) - print("on_event_start parent_id", parent_id) - print("on_event_start kwargs", kwargs) def on_event_end( self, @@ -108,21 +97,12 @@ def on_event_end( if(event_type == CBEventType.LLM): self.llm_event_stop(payload, event_id) - - # if(event_type == CBEventType.EMBEDDING): - # self.embedding_event_stop(payload, self.embedding_flag_stop) - - print("on_event_end event_type", event_type) - print("on_event_end event_id", event_id) - print("on_event_end payload", payload) def start_trace(self, trace_id: Optional[str] = None) -> None: """Run when an overall trace is launched.""" if not self.start: self.startTimestamp = int(datetime.now().timestamp()) - # self.start = True - print("start_trace trace_id",trace_id) def end_trace( self, @@ -130,32 +110,6 @@ def end_trace( trace_map: Optional[Dict[str, List[str]]] = None, ) -> None: """Run when an overall trace is exited.""" - # self.endTimestamp = int(datetime.now().timestamp()) - # self.responseTime = self.endTimestamp - self.startTimestamp - # self.total_tokens = self.token_llm + self.token_embedding - # self.responseStatus = 200 - # self.log_object.update( - # { - # "requestMethod": self.requestMethod, - # "requestURL": self.requestURL, - # "requestHeaders": self.requestHeaders, - # "requestBody": self.requestBody, - # "responseHeaders": self.responseHeaders, - # "responseBody": self.responseBody, - # "responseStatus": self.responseStatus, - # "responseTime": self.responseTime, - # "streamingMode": self.streamingMode, - # } - # ) - # print("LOGGER WILL BE CALLED NOW") - # # self.portkey_logger.log(log_object=self.log_object) - - # # self.end_trace_flag = True - - - print("end_trace trace_id",trace_id) - print("end_trace trace_map",trace_map) - print("end_trace total_tokens", self.total_tokens) def llm_event_start(self, payload: Any) -> None: @@ -163,47 +117,23 @@ def llm_event_start(self, payload: Any) -> None: if EventPayload.MESSAGES in payload: messages = payload.get(EventPayload.MESSAGES, {}) self.prompt_records = [{'role': m.role.value, 'content': m.content} for m in messages] - - print("llm_event_start prompt_records: ", self.prompt_records) - - self.request['url'] = payload.get(EventPayload.SERIALIZED, {}).get("base_url", "") self.request['method'] = "POST" - self.requestHeaders["provider"] = "openai" + self.request['url'] = payload.get(EventPayload.SERIALIZED, {}).get("base_url", "https://api.openai.com/v1/chat/completions") + self.request["provider"] = "openai" # placeholder + self.request['headers'] = {} self.request['body'] = {'messages':self.prompt_records} self.request['body'].update({"model": payload.get(EventPayload.SERIALIZED, {}).get("model", "")}) self.request['body'].update({"temperature": payload.get(EventPayload.SERIALIZED, {}).get("temperature", "")}) - - - - print("llm_event_start REQUEST: ", self.request) - - - - - # if not llm_flag_start and EventPayload.SERIALIZED in payload: - # print("llm_event_start llm_flag_start: ", llm_flag_start) - # print("llm_event_start payload: ", payload.get(EventPayload.SERIALIZED, {})) - # self.requestBody["llm"] = payload.get(EventPayload.SERIALIZED, {}) - - # self.llm_flag_start = True return None - # def embedding_event_start(self, payload: Any, embedding_flag_start: bool) -> None: - - # if not embedding_flag_start and EventPayload.SERIALIZED in payload: - # print("embedding_event_start embedding_flag_start: ", embedding_flag_start) - # print("embedding_event_start payload: ", payload.get(EventPayload.SERIALIZED, {})) - # self.requestBody["embedding"] = payload.get(EventPayload.SERIALIZED, {}) - # # self.embedding_flag_start = True - def llm_event_stop(self, payload: Any, event_id) -> None: self.endTimestamp = float(datetime.now().timestamp()) responseTime = self.endTimestamp - self.startTimestamp data = payload.get(EventPayload.RESPONSE, {}) - # print("llm_event_stop BODY: ", data) + chunks = payload.get(EventPayload.MESSAGES, {}) self.token_llm = self._token_counter.estimate_tokens_in_messages(chunks) @@ -221,25 +151,16 @@ def llm_event_stop(self, payload: Any, event_id) -> None: self.response['body'].update({'id': event_id}) self.response['body'].update({'created':int(time.time())}) self.response['body'].update({'model': data.raw.get("model", "")}) - self.response['responseTime'] = int(responseTime * 1000) - + self.response['time'] = int(responseTime * 1000) + self.response['headers'] = {} + self.response['streamingMode'] = self.streamingMode self.log_object.update( { - "requestMethod": self.request['method'], - "requestURL": self.request['url'], - "requestHeaders": self.requestHeaders, - "requestBody": self.request['body'], - "responseHeaders": self.responseHeaders, - "responseBody": self.response['body'], - "responseStatus": self.response['status'], - "responseTime": self.response['responseTime'] , - "streamingMode": self.streamingMode, + "request": self.request, + "response": self.response, } ) - self.portkey_logger.log(log_object=self.log_object) - print("llm_event_stop RESPONSE: ", self.response) - return None From 2a8256a3dccf076b3d12d62c4e0cb5fa6350042b Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Thu, 9 May 2024 14:29:41 +0530 Subject: [PATCH 08/25] fix: linting issues + code clean up --- portkey_ai/api_resources/apis/logger.py | 20 +-- ...lback.py => portkey_langchain_callback.py} | 91 ++++++------ .../llama_index/portkey_llama_callback.py | 134 ++++++++---------- 3 files changed, 103 insertions(+), 142 deletions(-) rename portkey_ai/llms/langchain/{portkey_callback.py => portkey_langchain_callback.py} (60%) diff --git a/portkey_ai/api_resources/apis/logger.py b/portkey_ai/api_resources/apis/logger.py index 006de9b..748ce8d 100644 --- a/portkey_ai/api_resources/apis/logger.py +++ b/portkey_ai/api_resources/apis/logger.py @@ -1,10 +1,6 @@ import json import os from typing import Optional -from portkey_ai.api_resources import apis -from portkey_ai.api_resources.apis.api_resource import APIResource -from portkey_ai.api_resources.base_client import APIClient -from portkey_ai.api_resources.exceptions import AuthenticationError import requests from portkey_ai.api_resources.global_constants import PORTKEY_BASE_URL @@ -22,8 +18,7 @@ def __init__( "x-portkey-api-key": api_key, } - # self.url = PORTKEY_BASE_URL + "/logs" - self.url = "https://api.portkeydev.com/v1/logs" + self.url = PORTKEY_BASE_URL + "/logs" if api_key is None: raise ValueError("API key is required to use the Logger API") @@ -33,16 +28,7 @@ def log( log_object: dict, ): response = requests.post( - url=self.url, data=json.dumps(log_object, default='str'), headers=self.headers + url=self.url, data=json.dumps(log_object), headers=self.headers ) - return response.status_code - - # Not using this function right now - @staticmethod - def get_provider(provider): - provider_dict = { - "openai": "openai", - "mistralai": "mistral-ai", - } - return provider_dict.get(provider, "openai") # placeholder + return response diff --git a/portkey_ai/llms/langchain/portkey_callback.py b/portkey_ai/llms/langchain/portkey_langchain_callback.py similarity index 60% rename from portkey_ai/llms/langchain/portkey_callback.py rename to portkey_ai/llms/langchain/portkey_langchain_callback.py index 4a502d7..2a3b360 100644 --- a/portkey_ai/llms/langchain/portkey_callback.py +++ b/portkey_ai/llms/langchain/portkey_langchain_callback.py @@ -9,8 +9,6 @@ class PortkeyCallbackHandler(BaseCallbackHandler): - - def __init__( self, api_key: str, @@ -24,13 +22,12 @@ def __init__( self.portkey_logger = Logger(api_key=api_key) self.log_object: Dict[str, Any] = {} - self.prompt_records: List[str] = [] + self.prompt_records: Any = [] self.request: Any = {} - self.response: Any = {} - self.responseHeaders: Dict[str, Any] = {} # Nowhere to get this from + # self.responseHeaders: Dict[str, Any] = {} self.responseBody: Any = None self.responseStatus: int = 0 @@ -42,24 +39,29 @@ def __init__( def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any ) -> None: - for prompt in prompts: messages = prompt.split("\n") for message in messages: - role, content = message.split(":",1) - self.prompt_records.append({'role':role.lower(), 'content':content.strip()}) + role, content = message.split(":", 1) + self.prompt_records.append( + {"role": role.lower(), "content": content.strip()} + ) self.startTimestamp = float(datetime.now().timestamp()) self.streamingMode = kwargs.get("invocation_params", False).get("stream", False) - self.request['method'] = "POST" - self.request['url'] = serialized.get("kwargs", "").get("base_url", "https://api.openai.com/v1/chat/completions") # placeholder - self.request['provider'] = serialized["id"][2] - self.request['headers'] = serialized.get("kwargs", {}).get("default_headers", {}) - self.request['headers'].update({"provider": serialized["id"][2]}) - self.request['body']= {'messages':self.prompt_records} - self.request['body'].update({**kwargs.get("invocation_params", {})}) + self.request["method"] = "POST" + self.request["url"] = serialized.get("kwargs", "").get( + "base_url", "https://api.openai.com/v1/chat/completions" + ) # placeholder + self.request["provider"] = serialized["id"][2] + self.request["headers"] = serialized.get("kwargs", {}).get( + "default_headers", {} + ) + self.request["headers"].update({"provider": serialized["id"][2]}) + self.request["body"] = {"messages": self.prompt_records} + self.request["body"].update({**kwargs.get("invocation_params", {})}) def on_chain_start( self, @@ -69,35 +71,36 @@ def on_chain_start( ) -> None: """Run when chain starts running.""" - # -------------------------------------------------------------------------------- - def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: - self.responseBody = response - self.responseStatus = 200 self.endTimestamp = float(datetime.now().timestamp()) responseTime = self.endTimestamp - self.startTimestamp - usage = response.llm_output.get("token_usage", {}) - - self.response['status'] = 200 - self.response['body'] = {'choices': [{ - "index":0, - "message": { - "role": "assistant", - "content": response.generations[0][0].text - }, - "logprobs": response.generations[0][0].generation_info['logprobs'], - "finish_reason": response.generations[0][0].generation_info['finish_reason'], - - }]} - self.response['body'].update({'usage': usage}) - self.response['body'].update({'id': str(kwargs.get("run_id", ""))}) - self.response['body'].update({'created':int(time.time())}) - self.response['body'].update({'model': response.llm_output.get("model_name", "")}) - self.response['body'].update({'system_fingerprint': response.llm_output.get("system_fingerprint", "")}) - self.response['time'] = int(responseTime * 1000) - self.response['headers'] = {} - self.response['streamingMode'] = self.streamingMode + usage = response.llm_output.get("token_usage", {}) # type: ignore[union-attr] + + self.response["status"] = ( + 200 if self.responseStatus == 0 else self.responseStatus + ) + self.response["body"] = { + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": response.generations[0][0].text, + }, + "logprobs": response.generations[0][0].generation_info["logprobs"], # type: ignore[index] + "finish_reason": response.generations[0][0].generation_info["finish_reason"], # type: ignore[index] # noqa: E501 + } + ] + } + self.response["body"].update({"usage": usage}) + self.response["body"].update({"id": str(kwargs.get("run_id", ""))}) + self.response["body"].update({"created": int(time.time())}) + self.response["body"].update({"model": response.llm_output.get("model_name", "")}) # type: ignore[union-attr] # noqa: E501 + self.response["body"].update({"system_fingerprint": response.llm_output.get("system_fingerprint", "")}) # type: ignore[union-attr] # noqa: E501 + self.response["time"] = int(responseTime * 1000) + self.response["headers"] = {} + self.response["streamingMode"] = self.streamingMode self.log_object.update( { @@ -116,8 +119,6 @@ def on_chain_end( """Run when chain ends running.""" pass - # -------------------------------------------------------------------------------- - def on_chain_error(self, error: BaseException, **kwargs: Any) -> None: self.responseBody = error self.responseStatus = error.status_code # type: ignore[attr-defined] @@ -135,23 +136,17 @@ def on_tool_error(self, error: BaseException, **kwargs: Any) -> None: self.responseStatus = error.status_code # type: ignore[attr-defined] pass - # -------------------------------------------------------------------------------- - def on_text(self, text: str, **kwargs: Any) -> None: pass def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> None: pass - # -------------------------------------------------------------------------------- - def on_llm_new_token(self, token: str, **kwargs: Any) -> None: self.streamingMode = True """Do nothing.""" pass - # -------------------------------------------------------------------------------- - def on_tool_start( self, serialized: Dict[str, Any], diff --git a/portkey_ai/llms/llama_index/portkey_llama_callback.py b/portkey_ai/llms/llama_index/portkey_llama_callback.py index 42617a4..ddd213c 100644 --- a/portkey_ai/llms/llama_index/portkey_llama_callback.py +++ b/portkey_ai/llms/llama_index/portkey_llama_callback.py @@ -1,22 +1,18 @@ -from collections import defaultdict import time from typing import Any, Dict, List, Optional from llama_index.core.callbacks.base_handler import ( - BaseCallbackHandler as LlamaIndexBaseCallbackHandler, - ) - -from portkey_ai.api_resources.apis.logger import Logger -from datetime import datetime, timezone -from llama_index.core.callbacks.schema import ( - CBEventType, - EventPayload - ) + BaseCallbackHandler as LlamaIndexBaseCallbackHandler, +) + +from portkey_ai.api_resources.apis.logger import Logger +from datetime import datetime +from llama_index.core.callbacks.schema import CBEventType, EventPayload from llama_index.core.utilities.token_counting import TokenCounter class PortkeyCallbackHandler(LlamaIndexBaseCallbackHandler): startTimestamp: int = 0 - endTimestamp: int = 0 + endTimestamp: float = 0 def __init__( self, @@ -31,48 +27,22 @@ def __init__( self.portkey_logger = Logger(api_key=api_key) - self.start = False - self.end = False - - self.end_trace_flag = False - self._token_counter = TokenCounter() - self.token_embedding = 0 self.token_llm = 0 - self.token_sum = 0 - - # ------------------------------------------------ self.log_object: Dict[str, Any] = {} - self.prompt_records: List[str] = [] - self.usage_records: Any = {} - self.prompt_tokens: int = 0 - self.completion_tokens: int = 0 - self.total_tokens: int = 0 - # ------------------------------------------------ - - # ------------------------------------------------ - self.request: Any = {} - self.requestMethod: str = "POST" # Done - self.requestURL: str = "" - self.requestHeaders: Dict[str, Any] = {} - self.requestBody: Any = {} # Done + self.prompt_records: Any = [] - # ----------------------------------------------- + self.request: Any = {} self.response: Any = {} - self.responseHeaders: Dict[str, Any] = {} # Nowhere to get this from - self.responseBody: Any = {} # Done - self.responseStatus: int = 0 # Done - self.responseTime: int = 0 # Done + self.responseTime: int = 0 self.streamingMode: bool = False if not api_key: raise ValueError("Please provide an API key to use PortkeyCallbackHandler") - - - def on_event_start( + def on_event_start( # type: ignore[return] self, event_type: CBEventType, payload: Optional[Dict[str, Any]] = None, @@ -82,10 +52,9 @@ def on_event_start( ) -> str: """Run when an event starts and return id of event.""" - if(event_type == CBEventType.LLM): + if event_type == CBEventType.LLM: self.llm_event_start(payload) - def on_event_end( self, event_type: CBEventType, @@ -95,14 +64,12 @@ def on_event_end( ) -> None: """Run when an event ends.""" - if(event_type == CBEventType.LLM): + if event_type == CBEventType.LLM: self.llm_event_stop(payload, event_id) - def start_trace(self, trace_id: Optional[str] = None) -> None: - """Run when an overall trace is launched.""" - if not self.start: - self.startTimestamp = int(datetime.now().timestamp()) + """Run when an overall trace is launched.""" + self.startTimestamp = int(datetime.now().timestamp()) def end_trace( self, @@ -111,24 +78,33 @@ def end_trace( ) -> None: """Run when an overall trace is exited.""" - def llm_event_start(self, payload: Any) -> None: - if EventPayload.MESSAGES in payload: messages = payload.get(EventPayload.MESSAGES, {}) - self.prompt_records = [{'role': m.role.value, 'content': m.content} for m in messages] - self.request['method'] = "POST" - self.request['url'] = payload.get(EventPayload.SERIALIZED, {}).get("base_url", "https://api.openai.com/v1/chat/completions") - self.request["provider"] = "openai" # placeholder - self.request['headers'] = {} - self.request['body'] = {'messages':self.prompt_records} - self.request['body'].update({"model": payload.get(EventPayload.SERIALIZED, {}).get("model", "")}) - self.request['body'].update({"temperature": payload.get(EventPayload.SERIALIZED, {}).get("temperature", "")}) - + self.prompt_records = [ + {"role": m.role.value, "content": m.content} for m in messages + ] + self.request["method"] = "POST" + self.request["url"] = payload.get(EventPayload.SERIALIZED, {}).get( + "base_url", "https://api.openai.com/v1/chat/completions" + ) + self.request["provider"] = "openai" # placeholder + self.request["headers"] = {} + self.request["body"] = {"messages": self.prompt_records} + self.request["body"].update( + {"model": payload.get(EventPayload.SERIALIZED, {}).get("model", "")} + ) + self.request["body"].update( + { + "temperature": payload.get(EventPayload.SERIALIZED, {}).get( + "temperature", "" + ) + } + ) + return None def llm_event_stop(self, payload: Any, event_id) -> None: - self.endTimestamp = float(datetime.now().timestamp()) responseTime = self.endTimestamp - self.startTimestamp @@ -137,23 +113,27 @@ def llm_event_stop(self, payload: Any, event_id) -> None: chunks = payload.get(EventPayload.MESSAGES, {}) self.token_llm = self._token_counter.estimate_tokens_in_messages(chunks) - self.response['status'] = 200 - self.response['body'] = {'choices': [{ - "index":0, - "message": { - "role": data.message.role.value, - "content": data.message.content - }, - "logprobs": data.logprobs, - "finish_reason": "done" - }]} - self.response['body'].update({'usage': {'total_tokens': self.token_llm}}) - self.response['body'].update({'id': event_id}) - self.response['body'].update({'created':int(time.time())}) - self.response['body'].update({'model': data.raw.get("model", "")}) - self.response['time'] = int(responseTime * 1000) - self.response['headers'] = {} - self.response['streamingMode'] = self.streamingMode + self.response["status"] = 200 + self.response["body"] = { + "choices": [ + { + "index": 0, + "message": { + "role": data.message.role.value, + "content": data.message.content, + }, + "logprobs": data.logprobs, + "finish_reason": "done", + } + ] + } + self.response["body"].update({"usage": {"total_tokens": self.token_llm}}) + self.response["body"].update({"id": event_id}) + self.response["body"].update({"created": int(time.time())}) + self.response["body"].update({"model": data.raw.get("model", "")}) + self.response["time"] = int(responseTime * 1000) + self.response["headers"] = {} + self.response["streamingMode"] = self.streamingMode self.log_object.update( { From fbbfdacfffb7d17e1a846ae0952ce98208f94bde Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Sat, 18 May 2024 17:32:39 -0400 Subject: [PATCH 09/25] fix: llamaindex init file --- portkey_ai/llms/llama_index/__init__.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/portkey_ai/llms/llama_index/__init__.py b/portkey_ai/llms/llama_index/__init__.py index 9d73cdf..e69de29 100644 --- a/portkey_ai/llms/llama_index/__init__.py +++ b/portkey_ai/llms/llama_index/__init__.py @@ -1,3 +0,0 @@ -# from .completions import PortkeyLLM - -# __all__ = ["PortkeyLLM"] From bab40e3e24478f940767eaf47236ae101b8b8794 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Sat, 1 Jun 2024 16:12:59 +0530 Subject: [PATCH 10/25] fix: base url for langchain --- portkey_ai/llms/langchain/portkey_langchain_callback.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/portkey_ai/llms/langchain/portkey_langchain_callback.py b/portkey_ai/llms/langchain/portkey_langchain_callback.py index 2a3b360..66f976f 100644 --- a/portkey_ai/llms/langchain/portkey_langchain_callback.py +++ b/portkey_ai/llms/langchain/portkey_langchain_callback.py @@ -53,8 +53,8 @@ def on_llm_start( self.request["method"] = "POST" self.request["url"] = serialized.get("kwargs", "").get( - "base_url", "https://api.openai.com/v1/chat/completions" - ) # placeholder + "base_url", "chat/completions" + ) self.request["provider"] = serialized["id"][2] self.request["headers"] = serialized.get("kwargs", {}).get( "default_headers", {} From 2904274b5aff2f5faec036cb6bd0bec537aea656 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Sat, 1 Jun 2024 16:28:54 +0530 Subject: [PATCH 11/25] fix:linting issues --- portkey_ai/api_resources/apis/logger.py | 5 ++--- portkey_ai/llms/langchain/portkey_langchain_callback.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/portkey_ai/api_resources/apis/logger.py b/portkey_ai/api_resources/apis/logger.py index 748ce8d..71283c7 100644 --- a/portkey_ai/api_resources/apis/logger.py +++ b/portkey_ai/api_resources/apis/logger.py @@ -12,6 +12,8 @@ def __init__( api_key: Optional[str] = None, ) -> None: api_key = api_key or os.getenv("PORTKEY_API_KEY") + if api_key is None: + raise ValueError("API key is required to use the Logger API") self.headers = { "Content-Type": "application/json", @@ -20,9 +22,6 @@ def __init__( self.url = PORTKEY_BASE_URL + "/logs" - if api_key is None: - raise ValueError("API key is required to use the Logger API") - def log( self, log_object: dict, diff --git a/portkey_ai/llms/langchain/portkey_langchain_callback.py b/portkey_ai/llms/langchain/portkey_langchain_callback.py index 66f976f..9eddcd6 100644 --- a/portkey_ai/llms/langchain/portkey_langchain_callback.py +++ b/portkey_ai/llms/langchain/portkey_langchain_callback.py @@ -54,7 +54,7 @@ def on_llm_start( self.request["method"] = "POST" self.request["url"] = serialized.get("kwargs", "").get( "base_url", "chat/completions" - ) + ) self.request["provider"] = serialized["id"][2] self.request["headers"] = serialized.get("kwargs", {}).get( "default_headers", {} From 627e754088647f835c550c07017ab9d504f96414 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Tue, 4 Jun 2024 18:16:34 +0530 Subject: [PATCH 12/25] feat: logger for langchain and llamaindex --- .../llms/langchain/portkey_langchain_callback.py | 12 ++++++------ .../llms/llama_index/portkey_llama_callback.py | 6 ++++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/portkey_ai/llms/langchain/portkey_langchain_callback.py b/portkey_ai/llms/langchain/portkey_langchain_callback.py index 9eddcd6..c1298b3 100644 --- a/portkey_ai/llms/langchain/portkey_langchain_callback.py +++ b/portkey_ai/llms/langchain/portkey_langchain_callback.py @@ -74,8 +74,8 @@ def on_chain_start( def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: self.endTimestamp = float(datetime.now().timestamp()) responseTime = self.endTimestamp - self.startTimestamp - - usage = response.llm_output.get("token_usage", {}) # type: ignore[union-attr] + + usage = (response.llm_output or {}).get("token_usage", "") # type: ignore[union-attr] self.response["status"] = ( 200 if self.responseStatus == 0 else self.responseStatus @@ -88,16 +88,16 @@ def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: "role": "assistant", "content": response.generations[0][0].text, }, - "logprobs": response.generations[0][0].generation_info["logprobs"], # type: ignore[index] - "finish_reason": response.generations[0][0].generation_info["finish_reason"], # type: ignore[index] # noqa: E501 + "logprobs": response.generations[0][0].generation_info.get("logprobs", ""), # type: ignore[index] + "finish_reason": response.generations[0][0].generation_info.get("finish_reason",""), # type: ignore[index] # noqa: E501 } ] } self.response["body"].update({"usage": usage}) self.response["body"].update({"id": str(kwargs.get("run_id", ""))}) self.response["body"].update({"created": int(time.time())}) - self.response["body"].update({"model": response.llm_output.get("model_name", "")}) # type: ignore[union-attr] # noqa: E501 - self.response["body"].update({"system_fingerprint": response.llm_output.get("system_fingerprint", "")}) # type: ignore[union-attr] # noqa: E501 + self.response["body"].update({"model": (response.llm_output or {}).get("model_name", "")}) # type: ignore[union-attr] # noqa: E501 + self.response["body"].update({"system_fingerprint": (response.llm_output or {}).get("system_fingerprint", "")}) # type: ignore[union-attr] # noqa: E501 self.response["time"] = int(responseTime * 1000) self.response["headers"] = {} self.response["streamingMode"] = self.streamingMode diff --git a/portkey_ai/llms/llama_index/portkey_llama_callback.py b/portkey_ai/llms/llama_index/portkey_llama_callback.py index ddd213c..c59f803 100644 --- a/portkey_ai/llms/llama_index/portkey_llama_callback.py +++ b/portkey_ai/llms/llama_index/portkey_llama_callback.py @@ -86,9 +86,11 @@ def llm_event_start(self, payload: Any) -> None: ] self.request["method"] = "POST" self.request["url"] = payload.get(EventPayload.SERIALIZED, {}).get( - "base_url", "https://api.openai.com/v1/chat/completions" + "api_base", "chat/completions" + ) + self.request["provider"] = payload.get(EventPayload.SERIALIZED, {}).get( + "class_name", "" ) - self.request["provider"] = "openai" # placeholder self.request["headers"] = {} self.request["body"] = {"messages": self.prompt_records} self.request["body"].update( From 4ff1b08c8efc553a4856fdf110ea3f67800d275a Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Tue, 4 Jun 2024 18:20:23 +0530 Subject: [PATCH 13/25] fix: linitng issues --- portkey_ai/llms/langchain/portkey_langchain_callback.py | 6 +++--- portkey_ai/llms/llama_index/portkey_llama_callback.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/portkey_ai/llms/langchain/portkey_langchain_callback.py b/portkey_ai/llms/langchain/portkey_langchain_callback.py index c1298b3..1ffc6b6 100644 --- a/portkey_ai/llms/langchain/portkey_langchain_callback.py +++ b/portkey_ai/llms/langchain/portkey_langchain_callback.py @@ -74,7 +74,7 @@ def on_chain_start( def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: self.endTimestamp = float(datetime.now().timestamp()) responseTime = self.endTimestamp - self.startTimestamp - + usage = (response.llm_output or {}).get("token_usage", "") # type: ignore[union-attr] self.response["status"] = ( @@ -88,8 +88,8 @@ def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: "role": "assistant", "content": response.generations[0][0].text, }, - "logprobs": response.generations[0][0].generation_info.get("logprobs", ""), # type: ignore[index] - "finish_reason": response.generations[0][0].generation_info.get("finish_reason",""), # type: ignore[index] # noqa: E501 + "logprobs": response.generations[0][0].generation_info.get("logprobs", ""), # type: ignore[union-attr] # noqa: E501 + "finish_reason": response.generations[0][0].generation_info.get("finish_reason", ""), # type: ignore[union-attr] # noqa: E501 } ] } diff --git a/portkey_ai/llms/llama_index/portkey_llama_callback.py b/portkey_ai/llms/llama_index/portkey_llama_callback.py index c59f803..a33a576 100644 --- a/portkey_ai/llms/llama_index/portkey_llama_callback.py +++ b/portkey_ai/llms/llama_index/portkey_llama_callback.py @@ -88,7 +88,7 @@ def llm_event_start(self, payload: Any) -> None: self.request["url"] = payload.get(EventPayload.SERIALIZED, {}).get( "api_base", "chat/completions" ) - self.request["provider"] = payload.get(EventPayload.SERIALIZED, {}).get( + self.request["provider"] = payload.get(EventPayload.SERIALIZED, {}).get( "class_name", "" ) self.request["headers"] = {} From ded6324dc14e2dba29cde1b234aaab8352ec7fea Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Tue, 4 Jun 2024 19:00:17 +0530 Subject: [PATCH 14/25] fix: file structure for callbackhanders --- portkey_ai/api_resources/global_constants.py | 3 ++- portkey_ai/llms/callback/__init__.py | 4 ++++ .../{langchain => callback}/portkey_langchain_callback.py | 2 +- .../{llama_index => callback}/portkey_llama_callback.py | 8 ++++++-- 4 files changed, 13 insertions(+), 4 deletions(-) create mode 100644 portkey_ai/llms/callback/__init__.py rename portkey_ai/llms/{langchain => callback}/portkey_langchain_callback.py (99%) rename portkey_ai/llms/{llama_index => callback}/portkey_llama_callback.py (96%) diff --git a/portkey_ai/api_resources/global_constants.py b/portkey_ai/api_resources/global_constants.py index ad6fdf1..ea3308f 100644 --- a/portkey_ai/api_resources/global_constants.py +++ b/portkey_ai/api_resources/global_constants.py @@ -29,7 +29,8 @@ VERSION = "0.1.0" DEFAULT_TIMEOUT = 60 PORTKEY_HEADER_PREFIX = "x-portkey-" -PORTKEY_BASE_URL = "https://api.portkey.ai/v1" +# PORTKEY_BASE_URL = "https://api.portkey.ai/v1" +PORTKEY_BASE_URL = "https://api.portkeydev.com/v1" PORTKEY_GATEWAY_URL = PORTKEY_BASE_URL PORTKEY_API_KEY_ENV = "PORTKEY_API_KEY" PORTKEY_PROXY_ENV = "PORTKEY_PROXY" diff --git a/portkey_ai/llms/callback/__init__.py b/portkey_ai/llms/callback/__init__.py new file mode 100644 index 0000000..3da4108 --- /dev/null +++ b/portkey_ai/llms/callback/__init__.py @@ -0,0 +1,4 @@ +from .portkey_langchain_callback import PortkeyLangchain +from .portkey_llama_callback import PortkeyLlamaindex + +__all__ = ["PortkeyLangchain", "PortkeyLlamaindex"] diff --git a/portkey_ai/llms/langchain/portkey_langchain_callback.py b/portkey_ai/llms/callback/portkey_langchain_callback.py similarity index 99% rename from portkey_ai/llms/langchain/portkey_langchain_callback.py rename to portkey_ai/llms/callback/portkey_langchain_callback.py index 1ffc6b6..7a172f9 100644 --- a/portkey_ai/llms/langchain/portkey_langchain_callback.py +++ b/portkey_ai/llms/callback/portkey_langchain_callback.py @@ -8,7 +8,7 @@ from portkey_ai.api_resources.apis.logger import Logger -class PortkeyCallbackHandler(BaseCallbackHandler): +class PortkeyLangchain(BaseCallbackHandler): def __init__( self, api_key: str, diff --git a/portkey_ai/llms/llama_index/portkey_llama_callback.py b/portkey_ai/llms/callback/portkey_llama_callback.py similarity index 96% rename from portkey_ai/llms/llama_index/portkey_llama_callback.py rename to portkey_ai/llms/callback/portkey_llama_callback.py index a33a576..75b5928 100644 --- a/portkey_ai/llms/llama_index/portkey_llama_callback.py +++ b/portkey_ai/llms/callback/portkey_llama_callback.py @@ -10,7 +10,7 @@ from llama_index.core.utilities.token_counting import TokenCounter -class PortkeyCallbackHandler(LlamaIndexBaseCallbackHandler): +class PortkeyLlamaindex(LlamaIndexBaseCallbackHandler): startTimestamp: int = 0 endTimestamp: float = 0 @@ -113,8 +113,10 @@ def llm_event_stop(self, payload: Any, event_id) -> None: data = payload.get(EventPayload.RESPONSE, {}) chunks = payload.get(EventPayload.MESSAGES, {}) - self.token_llm = self._token_counter.estimate_tokens_in_messages(chunks) + print("chunks", chunks) + self.token_llm = self._token_counter.estimate_tokens_in_messages(chunks) + print("token_llm", self.token_llm) self.response["status"] = 200 self.response["body"] = { "choices": [ @@ -137,6 +139,8 @@ def llm_event_stop(self, payload: Any, event_id) -> None: self.response["headers"] = {} self.response["streamingMode"] = self.streamingMode + print("response", self.response) + self.log_object.update( { "request": self.request, From deadd8b4b3a3db9fddb6e6e3915f3e12250e7989 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Sat, 8 Jun 2024 17:31:26 +0530 Subject: [PATCH 15/25] feat: test cases for langchain and llamaindex --- .../llms/callback/portkey_llama_callback.py | 4 -- tests/models.json | 26 ++++++++ tests/test_llm_langchain.py | 49 ++++++++++++++ tests/test_llm_llamaindex.py | 66 +++++++++++++++++++ 4 files changed, 141 insertions(+), 4 deletions(-) create mode 100644 tests/test_llm_langchain.py create mode 100644 tests/test_llm_llamaindex.py diff --git a/portkey_ai/llms/callback/portkey_llama_callback.py b/portkey_ai/llms/callback/portkey_llama_callback.py index 75b5928..ef3f765 100644 --- a/portkey_ai/llms/callback/portkey_llama_callback.py +++ b/portkey_ai/llms/callback/portkey_llama_callback.py @@ -113,10 +113,8 @@ def llm_event_stop(self, payload: Any, event_id) -> None: data = payload.get(EventPayload.RESPONSE, {}) chunks = payload.get(EventPayload.MESSAGES, {}) - print("chunks", chunks) self.token_llm = self._token_counter.estimate_tokens_in_messages(chunks) - print("token_llm", self.token_llm) self.response["status"] = 200 self.response["body"] = { "choices": [ @@ -139,8 +137,6 @@ def llm_event_stop(self, payload: Any, event_id) -> None: self.response["headers"] = {} self.response["streamingMode"] = self.streamingMode - print("response", self.response) - self.log_object.update( { "request": self.request, diff --git a/tests/models.json b/tests/models.json index 2924696..b37f05b 100644 --- a/tests/models.json +++ b/tests/models.json @@ -77,5 +77,31 @@ ], "image":[], "audio":[] + }, + "langchain": { + "env_variable": "OPENAI_API_KEY", + "chat": [ + "gpt-4o", + "gpt-4-0125-preview", + "gpt-4-turbo-preview", + "gpt-4-1106-preview", + "gpt-4-vision-preview", + "gpt-4", + "gpt-4-0613", + "gpt-3.5-turbo", + "gpt-3.5-turbo-0125", + "gpt-3.5-turbo-1106" + ] + }, + "llamaindex":{ + "env_variable": "LLAMA_INDEX_API_KEY", + "HuggingFaceEmbedding": [ + "sentence-transformers/all-MiniLM-L6-v2", + "sentence-transformers/LaBSE" + ], + "OpenAIEmbedding": [ + "text-embedding-ada-002", + "text-embedding-3-large" + ] } } \ No newline at end of file diff --git a/tests/test_llm_langchain.py b/tests/test_llm_langchain.py new file mode 100644 index 0000000..0516421 --- /dev/null +++ b/tests/test_llm_langchain.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import os +from typing import Any + +import pytest + +from tests.utils import read_json_file +from portkey_ai.llms.callback import PortkeyLangchain +from langchain.chat_models import ChatOpenAI +from langchain_core.prompts import ChatPromptTemplate +from langchain.chains import LLMChain + +api_key = os.environ.get("PORTKEY_API_KEY") + + +class TestLLMLangchain: + client = PortkeyLangchain + parametrize = pytest.mark.parametrize("client", [client], ids=["strict"]) + models = read_json_file("./tests/models.json") + + t1_params = [] + t1 = [] + for k, v in models.items(): + if k == "langchain": + for i in v["chat"]: + t1.append((client, i)) + + t1_params.extend(t1) + + @pytest.mark.parametrize("client, model", t1_params) + def test_method_langchain_openai( + self, client: Any, model + ) -> None: + handler = client( + api_key=api_key, + ) + llm = ChatOpenAI(callbacks=[handler], model=model) + prompt = ChatPromptTemplate.from_messages( + [ + ("system", "You are world class technical documentation writer."), + ("user", "{input}"), + ] + ) + chain = LLMChain(llm=llm, prompt=prompt) + + + assert isinstance(chain.invoke({"input": "what is langchain?"}).get('input'), str) is True + assert isinstance(chain.invoke({"input": "what is langchain?"}).get('text'), str) is True diff --git a/tests/test_llm_llamaindex.py b/tests/test_llm_llamaindex.py new file mode 100644 index 0000000..2e6d1a7 --- /dev/null +++ b/tests/test_llm_llamaindex.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import os +from typing import Any + +import pytest + +from tests.utils import read_json_file +from portkey_ai.llms.callback import PortkeyLlamaindex + + +from llama_index.llms.openai import OpenAI +from llama_index.core import Settings +from llama_index.core.callbacks import CallbackManager +from llama_index.core import ( + VectorStoreIndex, + SimpleDirectoryReader, +) +from llama_index.embeddings.huggingface import HuggingFaceEmbedding +from llama_index.embeddings.openai import OpenAIEmbedding +from llama_index.core.chat_engine.types import AgentChatResponse + +api_key = os.environ.get("PORTKEY_API_KEY") + + +class TestLLMLlamaindex: + client = PortkeyLlamaindex + parametrize = pytest.mark.parametrize("client", [client], ids=["strict"]) + models = read_json_file("./tests/models.json") + + t1_params = [] + t1 = [] + for k, v in models.items(): + if k == "llamaindex": + for emdmodel in v['HuggingFaceEmbedding']: + t1.append((client, "HuggingFaceEmbedding", emdmodel)) + for emdmodel in v['OpenAIEmbedding']: + t1.append((client, "OpenAIEmbedding", emdmodel)) + + t1_params.extend(t1) + + @pytest.mark.parametrize("client, provider, model", t1_params) + def test_method_llamaindex( + self, client: Any, provider: Any, model: Any + ) -> None: + handler = client( + api_key=api_key, + ) + + # embed_model_name = "sentence-transformers/all-MiniLM-L6-v2" + if provider == "HuggingFaceEmbedding": + embed_model = HuggingFaceEmbedding(model_name=model) + if provider == "OpenAIEmbedding": + embed_model = OpenAIEmbedding(model=model) + + docs = SimpleDirectoryReader("/Users/chandeep/Documents/Workspace/Portkey/SDK/Notebook/data").load_data() + index = VectorStoreIndex.from_documents(docs) + + Settings.callback_manager = CallbackManager([handler]) + Settings.llm = OpenAI() + Settings.embed_model = embed_model + + chat_engine = index.as_chat_engine() + chat_response = chat_engine.chat("What did the author do growing up?") + + assert isinstance(chat_response, AgentChatResponse) is True From 7a359429694be3f42bea4329bf9ce875a9bc3d8e Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Sat, 8 Jun 2024 17:32:50 +0530 Subject: [PATCH 16/25] fix: linting issues --- tests/test_llm_langchain.py | 17 ++++++++++------- tests/test_llm_llamaindex.py | 12 ++++++------ 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/tests/test_llm_langchain.py b/tests/test_llm_langchain.py index 0516421..ea1ac7f 100644 --- a/tests/test_llm_langchain.py +++ b/tests/test_llm_langchain.py @@ -29,13 +29,11 @@ class TestLLMLangchain: t1_params.extend(t1) @pytest.mark.parametrize("client, model", t1_params) - def test_method_langchain_openai( - self, client: Any, model - ) -> None: + def test_method_langchain_openai(self, client: Any, model) -> None: handler = client( api_key=api_key, ) - llm = ChatOpenAI(callbacks=[handler], model=model) + llm = ChatOpenAI(callbacks=[handler], model=model) # type: ignore[misc] prompt = ChatPromptTemplate.from_messages( [ ("system", "You are world class technical documentation writer."), @@ -43,7 +41,12 @@ def test_method_langchain_openai( ] ) chain = LLMChain(llm=llm, prompt=prompt) - - assert isinstance(chain.invoke({"input": "what is langchain?"}).get('input'), str) is True - assert isinstance(chain.invoke({"input": "what is langchain?"}).get('text'), str) is True + assert ( + isinstance(chain.invoke({"input": "what is langchain?"}).get("input"), str) + is True + ) + assert ( + isinstance(chain.invoke({"input": "what is langchain?"}).get("text"), str) + is True + ) diff --git a/tests/test_llm_llamaindex.py b/tests/test_llm_llamaindex.py index 2e6d1a7..7b53d0c 100644 --- a/tests/test_llm_llamaindex.py +++ b/tests/test_llm_llamaindex.py @@ -32,17 +32,15 @@ class TestLLMLlamaindex: t1 = [] for k, v in models.items(): if k == "llamaindex": - for emdmodel in v['HuggingFaceEmbedding']: + for emdmodel in v["HuggingFaceEmbedding"]: t1.append((client, "HuggingFaceEmbedding", emdmodel)) - for emdmodel in v['OpenAIEmbedding']: + for emdmodel in v["OpenAIEmbedding"]: t1.append((client, "OpenAIEmbedding", emdmodel)) t1_params.extend(t1) @pytest.mark.parametrize("client, provider, model", t1_params) - def test_method_llamaindex( - self, client: Any, provider: Any, model: Any - ) -> None: + def test_method_llamaindex(self, client: Any, provider: Any, model: Any) -> None: handler = client( api_key=api_key, ) @@ -53,7 +51,9 @@ def test_method_llamaindex( if provider == "OpenAIEmbedding": embed_model = OpenAIEmbedding(model=model) - docs = SimpleDirectoryReader("/Users/chandeep/Documents/Workspace/Portkey/SDK/Notebook/data").load_data() + docs = SimpleDirectoryReader( + "/Users/chandeep/Documents/Workspace/Portkey/SDK/Notebook/data" + ).load_data() index = VectorStoreIndex.from_documents(docs) Settings.callback_manager = CallbackManager([handler]) From 57e37551f282b650e07634ca9bc17719638e03c0 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Tue, 11 Jun 2024 18:07:26 +0530 Subject: [PATCH 17/25] fix: models.json for tests and base url to prod --- portkey_ai/api_resources/global_constants.py | 3 +-- tests/models.json | 11 +++++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/portkey_ai/api_resources/global_constants.py b/portkey_ai/api_resources/global_constants.py index ea3308f..ad6fdf1 100644 --- a/portkey_ai/api_resources/global_constants.py +++ b/portkey_ai/api_resources/global_constants.py @@ -29,8 +29,7 @@ VERSION = "0.1.0" DEFAULT_TIMEOUT = 60 PORTKEY_HEADER_PREFIX = "x-portkey-" -# PORTKEY_BASE_URL = "https://api.portkey.ai/v1" -PORTKEY_BASE_URL = "https://api.portkeydev.com/v1" +PORTKEY_BASE_URL = "https://api.portkey.ai/v1" PORTKEY_GATEWAY_URL = PORTKEY_BASE_URL PORTKEY_API_KEY_ENV = "PORTKEY_API_KEY" PORTKEY_PROXY_ENV = "PORTKEY_PROXY" diff --git a/tests/models.json b/tests/models.json index b37f05b..73033dc 100644 --- a/tests/models.json +++ b/tests/models.json @@ -91,7 +91,10 @@ "gpt-3.5-turbo", "gpt-3.5-turbo-0125", "gpt-3.5-turbo-1106" - ] + ], + "text":[], + "image":[], + "audio":[] }, "llamaindex":{ "env_variable": "LLAMA_INDEX_API_KEY", @@ -102,6 +105,10 @@ "OpenAIEmbedding": [ "text-embedding-ada-002", "text-embedding-3-large" - ] + ], + "chat":[], + "text":[], + "image":[], + "audio":[] } } \ No newline at end of file From a11eced69d7a9ad08fb1f85a570b72c5a1fa8526 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Tue, 11 Jun 2024 19:46:19 +0530 Subject: [PATCH 18/25] fix: linting issues + conditional import --- .../callback/portkey_langchain_callback.py | 15 ++++--- .../llms/callback/portkey_llama_callback.py | 44 +++++++++---------- setup.cfg | 1 + 3 files changed, 29 insertions(+), 31 deletions(-) diff --git a/portkey_ai/llms/callback/portkey_langchain_callback.py b/portkey_ai/llms/callback/portkey_langchain_callback.py index 7a172f9..ee9c7fa 100644 --- a/portkey_ai/llms/callback/portkey_langchain_callback.py +++ b/portkey_ai/llms/callback/portkey_langchain_callback.py @@ -1,12 +1,13 @@ from datetime import datetime import time from typing import Any, Dict, List, Optional -from langchain_core.callbacks import BaseCallbackHandler -from langchain_core.outputs import LLMResult -from langchain_core.agents import AgentFinish, AgentAction - from portkey_ai.api_resources.apis.logger import Logger +try: + from langchain_core.callbacks import BaseCallbackHandler +except ImportError: + raise ImportError("Please pip install langchain-core to use PortkeyLangchain") + class PortkeyLangchain(BaseCallbackHandler): def __init__( @@ -71,7 +72,7 @@ def on_chain_start( ) -> None: """Run when chain starts running.""" - def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: + def on_llm_end(self, response: Any, **kwargs: Any) -> None: self.endTimestamp = float(datetime.now().timestamp()) responseTime = self.endTimestamp - self.startTimestamp @@ -139,7 +140,7 @@ def on_tool_error(self, error: BaseException, **kwargs: Any) -> None: def on_text(self, text: str, **kwargs: Any) -> None: pass - def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> None: + def on_agent_finish(self, finish: Any, **kwargs: Any) -> None: pass def on_llm_new_token(self, token: str, **kwargs: Any) -> None: @@ -155,7 +156,7 @@ def on_tool_start( ) -> None: pass - def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any: + def on_agent_action(self, action: Any, **kwargs: Any) -> Any: """Do nothing.""" pass diff --git a/portkey_ai/llms/callback/portkey_llama_callback.py b/portkey_ai/llms/callback/portkey_llama_callback.py index ef3f765..9289ad2 100644 --- a/portkey_ai/llms/callback/portkey_llama_callback.py +++ b/portkey_ai/llms/callback/portkey_llama_callback.py @@ -1,13 +1,15 @@ import time from typing import Any, Dict, List, Optional -from llama_index.core.callbacks.base_handler import ( - BaseCallbackHandler as LlamaIndexBaseCallbackHandler, -) - from portkey_ai.api_resources.apis.logger import Logger from datetime import datetime -from llama_index.core.callbacks.schema import CBEventType, EventPayload -from llama_index.core.utilities.token_counting import TokenCounter + +try: + from llama_index.core.callbacks.base_handler import ( + BaseCallbackHandler as LlamaIndexBaseCallbackHandler, + ) + from llama_index.core.utilities.token_counting import TokenCounter +except ImportError: + raise ImportError("Please pip install llama-index to use Portkey Callback Handler") class PortkeyLlamaindex(LlamaIndexBaseCallbackHandler): @@ -44,7 +46,7 @@ def __init__( def on_event_start( # type: ignore[return] self, - event_type: CBEventType, + event_type: Any, payload: Optional[Dict[str, Any]] = None, event_id: str = "", parent_id: str = "", @@ -52,19 +54,19 @@ def on_event_start( # type: ignore[return] ) -> str: """Run when an event starts and return id of event.""" - if event_type == CBEventType.LLM: + if event_type == "llm": self.llm_event_start(payload) def on_event_end( self, - event_type: CBEventType, + event_type: Any, payload: Optional[Dict[str, Any]] = None, event_id: str = "", **kwargs: Any, ) -> None: """Run when an event ends.""" - if event_type == CBEventType.LLM: + if event_type == "llm": self.llm_event_stop(payload, event_id) def start_trace(self, trace_id: Optional[str] = None) -> None: @@ -79,29 +81,23 @@ def end_trace( """Run when an overall trace is exited.""" def llm_event_start(self, payload: Any) -> None: - if EventPayload.MESSAGES in payload: - messages = payload.get(EventPayload.MESSAGES, {}) + if "messages" in payload: + messages = payload.get("messages", {}) self.prompt_records = [ {"role": m.role.value, "content": m.content} for m in messages ] self.request["method"] = "POST" - self.request["url"] = payload.get(EventPayload.SERIALIZED, {}).get( + self.request["url"] = payload.get("serialized", {}).get( "api_base", "chat/completions" ) - self.request["provider"] = payload.get(EventPayload.SERIALIZED, {}).get( - "class_name", "" - ) + self.request["provider"] = payload.get("serialized", {}).get("class_name", "") self.request["headers"] = {} self.request["body"] = {"messages": self.prompt_records} self.request["body"].update( - {"model": payload.get(EventPayload.SERIALIZED, {}).get("model", "")} + {"model": payload.get("serialized", {}).get("model", "")} ) self.request["body"].update( - { - "temperature": payload.get(EventPayload.SERIALIZED, {}).get( - "temperature", "" - ) - } + {"temperature": payload.get("serialized", {}).get("temperature", "")} ) return None @@ -110,9 +106,9 @@ def llm_event_stop(self, payload: Any, event_id) -> None: self.endTimestamp = float(datetime.now().timestamp()) responseTime = self.endTimestamp - self.startTimestamp - data = payload.get(EventPayload.RESPONSE, {}) + data = payload.get("response", {}) - chunks = payload.get(EventPayload.MESSAGES, {}) + chunks = payload.get("messages", {}) self.token_llm = self._token_counter.estimate_tokens_in_messages(chunks) self.response["status"] = 200 diff --git a/setup.cfg b/setup.cfg index 8c28044..1f5f332 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,7 @@ install_requires = typing_extensions>=4.7.1,<5.0 pydantic>=1.10.12 openai>=1.0,<2.0 + types-requests [options.entry_points] console_scripts = From 983b646c586b5b3f6612ca72ea068201662b5a61 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Tue, 11 Jun 2024 20:04:18 +0530 Subject: [PATCH 19/25] fix: extra dependency for conditional import --- setup.cfg | 3 +++ 1 file changed, 3 insertions(+) diff --git a/setup.cfg b/setup.cfg index 1f5f332..a7b95b5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -44,6 +44,9 @@ dev = ruff==0.0.292 pytest-asyncio==0.23.5 openai>=1.0,<2.0 +extra = + langchain-core + llama-index [mypy] ignore_missing_imports = true From 614cd58291d3d48632688ef6ae7b6f0161650ddf Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Wed, 12 Jun 2024 13:28:23 +0530 Subject: [PATCH 20/25] fix: tested conditional import + init files fixed --- portkey_ai/llms/callback/__init__.py | 4 ---- portkey_ai/llms/langchain/__init__.py | 3 ++- .../{callback => langchain}/portkey_langchain_callback.py | 0 portkey_ai/llms/llama_index/__init__.py | 3 +++ .../llms/{callback => llama_index}/portkey_llama_callback.py | 4 ++++ 5 files changed, 9 insertions(+), 5 deletions(-) delete mode 100644 portkey_ai/llms/callback/__init__.py rename portkey_ai/llms/{callback => langchain}/portkey_langchain_callback.py (100%) rename portkey_ai/llms/{callback => llama_index}/portkey_llama_callback.py (97%) diff --git a/portkey_ai/llms/callback/__init__.py b/portkey_ai/llms/callback/__init__.py deleted file mode 100644 index 3da4108..0000000 --- a/portkey_ai/llms/callback/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .portkey_langchain_callback import PortkeyLangchain -from .portkey_llama_callback import PortkeyLlamaindex - -__all__ = ["PortkeyLangchain", "PortkeyLlamaindex"] diff --git a/portkey_ai/llms/langchain/__init__.py b/portkey_ai/llms/langchain/__init__.py index 45770de..de0f7f0 100644 --- a/portkey_ai/llms/langchain/__init__.py +++ b/portkey_ai/llms/langchain/__init__.py @@ -1,4 +1,5 @@ from .chat import ChatPortkey from .completion import PortkeyLLM +from .portkey_langchain_callback import PortkeyLangchain -__all__ = ["ChatPortkey", "PortkeyLLM"] +__all__ = ["ChatPortkey", "PortkeyLLM", "PortkeyLangchain"] diff --git a/portkey_ai/llms/callback/portkey_langchain_callback.py b/portkey_ai/llms/langchain/portkey_langchain_callback.py similarity index 100% rename from portkey_ai/llms/callback/portkey_langchain_callback.py rename to portkey_ai/llms/langchain/portkey_langchain_callback.py diff --git a/portkey_ai/llms/llama_index/__init__.py b/portkey_ai/llms/llama_index/__init__.py index e69de29..9530d3e 100644 --- a/portkey_ai/llms/llama_index/__init__.py +++ b/portkey_ai/llms/llama_index/__init__.py @@ -0,0 +1,3 @@ +from .portkey_llama_callback import PortkeyLlamaindex + +__all__ = ["PortkeyLlamaindex"] diff --git a/portkey_ai/llms/callback/portkey_llama_callback.py b/portkey_ai/llms/llama_index/portkey_llama_callback.py similarity index 97% rename from portkey_ai/llms/callback/portkey_llama_callback.py rename to portkey_ai/llms/llama_index/portkey_llama_callback.py index 9289ad2..c9548f4 100644 --- a/portkey_ai/llms/callback/portkey_llama_callback.py +++ b/portkey_ai/llms/llama_index/portkey_llama_callback.py @@ -8,6 +8,10 @@ BaseCallbackHandler as LlamaIndexBaseCallbackHandler, ) from llama_index.core.utilities.token_counting import TokenCounter +except ModuleNotFoundError: + raise ModuleNotFoundError( + "Please install llama-index to use Portkey Callback Handler" + ) except ImportError: raise ImportError("Please pip install llama-index to use Portkey Callback Handler") From 8864bacc6d640a177f1db91e23219cc5f768b7cb Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Thu, 13 Jun 2024 12:50:24 +0530 Subject: [PATCH 21/25] fix: token count for llamaindex --- portkey_ai/llms/llama_index/portkey_llama_callback.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/portkey_ai/llms/llama_index/portkey_llama_callback.py b/portkey_ai/llms/llama_index/portkey_llama_callback.py index c9548f4..0a1d771 100644 --- a/portkey_ai/llms/llama_index/portkey_llama_callback.py +++ b/portkey_ai/llms/llama_index/portkey_llama_callback.py @@ -129,7 +129,15 @@ def llm_event_stop(self, payload: Any, event_id) -> None: } ] } - self.response["body"].update({"usage": {"total_tokens": self.token_llm}}) + self.response["body"].update( + { + "usage": { + "prompt_tokens": 0, + "completion_tokens": self.token_llm, + "total_tokens": self.token_llm, + } + } + ) self.response["body"].update({"id": event_id}) self.response["body"].update({"created": int(time.time())}) self.response["body"].update({"model": data.raw.get("model", "")}) From 57da8e215b279e1ca430e04f84c804baa23651f8 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Tue, 18 Jun 2024 18:20:54 +0530 Subject: [PATCH 22/25] fix: restructuring setup.cfg --- setup.cfg | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index a7b95b5..bb69efc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -44,8 +44,9 @@ dev = ruff==0.0.292 pytest-asyncio==0.23.5 openai>=1.0,<2.0 -extra = +langachain_callback = langchain-core +llama_index_callback = llama-index [mypy] From 5d8552ee53cb05cc101110309199460c8a8f687b Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Wed, 19 Jun 2024 19:10:24 +0530 Subject: [PATCH 23/25] fix: import statement for llm test cases --- tests/test_llm_langchain.py | 2 +- tests/test_llm_llamaindex.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_llm_langchain.py b/tests/test_llm_langchain.py index ea1ac7f..3441d85 100644 --- a/tests/test_llm_langchain.py +++ b/tests/test_llm_langchain.py @@ -6,7 +6,7 @@ import pytest from tests.utils import read_json_file -from portkey_ai.llms.callback import PortkeyLangchain +from portkey_ai.llms.langchain import PortkeyLangchain from langchain.chat_models import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain.chains import LLMChain diff --git a/tests/test_llm_llamaindex.py b/tests/test_llm_llamaindex.py index 7b53d0c..959dd81 100644 --- a/tests/test_llm_llamaindex.py +++ b/tests/test_llm_llamaindex.py @@ -6,7 +6,7 @@ import pytest from tests.utils import read_json_file -from portkey_ai.llms.callback import PortkeyLlamaindex +from portkey_ai.llms.llama_index import PortkeyLlamaindex from llama_index.llms.openai import OpenAI From 244b7fed7dc1ff1c83860fecf7cd05d0010c7c24 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Wed, 19 Jun 2024 19:47:31 +0530 Subject: [PATCH 24/25] feat: prompt tokens for llamaindex --- .../llms/llama_index/portkey_llama_callback.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/portkey_ai/llms/llama_index/portkey_llama_callback.py b/portkey_ai/llms/llama_index/portkey_llama_callback.py index 0a1d771..0c35333 100644 --- a/portkey_ai/llms/llama_index/portkey_llama_callback.py +++ b/portkey_ai/llms/llama_index/portkey_llama_callback.py @@ -34,6 +34,8 @@ def __init__( self.portkey_logger = Logger(api_key=api_key) self._token_counter = TokenCounter() + self.completion_tokens = 0 + self.prompt_tokens = 0 self.token_llm = 0 self.log_object: Dict[str, Any] = {} @@ -86,6 +88,8 @@ def end_trace( def llm_event_start(self, payload: Any) -> None: if "messages" in payload: + chunks = payload.get("messages", {}) + self.prompt_tokens = self._token_counter.estimate_tokens_in_messages(chunks) messages = payload.get("messages", {}) self.prompt_records = [ {"role": m.role.value, "content": m.content} for m in messages @@ -113,8 +117,8 @@ def llm_event_stop(self, payload: Any, event_id) -> None: data = payload.get("response", {}) chunks = payload.get("messages", {}) - - self.token_llm = self._token_counter.estimate_tokens_in_messages(chunks) + self.completion_tokens = self._token_counter.estimate_tokens_in_messages(chunks) + self.token_llm = self.prompt_tokens + self.completion_tokens self.response["status"] = 200 self.response["body"] = { "choices": [ @@ -132,8 +136,8 @@ def llm_event_stop(self, payload: Any, event_id) -> None: self.response["body"].update( { "usage": { - "prompt_tokens": 0, - "completion_tokens": self.token_llm, + "prompt_tokens": self.prompt_tokens, + "completion_tokens": self.completion_tokens, "total_tokens": self.token_llm, } } From 84483b1244ef3b946d211245a0c56de951e30a76 Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Sat, 22 Jun 2024 19:07:50 +0530 Subject: [PATCH 25/25] fix: type + make file command --- Makefile | 8 +++++++- setup.cfg | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 2f482ad..07e45d2 100644 --- a/Makefile +++ b/Makefile @@ -29,4 +29,10 @@ upload: rm -rf dist dev: - pip install -e ".[dev]" \ No newline at end of file + pip install -e ".[dev]" + +langchain_callback: + pip install -e ".[langchain_callback]" + +llama_index_callback: + pip install -e ".[llama_index_callback]" \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index bb69efc..396d957 100644 --- a/setup.cfg +++ b/setup.cfg @@ -44,7 +44,7 @@ dev = ruff==0.0.292 pytest-asyncio==0.23.5 openai>=1.0,<2.0 -langachain_callback = +langchain_callback = langchain-core llama_index_callback = llama-index