Skip to content

Commit

Permalink
#353 - kafka.auth - elaborations
Browse files Browse the repository at this point in the history
  • Loading branch information
wheelly committed Feb 14, 2024
1 parent 0e2ee03 commit c62dbf6
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 10 deletions.
31 changes: 22 additions & 9 deletions core/src/datayoga_core/blocks/kafka/read/block.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import json
from abc import ABCMeta
from typing import Dict

from datayoga_core.context import Context
from typing import AsyncGenerator, List, Optional
Expand All @@ -23,24 +23,21 @@ class Block(DyProducer, metaclass=ABCMeta):
INTERNAL_FIELD_PREFIX = "__$$"
MSG_ID_FIELD = f"{INTERNAL_FIELD_PREFIX}msg_id"
MIN_COMMIT_COUNT = 10
connection_details: Dict

def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")
connection_details = utils.get_connection_details(self.properties["bootstrap_servers"], context)
logger.debug(f"Connection details: {json.dumps(connection_details)}")
self.bootstrap_servers = connection_details.get("bootstrap_servers")
self.connection_details = utils.get_connection_details(self.properties["bootstrap_servers"], context)

self.bootstrap_servers = self.connection_details.get("bootstrap_servers")
self.group = self.properties.get("group")
self.topic = self.properties["topic"]
self.seek_to_beginning = self.properties.get("seek_to_beginning", False)
self.snapshot = self.properties.get("snapshot", False)


async def produce(self) -> AsyncGenerator[List[Message], None]:
consumer = Consumer({
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group,
'enable.auto.commit': 'false'
})
consumer = Consumer(self._get_config())
logger.debug(f"Producing {self.get_block_name()}")

if self.seek_to_beginning:
Expand Down Expand Up @@ -79,6 +76,22 @@ def on_assign(c, ps):
finally:
consumer.close()

def _get_config(self):
conf = {
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group,
'enable.auto.commit': 'false'
}
proto = self.connection_details.get("security.protocol")
if proto:
conf['security.protocol'] = proto
# TODO: how to distinguish here different auth protocols in kafka?
if proto.startswith("SASL_"):
conf['sasl.mechanisms'] = self.connection_details["sasl.mechanisms"]
conf['sasl.username'] = self.connection_details["sasl.username"]
conf['sasl.password'] = self.connection_details["sasl.password"]
return conf




Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@
"bootstrap_servers": {
"description": "Kafka Hosts",
"type": "string"
},
"security.protocol":{
"description": "Auth protocols",
"type": "string"
},
"sasl.mechanisms": {
"type": "string"
},
"sasl.username": {
"type": "string"
},
"sasl.password": {
"type": "string"
}
},
"additionalProperties": false,
Expand Down
7 changes: 6 additions & 1 deletion integration-tests/resources/connections.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ cassandra:
port: 9042
kafka:
type: kafka
bootstrap_servers: "localhost:9093"
bootstrap_servers: "localhost:9092"
security.protocol: SASL_PLAINTEXT
sasl.mechanisms: PLAIN
sasl.username: admin
sasl.password: admin-secret

Empty file.
6 changes: 6 additions & 0 deletions integration-tests/resources/kafka.docker/basic/dockerfile.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM confluentinc/cp-kafka-rest:latest

COPY password.properties /etc/kafka-rest/
COPY rest-jaas.properties /etc/kafka-rest/

CMD ["/etc/confluent/docker/run"]
23 changes: 23 additions & 0 deletions integration-tests/resources/kafka.docker/basic/rest.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
KAFKA_REST_HOST_NAME=rest-proxy
KAFKA_REST_LISTENERS=http://0.0.0.0:8082
KAFKA_REST_SCHEMA_REGISTRY_URL=<HIDDEN>
KAFKA_REST_CLIENT_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=<HIDDEN>
KAFKA_REST_BOOTSTRAP_SERVERS=<HIDDEN>
KAFKA_REST_SECURITY_PROTOCOL=SASL_SSL
KAFKA_REST_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';
KAFKA_REST_SASL_MECHANISM=PLAIN
KAFKA_REST_CLIENT_BOOTSTRAP_SERVERS=<HIDDEN>
KAFKA_REST_CLIENT_SECURITY_PROTOCOL=SASL_SSL
KAFKA_REST_CLIENT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';

