Skip to content

Commit

Permalink
Merge pull request #88 from tubular/fix/use-official-kafka-io
Browse files Browse the repository at this point in the history
IMP use official kafka io
  • Loading branch information
srstrickland committed Dec 3, 2021
2 parents cb30255 + 6cb586e commit 89d2f1f
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 129 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Expand Up @@ -89,6 +89,7 @@ services:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_ZOOKEEPER_CONNECT: zookeeper.docker:2181
KAFKA_NUM_PARTITIONS: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
healthcheck:
test: ps ax | grep kafka

Expand Down
23 changes: 9 additions & 14 deletions docs/source/reader_and_writer.rst
Expand Up @@ -74,24 +74,14 @@ Sparkly relies on the official elastic spark connector and was successfully test
Kafka
-----

Sparkly's reader and writer for Kafka are built on top of the official spark package
for Kafka and python library `kafka-python <https://github.com/dpkp/kafka-python>`_ .
The first one allows us to read data efficiently,
the second covers a lack of writing functionality in the official distribution.
Sparkly's reader and writer for Kafka are built on top of the official spark package for Kafka-SQL.

+---------------+------------------------------------------------------------------------------------------+
| Package | https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.4.0 |
| Package | https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11/2.4.0 |
+---------------+------------------------------------------------------------------------------------------+
| Configuration | http://spark.apache.org/docs/2.4.0/streaming-kafka-0-8-integration.html |
| Configuration | https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html |
+---------------+------------------------------------------------------------------------------------------+

.. note::
- To interact with Kafka, ``sparkly`` needs the ``kafka-python`` library. You can get it via:
```
pip install sparkly[kafka]
```
- Sparkly was tested in production using Apache Kafka **0.10.x**.

.. code-block:: python
import json
Expand All @@ -101,7 +91,7 @@ the second covers a lack of writing functionality in the official distribution.
class MySession(SparklySession):
packages = [
'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0',
'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0',
]
spark = MySession()
Expand All @@ -123,15 +113,20 @@ the second covers a lack of writing functionality in the official distribution.
df = hc.read_ext.kafka(
'kafka.host',
topic='my.topic',
# key & value deserialization is optional; if not provided,
# then the user will have to deal with decoding the binary directly.
key_deserializer=lambda item: json.loads(item.decode('utf-8')),
value_deserializer=lambda item: json.loads(item.decode('utf-8')),
# if deserializers are used, the schema must be provided:
schema=df_schema,
)
# To write a dataframe to Kafka in JSON format:
df.write_ext.kafka(
'kafka.host',
topic='my.topic',
# key & value serialization is optional; if not provided,
# the `key` and `value` columns MUST already be StringType or BinaryType
key_serializer=lambda item: json.dumps(item).encode('utf-8'),
value_serializer=lambda item: json.dumps(item).encode('utf-8'),
)
Expand Down
1 change: 0 additions & 1 deletion requirements_dev.txt
Expand Up @@ -15,7 +15,6 @@
#

mock==1.3.0
pyspark==2.4.0
pytest==6.2.5
pytest-cov==3.0.0
Sphinx==4.2.0
Expand Down
1 change: 0 additions & 1 deletion setup.py
Expand Up @@ -89,7 +89,6 @@
# https://packaging.python.org/en/latest/requirements.html
install_requires=requirements,
extras_require={
'kafka': ['kafka-python>=2.0.2,<2.1'],
'redis': ['redis>=2.10,<3', 'ujson>=1.33,<2'],
'test': ['cassandra-driver>=3.7,<3.8', 'PyMySQL>=0.7,<0.10', 'kafka-python>=2.0.2,<2.1', 'redis>=2.10,<3', 'ujson>=1.33,<2'],
},
Expand Down
105 changes: 74 additions & 31 deletions sparkly/reader.py
Expand Up @@ -14,6 +14,10 @@
# limitations under the License.
#

import json

from pyspark.sql import functions as F

from sparkly.exceptions import InvalidArgumentError
from sparkly.utils import kafka_get_topics_offsets

