Skip to content

Commit

Permalink
[DOP-8208] Detect includeHeaders value automatically during write
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Aug 18, 2023
1 parent 1eeb0be commit 9363298
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 102 deletions.
3 changes: 3 additions & 0 deletions docs/changelog/next_release/131.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Automatically detect value of ``includeHeaders`` option for Kafka during write process, based on presence of ``headers`` column in the input DataFrame.

Passing ``includeHeaders`` option explicitly is prohibited.
1 change: 1 addition & 0 deletions docs/changelog/next_release/131.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add notes about reading and writing to Kafka to documentation
11 changes: 8 additions & 3 deletions docs/connection/db_connection/kafka/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ Kafka

.. toctree::
:maxdepth: 1
:caption: Connection & options
:caption: Connections

connection
read_options
write_options

.. toctree::
:maxdepth: 1
Expand All @@ -26,6 +24,13 @@ Kafka
kerberos_auth
scram_auth

.. toctree::
:maxdepth: 1
:caption: Operations

read
write

.. toctree::
:maxdepth: 1
:caption: For developers
Expand Down
72 changes: 72 additions & 0 deletions docs/connection/db_connection/kafka/read.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
.. _kafka-read:

Reading from Kafka
==================

For reading data from Kafka, use :obj:`DBReader <onetl.db.db_reader.db_reader.DBReader>` with specific options (see below).

.. warning::

For onETL 0.9.0, Kafka does not support :ref:`strategy`. You can only read the whole topic.

.. note::

Unlike other connection classes, Kafka always return dataframe with fixed schema
(see `documentation <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_):

.. dropdown:: DataFrame Schema

.. code:: python
from pyspark.sql.types import (
ArrayType,
BinaryType,
IntegerType,
LongType,
StringType,
StructField,
StructType,
TimestampType,
)
schema = StructType(
[
StructField("value", BinaryType(), nullable=True),
StructField("key", BinaryType(), nullable=True),
StructField("topic", StringType(), nullable=False),
StructField("partition", IntegerType(), nullable=False),
StructField("offset", LongType(), nullable=False),
StructField("timestamp", TimestampType(), nullable=False),
StructField("timestampType", IntegerType(), nullable=False),
# this field is returned only with ``include_headers=True``
StructField(
"headers",
ArrayType(
StructType(
[
StructField("key", StringType(), nullable=False),
StructField("value", BinaryType(), nullable=True),
],
),
),
nullable=True,
),
],
)
.. warning::

Columns:

* ``value``
* ``key``
* ``headers[*].value``

are always returned as raw bytes. If they contain values of custom type, these values should be deserialized manually.

.. currentmodule:: onetl.connection.db_connection.kafka.options

.. autopydantic_model:: KafkaReadOptions
:member-order: bysource
:model-show-field-summary: false
:field-show-constraints: false
11 changes: 0 additions & 11 deletions docs/connection/db_connection/kafka/read_options.rst

This file was deleted.

64 changes: 64 additions & 0 deletions docs/connection/db_connection/kafka/write.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
.. _kafka-write:

Writing to Kafka
================

For writing data to Kafka, use :obj:`DBWriter <onetl.db.db_writer.db_writer.DBWriter>` with specific options (see below).

.. note::

Unlike other connection classes, Kafka only accepts dataframe with fixed schema
(see `documentation <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_):

.. dropdown:: DataFrame Schema

.. code:: python
from pyspark.sql.types import (
ArrayType,
BinaryType,
IntegerType,
StringType,
StructField,
StructType,
)
schema = StructType(
[
# mandatory fields:
StructField("value", BinaryType(), nullable=True),
# optional fields, can be omitted:
StructField("key", BinaryType(), nullable=True),
StructField("partition", IntegerType(), nullable=True),
StructField(
"headers",
ArrayType(
StructType(
[
StructField("key", StringType(), nullable=False),
StructField("value", BinaryType(), nullable=True),
],
),
),
nullable=True,
),
],
)
You cannot pass dataframe with other column names or types.

.. warning::

Columns:

* ``value``
* ``key``
* ``headers[*].value``

can only be string or raw bytes. If they contain values of custom type, these values should be serialized manually.

.. currentmodule:: onetl.connection.db_connection.kafka.options

.. autopydantic_model:: KafkaWriteOptions
:member-order: bysource
:model-show-field-summary: false
:field-show-constraints: false
11 changes: 0 additions & 11 deletions docs/connection/db_connection/kafka/write_options.rst

This file was deleted.

