Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 47 additions & 37 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "pipedream"

[tool.poetry]
name = "pipedream"
version = "1.0.9"
version = "1.0.10"
description = ""
readme = "README.md"
authors = []
Expand Down
4 changes: 2 additions & 2 deletions src/pipedream/core/client_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ def __init__(

def get_headers(self) -> typing.Dict[str, str]:
headers: typing.Dict[str, str] = {
"User-Agent": "pipedream/1.0.9",
"User-Agent": "pipedream/1.0.10",
"X-Fern-Language": "Python",
"X-Fern-SDK-Name": "pipedream",
"X-Fern-SDK-Version": "1.0.9",
"X-Fern-SDK-Version": "1.0.10",
**(self.get_custom_headers() or {}),
}
if self._project_environment is not None:
Expand Down
42 changes: 42 additions & 0 deletions src/pipedream/core/http_sse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# This file was auto-generated by Fern from our API Definition.

# isort: skip_file

import typing
from importlib import import_module

if typing.TYPE_CHECKING:
from ._api import EventSource, aconnect_sse, connect_sse
from ._exceptions import SSEError
from ._models import ServerSentEvent
_dynamic_imports: typing.Dict[str, str] = {
"EventSource": "._api",
"SSEError": "._exceptions",
"ServerSentEvent": "._models",
"aconnect_sse": "._api",
"connect_sse": "._api",
}


def __getattr__(attr_name: str) -> typing.Any:
module_name = _dynamic_imports.get(attr_name)
if module_name is None:
raise AttributeError(f"No {attr_name} found in _dynamic_imports for module name -> {__name__}")
try:
module = import_module(module_name, __package__)
if module_name == f".{attr_name}":
return module
else:
return getattr(module, attr_name)
except ImportError as e:
raise ImportError(f"Failed to import {attr_name} from {module_name}: {e}") from e
except AttributeError as e:
raise AttributeError(f"Failed to get {attr_name} from {module_name}: {e}") from e


def __dir__():
lazy_attrs = list(_dynamic_imports.keys())
return sorted(lazy_attrs)


__all__ = ["EventSource", "SSEError", "ServerSentEvent", "aconnect_sse", "connect_sse"]
112 changes: 112 additions & 0 deletions src/pipedream/core/http_sse/_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# This file was auto-generated by Fern from our API Definition.

import re
from contextlib import asynccontextmanager, contextmanager
from typing import Any, AsyncGenerator, AsyncIterator, Iterator, cast

import httpx
from ._decoders import SSEDecoder
from ._exceptions import SSEError
from ._models import ServerSentEvent


class EventSource:
def __init__(self, response: httpx.Response) -> None:
self._response = response

def _check_content_type(self) -> None:
content_type = self._response.headers.get("content-type", "").partition(";")[0]
if "text/event-stream" not in content_type:
raise SSEError(
f"Expected response header Content-Type to contain 'text/event-stream', got {content_type!r}"
)

def _get_charset(self) -> str:
"""Extract charset from Content-Type header, fallback to UTF-8."""
content_type = self._response.headers.get("content-type", "")

# Parse charset parameter using regex
charset_match = re.search(r"charset=([^;\s]+)", content_type, re.IGNORECASE)
if charset_match:
charset = charset_match.group(1).strip("\"'")
# Validate that it's a known encoding
try:
# Test if the charset is valid by trying to encode/decode
"test".encode(charset).decode(charset)
return charset
except (LookupError, UnicodeError):
# If charset is invalid, fall back to UTF-8
pass

# Default to UTF-8 if no charset specified or invalid charset
return "utf-8"

@property
def response(self) -> httpx.Response:
return self._response

def iter_sse(self) -> Iterator[ServerSentEvent]:
self._check_content_type()
decoder = SSEDecoder()
charset = self._get_charset()

buffer = ""
for chunk in self._response.iter_bytes():
# Decode chunk using detected charset
text_chunk = chunk.decode(charset, errors="replace")
buffer += text_chunk

# Process complete lines
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.rstrip("\r")
sse = decoder.decode(line)
# when we reach a "\n\n" => line = ''
# => decoder will attempt to return an SSE Event
if sse is not None:
yield sse

# Process any remaining data in buffer
if buffer.strip():
line = buffer.rstrip("\r")
sse = decoder.decode(line)
if sse is not None:
yield sse

async def aiter_sse(self) -> AsyncGenerator[ServerSentEvent, None]:
self._check_content_type()
decoder = SSEDecoder()
lines = cast(AsyncGenerator[str, None], self._response.aiter_lines())
try:
async for line in lines:
line = line.rstrip("\n")
sse = decoder.decode(line)
if sse is not None:
yield sse
finally:
await lines.aclose()


@contextmanager
def connect_sse(client: httpx.Client, method: str, url: str, **kwargs: Any) -> Iterator[EventSource]:
headers = kwargs.pop("headers", {})
headers["Accept"] = "text/event-stream"
headers["Cache-Control"] = "no-store"

with client.stream(method, url, headers=headers, **kwargs) as response:
yield EventSource(response)


@asynccontextmanager
async def aconnect_sse(
client: httpx.AsyncClient,
method: str,
url: str,
**kwargs: Any,
) -> AsyncIterator[EventSource]:
headers = kwargs.pop("headers", {})
headers["Accept"] = "text/event-stream"
headers["Cache-Control"] = "no-store"

async with client.stream(method, url, headers=headers, **kwargs) as response:
yield EventSource(response)
61 changes: 61 additions & 0 deletions src/pipedream/core/http_sse/_decoders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# This file was auto-generated by Fern from our API Definition.

from typing import List, Optional

from ._models import ServerSentEvent


class SSEDecoder:
def __init__(self) -> None:
self._event = ""
self._data: List[str] = []
self._last_event_id = ""
self._retry: Optional[int] = None

def decode(self, line: str) -> Optional[ServerSentEvent]:
# See: https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation # noqa: E501

if not line:
if not self._event and not self._data and not self._last_event_id and self._retry is None:
return None

sse = ServerSentEvent(
event=self._event,
data="\n".join(self._data),
id=self._last_event_id,
retry=self._retry,
)

# NOTE: as per the SSE spec, do not reset last_event_id.
self._event = ""
self._data = []
self._retry = None

return sse

if line.startswith(":"):
return None

fieldname, _, value = line.partition(":")

if value.startswith(" "):
value = value[1:]

if fieldname == "event":
self._event = value
elif fieldname == "data":
self._data.append(value)
elif fieldname == "id":
if "\0" in value:
pass
else:
self._last_event_id = value
elif fieldname == "retry":
try:
self._retry = int(value)
except (TypeError, ValueError):
pass
else:
pass # Field is ignored.

return None
7 changes: 7 additions & 0 deletions src/pipedream/core/http_sse/_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# This file was auto-generated by Fern from our API Definition.

import httpx


class SSEError(httpx.TransportError):
pass
Loading