Skip to content

Commit

Permalink
Merge pull request #49664 from ilejn/test_for_basic_auth_registry
Browse files Browse the repository at this point in the history
Basic auth to fetch Avro schema in Kafka
  • Loading branch information
evillique committed Jul 20, 2023
2 parents cf54866 + c8347bd commit 209429d
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 40 deletions.
36 changes: 31 additions & 5 deletions docker/test/integration/runner/compose/docker_compose_kafka.yml
Expand Up @@ -4,6 +4,8 @@ services:
kafka_zookeeper:
image: zookeeper:3.4.9
hostname: kafka_zookeeper
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
Expand All @@ -15,15 +17,14 @@ services:
image: confluentinc/cp-kafka:5.2.0
hostname: kafka1
ports:
- ${KAFKA_EXTERNAL_PORT:-8081}:${KAFKA_EXTERNAL_PORT:-8081}
- ${KAFKA_EXTERNAL_PORT}:${KAFKA_EXTERNAL_PORT}
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://localhost:${KAFKA_EXTERNAL_PORT},OUTSIDE://kafka1:19092
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_LISTENERS: INSIDE://0.0.0.0:${KAFKA_EXTERNAL_PORT},OUTSIDE://0.0.0.0:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "kafka_zookeeper:2181"
KAFKA_ZOOKEEPER_CONNECT: kafka_zookeeper:2181
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
Expand All @@ -35,13 +36,38 @@ services:
image: confluentinc/cp-schema-registry:5.2.0
hostname: schema-registry
ports:
- ${SCHEMA_REGISTRY_EXTERNAL_PORT:-12313}:${SCHEMA_REGISTRY_INTERNAL_PORT:-12313}
- ${SCHEMA_REGISTRY_EXTERNAL_PORT}:${SCHEMA_REGISTRY_EXTERNAL_PORT}
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:${SCHEMA_REGISTRY_EXTERNAL_PORT}
SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: noauth
depends_on:
- kafka_zookeeper
- kafka1
restart: always
security_opt:
- label:disable

schema-registry-auth:
image: confluentinc/cp-schema-registry:5.2.0
hostname: schema-registry-auth
ports:
- ${SCHEMA_REGISTRY_AUTH_EXTERNAL_PORT}:${SCHEMA_REGISTRY_AUTH_EXTERNAL_PORT}
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry-auth
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:${SCHEMA_REGISTRY_AUTH_EXTERNAL_PORT}
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: user
SCHEMA_REGISTRY_AUTHENTICATION_REALM: RealmFooBar
SCHEMA_REGISTRY_OPTS: "-Djava.security.auth.login.config=/etc/schema-registry/secrets/schema_registry_jaas.conf"
SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: auth
volumes:
- ${SCHEMA_REGISTRY_DIR:-}/secrets:/etc/schema-registry/secrets
depends_on:
- kafka_zookeeper
- kafka1
restart: always
security_opt:
- label:disable
11 changes: 11 additions & 0 deletions docs/en/operations/settings/settings-formats.md
Expand Up @@ -1325,6 +1325,17 @@ Default value: 0.

