diff --git a/src/cloudevents/core/bindings/__init__.py b/src/cloudevents/core/bindings/__init__.py
new file mode 100644
index 0000000..da5ad07
--- /dev/null
+++ b/src/cloudevents/core/bindings/__init__.py
@@ -0,0 +1,19 @@
+# Copyright 2018-Present The CloudEvents Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""CloudEvents bindings for various protocols."""
+
+from . import http, kafka
+
+__all__ = ["http", "kafka"]
\ No newline at end of file
diff --git a/src/cloudevents/core/bindings/_http_binding.py b/src/cloudevents/core/bindings/_http_binding.py
new file mode 100644
index 0000000..6fd9fd5
--- /dev/null
+++ b/src/cloudevents/core/bindings/_http_binding.py
@@ -0,0 +1,170 @@
+# Copyright 2018-Present The CloudEvents Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Internal HTTP binding implementation."""
+
+import urllib.parse
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Any, Dict, Optional, Tuple, Union
+
+from cloudevents.core.base import BaseCloudEvent
+from cloudevents.core.formats.base import Format
+from cloudevents.core.v1.event import CloudEvent
+
+HTTPHeaders = Dict[str, str]
+HTTPBody = bytes
+
+
+@dataclass
+class HTTPMessage:
+ """Represents an HTTP message with headers and body."""
+
+ headers: HTTPHeaders
+ body: HTTPBody
+
+
+class HTTPBinding:
+ """HTTP protocol binding for CloudEvents."""
+
+ CE_PREFIX = "ce-"
+ CONTENT_TYPE_HEADER = "content-type"
+
+ def to_structured(
+ self, event: BaseCloudEvent, event_format: Format
+ ) -> Tuple[HTTPHeaders, HTTPBody]:
+ """
+ Convert CloudEvent to structured mode HTTP message.
+
+ :param event: The CloudEvent to convert.
+ :param event_format: The format to use for serialization.
+ :return: Tuple of (headers, body).
+ """
+ content_type = event_format.get_content_type()
+ headers = {self.CONTENT_TYPE_HEADER: content_type}
+ body = event_format.write(event)
+ return headers, body
+
+ def from_structured(
+ self,
+ headers: HTTPHeaders,
+ body: HTTPBody,
+ event_format: Format,
+ event_factory: Any = CloudEvent,
+ ) -> BaseCloudEvent:
+ """
+ Parse structured mode HTTP message to CloudEvent.
+
+ :param headers: HTTP headers.
+ :param body: HTTP body.
+ :param event_format: The format to use for deserialization.
+ :param event_factory: Factory function to create CloudEvent instances.
+ :return: The parsed CloudEvent.
+ """
+ return event_format.read(event_factory, body)
+
+ def to_binary(
+ self, event: BaseCloudEvent, event_format: Format
+ ) -> Tuple[HTTPHeaders, HTTPBody]:
+ """
+ Convert CloudEvent to binary mode HTTP message.
+
+ :param event: The CloudEvent to convert.
+ :param event_format: The format to use for data serialization if needed.
+ :return: Tuple of (headers, body).
+ """
+ headers = {}
+
+ # Map CloudEvents attributes to HTTP headers
+ attributes = event.get_attributes()
+ for attr_name, attr_value in attributes.items():
+ if attr_name == "datacontenttype":
+ headers[self.CONTENT_TYPE_HEADER] = attr_value
+ else:
+ header_name = f"{self.CE_PREFIX}{attr_name}"
+ headers[header_name] = self._encode_header_value(attr_value)
+
+ # Handle data using format's write_data method
+ data = event.get_data()
+ datacontenttype = event.get_datacontenttype()
+ body = event_format.write_data(data, datacontenttype)
+
+ return headers, body
+
+ def from_binary(
+ self,
+ headers: HTTPHeaders,
+ body: HTTPBody,
+ event_format: Format,
+ event_factory: Any = CloudEvent,
+ ) -> BaseCloudEvent:
+ """
+ Parse binary mode HTTP message to CloudEvent.
+
+ :param headers: HTTP headers.
+ :param body: HTTP body.
+ :param event_format: The format to use for data deserialization if needed.
+ :param event_factory: Factory function to create CloudEvent instances.
+ :return: The parsed CloudEvent.
+ """
+ attributes: Dict[str, Any] = {}
+
+ # Extract CloudEvents attributes from headers
+ for header_name, header_value in headers.items():
+ header_lower = header_name.lower()
+ if header_lower == self.CONTENT_TYPE_HEADER:
+ attributes["datacontenttype"] = header_value
+ elif header_lower.startswith(self.CE_PREFIX):
+ attr_name = header_lower[len(self.CE_PREFIX) :]
+ decoded_value = self._decode_header_value(header_value)
+ # Special handling for time attribute
+ if attr_name == "time":
+ from dateutil.parser import isoparse
+
+ attributes[attr_name] = isoparse(decoded_value)
+ else:
+ attributes[attr_name] = decoded_value
+
+ # Parse data using format's read_data method
+ data: Optional[Union[bytes, str, dict]] = None
+ if body:
+ data = event_format.read_data(body, attributes.get("datacontenttype"))
+
+ return event_factory(attributes, data)
+
+ def _encode_header_value(self, value: Any) -> str:
+ """
+ Encode a value for use in an HTTP header.
+
+ :param value: The value to encode.
+ :return: The encoded string.
+ """
+ if isinstance(value, datetime):
+ str_value = value.isoformat()
+ if str_value.endswith("+00:00"):
+ str_value = str_value[:-6] + "Z"
+ else:
+ str_value = str(value)
+
+ # Percent-encode special characters
+ return urllib.parse.quote(str_value, safe="")
+
+ def _decode_header_value(self, value: str) -> str:
+ """
+ Decode a value from an HTTP header.
+
+ :param value: The encoded value.
+ :return: The decoded string.
+ """
+ return urllib.parse.unquote(value)
\ No newline at end of file
diff --git a/src/cloudevents/core/bindings/_kafka_binding.py b/src/cloudevents/core/bindings/_kafka_binding.py
new file mode 100644
index 0000000..92a0f79
--- /dev/null
+++ b/src/cloudevents/core/bindings/_kafka_binding.py
@@ -0,0 +1,203 @@
+# Copyright 2018-Present The CloudEvents Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Internal Kafka binding implementation."""
+
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+
+from cloudevents.core.base import BaseCloudEvent
+from cloudevents.core.formats.base import Format
+from cloudevents.core.v1.event import CloudEvent
+
+# Type aliases for Kafka message components
+KafkaHeaders = Dict[str, bytes]
+KafkaKey = Optional[bytes]
+KafkaValue = bytes
+
+
+@dataclass
+class KafkaMessage:
+ """Represents a Kafka message with headers, key, and value."""
+
+ headers: KafkaHeaders
+ key: KafkaKey
+ value: KafkaValue
+
+
+class KafkaBinding:
+ """Kafka protocol binding for CloudEvents."""
+
+ CE_PREFIX = "ce_"
+ CONTENT_TYPE_HEADER = "content-type"
+
+ def to_structured(
+ self,
+ event: BaseCloudEvent,
+ event_format: Format,
+ key: Optional[bytes] = None,
+ key_mapper: Optional[Callable[[BaseCloudEvent], Optional[bytes]]] = None,
+ ) -> KafkaMessage:
+ """
+ Convert CloudEvent to structured mode Kafka message.
+
+ :param event: The CloudEvent to convert.
+ :param event_format: The format to use for serialization.
+ :param key: Optional Kafka message key.
+ :param key_mapper: Optional function to derive key from event.
+ :return: KafkaMessage with headers, key, and value.
+ """
+ # Determine content type
+ content_type = event_format.get_content_type()
+
+ # Create headers with content type
+ headers: KafkaHeaders = {
+ self.CONTENT_TYPE_HEADER: content_type.encode("utf-8")
+ }
+
+ # Serialize event to bytes
+ value = event_format.write(event)
+
+ # Determine message key
+ if key_mapper:
+ message_key = key_mapper(event)
+ else:
+ message_key = key
+
+ return KafkaMessage(headers=headers, key=message_key, value=value)
+
+ def from_structured(
+ self,
+ message: KafkaMessage,
+ event_format: Format,
+ event_factory: Any = CloudEvent,
+ ) -> BaseCloudEvent:
+ """
+ Parse structured mode Kafka message to CloudEvent.
+
+ :param message: The Kafka message.
+ :param event_format: The format to use for deserialization.
+ :param event_factory: Factory function to create CloudEvent instances.
+ :return: The parsed CloudEvent.
+ """
+ return event_format.read(event_factory, message.value)
+
+ def to_binary(
+ self,
+ event: BaseCloudEvent,
+ event_format: Format,
+ key: Optional[bytes] = None,
+ key_mapper: Optional[Callable[[BaseCloudEvent], Optional[bytes]]] = None,
+ ) -> KafkaMessage:
+ """
+ Convert CloudEvent to binary mode Kafka message.
+
+ :param event: The CloudEvent to convert.
+ :param event_format: The format to use for data serialization if needed.
+ :param key: Optional Kafka message key.
+ :param key_mapper: Optional function to derive key from event.
+ :return: KafkaMessage with headers, key, and value.
+ """
+ headers: KafkaHeaders = {}
+
+ # Map CloudEvents attributes to Kafka headers
+ attributes = event.get_attributes()
+ for attr_name, attr_value in attributes.items():
+ header_name = f"{self.CE_PREFIX}{attr_name}"
+ header_value = self._encode_header_value(attr_value)
+ headers[header_name] = header_value.encode("utf-8")
+
+ # Handle data using format's write_data method
+ data = event.get_data()
+ datacontenttype = event.get_datacontenttype()
+ value = event_format.write_data(data, datacontenttype)
+
+ # Determine message key
+ if key_mapper:
+ message_key = key_mapper(event)
+ elif key is None:
+ # Check for partitionkey extension
+ partition_key = event.get_extension("partitionkey")
+ if partition_key:
+ message_key = str(partition_key).encode("utf-8")
+ else:
+ message_key = None
+ else:
+ message_key = key
+
+ return KafkaMessage(headers=headers, key=message_key, value=value)
+
+ def from_binary(
+ self,
+ message: KafkaMessage,
+ event_format: Format,
+ event_factory: Any = CloudEvent,
+ ) -> BaseCloudEvent:
+ """
+ Parse binary mode Kafka message to CloudEvent.
+
+ :param message: The Kafka message.
+ :param event_format: The format to use for data deserialization if needed.
+ :param event_factory: Factory function to create CloudEvent instances.
+ :return: The parsed CloudEvent.
+ """
+ attributes: Dict[str, Any] = {}
+
+ # Extract CloudEvents attributes from headers
+ for header_name, header_value in message.headers.items():
+ if header_name.startswith(self.CE_PREFIX):
+ attr_name = header_name[len(self.CE_PREFIX) :]
+ decoded_value = self._decode_header_value(header_value.decode("utf-8"))
+ # Special handling for time attribute
+ if attr_name == "time":
+ from dateutil.parser import isoparse
+
+ attributes[attr_name] = isoparse(decoded_value)
+ else:
+ attributes[attr_name] = decoded_value
+
+ # Parse data using format's read_data method
+ data: Optional[Union[bytes, str, dict]] = None
+ if message.value:
+ data = event_format.read_data(
+ message.value, attributes.get("datacontenttype")
+ )
+
+ return event_factory(attributes, data)
+
+ def _encode_header_value(self, value: Any) -> str:
+ """
+ Encode a value for use in a Kafka header.
+
+ :param value: The value to encode.
+ :return: The encoded string.
+ """
+ if isinstance(value, datetime):
+ str_value = value.isoformat()
+ if str_value.endswith("+00:00"):
+ str_value = str_value[:-6] + "Z"
+ else:
+ str_value = str(value)
+
+ return str_value
+
+ def _decode_header_value(self, value: str) -> str:
+ """
+ Decode a value from a Kafka header.
+
+ :param value: The value to decode.
+ :return: The decoded string.
+ """
+ return value
diff --git a/src/cloudevents/core/bindings/base.py b/src/cloudevents/core/bindings/base.py
new file mode 100644
index 0000000..2cc6c66
--- /dev/null
+++ b/src/cloudevents/core/bindings/base.py
@@ -0,0 +1,41 @@
+# Copyright 2018-Present The CloudEvents Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from typing import Any, Callable, Generic, Protocol, TypeVar
+
+from cloudevents.core.base import BaseCloudEvent
+from cloudevents.core.formats.base import Format
+
+T = TypeVar("T") # Transport message type
+
+
+class Binding(Protocol, Generic[T]):
+ """Base protocol for all CloudEvents bindings."""
+
+ def to_structured(self, event: BaseCloudEvent, event_format: Format) -> T:
+ """Convert CloudEvent to structured mode transport message."""
+ ...
+
+ def from_structured(self, message: T, event_format: Format) -> BaseCloudEvent:
+ """Parse structured mode transport message to CloudEvent."""
+ ...
+
+ def to_binary(self, event: BaseCloudEvent, event_format: Format) -> T:
+ """Convert CloudEvent to binary mode transport message."""
+ ...
+
+ def from_binary(self, message: T, event_format: Format) -> BaseCloudEvent:
+ """Parse binary mode transport message to CloudEvent."""
+ ...
\ No newline at end of file
diff --git a/src/cloudevents/core/bindings/http.py b/src/cloudevents/core/bindings/http.py
new file mode 100644
index 0000000..1093815
--- /dev/null
+++ b/src/cloudevents/core/bindings/http.py
@@ -0,0 +1,155 @@
+# Copyright 2018-Present The CloudEvents Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Format-agnostic HTTP binding functions for CloudEvents.
+
+This module provides functions for converting CloudEvents to/from HTTP messages
+in both structured and binary modes. These functions work with any format
+implementation (JSON, Avro, Protobuf, etc.).
+"""
+
+from typing import Any, Tuple
+
+from cloudevents.core.base import BaseCloudEvent
+from cloudevents.core.formats.base import Format
+from cloudevents.core.v1.event import CloudEvent
+
+from ._http_binding import HTTPBinding, HTTPBody, HTTPHeaders, HTTPMessage
+
+_http_binding = HTTPBinding()
+
+
+def to_structured(
+ event: BaseCloudEvent, event_format: Format
+) -> Tuple[HTTPHeaders, HTTPBody]:
+ """
+ Convert CloudEvent to structured mode HTTP message.
+
+ Example:
+ from cloudevents.core.v1.event import CloudEvent
+ from cloudevents.core.formats.json import JSONFormat
+ from cloudevents.core.bindings.http import to_structured
+
+ event = CloudEvent({"type": "example", "source": "test"}, {"key": "value"})
+ headers, body = to_structured(event, JSONFormat())
+ """
+ return _http_binding.to_structured(event, event_format)
+
+
+def from_structured(
+ headers: HTTPHeaders,
+ body: HTTPBody,
+ event_format: Format,
+ event_factory: Any = CloudEvent,
+) -> BaseCloudEvent:
+ """
+ Parse structured mode HTTP message to CloudEvent.
+
+ Example:
+ from cloudevents.core.formats.json import JSONFormat
+ from cloudevents.core.bindings.http import from_structured
+
+ headers = {"content-type": "application/cloudevents+json"}
+ body = b'{"type": "example", "source": "test", "data": {"key": "value"}}'
+ event = from_structured(headers, body, JSONFormat())
+ """
+ return _http_binding.from_structured(headers, body, event_format, event_factory)
+
+
+def to_binary(event: BaseCloudEvent, event_format: Format) -> Tuple[HTTPHeaders, HTTPBody]:
+ """
+ Convert CloudEvent to binary mode HTTP message.
+
+ Example:
+ from cloudevents.core.v1.event import CloudEvent
+ from cloudevents.core.formats.json import JSONFormat
+ from cloudevents.core.bindings.http import to_binary
+
+ event = CloudEvent({"type": "example", "source": "test"}, "Hello World")
+ headers, body = to_binary(event, JSONFormat())
+ """
+ return _http_binding.to_binary(event, event_format)
+
+
+def from_binary(
+ headers: HTTPHeaders,
+ body: HTTPBody,
+ event_format: Format,
+ event_factory: Any = CloudEvent,
+) -> BaseCloudEvent:
+ """
+ Parse binary mode HTTP message to CloudEvent.
+
+ Example:
+ from cloudevents.core.formats.json import JSONFormat
+ from cloudevents.core.bindings.http import from_binary
+
+ headers = {"ce-type": "example", "ce-source": "test", "content-type": "text/plain"}
+ body = b"Hello World"
+ event = from_binary(headers, body, JSONFormat())
+ """
+ return _http_binding.from_binary(headers, body, event_format, event_factory)
+
+
+def from_http_message(
+ headers: HTTPHeaders,
+ body: HTTPBody,
+ event_format: Format,
+ event_factory: Any = CloudEvent,
+) -> BaseCloudEvent:
+ """
+ Auto-detect and parse HTTP message to CloudEvent (structured or binary mode).
+
+ This is a convenience function that automatically detects whether the message
+ uses structured or binary content mode and calls the appropriate parser.
+
+ Detection logic:
+ - If any header starts with "ce-", uses binary mode
+ - Otherwise uses structured mode
+
+ Example:
+ from cloudevents.core.formats.json import JSONFormat
+ from cloudevents.core.bindings.http import from_http_message
+
+ # Works with both structured and binary messages
+ headers = {...} # Any HTTP headers
+ body = b"..." # Any HTTP body
+ event = from_http_message(headers, body, JSONFormat())
+
+ :param headers: The HTTP headers dictionary.
+ :param body: The HTTP body as bytes.
+ :param event_format: The format to use for serialization/deserialization.
+ :param event_factory: Factory function to create CloudEvent instances.
+ :return: The parsed CloudEvent.
+ """
+ # Check for binary mode indicators (ce- prefixed headers)
+ for header_name in headers.keys():
+ if header_name.lower().startswith("ce-"):
+ return from_binary(headers, body, event_format, event_factory)
+
+ # Default to structured mode if no ce- headers found
+ return from_structured(headers, body, event_format, event_factory)
+
+
+# Export public API
+__all__ = [
+ "to_structured",
+ "from_structured",
+ "to_binary",
+ "from_binary",
+ "from_http_message",
+ "HTTPHeaders",
+ "HTTPBody",
+ "HTTPMessage",
+]
\ No newline at end of file
diff --git a/src/cloudevents/core/bindings/kafka.py b/src/cloudevents/core/bindings/kafka.py
new file mode 100644
index 0000000..4857840
--- /dev/null
+++ b/src/cloudevents/core/bindings/kafka.py
@@ -0,0 +1,217 @@
+# Copyright 2018-Present The CloudEvents Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Format-agnostic Kafka binding functions for CloudEvents.
+
+This module provides functions for converting CloudEvents to/from Kafka messages
+in both structured and binary modes. These functions work with any format
+implementation (JSON, Avro, Protobuf, etc.).
+
+Example:
+ from cloudevents.core.v1.event import CloudEvent
+ from cloudevents.core.formats.json import JSONFormat
+ from cloudevents.core.bindings.kafka import to_structured, to_binary
+
+ event = CloudEvent({"type": "example", "source": "test"}, {"key": "value"})
+
+ # Structured mode
+ message = to_structured(event, JSONFormat())
+
+ # Binary mode with partitioning
+ message = to_binary(event, JSONFormat(), key=b"my-key")
+"""
+
+from typing import Any, Callable, Optional
+
+from cloudevents.core.base import BaseCloudEvent
+from cloudevents.core.formats.base import Format
+from cloudevents.core.v1.event import CloudEvent
+
+from ._kafka_binding import KafkaBinding, KafkaKey, KafkaMessage
+
+_kafka_binding = KafkaBinding()
+
+
+def to_structured(
+ event: BaseCloudEvent,
+ event_format: Format,
+ key: Optional[bytes] = None,
+ key_mapper: Optional[Callable[[BaseCloudEvent], Optional[bytes]]] = None,
+) -> KafkaMessage:
+ """
+ Convert CloudEvent to structured mode Kafka message.
+
+ In structured mode, the entire event (attributes and data) is encoded
+ in the message value using the specified format.
+
+ Example:
+ from cloudevents.core.v1.event import CloudEvent
+ from cloudevents.core.formats.json import JSONFormat
+ from cloudevents.core.bindings.kafka import to_structured
+
+ event = CloudEvent({"type": "example", "source": "test"}, {"key": "value"})
+ message = to_structured(event, JSONFormat())
+ # message.headers = {"content-type": b"application/cloudevents+json"}
+ # message.value = b'{"specversion":"1.0","type":"example",...}'
+
+ :param event: The CloudEvent to convert.
+ :param event_format: The format to use (JSONFormat, AvroFormat, etc.).
+ :param key: Optional Kafka message key.
+ :param key_mapper: Optional function to derive key from event.
+ :return: KafkaMessage with headers, key, and value.
+ """
+ return _kafka_binding.to_structured(event, event_format, key, key_mapper)
+
+
+def from_structured(
+ message: KafkaMessage,
+ event_format: Format,
+ event_factory: Any = CloudEvent,
+) -> BaseCloudEvent:
+ """
+ Parse structured mode Kafka message to CloudEvent.
+
+ Example:
+ from cloudevents.core.formats.json import JSONFormat
+ from cloudevents.core.bindings.kafka import from_structured, KafkaMessage
+
+ message = KafkaMessage(
+ headers={"content-type": b"application/cloudevents+json"},
+ key=None,
+ value=b'{"type":"example","source":"test","data":{"key":"value"}}'
+ )
+ event = from_structured(message, JSONFormat())
+
+ :param message: The Kafka message to parse.
+ :param event_format: The format to use for deserialization.
+ :param event_factory: Factory function to create CloudEvent instances.
+ :return: The parsed CloudEvent.
+ """
+ return _kafka_binding.from_structured(message, event_format, event_factory)
+
+
+def to_binary(
+ event: BaseCloudEvent,
+ event_format: Format,
+ key: Optional[bytes] = None,
+ key_mapper: Optional[Callable[[BaseCloudEvent], Optional[bytes]]] = None,
+) -> KafkaMessage:
+ """
+ Convert CloudEvent to binary mode Kafka message.
+
+ In binary mode, CloudEvents attributes are mapped to Kafka headers with
+ 'ce_' prefix, and the event data is placed in the message value.
+
+ Example:
+ from cloudevents.core.v1.event import CloudEvent
+ from cloudevents.core.formats.json import JSONFormat
+ from cloudevents.core.bindings.kafka import to_binary
+
+ event = CloudEvent(
+ {"type": "example", "source": "test", "partitionkey": "user123"},
+ "Hello World"
+ )
+ message = to_binary(event, JSONFormat())
+ # message.headers = {"ce_type": b"example", "ce_source": b"test", ...}
+ # message.key = b"user123" # from partitionkey
+ # message.value = b"Hello World"
+
+ :param event: The CloudEvent to convert.
+ :param event_format: The format to use for data serialization if needed.
+ :param key: Optional Kafka message key (overrides partitionkey).
+ :param key_mapper: Optional function to derive key from event.
+ :return: KafkaMessage with headers, key, and value.
+ """
+ return _kafka_binding.to_binary(event, event_format, key, key_mapper)
+
+
+def from_binary(
+ message: KafkaMessage,
+ event_format: Format,
+ event_factory: Any = CloudEvent,
+) -> BaseCloudEvent:
+ """
+ Parse binary mode Kafka message to CloudEvent.
+
+ Example:
+ from cloudevents.core.formats.json import JSONFormat
+ from cloudevents.core.bindings.kafka import from_binary, KafkaMessage
+
+ message = KafkaMessage(
+ headers={
+ "ce_type": b"example",
+ "ce_source": b"test",
+ "ce_id": b"123",
+ "ce_specversion": b"1.0"
+ },
+ key=b"user123",
+ value=b"Hello World"
+ )
+ event = from_binary(message, JSONFormat())
+
+ :param message: The Kafka message to parse.
+ :param event_format: The format to use for data deserialization if needed.
+ :param event_factory: Factory function to create CloudEvent instances.
+ :return: The parsed CloudEvent.
+ """
+ return _kafka_binding.from_binary(message, event_format, event_factory)
+
+
+def from_kafka_message(
+ message: KafkaMessage,
+ event_format: Format,
+ event_factory: Any = CloudEvent,
+) -> BaseCloudEvent:
+ """
+ Auto-detect and parse Kafka message to CloudEvent (structured or binary mode).
+
+ This is a convenience function that automatically detects whether the message
+ uses structured or binary content mode and calls the appropriate parser.
+
+ Detection logic:
+ - If any header starts with "ce_", uses binary mode
+ - Otherwise uses structured mode
+
+ Example:
+ from cloudevents.core.formats.json import JSONFormat
+ from cloudevents.core.bindings.kafka import from_kafka_message, KafkaMessage
+
+ # Works with both structured and binary messages
+ message = KafkaMessage(...) # Any Kafka message format
+ event = from_kafka_message(message, JSONFormat())
+
+ :param message: The Kafka message to parse.
+ :param event_format: The format to use for serialization/deserialization.
+ :param event_factory: Factory function to create CloudEvent instances.
+ :return: The parsed CloudEvent.
+ """
+ # Check for binary mode indicators (ce_ prefixed headers)
+ for header_name in message.headers.keys():
+ if header_name.startswith("ce_"):
+ return from_binary(message, event_format, event_factory)
+
+ # Default to structured mode if no ce_ headers found
+ return from_structured(message, event_format, event_factory)
+
+
+# Re-export types for convenience
+__all__ = [
+ "to_structured",
+ "from_structured",
+ "to_binary",
+ "from_binary",
+ "from_kafka_message",
+ "KafkaMessage",
+ "KafkaKey",
+]
diff --git a/src/cloudevents/core/formats/base.py b/src/cloudevents/core/formats/base.py
index e1cf8f7..25bbe5f 100644
--- a/src/cloudevents/core/formats/base.py
+++ b/src/cloudevents/core/formats/base.py
@@ -28,3 +28,42 @@ def read(
) -> BaseCloudEvent: ...
def write(self, event: BaseCloudEvent) -> bytes: ...
+
+ def get_content_type(self) -> str:
+ """
+ Get the content-type string for structured mode serialization.
+
+ :return: The content-type string (e.g., "application/cloudevents+json").
+ """
+ ...
+
+ def write_data(
+ self, data: Optional[Union[dict, str, bytes]], datacontenttype: Optional[str]
+ ) -> bytes:
+ """
+ Serialize event data payload according to its content type.
+
+ This method is used by bindings in binary mode to serialize just the
+ data portion of a CloudEvent, respecting the datacontenttype attribute.
+
+ :param data: The data payload to serialize.
+ :param datacontenttype: The content type of the data.
+ :return: The serialized data as bytes.
+ """
+ ...
+
+ def read_data(
+ self, data: bytes, datacontenttype: Optional[str]
+ ) -> Optional[Union[dict, str, bytes]]:
+ """
+ Deserialize raw data bytes according to content type.
+
+ This method is used by bindings in binary mode to deserialize the
+ data portion of a CloudEvent based on the datacontenttype attribute.
+ It is the inverse operation of write_data().
+
+ :param data: The raw data bytes to deserialize.
+ :param datacontenttype: The content type of the data.
+ :return: The deserialized data (dict, str, or bytes).
+ """
+ ...
diff --git a/src/cloudevents/core/formats/json.py b/src/cloudevents/core/formats/json.py
index f674be0..78ff642 100644
--- a/src/cloudevents/core/formats/json.py
+++ b/src/cloudevents/core/formats/json.py
@@ -102,3 +102,70 @@ def write(self, event: BaseCloudEvent) -> bytes:
event_dict["data"] = str(event_data)
return dumps(event_dict, cls=_JSONEncoderWithDatetime).encode("utf-8")
+
+ def get_content_type(self) -> str:
+ """
+ Get the content-type string for structured mode serialization.
+
+ :return: The content-type string for JSON format.
+ """
+ return self.CONTENT_TYPE
+
+ def write_data(
+ self, data: Optional[Union[dict, str, bytes]], datacontenttype: Optional[str]
+ ) -> bytes:
+ """
+ Serialize event data payload according to its content type.
+
+ This method handles data serialization for binary mode, where only
+ the data portion (not the full event) needs to be serialized.
+
+ :param data: The data payload to serialize.
+ :param datacontenttype: The content type of the data.
+ :return: The serialized data as bytes.
+ """
+ if data is None:
+ return b""
+ elif isinstance(data, bytes):
+ return data
+ elif isinstance(data, str):
+ return data.encode("utf-8")
+ else:
+ # For dict or other structured data types
+ # Check if the content type is JSON-like
+ if datacontenttype and re.match(
+ JSONFormat.JSON_CONTENT_TYPE_PATTERN, datacontenttype
+ ):
+ return dumps(data).encode("utf-8")
+ else:
+ # Fallback to string conversion for non-JSON content types
+ return str(data).encode("utf-8")
+
+ def read_data(
+ self, data: bytes, datacontenttype: Optional[str]
+ ) -> Optional[Union[dict, str, bytes]]:
+ """
+ Deserialize raw data bytes according to content type.
+
+ This method handles data deserialization for binary mode, parsing
+ the data bytes based on the datacontenttype attribute.
+
+ :param data: The raw data bytes to deserialize.
+ :param datacontenttype: The content type of the data.
+ :return: The deserialized data (dict, str, or bytes).
+ """
+ if not data:
+ return None
+
+ content_type = datacontenttype or ""
+
+ # Handle text content types
+ if content_type.startswith("text/"):
+ return data.decode("utf-8")
+
+ # Handle JSON content types
+ if re.match(JSONFormat.JSON_CONTENT_TYPE_PATTERN, content_type):
+ return loads(data.decode("utf-8"))
+
+ # For other content types, return raw bytes
+ return data
diff --git a/tests/test_core/test_bindings/__init__.py b/tests/test_core/test_bindings/__init__.py
new file mode 100644
index 0000000..d35e879
--- /dev/null
+++ b/tests/test_core/test_bindings/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018-Present The CloudEvents Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
\ No newline at end of file
diff --git a/tests/test_core/test_bindings/test_http.py b/tests/test_core/test_bindings/test_http.py
new file mode 100644
index 0000000..28c618e
--- /dev/null
+++ b/tests/test_core/test_bindings/test_http.py
@@ -0,0 +1,442 @@
+# Copyright 2018-Present The CloudEvents Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import json
+from datetime import datetime, timezone
+
+from cloudevents.core.bindings.http import (
+ from_binary,
+ from_http_message,
+ from_structured,
+ to_binary,
+ to_structured,
+)
+from cloudevents.core.formats.json import JSONFormat
+from cloudevents.core.v1.event import CloudEvent
+
+
+def test_to_structured_with_json_format() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "time": datetime(2023, 10, 25, 17, 9, 19, 736166, tzinfo=timezone.utc),
+ "datacontenttype": "application/json",
+ }
+ event = CloudEvent(attributes=attributes, data={"key": "value"})
+ event_format = JSONFormat()
+
+ headers, body = to_structured(event, event_format)
+
+ assert headers["content-type"] == "application/cloudevents+json"
+ parsed_body = json.loads(body)
+ assert parsed_body["id"] == "123"
+ assert parsed_body["source"] == "test/source"
+ assert parsed_body["type"] == "test.type"
+ assert parsed_body["data"] == {"key": "value"}
+
+
+def test_from_structured_with_json_format() -> None:
+ headers = {"content-type": "application/cloudevents+json"}
+ body = b'{"id": "123", "source": "test/source", "type": "test.type", "specversion": "1.0", "data": {"key": "value"}}'
+ event_format = JSONFormat()
+
+ event = from_structured(headers, body, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+ assert event.get_data() == {"key": "value"}
+
+
+def test_to_binary_with_string_data() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "datacontenttype": "text/plain",
+ }
+ event = CloudEvent(attributes=attributes, data="Hello World")
+ event_format = JSONFormat()
+
+ headers, body = to_binary(event, event_format)
+
+ assert headers["ce-id"] == "123"
+ assert headers["ce-source"] == "test%2Fsource" # URL encoded
+ assert headers["ce-type"] == "test.type"
+ assert headers["ce-specversion"] == "1.0"
+ assert headers["content-type"] == "text/plain"
+ assert body == b"Hello World"
+
+
+def test_to_binary_with_json_data() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "datacontenttype": "application/json",
+ }
+ event = CloudEvent(attributes=attributes, data={"key": "value"})
+ event_format = JSONFormat()
+
+ headers, body = to_binary(event, event_format)
+
+ assert headers["ce-id"] == "123"
+ assert headers["content-type"] == "application/json"
+ assert json.loads(body) == {"key": "value"}
+
+
+def test_to_binary_with_bytes_data() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "datacontenttype": "application/octet-stream",
+ }
+ event = CloudEvent(attributes=attributes, data=b"\x00\x01\x02\x03")
+ event_format = JSONFormat()
+
+ headers, body = to_binary(event, event_format)
+
+ assert headers["content-type"] == "application/octet-stream"
+ assert body == b"\x00\x01\x02\x03"
+
+
+def test_from_binary_with_string_data() -> None:
+ headers = {
+ "ce-id": "123",
+ "ce-source": "test%2Fsource", # URL encoded
+ "ce-type": "test.type",
+ "ce-specversion": "1.0",
+ "content-type": "text/plain",
+ }
+ body = b"Hello World"
+ event_format = JSONFormat()
+
+ event = from_binary(headers, body, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source" # Should be decoded
+ assert event.get_type() == "test.type"
+ assert event.get_datacontenttype() == "text/plain"
+ assert event.get_data() == "Hello World"
+
+
+def test_from_binary_with_json_data() -> None:
+ headers = {
+ "ce-id": "123",
+ "ce-source": "test/source",
+ "ce-type": "test.type",
+ "ce-specversion": "1.0",
+ "content-type": "application/json",
+ }
+ body = b'{"key": "value"}'
+ event_format = JSONFormat()
+
+ event = from_binary(headers, body, event_format)
+
+ assert event.get_data() == {"key": "value"}
+
+
+def test_from_binary_with_bytes_data() -> None:
+ headers = {
+ "ce-id": "123",
+ "ce-source": "test/source",
+ "ce-type": "test.type",
+ "ce-specversion": "1.0",
+ "content-type": "application/octet-stream",
+ }
+ body = b"\x00\x01\x02\x03"
+ event_format = JSONFormat()
+
+ event = from_binary(headers, body, event_format)
+
+ assert event.get_data() == b"\x00\x01\x02\x03"
+
+
+def test_to_binary_with_time_attribute() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "time": datetime(2023, 10, 25, 17, 9, 19, 736166, tzinfo=timezone.utc),
+ }
+ event = CloudEvent(attributes=attributes, data=None)
+ event_format = JSONFormat()
+
+ headers, body = to_binary(event, event_format)
+
+ assert headers["ce-time"] == "2023-10-25T17%3A09%3A19.736166Z" # URL encoded
+
+
+def test_from_binary_with_time_attribute() -> None:
+ headers = {
+ "ce-id": "123",
+ "ce-source": "test/source",
+ "ce-type": "test.type",
+ "ce-specversion": "1.0",
+ "ce-time": "2023-10-25T17%3A09%3A19.736166Z", # URL encoded
+ }
+ body = b""
+ event_format = JSONFormat()
+
+ event = from_binary(headers, body, event_format)
+
+ assert event.get_time() == datetime(2023, 10, 25, 17, 9, 19, 736166, tzinfo=timezone.utc)
+
+
+def test_to_binary_with_extension_attributes() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "customext": "custom value",
+ "anotherext": 42,
+ }
+ event = CloudEvent(attributes=attributes, data=None)
+ event_format = JSONFormat()
+
+ headers, body = to_binary(event, event_format)
+
+ assert headers["ce-customext"] == "custom%20value" # Space URL encoded
+ assert headers["ce-anotherext"] == "42"
+
+
+def test_from_binary_with_extension_attributes() -> None:
+ headers = {
+ "ce-id": "123",
+ "ce-source": "test/source",
+ "ce-type": "test.type",
+ "ce-specversion": "1.0",
+ "ce-customext": "custom%20value", # Space URL encoded
+ "ce-anotherext": "42",
+ }
+ body = b""
+ event_format = JSONFormat()
+
+ event = from_binary(headers, body, event_format)
+
+ assert event.get_extension("customext") == "custom value"
+ assert event.get_extension("anotherext") == "42"
+
+
+def test_to_binary_with_no_data() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ }
+ event = CloudEvent(attributes=attributes, data=None)
+ event_format = JSONFormat()
+
+ headers, body = to_binary(event, event_format)
+
+ assert "content-type" not in headers
+ assert body == b""
+
+
+def test_from_binary_with_no_data() -> None:
+ headers = {
+ "ce-id": "123",
+ "ce-source": "test/source",
+ "ce-type": "test.type",
+ "ce-specversion": "1.0",
+ }
+ body = b""
+ event_format = JSONFormat()
+
+ event = from_binary(headers, body, event_format)
+
+ assert event.get_data() is None
+
+
+def test_structured_mode_preserves_all_attributes() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "time": datetime(2023, 10, 25, 17, 9, 19, 736166, tzinfo=timezone.utc),
+ "datacontenttype": "application/json",
+ "dataschema": "http://example.com/schema",
+ "subject": "test/subject",
+ "customext": "value",
+ }
+ event = CloudEvent(attributes=attributes, data={"key": "value"})
+ event_format = JSONFormat()
+
+ headers, body = to_structured(event, event_format)
+ reconstructed = from_structured(headers, body, event_format)
+
+ assert reconstructed.get_id() == event.get_id()
+ assert reconstructed.get_source() == event.get_source()
+ assert reconstructed.get_type() == event.get_type()
+ assert reconstructed.get_time() == event.get_time()
+ assert reconstructed.get_datacontenttype() == event.get_datacontenttype()
+ assert reconstructed.get_dataschema() == event.get_dataschema()
+ assert reconstructed.get_subject() == event.get_subject()
+ assert reconstructed.get_extension("customext") == event.get_extension("customext")
+ assert reconstructed.get_data() == event.get_data()
+
+
+def test_to_binary_with_dict_data_non_json_content_type() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "datacontenttype": "application/xml", # Non-JSON content type
+ }
+ event = CloudEvent(attributes=attributes, data={"key": "value"})
+ event_format = JSONFormat()
+
+ headers, body = to_binary(event, event_format)
+
+ assert headers["content-type"] == "application/xml"
+ # For non-JSON content type, dict data gets converted to its string representation
+ assert body == b"{'key': 'value'}"
+
+
+def test_binary_mode_roundtrip() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source with spaces",
+ "type": "test.type",
+ "specversion": "1.0",
+ "time": datetime(2023, 10, 25, 17, 9, 19, 736166, tzinfo=timezone.utc),
+ "datacontenttype": "text/plain",
+ "customext": "value with special chars: @#$%",
+ }
+ event = CloudEvent(attributes=attributes, data="Test data")
+ event_format = JSONFormat()
+
+ headers, body = to_binary(event, event_format)
+ reconstructed = from_binary(headers, body, event_format)
+
+ assert reconstructed.get_id() == event.get_id()
+ assert reconstructed.get_source() == event.get_source()
+ assert reconstructed.get_type() == event.get_type()
+ assert reconstructed.get_datacontenttype() == event.get_datacontenttype()
+ assert reconstructed.get_extension("customext") == event.get_extension("customext")
+ assert reconstructed.get_data() == event.get_data()
+
+
+def test_from_http_message_detects_structured_mode() -> None:
+ # Message without ce- headers should be detected as structured
+ headers = {"content-type": "application/cloudevents+json"}
+ body = b'{"id": "123", "source": "test/source", "type": "test.type", "specversion": "1.0", "data": {"key": "value"}}'
+ event_format = JSONFormat()
+
+ event = from_http_message(headers, body, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+ assert event.get_data() == {"key": "value"}
+
+
+def test_from_http_message_detects_binary_mode() -> None:
+ # Message with ce- headers should be detected as binary
+ headers = {
+ "ce-id": "123",
+ "ce-source": "test/source",
+ "ce-type": "test.type",
+ "ce-specversion": "1.0",
+ "content-type": "text/plain",
+ }
+ body = b"Hello World"
+ event_format = JSONFormat()
+
+ event = from_http_message(headers, body, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+ assert event.get_datacontenttype() == "text/plain"
+ assert event.get_data() == "Hello World"
+
+
+def test_from_http_message_defaults_to_structured() -> None:
+ # Message without clear indicators should default to structured
+ headers = {"some-other-header": "value"}
+ body = b'{"id": "123", "source": "test/source", "type": "test.type", "specversion": "1.0"}'
+ event_format = JSONFormat()
+
+ event = from_http_message(headers, body, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+
+
+def test_from_http_message_binary_mode_with_mixed_headers() -> None:
+ # If ce- headers exist, should use binary mode regardless of other headers
+ headers = {
+ "content-type": "application/cloudevents+json",
+ "ce-id": "123",
+ "ce-source": "test/source",
+ "ce-type": "test.type",
+ "ce-specversion": "1.0",
+ "ce-datacontenttype": "text/plain",
+ }
+ body = b"Hello World"
+ event_format = JSONFormat()
+
+ event = from_http_message(headers, body, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+ assert event.get_data() == "Hello World"
+
+
+def test_from_http_message_case_insensitive_headers() -> None:
+ # Header detection should be case insensitive (CE-, Ce-, ce-, etc.)
+ headers = {
+ "CE-ID": "123",
+ "Ce-Source": "test/source",
+ "ce-type": "test.type",
+ "CE-SPECVERSION": "1.0",
+ "content-type": "text/plain",
+ }
+ body = b"Hello World"
+ event_format = JSONFormat()
+
+ event = from_http_message(headers, body, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+ assert event.get_data() == "Hello World"
+
+
+def test_from_http_message_with_custom_event_factory() -> None:
+ # Should work with custom event factory in structured mode
+ headers = {"content-type": "application/cloudevents+json"}
+ body = b'{"id": "123", "source": "test/source", "type": "test.type", "specversion": "1.0"}'
+ event_format = JSONFormat()
+
+ event = from_http_message(headers, body, event_format, CloudEvent)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
\ No newline at end of file
diff --git a/tests/test_core/test_bindings/test_kafka.py b/tests/test_core/test_bindings/test_kafka.py
new file mode 100644
index 0000000..8cbdcc5
--- /dev/null
+++ b/tests/test_core/test_bindings/test_kafka.py
@@ -0,0 +1,487 @@
+# Copyright 2018-Present The CloudEvents Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import json
+from datetime import datetime, timezone
+
+from cloudevents.core.base import BaseCloudEvent
+from cloudevents.core.bindings.kafka import (
+ KafkaMessage,
+ from_binary,
+ from_kafka_message,
+ from_structured,
+ to_binary,
+ to_structured,
+)
+from cloudevents.core.formats.json import JSONFormat
+from cloudevents.core.v1.event import CloudEvent
+
+
+def test_to_structured_with_json_format() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "time": datetime(2023, 10, 25, 17, 9, 19, 736166, tzinfo=timezone.utc),
+ "datacontenttype": "application/json",
+ }
+ event = CloudEvent(attributes=attributes, data={"key": "value"})
+ event_format = JSONFormat()
+
+ message = to_structured(event, event_format)
+
+ assert message.headers == {"content-type": b"application/cloudevents+json"}
+ assert message.key is None
+ parsed_value = json.loads(message.value)
+ assert parsed_value["id"] == "123"
+ assert parsed_value["source"] == "test/source"
+ assert parsed_value["type"] == "test.type"
+ assert parsed_value["data"] == {"key": "value"}
+
+
+def test_to_structured_with_key() -> None:
+ event = CloudEvent(
+ {"id": "123", "type": "test", "source": "test", "specversion": "1.0"}, None
+ )
+ event_format = JSONFormat()
+
+ message = to_structured(event, event_format, key=b"my-key")
+
+ assert message.key == b"my-key"
+
+
+def test_to_structured_with_key_mapper() -> None:
+ event = CloudEvent(
+ {
+ "id": "123",
+ "type": "test",
+ "source": "test",
+ "specversion": "1.0",
+ "subject": "user123",
+ },
+ None,
+ )
+ event_format = JSONFormat()
+
+ def key_mapper(e: BaseCloudEvent) -> bytes:
+ return str(e.get_subject()).encode("utf-8")
+
+ message = to_structured(event, event_format, key_mapper=key_mapper)
+
+ assert message.key == b"user123"
+
+
+def test_from_structured_with_json_format() -> None:
+ headers = {"content-type": b"application/cloudevents+json"}
+ value = b'{"id": "123", "source": "test/source", "type": "test.type", "specversion": "1.0", "data": {"key": "value"}}'
+ message = KafkaMessage(headers=headers, key=None, value=value)
+ event_format = JSONFormat()
+
+ event = from_structured(message, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+ assert event.get_data() == {"key": "value"}
+
+
+def test_to_binary_with_string_data() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "datacontenttype": "text/plain",
+ }
+ event = CloudEvent(attributes=attributes, data="Hello World")
+ event_format = JSONFormat()
+
+ message = to_binary(event, event_format)
+
+ # Check headers
+ assert message.headers["ce_id"] == b"123"
+ assert message.headers["ce_source"] == b"test/source"
+ assert message.headers["ce_type"] == b"test.type"
+ assert message.headers["ce_specversion"] == b"1.0"
+ assert message.headers["ce_datacontenttype"] == b"text/plain"
+
+ # Check data
+ assert message.value == b"Hello World"
+ assert message.key is None
+
+
+def test_to_binary_with_json_data() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "datacontenttype": "application/json",
+ }
+ event = CloudEvent(attributes=attributes, data={"key": "value"})
+ event_format = JSONFormat()
+
+ message = to_binary(event, event_format)
+
+ assert message.headers["ce_datacontenttype"] == b"application/json"
+ assert json.loads(message.value) == {"key": "value"}
+
+
+def test_to_binary_with_partitionkey() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "partitionkey": "user123",
+ }
+ event = CloudEvent(attributes=attributes, data="test")
+ event_format = JSONFormat()
+
+ message = to_binary(event, event_format)
+
+ # partitionkey should be used as message key
+ assert message.key == b"user123"
+
+ # partitionkey should also be in headers
+ assert message.headers["ce_partitionkey"] == b"user123"
+
+
+def test_to_binary_with_explicit_key_overrides_partitionkey() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "partitionkey": "user123",
+ }
+ event = CloudEvent(attributes=attributes, data="test")
+ event_format = JSONFormat()
+
+ message = to_binary(event, event_format, key=b"override-key")
+
+ assert message.key == b"override-key"
+
+
+def test_from_binary_with_string_data() -> None:
+ headers = {
+ "ce_id": b"123",
+ "ce_source": b"test/source",
+ "ce_type": b"test.type",
+ "ce_specversion": b"1.0",
+ "ce_datacontenttype": b"text/plain",
+ }
+ message = KafkaMessage(headers=headers, key=None, value=b"Hello World")
+ event_format = JSONFormat()
+
+ event = from_binary(message, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+ assert event.get_datacontenttype() == "text/plain"
+ assert event.get_data() == "Hello World"
+
+
+def test_from_binary_with_json_data() -> None:
+ headers = {
+ "ce_id": b"123",
+ "ce_source": b"test/source",
+ "ce_type": b"test.type",
+ "ce_specversion": b"1.0",
+ "ce_datacontenttype": b"application/json",
+ }
+ message = KafkaMessage(headers=headers, key=None, value=b'{"key": "value"}')
+ event_format = JSONFormat()
+
+ event = from_binary(message, event_format)
+
+ assert event.get_data() == {"key": "value"}
+
+
+def test_from_binary_with_bytes_data() -> None:
+ headers = {
+ "ce_id": b"123",
+ "ce_source": b"test/source",
+ "ce_type": b"test.type",
+ "ce_specversion": b"1.0",
+ "ce_datacontenttype": b"application/octet-stream",
+ }
+ message = KafkaMessage(headers=headers, key=None, value=b"\x00\x01\x02\x03")
+ event_format = JSONFormat()
+
+ event = from_binary(message, event_format)
+
+ assert event.get_data() == b"\x00\x01\x02\x03"
+
+
+def test_to_binary_with_time_attribute() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "time": datetime(2023, 10, 25, 17, 9, 19, 736166, tzinfo=timezone.utc),
+ }
+ event = CloudEvent(attributes=attributes, data=None)
+ event_format = JSONFormat()
+
+ message = to_binary(event, event_format)
+
+ assert message.headers["ce_time"] == b"2023-10-25T17:09:19.736166Z"
+
+
+def test_from_binary_with_time_attribute() -> None:
+ headers = {
+ "ce_id": b"123",
+ "ce_source": b"test/source",
+ "ce_type": b"test.type",
+ "ce_specversion": b"1.0",
+ "ce_time": b"2023-10-25T17:09:19.736166Z",
+ }
+ message = KafkaMessage(headers=headers, key=None, value=b"")
+ event_format = JSONFormat()
+
+ event = from_binary(message, event_format)
+
+ assert event.get_time() == datetime(
+ 2023, 10, 25, 17, 9, 19, 736166, tzinfo=timezone.utc
+ )
+
+
+def test_to_binary_with_extension_attributes() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "customext": "custom value",
+ "anotherext": 42,
+ }
+ event = CloudEvent(attributes=attributes, data=None)
+ event_format = JSONFormat()
+
+ message = to_binary(event, event_format)
+
+ assert message.headers["ce_customext"] == b"custom value"
+ assert message.headers["ce_anotherext"] == b"42"
+
+
+def test_from_binary_with_extension_attributes() -> None:
+ headers = {
+ "ce_id": b"123",
+ "ce_source": b"test/source",
+ "ce_type": b"test.type",
+ "ce_specversion": b"1.0",
+ "ce_customext": b"custom value",
+ "ce_anotherext": b"42",
+ }
+ message = KafkaMessage(headers=headers, key=None, value=b"")
+ event_format = JSONFormat()
+
+ event = from_binary(message, event_format)
+
+ assert event.get_extension("customext") == "custom value"
+ assert event.get_extension("anotherext") == "42"
+
+
+def test_to_binary_with_no_data() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ }
+ event = CloudEvent(attributes=attributes, data=None)
+ event_format = JSONFormat()
+
+ message = to_binary(event, event_format)
+
+ assert message.value == b""
+
+
+def test_from_binary_with_no_data() -> None:
+ headers = {
+ "ce_id": b"123",
+ "ce_source": b"test/source",
+ "ce_type": b"test.type",
+ "ce_specversion": b"1.0",
+ }
+ message = KafkaMessage(headers=headers, key=None, value=b"")
+ event_format = JSONFormat()
+
+ event = from_binary(message, event_format)
+
+ assert event.get_data() is None
+
+
+def test_structured_mode_roundtrip() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source",
+ "type": "test.type",
+ "specversion": "1.0",
+ "time": datetime(2023, 10, 25, 17, 9, 19, 736166, tzinfo=timezone.utc),
+ "datacontenttype": "application/json",
+ "dataschema": "http://example.com/schema",
+ "subject": "test/subject",
+ "customext": "value",
+ }
+ event = CloudEvent(attributes=attributes, data={"key": "value"})
+ event_format = JSONFormat()
+
+ message = to_structured(event, event_format, key=b"test-key")
+ reconstructed = from_structured(message, event_format)
+
+ assert reconstructed.get_id() == event.get_id()
+ assert reconstructed.get_source() == event.get_source()
+ assert reconstructed.get_type() == event.get_type()
+ assert reconstructed.get_time() == event.get_time()
+ assert reconstructed.get_datacontenttype() == event.get_datacontenttype()
+ assert reconstructed.get_dataschema() == event.get_dataschema()
+ assert reconstructed.get_subject() == event.get_subject()
+ assert reconstructed.get_extension("customext") == event.get_extension("customext")
+ assert reconstructed.get_data() == event.get_data()
+
+
+def test_binary_mode_roundtrip() -> None:
+ attributes = {
+ "id": "123",
+ "source": "test/source with spaces",
+ "type": "test.type",
+ "specversion": "1.0",
+ "time": datetime(2023, 10, 25, 17, 9, 19, 736166, tzinfo=timezone.utc),
+ "datacontenttype": "text/plain",
+ "customext": "value with special chars: @#$%",
+ "partitionkey": "partition123",
+ }
+ event = CloudEvent(attributes=attributes, data="Test data")
+ event_format = JSONFormat()
+
+ message = to_binary(event, event_format)
+ reconstructed = from_binary(message, event_format)
+
+ assert reconstructed.get_id() == event.get_id()
+ assert reconstructed.get_source() == event.get_source()
+ assert reconstructed.get_type() == event.get_type()
+ assert reconstructed.get_datacontenttype() == event.get_datacontenttype()
+ assert reconstructed.get_extension("customext") == event.get_extension("customext")
+ assert reconstructed.get_extension("partitionkey") == event.get_extension(
+ "partitionkey"
+ )
+ assert reconstructed.get_data() == event.get_data()
+ # Key is derived from partitionkey
+ assert message.key == b"partition123"
+
+
+def test_from_kafka_message_detects_structured_mode() -> None:
+ # Message without ce_ headers should be detected as structured
+ headers = {"content-type": b"application/cloudevents+json"}
+ value = b'{"id": "123", "source": "test/source", "type": "test.type", "specversion": "1.0", "data": {"key": "value"}}'
+ message = KafkaMessage(headers=headers, key=None, value=value)
+ event_format = JSONFormat()
+
+ event = from_kafka_message(message, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+ assert event.get_data() == {"key": "value"}
+
+
+def test_from_kafka_message_detects_binary_mode() -> None:
+ # Message with ce_ headers should be detected as binary
+ headers = {
+ "ce_id": b"123",
+ "ce_source": b"test/source",
+ "ce_type": b"test.type",
+ "ce_specversion": b"1.0",
+ "ce_datacontenttype": b"text/plain",
+ }
+ message = KafkaMessage(headers=headers, key=None, value=b"Hello World")
+ event_format = JSONFormat()
+
+ event = from_kafka_message(message, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+ assert event.get_datacontenttype() == "text/plain"
+ assert event.get_data() == "Hello World"
+
+
+def test_from_kafka_message_defaults_to_structured() -> None:
+ # Message without clear indicators should default to structured
+ headers = {"some-other-header": b"value"}
+ value = b'{"id": "123", "source": "test/source", "type": "test.type", "specversion": "1.0"}'
+ message = KafkaMessage(headers=headers, key=None, value=value)
+ event_format = JSONFormat()
+
+ event = from_kafka_message(message, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+
+
+def test_from_kafka_message_binary_mode_with_mixed_headers() -> None:
+ # If ce_ headers exist, should use binary mode regardless of other headers
+ headers = {
+ "content-type": b"application/cloudevents+json",
+ "ce_id": b"123",
+ "ce_source": b"test/source",
+ "ce_type": b"test.type",
+ "ce_specversion": b"1.0",
+ "ce_datacontenttype": b"text/plain",
+ }
+ value = b"Hello World"
+ message = KafkaMessage(headers=headers, key=None, value=value)
+ event_format = JSONFormat()
+
+ event = from_kafka_message(message, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+ assert event.get_data() == "Hello World"
+
+
+def test_from_kafka_message_structured_with_various_headers() -> None:
+ # Structured mode should work with any non-ce_ headers
+ headers = {"custom-header": b"value", "another-header": b"data"}
+ value = b'{"id": "123", "source": "test/source", "type": "test.type", "specversion": "1.0"}'
+ message = KafkaMessage(headers=headers, key=None, value=value)
+ event_format = JSONFormat()
+
+ event = from_kafka_message(message, event_format)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
+
+
+def test_from_kafka_message_with_custom_event_factory() -> None:
+ # Should work with custom event factory in structured mode
+ headers = {"content-type": b"application/cloudevents+json"}
+ value = b'{"id": "123", "source": "test/source", "type": "test.type", "specversion": "1.0"}'
+ message = KafkaMessage(headers=headers, key=None, value=value)
+ event_format = JSONFormat()
+
+ event = from_kafka_message(message, event_format, CloudEvent)
+
+ assert event.get_id() == "123"
+ assert event.get_source() == "test/source"
+ assert event.get_type() == "test.type"
diff --git a/tests/test_core/test_format/test_json.py b/tests/test_core/test_format/test_json.py
index 12f7543..31c835c 100644
--- a/tests/test_core/test_format/test_json.py
+++ b/tests/test_core/test_format/test_json.py
@@ -323,3 +323,142 @@ def test_read_cloud_event_from_string_input() -> None:
assert result.get_id() == "123"
assert result.get_source() == "source"
+
+
+def test_get_content_type() -> None:
+ formatter = JSONFormat()
+ assert formatter.get_content_type() == "application/cloudevents+json"
+
+
+def test_write_data_with_none() -> None:
+ formatter = JSONFormat()
+ result = formatter.write_data(None, "application/json")
+ assert result == b""
+
+
+def test_write_data_with_bytes() -> None:
+ formatter = JSONFormat()
+ result = formatter.write_data(b"\x00\x01\x02\x03", "application/octet-stream")
+ assert result == b"\x00\x01\x02\x03"
+
+
+def test_write_data_with_string() -> None:
+ formatter = JSONFormat()
+ result = formatter.write_data("Hello World", "text/plain")
+ assert result == b"Hello World"
+
+
+def test_write_data_with_dict_and_json_content_type() -> None:
+ formatter = JSONFormat()
+ result = formatter.write_data({"key": "value"}, "application/json")
+ assert result == b'{"key": "value"}'
+
+
+def test_write_data_with_dict_and_custom_json_content_type() -> None:
+ formatter = JSONFormat()
+ result = formatter.write_data({"key": "value"}, "application/vnd.api+json")
+ assert result == b'{"key": "value"}'
+
+
+def test_write_data_with_dict_and_non_json_content_type() -> None:
+ formatter = JSONFormat()
+ result = formatter.write_data({"key": "value"}, "text/plain")
+ # Should fallback to string conversion
+ assert result == b"{'key': 'value'}"
+
+
+def test_write_data_with_dict_and_no_content_type() -> None:
+ formatter = JSONFormat()
+ result = formatter.write_data({"key": "value"}, None)
+ # Should fallback to string conversion when content type is None
+ assert result == b"{'key': 'value'}"
+
+
+def test_read_data_with_empty_bytes() -> None:
+ formatter = JSONFormat()
+ result = formatter.read_data(b"", "application/json")
+ assert result is None
+
+
+def test_read_data_with_text_content_type() -> None:
+ formatter = JSONFormat()
+ result = formatter.read_data(b"Hello World", "text/plain")
+ assert result == "Hello World"
+
+
+def test_read_data_with_json_content_type() -> None:
+ formatter = JSONFormat()
+ result = formatter.read_data(b'{"key": "value"}', "application/json")
+ assert result == {"key": "value"}
+
+
+def test_read_data_with_custom_json_content_type() -> None:
+ formatter = JSONFormat()
+ result = formatter.read_data(
+ b'{"key": "value"}', "application/vnd.api+json"
+ )
+ assert result == {"key": "value"}
+
+
+def test_read_data_with_json_charset() -> None:
+ formatter = JSONFormat()
+ result = formatter.read_data(
+ b'{"key": "value"}', "application/json; charset=utf-8"
+ )
+ assert result == {"key": "value"}
+
+
+def test_read_data_with_bytes_content_type() -> None:
+ formatter = JSONFormat()
+ result = formatter.read_data(b"\x00\x01\x02\x03", "application/octet-stream")
+ assert result == b"\x00\x01\x02\x03"
+
+
+def test_read_data_with_no_content_type() -> None:
+ formatter = JSONFormat()
+ result = formatter.read_data(b"\x00\x01\x02\x03", None)
+ # Should return raw bytes when content type is None
+ assert result == b"\x00\x01\x02\x03"
+
+
+def test_read_data_with_xml_content_type() -> None:
+ formatter = JSONFormat()
+ result = formatter.read_data(b"data", "application/xml")
+ # Non-JSON, non-text content types return raw bytes
+ assert result == b"data"
+
+
+def test_read_data_write_data_roundtrip_json() -> None:
+ """Test that write_data and read_data are inverse operations for JSON."""
+ formatter = JSONFormat()
+ original_data = {"key": "value", "nested": {"foo": "bar"}}
+
+ # Write then read
+ serialized = formatter.write_data(original_data, "application/json")
+ deserialized = formatter.read_data(serialized, "application/json")
+
+ assert deserialized == original_data
+
+
+def test_read_data_write_data_roundtrip_text() -> None:
+ """Test that write_data and read_data are inverse operations for text."""
+ formatter = JSONFormat()
+ original_data = "Hello World with unicode: 世界"
+
+ # Write then read
+ serialized = formatter.write_data(original_data, "text/plain")
+ deserialized = formatter.read_data(serialized, "text/plain")
+
+ assert deserialized == original_data
+
+
+def test_read_data_write_data_roundtrip_bytes() -> None:
+ """Test that write_data and read_data are inverse operations for bytes."""
+ formatter = JSONFormat()
+ original_data = b"\x00\x01\x02\x03\xff\xfe"
+
+ # Write then read
+ serialized = formatter.write_data(original_data, "application/octet-stream")
+ deserialized = formatter.read_data(serialized, "application/octet-stream")
+
+ assert deserialized == original_data