Expand Down Expand Up @@ -213,71 +217,110 @@ def kafka(self,
schema=None,
port=9092,
parallelism=None,
options=None):
options=None,
include_meta_cols=None):
"""Creates dataframe from specified set of messages from Kafka topic.
Defining ranges:
- If `offset_ranges` is specified it defines which specific range to read.
- If `offset_ranges` is omitted it will auto-discover it's partitions.
The `schema` parameter, if specified, should contain two top level fields:
`key` and `value`.
`key` and `value`. It is only required if deserializers are used.
Parameters `key_deserializer` and `value_deserializer` are callables
which get bytes as input and should return python structures as output.
Args:
host (str): Kafka host.
topic (str|None): Kafka topic to read from.
topic (str|List[str]|None): Kafka topic(s) to read from.
offset_ranges (list[(int, int, int)]|None): List of partition ranges
[(partition, start_offset, end_offset)].
key_deserializer (function): Function used to deserialize the key.
value_deserializer (function): Function used to deserialize the value.
schema (pyspark.sql.types.StructType): Schema to apply to create a Dataframe.
port (int): Kafka port.
parallelism (int|None): The max number of parallel tasks that could be executed
during the read stage (see :ref:`controlling-the-load`).
options (dict|None): Additional kafka parameters, see KafkaUtils.createRDD docs.
include_meta_cols (bool|None): If true, also return "metadata" columns
like offset, topic, etc.
Returns:
pyspark.sql.DataFrame
Raises:
InvalidArgumentError
"""
try:
from pyspark.streaming.kafka import KafkaUtils, OffsetRange
except ImportError:
raise NotImplementedError('Reading from kafka not supported')

if not key_deserializer or not value_deserializer or not schema:
raise InvalidArgumentError('You should specify all of parameters:'
'`key_deserializer`, `value_deserializer` and `schema`')

kafka_params = {
'metadata.broker.list': '{}:{}'.format(host, port),
}
if isinstance(topic, str):
topic = [topic]

if options:
kafka_params.update(options)
reader = (
self._spark.read.format('kafka')
.option('kafka.bootstrap.servers', f'{host}:{port}')
.option('subscribe', ','.join(topic))
)

if not offset_ranges:
offset_ranges = kafka_get_topics_offsets(host, topic, port)
def get_offsets(offsets, which):
return {
offset[0]: offset[which]
for offset in offsets
}

if offset_ranges:
if len(topic) > 1:
raise InvalidArgumentError(
'Specifying offset_ranges for multiple topics is not currently supported; '
'please specify options "startingOffsets" and "endingOffsets" manually'
)
starting_offsets = json.dumps({t: get_offsets(offset_ranges, 1) for t in topic})
ending_offsets = json.dumps({t: get_offsets(offset_ranges, 2) for t in topic})
reader = (
reader
.option('startingOffsets', starting_offsets)
.option('endingOffsets', ending_offsets)
)

offset_ranges = [OffsetRange(topic, partition, start_offset, end_offset)
for partition, start_offset, end_offset in offset_ranges]
for key, value in (options or {}).items():
reader = reader.option(key, value)

df = reader.load()

def get_schema(field):
if schema is None:
raise InvalidArgumentError(
'Cannot use a deserializer without specifying schema'
)
candidates = [x for x in schema.fields if x.name == field]
if not candidates:
raise InvalidArgumentError(
f'Cannot find field: {field} in schema: {schema.simpleString()}'
)
result = candidates[0].dataType
return result

if key_deserializer is not None:
df = df.withColumn(
'key',
F.udf(
key_deserializer,
returnType=get_schema('key'),
)(F.col('key')),
)
if value_deserializer is not None:
df = df.withColumn(
'value',
F.udf(
value_deserializer,
returnType=get_schema('value'),
)(F.col('value')),
)

rdd = KafkaUtils.createRDD(self._spark.sparkContext,
kafkaParams=kafka_params,
offsetRanges=offset_ranges or [],
keyDecoder=key_deserializer,
valueDecoder=value_deserializer,
)
if not include_meta_cols:
df = df.select('key', 'value')

if parallelism:
rdd = rdd.coalesce(parallelism)
df = df.coalesce(parallelism)

return self._spark.createDataFrame(rdd, schema=schema)
return df

def _basic_read(self, reader_options, additional_options, parallelism):
reader_options.update(additional_options or {})
Expand Down
57 changes: 27 additions & 30 deletions sparkly/writer.py
Expand Up @@ -19,13 +19,6 @@
except ImportError:
from urlparse import urlparse, parse_qsl

try:
from kafka import KafkaProducer
except ImportError:
KAFKA_WRITER_SUPPORT = False
else:
KAFKA_WRITER_SUPPORT = True