Sets [Confluent Schema Registry](https://docs.confluent.io/current/schema-registry/index.html) URL to use with [AvroConfluent](../../interfaces/formats.md/#data-format-avro-confluent) format.

Format:
``` text
http://[user:password@]machine[:port]"
```

Examples:
``` text
http://registry.example.com:8081
http://admin:secret@registry.example.com:8081
```

Default value: `Empty`.

### output_format_avro_codec {#output_format_avro_codec}
Expand Down
23 changes: 23 additions & 0 deletions src/Processors/Formats/Impl/AvroRowInputFormat.cpp
Expand Up @@ -52,6 +52,8 @@
#include <Poco/Buffer.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPCredentials.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/URI.h>
Expand Down Expand Up @@ -934,6 +936,27 @@ class AvroConfluentRowInputFormat::SchemaRegistry
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(url.getHost());

if (!url.getUserInfo().empty())
{
Poco::Net::HTTPCredentials http_credentials;
Poco::Net::HTTPBasicCredentials http_basic_credentials;

http_credentials.fromUserInfo(url.getUserInfo());

std::string decoded_username;
Poco::URI::decode(http_credentials.getUsername(), decoded_username);
http_basic_credentials.setUsername(decoded_username);

if (!http_credentials.getPassword().empty())
{
std::string decoded_password;
Poco::URI::decode(http_credentials.getPassword(), decoded_password);
http_basic_credentials.setPassword(decoded_password);
}

http_basic_credentials.authenticate(request);
}

auto session = makePooledHTTPSession(url, timeouts, 1);
session->sendRequest(request);

Expand Down
67 changes: 46 additions & 21 deletions tests/integration/helpers/cluster.py
Expand Up @@ -486,6 +486,8 @@ def __init__(
self.kafka_docker_id = None
self.schema_registry_host = "schema-registry"
self._schema_registry_port = 0
self.schema_registry_auth_host = "schema-registry-auth"
self._schema_registry_auth_port = 0
self.kafka_docker_id = self.get_instance_docker_id(self.kafka_host)

self.coredns_host = "coredns"
Expand Down Expand Up @@ -657,6 +659,13 @@ def schema_registry_port(self):
self._schema_registry_port = get_free_port()
return self._schema_registry_port

@property
def schema_registry_auth_port(self):
if self._schema_registry_auth_port:
return self._schema_registry_auth_port
self._schema_registry_auth_port = get_free_port()
return self._schema_registry_auth_port

@property
def kerberized_kafka_port(self):
if self._kerberized_kafka_port:
Expand Down Expand Up @@ -1163,8 +1172,11 @@ def setup_kafka_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_kafka = True
env_variables["KAFKA_HOST"] = self.kafka_host
env_variables["KAFKA_EXTERNAL_PORT"] = str(self.kafka_port)
env_variables["SCHEMA_REGISTRY_DIR"] = instance.path + "/"
env_variables["SCHEMA_REGISTRY_EXTERNAL_PORT"] = str(self.schema_registry_port)
env_variables["SCHEMA_REGISTRY_INTERNAL_PORT"] = "8081"
env_variables["SCHEMA_REGISTRY_AUTH_EXTERNAL_PORT"] = str(
self.schema_registry_auth_port
)
self.base_cmd.extend(
["--file", p.join(docker_compose_yml_dir, "docker_compose_kafka.yml")]
)
Expand Down Expand Up @@ -1498,6 +1510,7 @@ def add_instance(
with_kafka=False,
with_kerberized_kafka=False,
with_kerberos_kdc=False,
with_secrets=False,
with_rabbitmq=False,
with_nats=False,
clickhouse_path_dir=None,
Expand Down Expand Up @@ -1604,6 +1617,10 @@ def add_instance(
with_nats=with_nats,
with_nginx=with_nginx,
with_kerberized_hdfs=with_kerberized_hdfs,
with_secrets=with_secrets
or with_kerberized_hdfs
or with_kerberos_kdc
or with_kerberized_kafka,
with_mongo=with_mongo or with_mongo_secure,
with_meili=with_meili,
with_redis=with_redis,
Expand Down Expand Up @@ -2493,20 +2510,27 @@ def wait_azurite_to_start(self, timeout=180):
raise Exception("Can't wait Azurite to start")

def wait_schema_registry_to_start(self, timeout=180):
sr_client = CachedSchemaRegistryClient(
{"url": "http://localhost:{}".format(self.schema_registry_port)}
)
start = time.time()
while time.time() - start < timeout:
try:
sr_client._send_request(sr_client.url)
logging.debug("Connected to SchemaRegistry")
return sr_client
except Exception as ex:
logging.debug(("Can't connect to SchemaRegistry: %s", str(ex)))
time.sleep(1)
for port in self.schema_registry_port, self.schema_registry_auth_port:
reg_url = "http://localhost:{}".format(port)
arg = {"url": reg_url}
sr_client = CachedSchemaRegistryClient(arg)

start = time.time()
sr_started = False
sr_auth_started = False
while time.time() - start < timeout:
try:
sr_client._send_request(sr_client.url)
logging.debug("Connected to SchemaRegistry")
# don't care about possible auth errors
sr_started = True
break
except Exception as ex:
logging.debug(("Can't connect to SchemaRegistry: %s", str(ex)))
time.sleep(1)

raise Exception("Can't wait Schema Registry to start")
if not sr_started:
raise Exception("Can't wait Schema Registry to start")

def wait_cassandra_to_start(self, timeout=180):
self.cassandra_ip = self.get_instance_ip(self.cassandra_host)
Expand Down Expand Up @@ -3135,6 +3159,7 @@ def __init__(
with_nats,
with_nginx,
with_kerberized_hdfs,
with_secrets,
with_mongo,
with_meili,
with_redis,
Expand Down Expand Up @@ -3197,7 +3222,7 @@ def __init__(
if clickhouse_path_dir
else None
)
self.kerberos_secrets_dir = p.abspath(p.join(base_path, "secrets"))
self.secrets_dir = p.abspath(p.join(base_path, "secrets"))
self.macros = macros if macros is not None else {}
self.with_zookeeper = with_zookeeper
self.zookeeper_config_path = zookeeper_config_path
Expand All @@ -3220,6 +3245,7 @@ def __init__(
self.with_nats = with_nats
self.with_nginx = with_nginx
self.with_kerberized_hdfs = with_kerberized_hdfs
self.with_secrets = with_secrets
self.with_mongo = with_mongo
self.with_meili = with_meili
self.with_redis = with_redis
Expand Down Expand Up @@ -4217,17 +4243,16 @@ def write_embedded_config(name, dest_dir, fix_log_level=False):
if self.with_zookeeper:
shutil.copy(self.zookeeper_config_path, conf_d_dir)

if (
self.with_kerberized_kafka
or self.with_kerberized_hdfs
or self.with_kerberos_kdc
):
if self.with_secrets:
if self.with_kerberos_kdc:
base_secrets_dir = self.cluster.instances_dir
else:
base_secrets_dir = self.path
from_dir = self.secrets_dir
to_dir = p.abspath(p.join(base_secrets_dir, "secrets"))
logging.debug(f"Copy secret from {from_dir} to {to_dir}")
shutil.copytree(
self.kerberos_secrets_dir,
self.secrets_dir,
p.abspath(p.join(base_secrets_dir, "secrets")),
dirs_exist_ok=True,
)
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_format_avro_confluent/secrets/password
@@ -0,0 +1,3 @@
schemauser: MD5:0d107d09f5bbe40cade3de5c71e9e9b7,user
schemauser/slash: MD5:0d107d09f5bbe40cade3de5c71e9e9b7,user
complexschemauser: MD5:fcaeda86837fcd37755044e7258edc5d,user
@@ -0,0 +1,5 @@
RealmFooBar {
org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
file="/etc/schema-registry/secrets/password"
debug="true";
};

0 comments on commit 209429d

Please sign in to comment.