22 changes: 9 additions & 13 deletions onetl/connection/db_connection/kafka/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,28 +286,24 @@ def write_df_to_target(
options: KafkaWriteOptions = KafkaWriteOptions(), # noqa: B008, WPS404
) -> None:
# Check that the DataFrame doesn't contain any columns not in the schema
schema: StructType = self.get_df_schema(target)
required_columns = [field.name for field in schema.fields if not field.nullable]
optional_columns = [field.name for field in schema.fields if field.nullable]
schema_field_names = {field.name for field in schema.fields}
df_column_names = set(df.columns)
if not df_column_names.issubset(schema_field_names):
invalid_columns = df_column_names - schema_field_names
required_columns = {"value"}
optional_columns = {"key", "partition", "headers"}
allowed_columns = required_columns | optional_columns | {"topic"}
df_columns = set(df.columns)
if not df_columns.issubset(allowed_columns):
invalid_columns = df_columns - allowed_columns
raise ValueError(
f"Invalid column names: {invalid_columns}. Expected columns: {required_columns} (required),"
f" {optional_columns} (optional)",
f" {sorted(optional_columns)} (optional)",
)

# Check that the DataFrame doesn't contain a 'headers' column with includeHeaders=False
if not getattr(options, "includeHeaders", True) and "headers" in df.columns:
raise ValueError("Cannot write 'headers' column with kafka.WriteOptions(includeHeaders=False)")

if "topic" in df.columns:
log.warning("The 'topic' column in the DataFrame will be overridden with value %r", target)

write_options = {f"kafka.{key}": value for key, value in self._get_connection_properties().items()}
write_options.update(options.dict(by_alias=True, exclude_none=True, exclude={"if_exists"}))
write_options["topic"] = target
write_options["includeHeaders"] = "headers" in df.columns # magic

# As of Apache Spark version 3.4.1, the mode 'error' is not functioning as expected.
# This issue has been reported and can be tracked at:
Expand Down Expand Up @@ -341,7 +337,7 @@ def get_df_schema(
schema = StructType(
[
StructField("key", BinaryType(), nullable=True),
StructField("value", BinaryType(), nullable=False),
StructField("value", BinaryType(), nullable=True),
StructField("topic", StringType(), nullable=True),
StructField("partition", IntegerType(), nullable=True),
StructField("offset", LongType(), nullable=True),
Expand Down
40 changes: 17 additions & 23 deletions onetl/connection/db_connection/kafka/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from enum import Enum

from pydantic import Field, root_validator
Expand All @@ -34,14 +36,6 @@
),
)

KNOWN_READ_WRITE_OPTIONS = frozenset(
(
# not adding this to class itself because headers support was added to Spark only in 3.0
# https://issues.apache.org/jira/browse/SPARK-23539
"includeHeaders",
),
)

KNOWN_READ_OPTIONS = frozenset(
(
"failOnDataLoss",
Expand All @@ -53,6 +47,7 @@
"maxTriggerDelay",
"minOffsetsPerTrigger",
"minPartitions",
"includeHeaders",
),
)

Expand Down Expand Up @@ -88,9 +83,8 @@ class KafkaReadOptions(GenericOptions):
* ``startingTimestamp``
* ``subscribe``
* ``subscribePattern``
* ``topic``
populated from connection attributes, and cannot be set in ``KafkaReadOptions`` class and be overridden
are populated from connection attributes, and cannot be set in ``KafkaReadOptions`` class and be overridden
by the user to avoid issues.
Examples
Expand All @@ -101,14 +95,21 @@ class KafkaReadOptions(GenericOptions):
.. code:: python
options = Kafka.ReadOptions(
include_headers=True,
minPartitions=50,
includeHeaders=True,
)
"""

include_headers: bool = Field(default=False, alias="includeHeaders")
"""
If ``True``, add ``headers`` column to output DataFrame.
If ``False``, column will not be added.
"""

class Config:
prohibited_options = PROHIBITED_OPTIONS
known_options = KNOWN_READ_OPTIONS | KNOWN_READ_WRITE_OPTIONS
known_options = KNOWN_READ_OPTIONS
extra = "allow"


Expand All @@ -126,20 +127,14 @@ class KafkaWriteOptions(GenericOptions):
.. warning::
Options:
* ``assign``
* ``endingOffsets``
* ``endingOffsetsByTimestamp``
* ``kafka.*``
* ``startingOffsets``
* ``startingOffsetsByTimestamp``
* ``startingTimestamp``
* ``subscribe``
* ``subscribePattern``
* ``topic``
populated from connection attributes, and cannot be set in ``KafkaWriteOptions`` class and be overridden
are populated from connection attributes, and cannot be set in ``KafkaWriteOptions`` class and be overridden
by the user to avoid issues.
Option ``includeHeaders`` is also prohibited because it is determined by presence of ``headers`` column in DataFrame.
Examples
--------
Expand All @@ -149,7 +144,6 @@ class KafkaWriteOptions(GenericOptions):
options = Kafka.WriteOptions(
if_exists="append",
includeHeaders=False,
)
"""

Expand All @@ -165,7 +159,7 @@ class KafkaWriteOptions(GenericOptions):

class Config:
prohibited_options = PROHIBITED_OPTIONS | KNOWN_READ_OPTIONS
known_options = KNOWN_READ_WRITE_OPTIONS
known_options: frozenset[str] = frozenset()
extra = "allow"

@root_validator(pre=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def kafka_schema_with_headers():
],
),
),
nullable=True,
),
],
)
Expand Down
Loading

0 comments on commit 9363298

Please sign in to comment.