From 90ae2803b5075bbfac7019be7ba1a73161b00ce0 Mon Sep 17 00:00:00 2001 From: Mykhailo Shtanko Date: Mon, 17 Oct 2022 14:38:05 +0300 Subject: [PATCH] [Kafka Messenger] Added configuration separator for consumer and producer --- Exception/ConnectionException.php | 2 - Exception/ExtensionException.php | 2 - Exception/KafkaException.php | 2 - Exception/TransportException.php | 2 - Tests/Fixtures/KafkaMessage.php | 20 -- Tests/Fixtures/Message/KafkaMessage.php | 16 ++ .../Serializer/KafkaMessageSerializer.php | 16 ++ .../KafkaMessageSerializerTest.php | 11 +- Tests/Transport/KafkaExtIntegrationTest.php | 2 +- Tests/Transport/KafkaReceiverTest.php | 2 +- Tests/Transport/KafkaSenderTest.php | 2 +- Tests/Transport/KafkaTransportTest.php | 2 +- Transport/KafkaFactory.php | 198 +++++++++++++++++- 13 files changed, 230 insertions(+), 47 deletions(-) delete mode 100644 Tests/Fixtures/KafkaMessage.php create mode 100644 Tests/Fixtures/Message/KafkaMessage.php create mode 100644 Tests/Fixtures/Serializer/KafkaMessageSerializer.php diff --git a/Exception/ConnectionException.php b/Exception/ConnectionException.php index a285d63..18023da 100644 --- a/Exception/ConnectionException.php +++ b/Exception/ConnectionException.php @@ -16,8 +16,6 @@ use RdKafka\Message; /** - * @internal - * * @author Mykhailo Shtanko */ final class ConnectionException extends \LogicException diff --git a/Exception/ExtensionException.php b/Exception/ExtensionException.php index 36597dc..e2a36ba 100644 --- a/Exception/ExtensionException.php +++ b/Exception/ExtensionException.php @@ -16,8 +16,6 @@ use JetBrains\PhpStorm\Pure; /** - * @internal - * * @author Mykhailo Shtanko */ final class ExtensionException extends \LogicException diff --git a/Exception/KafkaException.php b/Exception/KafkaException.php index 8a65cad..021941c 100644 --- a/Exception/KafkaException.php +++ b/Exception/KafkaException.php @@ -16,8 +16,6 @@ use RdKafka\Message; /** - * @internal - * * @author Mykhailo Shtanko */ final class KafkaException extends \LogicException diff --git a/Exception/TransportException.php b/Exception/TransportException.php index 480f95e..8a0a39c 100644 --- a/Exception/TransportException.php +++ b/Exception/TransportException.php @@ -11,8 +11,6 @@ use Symfony\Component\Uid\Uuid; /** - * @internal - * * @author Mykhailo Shtanko */ final class TransportException extends BaseTransportException diff --git a/Tests/Fixtures/KafkaMessage.php b/Tests/Fixtures/KafkaMessage.php deleted file mode 100644 index a96b4c7..0000000 --- a/Tests/Fixtures/KafkaMessage.php +++ /dev/null @@ -1,20 +0,0 @@ -message = $message; - } - - public function getMessage(): string - { - return $this->message; - } -} diff --git a/Tests/Fixtures/Message/KafkaMessage.php b/Tests/Fixtures/Message/KafkaMessage.php new file mode 100644 index 0000000..4e9202d --- /dev/null +++ b/Tests/Fixtures/Message/KafkaMessage.php @@ -0,0 +1,16 @@ +getMessage()); } } - -final class KafkaMessageSerializer extends MessageSerializer -{ - protected static function getMessageType(): string - { - return KafkaMessage::class; - } -} diff --git a/Tests/Transport/KafkaExtIntegrationTest.php b/Tests/Transport/KafkaExtIntegrationTest.php index 2c7734b..7d790d5 100644 --- a/Tests/Transport/KafkaExtIntegrationTest.php +++ b/Tests/Transport/KafkaExtIntegrationTest.php @@ -14,7 +14,7 @@ namespace FRZB\Component\Messenger\Bridge\Kafka\Tests\Transport; use Fp\Collections\ArrayList; -use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\KafkaMessage; +use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Message\KafkaMessage; use FRZB\Component\Messenger\Bridge\Kafka\Tests\Helper\OptionsHelper; use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaReceivedStamp; use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaTransportFactory; diff --git a/Tests/Transport/KafkaReceiverTest.php b/Tests/Transport/KafkaReceiverTest.php index 2d90439..bf7bece 100644 --- a/Tests/Transport/KafkaReceiverTest.php +++ b/Tests/Transport/KafkaReceiverTest.php @@ -8,7 +8,7 @@ use FRZB\Component\Messenger\Bridge\Kafka\Exception\ConnectionException; use FRZB\Component\Messenger\Bridge\Kafka\Exception\KafkaException; use FRZB\Component\Messenger\Bridge\Kafka\Exception\TransportException; -use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\KafkaMessage; +use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Message\KafkaMessage; use FRZB\Component\Messenger\Bridge\Kafka\Tests\Helper\MessageHelper; use FRZB\Component\Messenger\Bridge\Kafka\Transport\Connection; use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaReceivedStamp; diff --git a/Tests/Transport/KafkaSenderTest.php b/Tests/Transport/KafkaSenderTest.php index bb283c2..6a44cdb 100644 --- a/Tests/Transport/KafkaSenderTest.php +++ b/Tests/Transport/KafkaSenderTest.php @@ -7,7 +7,7 @@ use FRZB\Component\Messenger\Bridge\Kafka\Exception\ConnectionException; use FRZB\Component\Messenger\Bridge\Kafka\Exception\TransportException; use FRZB\Component\Messenger\Bridge\Kafka\Helper\ConfigHelper; -use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\KafkaMessage; +use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Message\KafkaMessage; use FRZB\Component\Messenger\Bridge\Kafka\Tests\Helper\MessageHelper; use FRZB\Component\Messenger\Bridge\Kafka\Transport\Connection; use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaSender; diff --git a/Tests/Transport/KafkaTransportTest.php b/Tests/Transport/KafkaTransportTest.php index 48b227c..2c362ed 100644 --- a/Tests/Transport/KafkaTransportTest.php +++ b/Tests/Transport/KafkaTransportTest.php @@ -6,7 +6,7 @@ use Fp\Collections\ArrayList; use FRZB\Component\Messenger\Bridge\Kafka\Helper\ConfigHelper; -use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\KafkaMessage; +use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Message\KafkaMessage; use FRZB\Component\Messenger\Bridge\Kafka\Tests\Helper\MessageHelper; use FRZB\Component\Messenger\Bridge\Kafka\Tests\Helper\OptionsHelper; use FRZB\Component\Messenger\Bridge\Kafka\Transport\Connection; diff --git a/Transport/KafkaFactory.php b/Transport/KafkaFactory.php index d0d6d46..298dc57 100644 --- a/Transport/KafkaFactory.php +++ b/Transport/KafkaFactory.php @@ -28,8 +28,189 @@ class KafkaFactory private const OPTIONS_KAFKA_KEY = 'kafka_conf'; private const OPTIONS_TOPIC_KEY = 'topic_conf'; + private const CONSUMER_PROPS = [ + 'enable.auto.offset.store', + 'key.deserializer', + 'value.deserializer', + 'bootstrap.servers', + 'fetch.min.bytes', + 'group.id', + 'heartbeat.interval.ms', + 'max.partition.fetch.bytes', + 'session.timeout.ms', + 'ssl.key.password', + 'ssl.keystore.certificate.chain', + 'ssl.keystore.key', + 'ssl.keystore.location', + 'ssl.keystore.password', + 'ssl.truststore.certificates', + 'ssl.truststore.location', + 'ssl.truststore.password', + 'allow.auto.create.topics', + 'auto.offset.reset', + 'client.dns.lookup', + 'connections.max.idle.ms', + 'default.api.timeout.ms', + 'enable.auto.commit', + 'exclude.internal.topics', + 'fetch.max.bytes', + 'group.instance.id', + 'isolation.level', + 'max.poll.interval.ms', + 'max.poll.records', + 'partition.assignment.strategy', + 'receive.buffer.bytes', + 'request.timeout.ms', + 'sasl.client.callback.handler.class', + 'sasl.jaas.config', + 'sasl.kerberos.service.name', + 'sasl.login.callback.handler.class', + 'sasl.login.class', + 'sasl.mechanism', + 'sasl.oauthbearer.jwks.endpoint.url', + 'sasl.oauthbearer.token.endpoint.url', + 'security.protocol', + 'send.buffer.bytes', + 'socket.connection.setup.timeout.max.ms', + 'socket.connection.setup.timeout.ms', + 'ssl.enabled.protocols', + 'ssl.keystore.type', + 'ssl.protocol', + 'ssl.provider', + 'ssl.truststore.type', + 'auto.commit.interval.ms', + 'check.crcs', + 'client.id', + 'client.rack', + 'fetch.max.wait.ms', + 'interceptor.classes', + 'metadata.max.age.ms', + 'metric.reporters', + 'metrics.num.samples', + 'metrics.recording.level', + 'metrics.sample.window.ms', + 'reconnect.backoff.max.ms', + 'reconnect.backoff.ms', + 'retry.backoff.ms', + 'sasl.kerberos.kinit.cmd', + 'sasl.kerberos.min.time.before.relogin', + 'sasl.kerberos.ticket.renew.jitter', + 'sasl.kerberos.ticket.renew.window.factor', + 'sasl.login.connect.timeout.ms', + 'sasl.login.read.timeout.ms', + 'sasl.login.refresh.buffer.seconds', + 'sasl.login.refresh.min.period.seconds', + 'sasl.login.refresh.window.factor', + 'sasl.login.refresh.window.jitter', + 'sasl.login.retry.backoff.max.ms', + 'sasl.login.retry.backoff.ms', + 'sasl.oauthbearer.clock.skew.seconds', + 'sasl.oauthbearer.expected.audience', + 'sasl.oauthbearer.expected.issuer', + 'sasl.oauthbearer.jwks.endpoint.refresh.ms', + 'sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms', + 'sasl.oauthbearer.jwks.endpoint.retry.backoff.ms', + 'sasl.oauthbearer.scope.claim.name', + 'sasl.oauthbearer.sub.claim.name', + 'security.providers', + 'ssl.cipher.suites', + 'ssl.endpoint.identification.algorithm', + 'ssl.engine.factory.class', + 'ssl.keymanager.algorithm', + 'ssl.secure.random.implementation', + 'ssl.trustmanager.algorithm', + ]; + + private const PRODUCER_PROPS = [ + 'key.serializer', + 'value.serializer', + 'bootstrap.servers', + 'buffer.memory', + 'compression.type', + 'retries', + 'ssl.key.password', + 'ssl.keystore.certificate.chain', + 'ssl.keystore.key', + 'ssl.keystore.location', + 'ssl.keystore.password', + 'ssl.truststore.certificates', + 'ssl.truststore.location', + 'ssl.truststore.password', + 'batch.size', + 'client.dns.lookup', + 'client.id', + 'connections.max.idle.ms', + 'delivery.timeout.ms', + 'linger.ms', + 'max.block.ms', + 'max.request.size', + 'partitioner.class', + 'receive.buffer.bytes', + 'request.timeout.ms', + 'sasl.client.callback.handler.class', + 'sasl.jaas.config', + 'sasl.kerberos.service.name', + 'sasl.login.callback.handler.class', + 'sasl.login.class', + 'sasl.mechanism', + 'sasl.oauthbearer.jwks.endpoint.url', + 'sasl.oauthbearer.token.endpoint.url', + 'security.protocol', + 'send.buffer.bytes', + 'socket.connection.setup.timeout.max.ms', + 'socket.connection.setup.timeout.ms', + 'ssl.enabled.protocols', + 'ssl.keystore.type', + 'ssl.protocol', + 'ssl.provider', + 'ssl.truststore.type', + 'acks', + 'enable.idempotence', + 'interceptor.classes', + 'max.in.flight.requests.per.connection', + 'metadata.max.age.ms', + 'metadata.max.idle.ms', + 'metric.reporters', + 'metrics.num.samples', + 'metrics.recording.level', + 'metrics.sample.window.ms', + 'reconnect.backoff.max.ms', + 'reconnect.backoff.ms', + 'retry.backoff.ms', + 'sasl.kerberos.kinit.cmd', + 'sasl.kerberos.min.time.before.relogin', + 'sasl.kerberos.ticket.renew.jitter', + 'sasl.kerberos.ticket.renew.window.factor', + 'sasl.login.connect.timeout.ms', + 'sasl.login.read.timeout.ms', + 'sasl.login.refresh.buffer.seconds', + 'sasl.login.refresh.min.period.seconds', + 'sasl.login.refresh.window.factor', + 'sasl.login.refresh.window.jitter', + 'sasl.login.retry.backoff.max.ms', + 'sasl.login.retry.backoff.ms', + 'sasl.oauthbearer.clock.skew.seconds', + 'sasl.oauthbearer.expected.audience', + 'sasl.oauthbearer.expected.issuer', + 'sasl.oauthbearer.jwks.endpoint.refresh.ms', + 'sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms', + 'sasl.oauthbearer.jwks.endpoint.retry.backoff.ms', + 'sasl.oauthbearer.scope.claim.name', + 'sasl.oauthbearer.sub.claim.name', + 'security.providers', + 'ssl.cipher.suites', + 'ssl.endpoint.identification.algorithm', + 'ssl.engine.factory.class', + 'ssl.keymanager.algorithm', + 'ssl.secure.random.implementation', + 'ssl.trustmanager.algorithm', + 'transaction.timeout.ms', + 'transactional.id', + ]; + public function __construct( - private readonly KafkaConfiguration $configuration, + private readonly KafkaConfiguration $consumerConfiguration, + private readonly KafkaConfiguration $producerConfiguration, ) { } @@ -38,26 +219,31 @@ public static function fromDsnAndOptions(string $dsn, array $options): self $kafka = $options[self::OPTIONS_KAFKA_KEY] ?? []; $topic = $options[self::OPTIONS_TOPIC_KEY] ?? []; - return new self(self::getConfig($dsn, $kafka, $topic)); + return new self( + self::getConfig($dsn, $kafka, $topic, self::CONSUMER_PROPS), + self::getConfig($dsn, $kafka, $topic, self::PRODUCER_PROPS), + ); } public function createConsumer(?KafkaConfiguration $configuration = null): KafkaConsumer { - return new KafkaConsumer($configuration ?? $this->configuration); + return new KafkaConsumer($configuration ?? $this->consumerConfiguration); } public function createProducer(?KafkaConfiguration $configuration = null): KafkaProducer { - return new KafkaProducer($configuration ?? $this->configuration); + return new KafkaProducer($configuration ?? $this->producerConfiguration); } - private static function getConfig(string $dsn, array $kafka, array $topic): KafkaConfiguration + private static function getConfig(string $dsn, array $kafka, array $topic, array $existingOptions): KafkaConfiguration { $config = new KafkaConfiguration(); $config->set('metadata.broker.list', self::getBrokers($dsn)); foreach (array_merge($kafka, $topic) as $option => $value) { - $config->set($option, $value); + if (array_search($option, $existingOptions, true)) { + $config->set($option, $value); + } } return $config;