Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 4.1.0 - 2025-05-22

Moved ai openai package to a composition approach over inheritance.

## 4.0.1 – 2025-04-29

1. Remove deprecated `monotonic` library. Use Python's core `time.monotonic` function instead
Expand Down
128 changes: 100 additions & 28 deletions posthog/ai/openai/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

try:
import openai
import openai.resources
except ImportError:
raise ModuleNotFoundError("Please install the OpenAI SDK to use this feature: 'pip install openai'")

Expand All @@ -29,14 +28,37 @@ def __init__(self, posthog_client: PostHogClient, **kwargs):
"""
super().__init__(**kwargs)
self._ph_client = posthog_client
self.chat = WrappedChat(self)
self.embeddings = WrappedEmbeddings(self)
self.beta = WrappedBeta(self)
self.responses = WrappedResponses(self)

# Store original objects after parent initialization (only if they exist)
self._original_chat = getattr(self, "chat", None)
self._original_embeddings = getattr(self, "embeddings", None)
self._original_beta = getattr(self, "beta", None)
self._original_responses = getattr(self, "responses", None)

class WrappedResponses(openai.resources.responses.Responses):
_client: OpenAI
# Replace with wrapped versions (only if originals exist)
if self._original_chat is not None:
self.chat = WrappedChat(self, self._original_chat)

if self._original_embeddings is not None:
self.embeddings = WrappedEmbeddings(self, self._original_embeddings)

if self._original_beta is not None:
self.beta = WrappedBeta(self, self._original_beta)

if self._original_responses is not None:
self.responses = WrappedResponses(self, self._original_responses)


class WrappedResponses:
"""Wrapper for OpenAI responses that tracks usage in PostHog."""

def __init__(self, client: OpenAI, original_responses):
self._client = client
self._original = original_responses

def __getattr__(self, name):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be called only in case there's no method, or is this called all the time? Do we have unit tests for this wrapper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do have unit tests, just not integration tests that will run with different versions. i have tested that manually.

that method is only called if we do not have an override. it allows us to fall back to the original client if we do not support

"""Fallback to original responses object for any methods we don't explicitly handle."""
return getattr(self._original, name)

def create(
self,
Expand Down Expand Up @@ -69,7 +91,7 @@ def create(
posthog_privacy_mode,
posthog_groups,
self._client.base_url,
super().create,
self._original.create,
**kwargs,
)

Expand All @@ -85,7 +107,7 @@ def _create_streaming(
start_time = time.time()
usage_stats: Dict[str, int] = {}
final_content = []
response = super().create(**kwargs)
response = self._original.create(**kwargs)

def generator():
nonlocal usage_stats
Expand Down Expand Up @@ -195,16 +217,32 @@ def _capture_streaming_event(
)


class WrappedChat(openai.resources.chat.Chat):
_client: OpenAI
class WrappedChat:
"""Wrapper for OpenAI chat that tracks usage in PostHog."""

def __init__(self, client: OpenAI, original_chat):
self._client = client
self._original = original_chat

def __getattr__(self, name):
"""Fallback to original chat object for any methods we don't explicitly handle."""
return getattr(self._original, name)

@property
def completions(self):
return WrappedCompletions(self._client)
return WrappedCompletions(self._client, self._original.completions)


class WrappedCompletions:
"""Wrapper for OpenAI chat completions that tracks usage in PostHog."""

class WrappedCompletions(openai.resources.chat.completions.Completions):
_client: OpenAI
def __init__(self, client: OpenAI, original_completions):
self._client = client
self._original = original_completions

def __getattr__(self, name):
"""Fallback to original completions object for any methods we don't explicitly handle."""
return getattr(self._original, name)

def create(
self,
Expand Down Expand Up @@ -237,7 +275,7 @@ def create(
posthog_privacy_mode,
posthog_groups,
self._client.base_url,
super().create,
self._original.create,
**kwargs,
)

Expand All @@ -257,7 +295,7 @@ def _create_streaming(
if "stream_options" not in kwargs:
kwargs["stream_options"] = {}
kwargs["stream_options"]["include_usage"] = True
response = super().create(**kwargs)
response = self._original.create(**kwargs)

def generator():
nonlocal usage_stats
Expand Down Expand Up @@ -383,8 +421,16 @@ def _capture_streaming_event(
)


class WrappedEmbeddings(openai.resources.embeddings.Embeddings):
_client: OpenAI
class WrappedEmbeddings:
"""Wrapper for OpenAI embeddings that tracks usage in PostHog."""

def __init__(self, client: OpenAI, original_embeddings):
self._client = client
self._original = original_embeddings

def __getattr__(self, name):
"""Fallback to original embeddings object for any methods we don't explicitly handle."""
return getattr(self._original, name)

def create(
self,
Expand All @@ -402,6 +448,8 @@ def create(
posthog_distinct_id: Optional ID to associate with the usage event.
posthog_trace_id: Optional trace UUID for linking events.
posthog_properties: Optional dictionary of extra properties to include in the event.
posthog_privacy_mode: Whether to anonymize the input and output.
posthog_groups: Optional dictionary of groups to associate with the event.
**kwargs: Any additional parameters for the OpenAI Embeddings API.

Returns:
Expand All @@ -411,7 +459,7 @@ def create(
posthog_trace_id = str(uuid.uuid4())

start_time = time.time()
response = super().create(**kwargs)
response = self._original.create(**kwargs)
end_time = time.time()

# Extract usage statistics if available
Expand Down Expand Up @@ -452,24 +500,48 @@ def create(
return response


class WrappedBeta(openai.resources.beta.Beta):
_client: OpenAI
class WrappedBeta:
"""Wrapper for OpenAI beta features that tracks usage in PostHog."""

def __init__(self, client: OpenAI, original_beta):
self._client = client
self._original = original_beta

def __getattr__(self, name):
"""Fallback to original beta object for any methods we don't explicitly handle."""
return getattr(self._original, name)

@property
def chat(self):
return WrappedBetaChat(self._client)
return WrappedBetaChat(self._client, self._original.chat)


class WrappedBetaChat(openai.resources.beta.chat.Chat):
_client: OpenAI
class WrappedBetaChat:
"""Wrapper for OpenAI beta chat that tracks usage in PostHog."""

def __init__(self, client: OpenAI, original_beta_chat):
self._client = client
self._original = original_beta_chat

def __getattr__(self, name):
"""Fallback to original beta chat object for any methods we don't explicitly handle."""
return getattr(self._original, name)

@property
def completions(self):
return WrappedBetaCompletions(self._client)
return WrappedBetaCompletions(self._client, self._original.completions)


class WrappedBetaCompletions:
"""Wrapper for OpenAI beta chat completions that tracks usage in PostHog."""

def __init__(self, client: OpenAI, original_beta_completions):
self._client = client
self._original = original_beta_completions

class WrappedBetaCompletions(openai.resources.beta.chat.completions.Completions):
_client: OpenAI
def __getattr__(self, name):
"""Fallback to original beta completions object for any methods we don't explicitly handle."""
return getattr(self._original, name)

def parse(
self,
Expand All @@ -489,6 +561,6 @@ def parse(
posthog_privacy_mode,
posthog_groups,
self._client.base_url,
super().parse,
self._original.parse,
**kwargs,
)
Loading