Skip to content

Commit

Permalink
Add support for the Kafka broker in Python (#1363)
Browse files Browse the repository at this point in the history
* Update data model

* Support Kafka as messaging protocol in Python client (#1362)

* Reformat code (#1362)

* Update broker __init__

---------

Co-authored-by: Sven Oehler <oehler.sven@web.de>
Co-authored-by: Sven Oehler <43231162+SvenO3@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 11, 2023
1 parent 25bf66f commit 7fe524e
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 4 deletions.
1 change: 1 addition & 0 deletions streampipes-client-python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"pydantic>=1.10.2",
"requests>=2.28.1",
"nats-py>=2.2.0",
"confluent-kafka>=2.0.2"
]

dev_packages = base_packages + [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
# limitations under the License.
#
from .broker import Broker
from .kafka_broker import KafkaBroker
from .nats_broker import NatsBroker

from .broker_handler import SupportedBroker, get_broker # isort: skip

__all__ = [
"Broker",
"KafkaBroker",
"NatsBroker",
"SupportedBroker",
"get_broker",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
#
from enum import Enum

from streampipes.functions.broker import Broker, NatsBroker
from streampipes.functions.broker import Broker, KafkaBroker, NatsBroker
from streampipes.model.resource.data_stream import DataStream


class SupportedBroker(Enum):
"""Enum for the supported brokers."""

NATS = "NatsTransportProtocol"
KAFKA = "KafkaTransportProtocol"


# TODO Exception should be removed once all brokers are implemented.
Expand All @@ -49,5 +50,7 @@ def get_broker(data_stream: DataStream) -> Broker: # TODO implementation for mo
broker_name = data_stream.event_grounding.transport_protocols[0].class_name
if SupportedBroker.NATS.value in broker_name:
return NatsBroker()
elif SupportedBroker.KAFKA.value in broker_name:
return KafkaBroker()
else:
raise UnsupportedBroker(f'The python client doesn\'t include the broker: "{broker_name}" yet')
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
from typing import Any, AsyncIterator, Dict

from confluent_kafka import Consumer # type: ignore
from streampipes.functions.broker.broker import Broker
from streampipes.functions.broker.kafka_message_fetcher import KafkaMessageFetcher
from streampipes.model.common import random_letters

logger = logging.getLogger(__name__)


class KafkaBroker(Broker):
"""Implementation of the NatsBroker"""

async def _makeConnection(self, hostname: str, port: int) -> None:
"""Helper function to connect to a server.
Parameters
----------
hostname: str
The hostname of the of the server, which the broker connects to.
port: int
The port number of the connection.
Returns
-------
None
"""
self.kafka_consumer = Consumer(
{"bootstrap.servers": f"{hostname}:{port}", "group.id": random_letters(6), "auto.offset.reset": "latest"}
)

async def createSubscription(self) -> None:
"""Creates a subscription to a data stream.
Returns
-------
None
"""
self.kafka_consumer.subscribe([self.topic_name])

logger.info(f"Subscribed to stream: {self.stream_id}")

async def publish_event(self, event: Dict[str, Any]):
"""Publish an event to a connected data stream.
Parameters
----------
event: Dict[str, Any]
The event to be published.
Returns
-------
None
"""

# await self.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8"))

async def disconnect(self) -> None:
"""Closes the connection to the server.
Returns
-------
None
"""
self.kafka_consumer.close()
logger.info(f"Stopped connection to stream: {self.stream_id}")

def get_message(self) -> AsyncIterator:
"""Get the published messages of the subscription.
Returns
-------
An async iterator for the messages.
"""

return KafkaMessageFetcher(self.kafka_consumer)
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from confluent_kafka import Consumer # type: ignore


class KafkaMsg:
"""An internal representation of a Kafka message
Parameters
----------
data: Byte Array
The received Kafka message as byte array
"""

def __init__(self, data):
self.data = data


class KafkaMessageFetcher:
"""Fetches the next message from Kafka
Parameters
----------
consumer: Consumer
The Kafka consumer
"""

def __init__(self, consumer: Consumer):
self.consumer = consumer

def __aiter__(self):
return self

async def __anext__(self):
msg = self.consumer.poll(0.1)
return KafkaMsg(msg.value())
4 changes: 2 additions & 2 deletions streampipes-client-python/streampipes/model/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ class EventProperty(BasicModel):
description: Optional[StrictStr]
runtime_name: StrictStr
required: StrictBool = Field(default=False)
domain_properties: List[StrictStr] = Field(default_factory=list)
property_scope: StrictStr = Field(default="MEASUREMENT_PROPERTY")
domain_properties: Optional[List[StrictStr]] = Field(default_factory=list)
property_scope: Optional[StrictStr] = Field(default="MEASUREMENT_PROPERTY")
index: StrictInt = Field(default=0)
runtime_id: Optional[StrictStr]
runtime_type: StrictStr = Field(default="http://www.w3.org/2001/XMLSchema#string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ def convert_to_pandas_representation(self):
pipeline_id: Optional[StrictStr]
pipeline_name: Optional[StrictStr]
pipeline_is_running: StrictBool
schema_version: StrictStr
schema_version: Optional[StrictStr]

0 comments on commit 7fe524e

Please sign in to comment.