Skip to content

Commit

Permalink
[Kafka Messenger] Added configuration separator for consumer and prod…
Browse files Browse the repository at this point in the history
…ucer
  • Loading branch information
fractalzombie committed Oct 17, 2022
1 parent bb7920f commit 90ae280
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 47 deletions.
2 changes: 0 additions & 2 deletions Exception/ConnectionException.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
use RdKafka\Message;

/**
* @internal
*
* @author Mykhailo Shtanko <fractalzombie@gmail.com>
*/
final class ConnectionException extends \LogicException
Expand Down
2 changes: 0 additions & 2 deletions Exception/ExtensionException.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
use JetBrains\PhpStorm\Pure;

/**
* @internal
*
* @author Mykhailo Shtanko <fractalzombie@gmail.com>
*/
final class ExtensionException extends \LogicException
Expand Down
2 changes: 0 additions & 2 deletions Exception/KafkaException.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
use RdKafka\Message;

/**
* @internal
*
* @author Mykhailo Shtanko <fractalzombie@gmail.com>
*/
final class KafkaException extends \LogicException
Expand Down
2 changes: 0 additions & 2 deletions Exception/TransportException.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
use Symfony\Component\Uid\Uuid;

/**
* @internal
*
* @author Mykhailo Shtanko <fractalzombie@gmail.com>
*/
final class TransportException extends BaseTransportException
Expand Down
20 changes: 0 additions & 20 deletions Tests/Fixtures/KafkaMessage.php

This file was deleted.

16 changes: 16 additions & 0 deletions Tests/Fixtures/Message/KafkaMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Message;

use JetBrains\PhpStorm\Immutable;

#[Immutable]
final class KafkaMessage
{
public function __construct(
public readonly string $message,
) {
}
}
16 changes: 16 additions & 0 deletions Tests/Fixtures/Serializer/KafkaMessageSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Serializer;

use FRZB\Component\Messenger\Bridge\Kafka\Integration\MessageSerializer;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Message\KafkaMessage;

final class KafkaMessageSerializer extends MessageSerializer
{
protected static function getMessageType(): string
{
return KafkaMessage::class;
}
}
11 changes: 2 additions & 9 deletions Tests/Integration/KafkaMessageSerializerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
namespace FRZB\Component\Messenger\Bridge\Kafka\Tests\Integration;

use FRZB\Component\Messenger\Bridge\Kafka\Integration\MessageSerializer;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\KafkaMessage;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Message\KafkaMessage;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Serializer\KafkaMessageSerializer;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Helper\DependencyHelper;
use PHPUnit\Framework\TestCase;

Expand All @@ -28,11 +29,3 @@ public function testDecodeMethod(): void
self::assertInstanceOf(KafkaMessage::class, $envelope->getMessage());
}
}

final class KafkaMessageSerializer extends MessageSerializer
{
protected static function getMessageType(): string
{
return KafkaMessage::class;
}
}
2 changes: 1 addition & 1 deletion Tests/Transport/KafkaExtIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace FRZB\Component\Messenger\Bridge\Kafka\Tests\Transport;

use Fp\Collections\ArrayList;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\KafkaMessage;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Message\KafkaMessage;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Helper\OptionsHelper;
use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaReceivedStamp;
use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaTransportFactory;
Expand Down
2 changes: 1 addition & 1 deletion Tests/Transport/KafkaReceiverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use FRZB\Component\Messenger\Bridge\Kafka\Exception\ConnectionException;
use FRZB\Component\Messenger\Bridge\Kafka\Exception\KafkaException;
use FRZB\Component\Messenger\Bridge\Kafka\Exception\TransportException;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\KafkaMessage;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Message\KafkaMessage;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Helper\MessageHelper;
use FRZB\Component\Messenger\Bridge\Kafka\Transport\Connection;
use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaReceivedStamp;
Expand Down
2 changes: 1 addition & 1 deletion Tests/Transport/KafkaSenderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use FRZB\Component\Messenger\Bridge\Kafka\Exception\ConnectionException;
use FRZB\Component\Messenger\Bridge\Kafka\Exception\TransportException;
use FRZB\Component\Messenger\Bridge\Kafka\Helper\ConfigHelper;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\KafkaMessage;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Message\KafkaMessage;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Helper\MessageHelper;
use FRZB\Component\Messenger\Bridge\Kafka\Transport\Connection;
use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaSender;
Expand Down
2 changes: 1 addition & 1 deletion Tests/Transport/KafkaTransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Fp\Collections\ArrayList;
use FRZB\Component\Messenger\Bridge\Kafka\Helper\ConfigHelper;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\KafkaMessage;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\Message\KafkaMessage;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Helper\MessageHelper;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Helper\OptionsHelper;
use FRZB\Component\Messenger\Bridge\Kafka\Transport\Connection;
Expand Down
198 changes: 192 additions & 6 deletions Transport/KafkaFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,189 @@ class KafkaFactory
private const OPTIONS_KAFKA_KEY = 'kafka_conf';
private const OPTIONS_TOPIC_KEY = 'topic_conf';

