diff --git a/docs/webhooks/webhooks_overview/webhooks_overview.md b/docs/webhooks/webhooks_overview/webhooks_overview.md index e9cff35..5f9895f 100644 --- a/docs/webhooks/webhooks_overview/webhooks_overview.md +++ b/docs/webhooks/webhooks_overview/webhooks_overview.md @@ -90,6 +90,96 @@ valid = client.verify_webhook(request.body, request.META['HTTP_X_SIGNATURE']) valid = client.verify_webhook(request.data, request.headers['X-SIGNATURE']) ``` +### Compressed webhook bodies + +GZIP compression can be enabled for hook payloads from the Dashboard. Enabling compression reduces the payload size significantly (often 70–90% smaller) reducing your bandwidth usage on Stream. The decompression cost on your side is usually negligible and offset by the much smaller payload. + +When payload compression is enabled, webhook HTTP requests include the `Content-Encoding: gzip` header and the body is gzipped. SQS and SNS messages are gzipped and then base64-wrapped (both transports are UTF-8 only). Some HTTP servers and middleware (Rails, Django, Laravel, Spring Boot, ASP.NET) auto-decompress the body before your handler runs — in that case the body you see is already raw JSON. + +Before enabling compression, make sure that: + +* Your backend integration is using a recent version of our official SDKs with compression support +* If you don't use an official SDK, make sure that your code supports receiving compressed payloads +* The payload signature check is done on the **uncompressed** payload + +The Python SDK exposes a one-liner per transport. Each helper detects the encoding from the body bytes (the gzip magic `1f 8b`, per [RFC 1952](https://datatracker.ietf.org/doc/html/rfc1952)), verifies the HMAC `X-Signature` over the uncompressed JSON, and returns the parsed event as a `dict`. Typed event classes are planned for a future release; until then handlers can key off the `type` field. + +```python +from stream_chat import StreamChat + +client = StreamChat(api_key="STREAM_KEY", api_secret="STREAM_SECRET") + +# Django view +def stream_webhook(request): + event = client.verify_and_parse_webhook( + request.body, + request.headers["X-Signature"], + ) + # ... handle event["type"], event["message"], ... +``` + +```python +from flask import request +from stream_chat import StreamChat + +client = StreamChat(api_key="STREAM_KEY", api_secret="STREAM_SECRET") + +@app.route("/webhooks/stream", methods=["POST"]) +def stream_webhook(): + event = client.verify_and_parse_webhook( + request.get_data(), + request.headers["X-Signature"], + ) + # ... handle event["type"], event["message"], ... +``` + +The same call works whether or not Stream is compressing for this app, and whether or not your framework auto-decompressed the request — the helper inspects the body bytes rather than the `Content-Encoding` header. + +All helpers raise `stream_chat.webhook.InvalidWebhookError` when the signature does not match, when the gzip stream is corrupt, or when the SQS/SNS base64 envelope cannot be decoded. + +The original `client.verify_webhook(request.body, request.headers["X-Signature"])` — which returns a `bool` and does not decompress — stays unchanged for backward compatibility. Switch to `verify_and_parse_webhook` to support compressed payloads. + +#### SQS / SNS firehose + +For events delivered through SQS or SNS, call the matching helper. It base64-decodes the envelope, gzip-decompresses when the magic bytes are present, and returns the parsed event. + +Stream does **not** ship an `X-Signature` on SQS or SNS deliveries: those transports run on AWS-internal infrastructure that is already authenticated end-to-end. SQS queues are reached via IAM-authenticated polling, and SNS notifications carry an AWS signature on the notification envelope itself, so verifying that the message really came from your topic happens at the AWS layer. Layering an HMAC check on top is redundant, so the SQS/SNS helpers only decode and parse — they take a single argument and never verify a signature. + +For SQS, pass the message `Body` (already the payload): + +```python +event = client.parse_sqs(sqs_message["Body"]) +``` + +For SNS, pass the **raw notification body** (the full `{"Type":"Notification", ...}` JSON envelope Amazon delivers). The SDK extracts the inner `Message` field for you, so the call site mirrors what HTTP frameworks already hand you in `request.body`: + +```python +# Django SNS HTTP delivery +event = client.parse_sns(request.body) # raw envelope (bytes/str) +``` + +#### Stateless / module-level form + +If you do not want to construct a `StreamChat` client (for example in a lightweight Lambda that only handles webhooks), call the module-level helpers directly. The HTTP helper still requires the signature and secret; the SQS/SNS helpers take a single argument: + +```python +from stream_chat import webhook + +event = webhook.verify_and_parse_webhook(body, signature, secret) +event = webhook.parse_sqs(message_body) +event = webhook.parse_sns(notification_body) +``` + +##### Arguments + +| Argument | `verify_and_parse_webhook` | `parse_sqs` | `parse_sns` | +| ------------------- | -------------------------- | --------------- | -------------------- | +| body / message_body / notification_body | required | required | required | +| signature | required | — | — | +| secret | required | — | — | + +The module also exposes the primitives the composites are built from — `gunzip_payload`, `decode_sqs_payload`, `decode_sns_payload`, `verify_signature` (constant-time HMAC-SHA256), and `parse_event` — for callers that need to run the steps individually. + All webhook requests contain these headers: | Name | Description | Example | diff --git a/stream_chat/base/client.py b/stream_chat/base/client.py index 750fcc8..4c17bbb 100644 --- a/stream_chat/base/client.py +++ b/stream_chat/base/client.py @@ -133,6 +133,41 @@ def verify_webhook( ).hexdigest() return signature == x_signature + def verify_and_parse_webhook( + self, + body: Union[bytes, str], + signature: Union[str, bytes], + ) -> Dict[str, Any]: + """Verify and parse an HTTP webhook event. + + Decompresses ``body`` when gzipped (detected from the body bytes), + verifies the ``X-Signature`` header against the app's API secret, + and returns the parsed event. The Python SDK currently returns a + ``dict``; typed event classes are planned for a future release. + + :param body: raw HTTP request body bytes Stream signed + :param signature: ``X-Signature`` header value + :raises stream_chat.base.exceptions.InvalidWebhookError: on + signature mismatch or any decode error + """ + from stream_chat.webhook import verify_and_parse_webhook + + return verify_and_parse_webhook(body, signature, self.api_secret) + + def parse_sqs(self, message_body: Union[bytes, str]) -> Dict[str, Any]: + """Parse an SQS firehose body (base64 + optional gzip). No HMAC.""" + + from stream_chat.webhook import parse_sqs + + return parse_sqs(message_body) + + def parse_sns(self, message: Union[bytes, str]) -> Dict[str, Any]: + """Parse an SNS body (unwraps SNS envelope when present). No HMAC.""" + + from stream_chat.webhook import parse_sns + + return parse_sns(message) + @abc.abstractmethod def update_app_settings( self, **settings: Any diff --git a/stream_chat/base/exceptions.py b/stream_chat/base/exceptions.py index 9ed295b..6a592d8 100644 --- a/stream_chat/base/exceptions.py +++ b/stream_chat/base/exceptions.py @@ -6,6 +6,23 @@ class StreamChannelException(Exception): pass +class InvalidWebhookError(Exception): + """Invalid webhook signature or malformed gzip/base64/JSON envelope. + + Raised by :mod:`stream_chat.webhook` on any failure path: signature + mismatch, malformed base64, gzip decompression failure, or invalid + JSON payload. The message text identifies the failure mode so + callers that want to differentiate (security logging, retry policy) + can filter on substring or on the module-level constants. + """ + + +INVALID_WEBHOOK_SIGNATURE_MISMATCH = "signature mismatch" +INVALID_WEBHOOK_INVALID_BASE64 = "invalid base64 encoding" +INVALID_WEBHOOK_GZIP_FAILED = "gzip decompression failed" +INVALID_WEBHOOK_INVALID_JSON = "invalid JSON payload" + + class StreamAPIException(Exception): def __init__(self, text: str, status_code: int) -> None: self.response_text = text diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py new file mode 100644 index 0000000..c22853f --- /dev/null +++ b/stream_chat/tests/test_webhook_compression.py @@ -0,0 +1,318 @@ +"""Tests for the webhook verification + parsing helpers. + +The Python SDK exposes the cross-SDK webhook contract in two layers: + +* Module-level functions in :mod:`stream_chat.webhook`: + + * primitives - ``gunzip_payload``, ``decode_sqs_payload``, + ``decode_sns_payload``, ``verify_signature``, ``parse_event`` + * composite helpers - ``verify_and_parse_webhook``, ``parse_sqs``, + ``parse_sns`` + +* Client-instance forms on :class:`StreamChat` and + :class:`StreamChatAsync`. They take ``api_secret`` from the client and + delegate to the module functions. + +Tests below exercise each layer with both compressed and uncompressed +payloads and confirm that bad signatures, malformed gzip / base64, and +parsed-event return values all behave as documented. +""" + +import base64 +import gzip +import hashlib +import hmac +import json + +import pytest + +from stream_chat import StreamChat, StreamChatAsync +from stream_chat.base.exceptions import ( + INVALID_WEBHOOK_GZIP_FAILED, + INVALID_WEBHOOK_INVALID_BASE64, + INVALID_WEBHOOK_INVALID_JSON, + INVALID_WEBHOOK_SIGNATURE_MISMATCH, + InvalidWebhookError, +) +from stream_chat.webhook import ( + GZIP_MAGIC, + decode_sns_payload, + decode_sqs_payload, + gunzip_payload, + parse_event, + parse_sns, + parse_sqs, + verify_and_parse_webhook, + verify_signature, +) + +API_KEY = "tkey" +API_SECRET = "tsec2" +JSON_BODY = b'{"type":"message.new","message":{"text":"the quick brown fox"}}' +EVENT_DICT = { + "type": "message.new", + "message": {"text": "the quick brown fox"}, +} + + +def _sign(body: bytes, secret: str = API_SECRET) -> str: + return hmac.new(key=secret.encode(), msg=body, digestmod=hashlib.sha256).hexdigest() + + +def _gzip(body: bytes) -> bytes: + return gzip.compress(body) + + +def _b64(body: bytes) -> str: + return base64.b64encode(body).decode("ascii") + + +@pytest.fixture +def sync_client() -> StreamChat: + return StreamChat(api_key=API_KEY, api_secret=API_SECRET) + + +class TestGunzipPayload: + def test_passthrough_plain_bytes(self): + assert gunzip_payload(JSON_BODY) == JSON_BODY + + def test_passthrough_str_input(self): + assert gunzip_payload(JSON_BODY.decode("utf-8")) == JSON_BODY + + def test_inflates_gzip_bytes(self): + assert gunzip_payload(_gzip(JSON_BODY)) == JSON_BODY + + def test_returns_bytes(self): + assert isinstance(gunzip_payload(JSON_BODY), bytes) + assert isinstance(gunzip_payload(_gzip(JSON_BODY)), bytes) + + def test_empty_input(self): + assert gunzip_payload(b"") == b"" + + def test_short_input_below_magic_length(self): + assert gunzip_payload(b"ab") == b"ab" + + def test_truncated_gzip_with_magic_raises(self): + bad = GZIP_MAGIC + b"\x00\x00\x00" + with pytest.raises(InvalidWebhookError) as exc_info: + gunzip_payload(bad) + assert str(exc_info.value) == INVALID_WEBHOOK_GZIP_FAILED + + +class TestDecodeSqsPayload: + def test_base64_only_no_compression(self): + assert decode_sqs_payload(_b64(JSON_BODY)) == JSON_BODY + + def test_base64_plus_gzip(self): + assert decode_sqs_payload(_b64(_gzip(JSON_BODY))) == JSON_BODY + + def test_accepts_str_input(self): + encoded = _b64(_gzip(JSON_BODY)) + assert isinstance(encoded, str) + assert decode_sqs_payload(encoded) == JSON_BODY + + def test_accepts_bytes_input(self): + encoded = _b64(_gzip(JSON_BODY)).encode("ascii") + assert decode_sqs_payload(encoded) == JSON_BODY + + def test_invalid_base64_raises(self): + with pytest.raises(InvalidWebhookError) as exc_info: + decode_sqs_payload("!!!not-valid-base64!!!") + assert str(exc_info.value) == INVALID_WEBHOOK_INVALID_BASE64 + + +def _sns_envelope(inner_message: str) -> str: + return json.dumps( + { + "Type": "Notification", + "MessageId": "22b80b92-fdea-4c2c-8f9d-bdfb0c7bf324", + "TopicArn": "arn:aws:sns:us-east-1:123456789012:stream-webhooks", + "Message": inner_message, + "Timestamp": "2026-05-11T10:00:00.000Z", + "SignatureVersion": "1", + "MessageAttributes": { + "X-Signature": {"Type": "String", "Value": ""}, + }, + } + ) + + +class TestDecodeSnsPayload: + def test_pre_extracted_message_matches_decode_sqs_payload(self): + wrapped = _b64(_gzip(JSON_BODY)) + assert decode_sns_payload(wrapped) == decode_sqs_payload(wrapped) + + def test_pre_extracted_message_round_trip(self): + assert decode_sns_payload(_b64(_gzip(JSON_BODY))) == JSON_BODY + + def test_unwraps_full_sns_envelope(self): + wrapped = _b64(_gzip(JSON_BODY)) + envelope = _sns_envelope(wrapped) + assert decode_sns_payload(envelope) == JSON_BODY + + def test_handles_envelope_with_leading_whitespace(self): + wrapped = _b64(_gzip(JSON_BODY)) + envelope = "\n " + _sns_envelope(wrapped) + assert decode_sns_payload(envelope) == JSON_BODY + + +class TestVerifySignature: + def test_matching(self): + assert verify_signature(JSON_BODY, _sign(JSON_BODY), API_SECRET) is True + + def test_mismatched_returns_false(self): + assert verify_signature(JSON_BODY, "0" * 64, API_SECRET) is False + + def test_accepts_bytes_signature(self): + sig = _sign(JSON_BODY).encode() + assert verify_signature(JSON_BODY, sig, API_SECRET) is True + + def test_accepts_str_body(self): + body_str = JSON_BODY.decode("utf-8") + assert verify_signature(body_str, _sign(JSON_BODY), API_SECRET) is True + + def test_wrong_secret_returns_false(self): + sig = _sign(JSON_BODY, secret="other") + assert verify_signature(JSON_BODY, sig, API_SECRET) is False + + def test_signature_must_match_uncompressed_bytes(self): + compressed = _gzip(JSON_BODY) + sig_over_compressed = _sign(compressed) + assert verify_signature(JSON_BODY, sig_over_compressed, API_SECRET) is False + + def test_non_ascii_bytes_signature_returns_false(self): + assert verify_signature(JSON_BODY, b"\xff" * 32, API_SECRET) is False + + def test_non_ascii_str_signature_returns_false(self): + assert verify_signature(JSON_BODY, "☃" * 64, API_SECRET) is False + + def test_non_string_signature_returns_false(self): + assert verify_signature(JSON_BODY, 12345, API_SECRET) is False # type: ignore[arg-type] + + +class TestParseEvent: + def test_parses_bytes(self): + assert parse_event(JSON_BODY) == EVENT_DICT + + def test_parses_str(self): + assert parse_event(JSON_BODY.decode("utf-8")) == EVENT_DICT + + def test_unknown_event_type_still_parses(self): + body = b'{"type":"a.future.event","custom":42}' + assert parse_event(body) == {"type": "a.future.event", "custom": 42} + + def test_malformed_json_raises(self): + with pytest.raises(InvalidWebhookError) as exc_info: + parse_event(b"not json") + assert str(exc_info.value) == INVALID_WEBHOOK_INVALID_JSON + + +class TestVerifyAndParseWebhook: + def test_plain_body(self): + sig = _sign(JSON_BODY) + assert verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) == EVENT_DICT + + def test_gzip_body(self): + sig = _sign(JSON_BODY) + assert verify_and_parse_webhook(_gzip(JSON_BODY), sig, API_SECRET) == EVENT_DICT + + def test_returns_dict(self): + sig = _sign(JSON_BODY) + result = verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) + assert isinstance(result, dict) + + def test_signature_mismatch_raises(self): + with pytest.raises(InvalidWebhookError) as exc_info: + verify_and_parse_webhook(JSON_BODY, "0" * 64, API_SECRET) + assert str(exc_info.value) == INVALID_WEBHOOK_SIGNATURE_MISMATCH + + def test_signature_must_be_over_uncompressed_bytes(self): + compressed = _gzip(JSON_BODY) + sig_over_compressed = _sign(compressed) + with pytest.raises(InvalidWebhookError): + verify_and_parse_webhook(compressed, sig_over_compressed, API_SECRET) + + def test_wrong_secret_raises(self): + sig = _sign(JSON_BODY, secret="other") + with pytest.raises(InvalidWebhookError): + verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) + + def test_signature_can_be_bytes(self): + sig = _sign(JSON_BODY).encode() + assert verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) == EVENT_DICT + + def test_malformed_signature_surfaces_as_webhook_error(self): + with pytest.raises(InvalidWebhookError): + verify_and_parse_webhook(JSON_BODY, b"\xff" * 32, API_SECRET) + with pytest.raises(InvalidWebhookError): + verify_and_parse_webhook(JSON_BODY, "☃" * 64, API_SECRET) + + +class TestParseSqs: + def test_base64_only(self): + wrapped = _b64(JSON_BODY) + assert parse_sqs(wrapped) == EVENT_DICT + + def test_base64_plus_gzip(self): + wrapped = _b64(_gzip(JSON_BODY)) + assert parse_sqs(wrapped) == EVENT_DICT + + +class TestParseSns: + def test_pre_extracted_message_round_trip(self): + wrapped = _b64(_gzip(JSON_BODY)) + assert parse_sns(wrapped) == EVENT_DICT + + def test_matches_sqs_behaviour_for_pre_extracted_message(self): + wrapped = _b64(_gzip(JSON_BODY)) + assert parse_sns(wrapped) == parse_sqs(wrapped) + + def test_full_sns_envelope(self): + wrapped = _b64(_gzip(JSON_BODY)) + envelope = _sns_envelope(wrapped) + assert parse_sns(envelope) == EVENT_DICT + + +class TestSyncClientMethods: + def test_verify_and_parse_webhook(self, sync_client: StreamChat): + sig = _sign(JSON_BODY) + assert sync_client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT + + def test_parse_sqs(self, sync_client: StreamChat): + wrapped = _b64(_gzip(JSON_BODY)) + assert sync_client.parse_sqs(wrapped) == EVENT_DICT + + def test_parse_sns(self, sync_client: StreamChat): + wrapped = _b64(_gzip(JSON_BODY)) + assert sync_client.parse_sns(wrapped) == EVENT_DICT + + def test_signature_mismatch_via_client(self, sync_client: StreamChat): + with pytest.raises(InvalidWebhookError): + sync_client.verify_and_parse_webhook(JSON_BODY, "0" * 64) + + +class TestSyncClientLegacyVerifyWebhook: + """The legacy boolean helper stays unchanged for backward compatibility.""" + + def test_returns_true_on_match(self, sync_client: StreamChat): + assert sync_client.verify_webhook(JSON_BODY, _sign(JSON_BODY)) is True + + def test_returns_false_on_mismatch(self, sync_client: StreamChat): + assert sync_client.verify_webhook(JSON_BODY, "0" * 64) is False + + +class TestAsyncClientMethods: + async def test_verify_and_parse_webhook(self): + sig = _sign(JSON_BODY) + async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: + assert client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT + + async def test_parse_sqs(self): + wrapped = _b64(_gzip(JSON_BODY)) + async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: + assert client.parse_sqs(wrapped) == EVENT_DICT + + async def test_parse_sns(self): + wrapped = _b64(_gzip(JSON_BODY)) + async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: + assert client.parse_sns(wrapped) == EVENT_DICT diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py new file mode 100644 index 0000000..032aa68 --- /dev/null +++ b/stream_chat/webhook.py @@ -0,0 +1,196 @@ +"""Webhook verification and parsing helpers. + +Stream Chat can deliver outbound events as plain JSON, gzipped JSON over +HTTP, or as base64 + gzip wrapped messages over SQS / SNS. The helpers in +this module implement the cross-SDK contract documented at +https://getstream.io/chat/docs/node/webhooks_overview/. + +The composite functions (:func:`verify_and_parse_webhook`, +:func:`parse_sqs`, :func:`parse_sns`) are the recommended entry points. The primitives they compose are exposed so +callers can build custom flows or run individual steps in isolation. + +The Python SDK currently returns the parsed JSON as a ``dict``; typed +event classes will land in a future release. +""" + +import base64 +import gzip +import hashlib +import hmac +import json +from typing import Any, Dict, Optional, Union + +from stream_chat.base.exceptions import ( + INVALID_WEBHOOK_GZIP_FAILED, + INVALID_WEBHOOK_INVALID_BASE64, + INVALID_WEBHOOK_INVALID_JSON, + INVALID_WEBHOOK_SIGNATURE_MISMATCH, + InvalidWebhookError, +) + +GZIP_MAGIC = b"\x1f\x8b" + +_BytesLike = Union[bytes, bytearray, memoryview, str] + + +def _to_bytes(body: _BytesLike) -> bytes: + if isinstance(body, str): + return body.encode("utf-8") + if isinstance(body, (bytes, bytearray, memoryview)): + return bytes(body) + raise TypeError(f"webhook body must be bytes or str, got {type(body).__name__}") + + +def gunzip_payload(body: _BytesLike) -> bytes: + """Return ``body`` unchanged unless it starts with the gzip magic + (``1f 8b``, per RFC 1952), in which case the gzip stream is decompressed. + + Magic-byte detection (rather than relying on a header) means the same + handler stays correct when middleware - Rails, Django, Laravel, Phoenix - + auto-decompresses the request before your code sees it. + """ + raw = _to_bytes(body) + if raw[:2] != GZIP_MAGIC: + return raw + try: + return gzip.decompress(raw) + except (gzip.BadGzipFile, OSError, EOFError) as exc: + raise InvalidWebhookError(INVALID_WEBHOOK_GZIP_FAILED) from exc + + +def decode_sqs_payload(body: _BytesLike) -> bytes: + """Reverse the SQS firehose envelope. + + SQS message bodies are always base64-encoded so they remain valid + UTF-8 over the queue. The base64-decoded bytes are gzip-decompressed + when they begin with the gzip magic, otherwise they are returned + as-is, which means the same call works whether or not Stream is + compressing payloads for this app. + """ + raw = _to_bytes(body) + try: + decoded = base64.b64decode(raw, validate=True) + except ValueError as exc: + raise InvalidWebhookError(INVALID_WEBHOOK_INVALID_BASE64) from exc + return gunzip_payload(decoded) + + +def decode_sns_payload(notification_body: _BytesLike) -> bytes: + """Reverse an SNS HTTP notification envelope. + + When ``notification_body`` is a JSON envelope + (``{"Type":"Notification","Message":"..."}``), the inner + ``Message`` field is extracted and run through + :func:`decode_sqs_payload` (base64-decode, then gzip-if-magic). When + the input is not a JSON envelope it is treated as the already-extracted + ``Message`` string, so call sites that pre-unwrap continue to work. + """ + raw = _to_bytes(notification_body) + inner = _extract_sns_message(raw) + return decode_sqs_payload(inner if inner is not None else raw) + + +def _extract_sns_message(notification_body: bytes) -> Optional[str]: + trimmed = notification_body.lstrip() + if not trimmed or trimmed[:1] != b"{": + return None + try: + envelope = json.loads(trimmed) + except (json.JSONDecodeError, ValueError): + return None + if not isinstance(envelope, dict): + return None + message = envelope.get("Message") + return message if isinstance(message, str) else None + + +def verify_signature( + body: _BytesLike, + signature: Union[str, bytes], + secret: str, +) -> bool: + """Constant-time HMAC-SHA256 verification of ``signature`` against + the digest of ``body`` keyed by ``secret``. + + The signature is always computed over the **uncompressed** JSON + bytes, so callers that decoded a gzipped or base64-wrapped payload + must pass the inflated bytes here. + + A malformed ``signature`` (non-ASCII bytes, non-string types, etc.) + is treated as a mismatch and returns ``False`` rather than raising, + so callers can rely on the boolean contract. + """ + raw = _to_bytes(body) + if isinstance(signature, bytes): + try: + signature = signature.decode("ascii") + except UnicodeDecodeError: + return False + elif not isinstance(signature, str): + return False + expected = hmac.new( + key=secret.encode("utf-8"), msg=raw, digestmod=hashlib.sha256 + ).hexdigest() + try: + return hmac.compare_digest(expected, signature) + except TypeError: + return False + + +def parse_event(payload: _BytesLike) -> Dict[str, Any]: + """Parse a JSON-encoded webhook event. + + Returns a ``dict`` today; typed event classes are planned for a + future release of the Python SDK. The function name matches the + documented primitive so callers can swap in a typed parser later + without changing call sites. + """ + try: + if isinstance(payload, (bytes, bytearray, memoryview)): + return json.loads(bytes(payload)) + return json.loads(payload) + except (json.JSONDecodeError, ValueError) as exc: + raise InvalidWebhookError(INVALID_WEBHOOK_INVALID_JSON) from exc + + +def _verify_and_parse( + payload_bytes: bytes, + signature: Union[str, bytes], + secret: str, +) -> Dict[str, Any]: + if not verify_signature(payload_bytes, signature, secret): + raise InvalidWebhookError(INVALID_WEBHOOK_SIGNATURE_MISMATCH) + return parse_event(payload_bytes) + + +def verify_and_parse_webhook( + body: _BytesLike, + signature: Union[str, bytes], + secret: str, +) -> Dict[str, Any]: + """Decompress (when gzipped), verify the HMAC ``signature``, and + return the parsed event. + + :param body: raw HTTP request body bytes Stream signed + :param signature: ``X-Signature`` header value + :param secret: the app's API secret + :raises InvalidWebhookError: on signature mismatch or decode error + """ + inflated = gunzip_payload(body) + return _verify_and_parse(inflated, signature, secret) + + +def parse_sqs(message_body: _BytesLike) -> Dict[str, Any]: + """Decode the SQS ``Body`` (base64, then gzip-if-magic) and return the + parsed event. Stream does not HMAC-sign SQS message bodies.""" + + inflated = decode_sqs_payload(message_body) + return parse_event(inflated) + + +def parse_sns(message: _BytesLike) -> Dict[str, Any]: + """Decode an SNS-delivered payload (unwraps envelope JSON when present, + then same path as :func:`parse_sqs`). No verification step.""" + + inflated = decode_sns_payload(message) + return parse_event(inflated)