try:
import redis
import ujson
Expand Down Expand Up @@ -239,35 +232,39 @@ def kafka(self,
during the write stage (see :ref:`controlling-the-load`).
options (dict|None): Additional options.
"""
if not KAFKA_WRITER_SUPPORT:
raise NotImplementedError('kafka-python package isn\'t available. '
'Use pip install sparkly[kafka] to fix it.')

def write_partition_to_kafka(messages):
producer = KafkaProducer(
bootstrap_servers=['{}:{}'.format(host, port)],
key_serializer=key_serializer,
value_serializer=value_serializer,
)
for message in messages:
as_dict = message.asDict(recursive=True)
try:
result = producer.send(topic, key=as_dict['key'], value=as_dict['value'])
except Exception as exc:
raise WriteError('Error publishing to kafka: {}'.format(exc))

if result.failed():
raise WriteError('Error publishing to kafka: {}'.format(result.exception))

producer.flush()
producer.close()
def get_serializer_udf(serializer_func):

rdd = self._df.rdd
@F.udf(returnType=T.BinaryType())
def serializer(value):
if isinstance(value, T.Row):
value = value.asDict(recursive=True)
return serializer_func(value)

return serializer

df = self._df
if key_serializer:
key_serializer = get_serializer_udf(key_serializer)
df = df.withColumn('key', key_serializer(F.col('key')))
if value_serializer:
value_serializer = get_serializer_udf(value_serializer)
df = df.withColumn('value', value_serializer(F.col('value')))

if parallelism:
rdd = rdd.coalesce(parallelism)
df = df.coalesce(parallelism)

writer = (
df.write.format('kafka')
.option('kafka.bootstrap.servers', f'{host}:{port}')
.option('topic', topic)
)
options = options or {}
for key, value in options.items():
writer = writer.option(key, value)

rdd.foreachPartition(write_partition_to_kafka)
return writer.save()

def redis(self,
key_by,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/base.py
Expand Up @@ -26,7 +26,7 @@ class SparklyTestSession(SparklySession):
packages = [
'com.datastax.spark:spark-cassandra-connector_2.11:2.4.0',
'org.elasticsearch:elasticsearch-spark-20_2.11:7.3.0',
'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0',
'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0',
'mysql:mysql-connector-java:6.0.6',
'io.confluent:kafka-avro-serializer:3.0.1',
]
Expand Down
34 changes: 31 additions & 3 deletions tests/integration/test_reader.py
Expand Up @@ -212,18 +212,46 @@ def test_read_by_offsets(self):

self.assertDataFrameEqual(df, self.expected_data * 2)

df = self.spark.read_ext.kafka(
'kafka.docker',
topic=self.topic,
offset_ranges=offsets,
key_deserializer=self.json_decoder,
value_deserializer=self.json_decoder,
schema=self.expected_data_df.schema,
include_meta_cols=True,
)
expected = [
# normal fields:
'key',
'value',
# meta fields:
'topic',
'partition',
'offset',
'timestamp',
'timestampType',
]
self.assertListEqual(sorted(expected), sorted(df.schema.fieldNames()))

def test_argument_errors(self):
with self.assertRaises(InvalidArgumentError):
self.spark.read_ext.kafka(
'kafka.docker',
topic=self.topic,
key_deserializer=self.json_decoder,
value_deserializer=self.json_decoder,
# no schema!
)
self.spark.read_ext.kafka(
'kafka.docker',
topic=self.topic,
key_deserializer=self.json_decoder,
# no schema!
)

with self.assertRaises(InvalidArgumentError):
self.spark.read_ext.kafka(
'kafka.docker',
topic=self.topic,
schema=self.expected_data_df.schema,
value_deserializer=self.json_decoder,
# no schema!
)
4 changes: 2 additions & 2 deletions tests/integration/test_writer.py
Expand Up @@ -220,7 +220,7 @@ def test_write_kafka_dataframe(self):

def test_write_kafka_dataframe_error(self):
def _errored_serializer(data):
raise ValueError
raise ValueError('this is a testable error')

try:
self.expected_data.write_ext.kafka(
Expand All @@ -230,7 +230,7 @@ def _errored_serializer(data):
value_serializer=_errored_serializer,
)
except Py4JJavaError as ex:
self.assertIn('WriteError(\'Error publishing to kafka', str(ex))
self.assertIn('this is a testable error', str(ex))
else:
raise AssertionError('WriteError exception not raised')

Expand Down

0 comments on commit 89d2f1f

Please sign in to comment.