KAFKA_REST_CLIENT_SASL_MECHANISM=PLAIN
KAFKA_REST_AUTHENTICATION_METHOD=BASIC
KAFKA_REST_AUTHENTICATION_REALM=KafkaRest
KAFKA_REST_AUTHENTICATION_ROLES=thisismyrole
KAFKAREST_OPTS=-Djava.security.auth.login.config=/etc/kafka-rest/rest-jaas.properties

docker run -p 9093:8081 -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="1.kafka.broker:9092,2.kafka.broker:9092,3.kafka.broker:9092" \
-e SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL=SASL_SSL \
-e SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM=PLAIN \
-e SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="pass";' confluentinc/cp-schema-registry:5.5.3
33 changes: 33 additions & 0 deletions integration-tests/resources/kafka.docker/sasl/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"

# KAFKA_SASL_JAAS_CONFIG: |
# org.apache.kafka.common.security.plain.PlainLoginModule required \
# username="admin" \
# password="admin-secret" \
# user_admin="admin-secret" \
# user_alice="alice-secret";

volumes:
- ./kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
ports:
- 9093:9092
#command: ["sleep","3600"]
42 changes: 42 additions & 0 deletions integration-tests/resources/kafka.docker/sasl/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# FROM confluentinc/cp-kafka:5.4.3
# ENV KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
# ENV KAFKA_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT
# ENV KAFKA_SASL_ENABLED_MECHANISMS=PLAIN
# ENV KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
# ENV KAFKA_LISTENERS="SASL_PLAINTEXT://:9093,BROKER://:9092"
# ENV KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
# ENV KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
# ENV KAFKA_BROKER_ID=1
# ENV KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
# ENV KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
# ENV KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=10000000
# ENV KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
# COPY kafka_server_jaas.conf /etc/kafka
# COPY entry.sh /
# ENTRYPOINT ["/entry.sh", "SASL_PLAINTEXT://:9093,BROKER://:9092"]


FROM confluentinc/cp-kafka:latest
ENV KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
#ENV KAFKA_ADVERTISED_LISTENERS=SASL_PLAINTEXT://:9093,BROKER://:9092
ENV KAFKA_LISTENERS=SASL_PLAINTEXT://:9093,BROKER://:9092
ENV KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SASL_PLAINTEXT:SASL_PLAINTEXT,BROKER:PLAINTEXT
ENV KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
ENV KAFKA_SECURITY_INTER_BROKER_PROTOCOL=PLAINTEXT
ENV KAFKA_SASL_ENABLED_MECHANISMS=PLAIN
ENV KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN

ENV KAFKA_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';"
ENV KAFKA_NUM_PARTITIONS=1
ENV KAFKA_DEFAULT_REPLICATION_FACTOR=1
ENV KAFKA_BROKER_ID=1
COPY kafka_server_jaas.conf /etc/kafka
COPY entry.sh /
ENTRYPOINT ["/entry.sh", "SASL_PLAINTEXT://:9093,BROKER://:9092"]

#FROM confluentinc/cp-schema-registry:5.5.3
#ENV SCHEMA_REGISTRY_HOST_NAME=schema-registry
#ENV SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="localhost:9092;:9091"
#ENV SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL=SASL_SSL
#ENV SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM=PLAIN
#ENV SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";'
10 changes: 10 additions & 0 deletions integration-tests/resources/kafka.docker/sasl/entry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash
echo 'clientPort=2181' > zookeeper.properties
echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties
echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties
zookeeper-server-start zookeeper.properties &
export KAFKA_ZOOKEEPER_CONNECT="localhost:2181"
export KAFKA_ADVERTISED_LISTENERS="$1"
. /etc/confluent/docker/bash-config
/etc/confluent/docker/configure
/etc/confluent/docker/launch
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="test"
user_alice="test";
};
Client {};

0 comments on commit c62dbf6

Please sign in to comment.