private const CONSUMER_PROPS = [
'enable.auto.offset.store',
'key.deserializer',
'value.deserializer',
'bootstrap.servers',
'fetch.min.bytes',
'group.id',
'heartbeat.interval.ms',
'max.partition.fetch.bytes',
'session.timeout.ms',
'ssl.key.password',
'ssl.keystore.certificate.chain',
'ssl.keystore.key',
'ssl.keystore.location',
'ssl.keystore.password',
'ssl.truststore.certificates',
'ssl.truststore.location',
'ssl.truststore.password',
'allow.auto.create.topics',
'auto.offset.reset',
'client.dns.lookup',
'connections.max.idle.ms',
'default.api.timeout.ms',
'enable.auto.commit',
'exclude.internal.topics',
'fetch.max.bytes',
'group.instance.id',
'isolation.level',
'max.poll.interval.ms',
'max.poll.records',
'partition.assignment.strategy',
'receive.buffer.bytes',
'request.timeout.ms',
'sasl.client.callback.handler.class',
'sasl.jaas.config',
'sasl.kerberos.service.name',
'sasl.login.callback.handler.class',
'sasl.login.class',
'sasl.mechanism',
'sasl.oauthbearer.jwks.endpoint.url',
'sasl.oauthbearer.token.endpoint.url',
'security.protocol',
'send.buffer.bytes',
'socket.connection.setup.timeout.max.ms',
'socket.connection.setup.timeout.ms',
'ssl.enabled.protocols',
'ssl.keystore.type',
'ssl.protocol',
'ssl.provider',
'ssl.truststore.type',
'auto.commit.interval.ms',
'check.crcs',
'client.id',
'client.rack',
'fetch.max.wait.ms',
'interceptor.classes',
'metadata.max.age.ms',
'metric.reporters',
'metrics.num.samples',
'metrics.recording.level',
'metrics.sample.window.ms',
'reconnect.backoff.max.ms',
'reconnect.backoff.ms',
'retry.backoff.ms',
'sasl.kerberos.kinit.cmd',
'sasl.kerberos.min.time.before.relogin',
'sasl.kerberos.ticket.renew.jitter',
'sasl.kerberos.ticket.renew.window.factor',
'sasl.login.connect.timeout.ms',
'sasl.login.read.timeout.ms',
'sasl.login.refresh.buffer.seconds',
'sasl.login.refresh.min.period.seconds',
'sasl.login.refresh.window.factor',
'sasl.login.refresh.window.jitter',
'sasl.login.retry.backoff.max.ms',
'sasl.login.retry.backoff.ms',
'sasl.oauthbearer.clock.skew.seconds',
'sasl.oauthbearer.expected.audience',
'sasl.oauthbearer.expected.issuer',
'sasl.oauthbearer.jwks.endpoint.refresh.ms',
'sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms',
'sasl.oauthbearer.jwks.endpoint.retry.backoff.ms',
'sasl.oauthbearer.scope.claim.name',
'sasl.oauthbearer.sub.claim.name',
'security.providers',
'ssl.cipher.suites',
'ssl.endpoint.identification.algorithm',
'ssl.engine.factory.class',
'ssl.keymanager.algorithm',
'ssl.secure.random.implementation',
'ssl.trustmanager.algorithm',
];

