Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for the Kafka broker in Python #1363

Merged
merged 4 commits into from
Mar 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions streampipes-client-python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"pydantic>=1.10.2",
"requests>=2.28.1",
"nats-py>=2.2.0",
"confluent-kafka>=2.0.2"
]

dev_packages = base_packages + [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
# limitations under the License.
#
from .broker import Broker
from .kafka_broker import KafkaBroker
from .nats_broker import NatsBroker

from .broker_handler import SupportedBroker, get_broker # isort: skip

__all__ = [
"Broker",
"KafkaBroker",
"NatsBroker",
"SupportedBroker",
"get_broker",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
#
from enum import Enum

from streampipes.functions.broker import Broker, NatsBroker
from streampipes.functions.broker import Broker, KafkaBroker, NatsBroker
from streampipes.model.resource.data_stream import DataStream


class SupportedBroker(Enum):
"""Enum for the supported brokers."""

NATS = "NatsTransportProtocol"
KAFKA = "KafkaTransportProtocol"


# TODO Exception should be removed once all brokers are implemented.
Expand All @@ -49,5 +50,7 @@ def get_broker(data_stream: DataStream) -> Broker: # TODO implementation for mo
broker_name = data_stream.event_grounding.transport_protocols[0].class_name
if SupportedBroker.NATS.value in broker_name:
return NatsBroker()
elif SupportedBroker.KAFKA.value in broker_name:
return KafkaBroker()
else:
raise UnsupportedBroker(f'The python client doesn\'t include the broker: "{broker_name}" yet')
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
from typing import Any, AsyncIterator, Dict

from confluent_kafka import Consumer # type: ignore
from streampipes.functions.broker.broker import Broker
from streampipes.functions.broker.kafka_message_fetcher import KafkaMessageFetcher
from streampipes.model.common import random_letters

logger = logging.getLogger(__name__)


class KafkaBroker(Broker):
"""Implementation of the NatsBroker"""

async def _makeConnection(self, hostname: str, port: int) -> None:
"""Helper function to connect to a server.
Parameters
----------
hostname: str
The hostname of the of the server, which the broker connects to.
port: int
The port number of the connection.
Returns
-------
None
"""
self.kafka_consumer = Consumer(
{"bootstrap.servers": f"{hostname}:{port}", "group.id": random_letters(6), "auto.offset.reset": "latest"}
)

async def createSubscription(self) -> None:
"""Creates a subscription to a data stream.
Returns
-------
None
"""
self.kafka_consumer.subscribe([self.topic_name])

logger.info(f"Subscribed to stream: {self.stream_id}")

async def publish_event(self, event: Dict[str, Any]):
"""Publish an event to a connected data stream.
Parameters
----------
event: Dict[str, Any]
The event to be published.
Returns
-------
None
"""

# await self.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8"))

async def disconnect(self) -> None:
"""Closes the connection to the server.
Returns
-------
None
"""
self.kafka_consumer.close()
logger.info(f"Stopped connection to stream: {self.stream_id}")

def get_message(self) -> AsyncIterator:
"""Get the published messages of the subscription.
Returns
-------
An async iterator for the messages.
"""

return KafkaMessageFetcher(self.kafka_consumer)
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from confluent_kafka import Consumer # type: ignore


class KafkaMsg:
"""An internal representation of a Kafka message
Parameters
----------
data: Byte Array
The received Kafka message as byte array
"""

def __init__(self, data):
self.data = data


class KafkaMessageFetcher:
"""Fetches the next message from Kafka
Parameters
----------
consumer: Consumer
The Kafka consumer
"""

def __init__(self, consumer: Consumer):
self.consumer = consumer

def __aiter__(self):
return self

async def __anext__(self):
msg = self.consumer.poll(0.1)
return KafkaMsg(msg.value())
4 changes: 2 additions & 2 deletions streampipes-client-python/streampipes/model/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ class EventProperty(BasicModel):
description: Optional[StrictStr]
runtime_name: StrictStr
required: StrictBool = Field(default=False)
domain_properties: List[StrictStr] = Field(default_factory=list)
property_scope: StrictStr = Field(default="MEASUREMENT_PROPERTY")
domain_properties: Optional[List[StrictStr]] = Field(default_factory=list)
property_scope: Optional[StrictStr] = Field(default="MEASUREMENT_PROPERTY")
index: StrictInt = Field(default=0)
runtime_id: Optional[StrictStr]
runtime_type: StrictStr = Field(default="http://www.w3.org/2001/XMLSchema#string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ def convert_to_pandas_representation(self):
pipeline_id: Optional[StrictStr]
pipeline_name: Optional[StrictStr]
pipeline_is_running: StrictBool
schema_version: StrictStr
schema_version: Optional[StrictStr]