-
Notifications
You must be signed in to change notification settings - Fork 6
/
options.py
162 lines (120 loc) · 4.24 KB
/
options.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from enum import Enum
from pydantic import Field, root_validator
from onetl.impl import GenericOptions
PROHIBITED_OPTIONS = frozenset(
(
"assign",
"endingOffsets",
"endingOffsetsByTimestamp",
"endingTimestamp",
"kafka.*",
"startingOffsets",
"startingOffsetsByTimestamp",
"startingTimestamp",
"subscribe",
"subscribePattern",
"topic",
),
)
KNOWN_READ_OPTIONS = frozenset(
(
"failOnDataLoss",
"fetchOffset.numRetries",
"fetchOffset.retryIntervalMs",
"groupIdPrefix",
"kafkaConsumer.pollTimeoutMs",
"maxOffsetsPerTrigger",
"maxTriggerDelay",
"minOffsetsPerTrigger",
"minPartitions",
),
)
class KafkaTopicExistBehaviorKafka(str, Enum):
ERROR = "error"
APPEND = "append"
def __str__(self) -> str:
return str(self.value)
class KafkaReadOptions(GenericOptions):
"""Reading options for Kafka connector.
.. note ::
You can pass any value
`supported by connector <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_,
even if it is not mentioned in this documentation.
The set of supported options depends on connector version. See link above.
.. warning::
Options:
* ``assign``
* ``endingOffsets``
* ``endingOffsetsByTimestamp``
* ``kafka.*``
* ``startingOffsets``
* ``startingOffsetsByTimestamp``
* ``startingTimestamp``
* ``subscribe``
* ``subscribePattern``
are populated from connection attributes, and cannot be set in ``KafkaReadOptions`` class and be overridden
by the user to avoid issues.
Examples
--------
Read options initialization
.. code:: python
options = Kafka.ReadOptions(
include_headers=False,
minPartitions=50,
)
"""
include_headers: bool = Field(default=False, alias="includeHeaders")
"""
If ``True``, add ``headers`` column to output DataFrame.
If ``False``, column will not be added.
"""
class Config:
prohibited_options = PROHIBITED_OPTIONS
known_options = KNOWN_READ_OPTIONS
extra = "allow"
class KafkaWriteOptions(GenericOptions):
"""Writing options for Kafka connector.
.. note ::
You can pass any value
`supported by connector <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_,
even if it is not mentioned in this documentation.
The set of supported options depends on connector version. See link above.
.. warning::
Options:
* ``kafka.*``
* ``topic``
are populated from connection attributes, and cannot be set in ``KafkaWriteOptions`` class and be overridden
by the user to avoid issues.
Examples
--------
Write options initialization
.. code:: python
options = Kafka.WriteOptions(
if_exists="append",
include_headers=True,
)
"""
if_exists: KafkaTopicExistBehaviorKafka = Field(default=KafkaTopicExistBehaviorKafka.APPEND)
"""Behavior of writing data into existing topic.
Same as ``df.write.mode(...)``.
Possible values:
* ``append`` (default) - Adds new objects into existing topic.
* ``error`` - Raises an error if topic already exists.
"""
include_headers: bool = Field(default=False, alias="includeHeaders")
"""
If ``True``, ``headers`` column from dataframe can be written to Kafka (requires Kafka 2.0+).
If ``False`` and dataframe contains ``headers`` column, an exception will be raised.
"""
class Config:
prohibited_options = PROHIBITED_OPTIONS | KNOWN_READ_OPTIONS
known_options: frozenset[str] = frozenset()
extra = "allow"
@root_validator(pre=True)
def _mode_is_restricted(cls, values):
if "mode" in values:
raise ValueError("Parameter `mode` is not allowed. Please use `if_exists` parameter instead.")
return values