private const PRODUCER_PROPS = [
'key.serializer',
'value.serializer',
'bootstrap.servers',
'buffer.memory',
'compression.type',
'retries',
'ssl.key.password',
'ssl.keystore.certificate.chain',
'ssl.keystore.key',
'ssl.keystore.location',
'ssl.keystore.password',
'ssl.truststore.certificates',
'ssl.truststore.location',
'ssl.truststore.password',
'batch.size',
'client.dns.lookup',
'client.id',
'connections.max.idle.ms',
'delivery.timeout.ms',
'linger.ms',
'max.block.ms',
'max.request.size',
'partitioner.class',
'receive.buffer.bytes',
'request.timeout.ms',
'sasl.client.callback.handler.class',
'sasl.jaas.config',
'sasl.kerberos.service.name',
'sasl.login.callback.handler.class',
'sasl.login.class',
'sasl.mechanism',
'sasl.oauthbearer.jwks.endpoint.url',
'sasl.oauthbearer.token.endpoint.url',
'security.protocol',
'send.buffer.bytes',
'socket.connection.setup.timeout.max.ms',
'socket.connection.setup.timeout.ms',
'ssl.enabled.protocols',
'ssl.keystore.type',
'ssl.protocol',
'ssl.provider',
'ssl.truststore.type',
'acks',
'enable.idempotence',
'interceptor.classes',
'max.in.flight.requests.per.connection',
'metadata.max.age.ms',
'metadata.max.idle.ms',
'metric.reporters',
'metrics.num.samples',
'metrics.recording.level',
'metrics.sample.window.ms',
'reconnect.backoff.max.ms',
'reconnect.backoff.ms',
'retry.backoff.ms',
'sasl.kerberos.kinit.cmd',
'sasl.kerberos.min.time.before.relogin',
'sasl.kerberos.ticket.renew.jitter',
'sasl.kerberos.ticket.renew.window.factor',
'sasl.login.connect.timeout.ms',
'sasl.login.read.timeout.ms',
'sasl.login.refresh.buffer.seconds',
'sasl.login.refresh.min.period.seconds',
'sasl.login.refresh.window.factor',
'sasl.login.refresh.window.jitter',
'sasl.login.retry.backoff.max.ms',
'sasl.login.retry.backoff.ms',
'sasl.oauthbearer.clock.skew.seconds',
'sasl.oauthbearer.expected.audience',
'sasl.oauthbearer.expected.issuer',
'sasl.oauthbearer.jwks.endpoint.refresh.ms',
'sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms',
'sasl.oauthbearer.jwks.endpoint.retry.backoff.ms',
'sasl.oauthbearer.scope.claim.name',
'sasl.oauthbearer.sub.claim.name',
'security.providers',
'ssl.cipher.suites',
'ssl.endpoint.identification.algorithm',
'ssl.engine.factory.class',
'ssl.keymanager.algorithm',
'ssl.secure.random.implementation',
'ssl.trustmanager.algorithm',
'transaction.timeout.ms',
'transactional.id',
];

public function __construct(
private readonly KafkaConfiguration $configuration,
private readonly KafkaConfiguration $consumerConfiguration,
private readonly KafkaConfiguration $producerConfiguration,
) {
}

Expand All @@ -38,26 +219,31 @@ public static function fromDsnAndOptions(string $dsn, array $options): self
$kafka = $options[self::OPTIONS_KAFKA_KEY] ?? [];
$topic = $options[self::OPTIONS_TOPIC_KEY] ?? [];

return new self(self::getConfig($dsn, $kafka, $topic));
return new self(
self::getConfig($dsn, $kafka, $topic, self::CONSUMER_PROPS),
self::getConfig($dsn, $kafka, $topic, self::PRODUCER_PROPS),
);
}

public function createConsumer(?KafkaConfiguration $configuration = null): KafkaConsumer
{
return new KafkaConsumer($configuration ?? $this->configuration);
return new KafkaConsumer($configuration ?? $this->consumerConfiguration);
}

public function createProducer(?KafkaConfiguration $configuration = null): KafkaProducer
{
return new KafkaProducer($configuration ?? $this->configuration);
return new KafkaProducer($configuration ?? $this->producerConfiguration);
}

private static function getConfig(string $dsn, array $kafka, array $topic): KafkaConfiguration
private static function getConfig(string $dsn, array $kafka, array $topic, array $existingOptions): KafkaConfiguration
{
$config = new KafkaConfiguration();
$config->set('metadata.broker.list', self::getBrokers($dsn));

foreach (array_merge($kafka, $topic) as $option => $value) {
$config->set($option, $value);
if (array_search($option, $existingOptions, true)) {
$config->set($option, $value);
}
}

return $config;
Expand Down

0 comments on commit 90ae280

Please sign in to comment.