Skip to content

Commit

Permalink
[Kafka Messenger] Added configs, update README
Browse files Browse the repository at this point in the history
  • Loading branch information
fractalzombie committed Dec 24, 2021
1 parent 174c41c commit 39cb005
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 30 deletions.
2 changes: 2 additions & 0 deletions .coveralls.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
service_name: travis-pro
repo_token: 2Gea7aeTkYH3NwMWGu0gK514GKrScs1Nv
12 changes: 8 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ jobs:
uses: actions/checkout@v2

- name: Install rdkafka
run:
sudo add-apt-repository universe && sudo apt update && sudo apt install netcat librdkafka1 librdkafka-dev
run: sudo add-apt-repository universe && sudo apt update && sudo apt install netcat librdkafka1 librdkafka-dev

- name: Setup PHP
uses: shivammathur/setup-php@v2
Expand All @@ -60,8 +59,10 @@ jobs:
ini-values: post_max_size=256M
env:
runner: kafka-messenger
MESSENGER_KAFKA_DSN: 'kafka://0.0.0.0:9092'
MESSENGER_KAFKA_DEBUG: false

- name: check if rdkafka is there
- name: Check Kafka Extension
run: php -v && php --ri rdkafka

- name: Setup Composer
Expand All @@ -83,10 +84,13 @@ jobs:
- name: Install dependencies
if: steps.composer-cache.outputs.cache-hit != 'true'
run: composer update --prefer-dist --no-progress
run: composer install

- name: Wait for Kafka
run: .github/workflows/wait-for-kafka.sh

- name: Unit Tests
run: ./vendor/bin/phpunit --colors=always --verbose --configuration phpunit.xml.dist --log-junit var/tests/.phpunit.output.xml
env:
MESSENGER_KAFKA_DSN: 'kafka://0.0.0.0:9092'
MESSENGER_KAFKA_DEBUG: false
47 changes: 47 additions & 0 deletions .scrutinizer.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
build:
tests:
override:
- true

nodes:
tests:
dependencies:
before:
- sudo apt update
- sudo apt install -y librdkafka1 librdkafka-dev
environment:
php:
pecl_extensions:
- rdkafka
- xdebug
analysis:
dependencies:
before:
- sudo apt update
- sudo apt install -y librdkafka1 librdkafka-dev
environment:
php:
pecl_extensions:
- rdkafka
- xdebug
tests:
override: [php-scrutinizer-run]

coverage:
tests:
override:
- command: php vendor/bin/phpunit --colors=always --verbose --configuration phpunit.xml.dist --log-junit var/tests/.phpunit.output.xml --coverage-clover var/tests/coverage/coverage.xml
coverage:
file: var/tests/coverage/coverage.xml
format: clover

environment:
php: 8.0.13

filter:
excluded_paths:
- "Tests/*"
- "var/*"
- "Resources/*"
dependency_paths:
- "vendor/*"
22 changes: 22 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
language: php
dist: focal

matrix:
include:
- php: 8.0

before_install:
- sudo apt update
- sudo apt install -y librdkafka1 librdkafka-dev
- pecl install rdkafka

install:
- travis_retry composer install

script:
- mkdir -p build/logs
- php vendor/bin/phpunit --colors=always --configuration phpunit.xml.dist --log-junit build/logs/.phpunit.output.xml --coverage-clover build/logs/clover.xml

after_success:
- vendor/bin/php-coveralls -v
- bash <(curl -s https://codecov.io/bash)
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
Kafka Messenger Component
=============================
[![Build Status](https://app.travis-ci.com/fractalzombie/frzb-kafka-messenger.svg?branch=main)](https://app.travis-ci.com/fractalzombie/frzb-kafka-messenger)
[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/fractalzombie/frzb-kafka-messenger/badges/quality-score.png?b=main)](https://scrutinizer-ci.com/g/fractalzombie/frzb-kafka-messenger/?branch=main)
[![Code Intelligence Status](https://scrutinizer-ci.com/g/fractalzombie/frzb-kafka-messenger/badges/code-intelligence.svg?b=main)](https://scrutinizer-ci.com/code-intelligence)
[![Build Status](https://scrutinizer-ci.com/g/fractalzombie/frzb-kafka-messenger/badges/build.png?b=main)](https://scrutinizer-ci.com/g/fractalzombie/frzb-kafka-messenger/build-status/main)
[![Code Coverage](https://scrutinizer-ci.com/g/fractalzombie/frzb-kafka-messenger/badges/coverage.png?b=main)](https://scrutinizer-ci.com/g/fractalzombie/frzb-kafka-messenger/?branch=main)
[![Coverage Status](https://coveralls.io/repos/github/fractalzombie/frzb-kafka-messenger/badge.svg?branch=main)](https://coveralls.io/github/fractalzombie/frzb-kafka-messenger?branch=main)
[![codecov](https://codecov.io/gh/fractalzombie/frzb-kafka-messenger/branch/main/graph/badge.svg?token=BF90VP2L0V)](https://codecov.io/gh/fractalzombie/frzb-kafka-messenger)

Provides Kafka integration for Symfony Messenger.

Resources
---------

* [Documentation] - Coming Soon
* [License](https://github.com/fractalzombie/frzb-request-mapper/blob/main/LICENSE)
* [License](https://github.com/fractalzombie/frzb-kafka-messenger/blob/main/LICENSE)
12 changes: 2 additions & 10 deletions Tests/Transport/KafkaExtIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
use Fp\Collections\ArrayList;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Fixtures\KafkaMessage;
use FRZB\Component\Messenger\Bridge\Kafka\Tests\Helper\OptionsHelper;
use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaLogger;
use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaReceivedStamp;
use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaTransportFactory;
use PHPUnit\Framework\TestCase;
use Symfony\Component\ErrorHandler\BufferingLogger;
use Symfony\Component\Messenger\Envelope;

/**
Expand All @@ -32,20 +30,14 @@
*/
class KafkaExtIntegrationTest extends TestCase
{
protected function setUp(): void
public function testItSendsAndReceivesMessages(): void
{
parent::setUp();

if (!getenv('MESSENGER_KAFKA_DSN')) {
$this->markTestSkipped('The "MESSENGER_KAFKA_DSN" environment variable is required.');
}
}

public function testItSendsAndReceivesMessages(): void
{
$options = OptionsHelper::getOptions();
$logger = new KafkaLogger(new BufferingLogger());
$transport = (new KafkaTransportFactory($logger, false))->createTransport(getenv('MESSENGER_KAFKA_DSN'), $options);
$transport = (new KafkaTransportFactory())->createTransport(getenv('MESSENGER_KAFKA_DSN'), $options);

$transport->send($first = new Envelope(new KafkaMessage('Message')));

Expand Down
2 changes: 1 addition & 1 deletion Tests/Transport/KafkaFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class KafkaFactoryTest extends TestCase
{
public function testFromDsnAndOptionsMethod(): void
{
$dsn = 'kafka://kafka:9092';
$dsn = getenv('MESSENGER_KAFKA_DSN') ?: 'kafka://0.0.0.0:9092';
$options = OptionsHelper::getOptions();
$factory = KafkaFactory::fromDsnAndOptions($dsn, $options);

Expand Down
6 changes: 3 additions & 3 deletions Tests/Transport/KafkaTransportFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaLogger;
use FRZB\Component\Messenger\Bridge\Kafka\Transport\KafkaTransportFactory;
use PHPUnit\Framework\TestCase;
use Symfony\Component\ErrorHandler\BufferingLogger;
use Psr\Log\NullLogger;

/**
* @requires extension rdkafka
Expand All @@ -21,10 +21,10 @@ class KafkaTransportFactoryTest extends TestCase
{
public function testSupportsMethod(): void
{
$dsn = 'kafka://kafka:9092';
$dsn = getenv('MESSENGER_KAFKA_DSN') ?: 'kafka://0.0.0.0:9092';
$options = OptionsHelper::getOptions();

$factory = new KafkaTransportFactory(new KafkaLogger(new BufferingLogger()), false);
$factory = new KafkaTransportFactory();

self::assertTrue($factory->supports($dsn, $options));
self::assertNotNull($factory->createTransport($dsn, $options));
Expand Down
2 changes: 1 addition & 1 deletion Transport/KafkaLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function logProduce(array $payload): void
{
$context = [
'payload' => $payload['body'],
'headers' => json_encode($payload['headers'], \JSON_THROW_ON_ERROR),
'headers' => json_encode($payload['headers'] ?? [], \JSON_THROW_ON_ERROR),
];

$this->logger->info(self::PUBLISH_MESSAGE_TEMPLATE, $context);
Expand Down
6 changes: 3 additions & 3 deletions Transport/KafkaStamp.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public function __construct(
private int $timestamp,
private string $body = '',
private array $headers = [],
private int $partition = \RD_KAFKA_PARTITION_UA,
private int $partition = RD_KAFKA_PARTITION_UA,
private ?string $key = null,
private bool $isRedelivered = false,
) {
Expand All @@ -43,8 +43,8 @@ public static function fromMessage(Message $message, bool $isRedelivered = false
$message->offset,
$message->timestamp,
$message->payload,
$message->headers,
$message->partition,
$message->headers ?? [],
$message->partition ?? RD_KAFKA_PARTITION_UA,
$message->key,
$isRedelivered,
);
Expand Down
12 changes: 8 additions & 4 deletions Transport/KafkaTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

use FRZB\Component\Messenger\Bridge\Kafka\Exception\ExtensionException;
use FRZB\Component\Messenger\Bridge\Kafka\Helper\ConfigHelper;
use JetBrains\PhpStorm\Pure;
use Psr\Log\NullLogger;
use Symfony\Component\ErrorHandler\BufferingLogger;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
Expand All @@ -26,10 +28,13 @@
*/
class KafkaTransportFactory implements TransportFactoryInterface
{
#[Pure]
public function __construct(
private KafkaLogger $logger,
private bool $isDebug,
private ?KafkaLogger $logger = null,
private bool $isDebug = false,
) {
$innerLogger = $this->isDebug ? new BufferingLogger() : new NullLogger();
$this->logger ??= new KafkaLogger($innerLogger);
}

public function createTransport(string $dsn, array $options, ?SerializerInterface $serializer = null): TransportInterface
Expand All @@ -38,10 +43,9 @@ public function createTransport(string $dsn, array $options, ?SerializerInterfac
$senderConfig = ConfigHelper::createSenderConfig($options);
$kafkaFactory = KafkaFactory::fromDsnAndOptions($dsn, $options);
$connection = new Connection($kafkaFactory);
$logger = $this->isDebug ? $this->logger : new KafkaLogger(new NullLogger());
$serializer ??= new PhpSerializer();

return new KafkaTransport($connection, $receiverConfig, $senderConfig, $serializer, $logger);
return new KafkaTransport($connection, $receiverConfig, $senderConfig, $serializer, $this->logger);
}

public function supports(string $dsn, array $options): bool
Expand Down
3 changes: 1 addition & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@
},
"scripts": {
"test": ["php vendor/bin/phpunit --colors=always --verbose --configuration phpunit.xml.dist --log-junit var/tests/.phpunit.output.xml"],
"test-coverage": ["php vendor/bin/phpunit --colors=always --verbose --configuration phpunit.xml.dist --log-junit var/tests/.phpunit.output.xml --coverage-html var/tests/coverage --coverage-clover var/tests/coverage/coverage.xml"],
"test-coverage-external": ["php -n -dzend_extension=xdebug -dxdebug.mode=coverage vendor/bin/phpunit --colors=always --verbose --configuration phpunit.xml.dist --log-junit var/tests/.phpunit.output.xml --coverage-html var/tests/coverage --coverage-clover var/tests/coverage/coverage.xml"]
"test-coverage": ["php vendor/bin/phpunit --colors=always --verbose --configuration phpunit.xml.dist --log-junit var/tests/.phpunit.output.xml --coverage-html var/tests/coverage --coverage-clover var/tests/coverage/coverage.xml"]
},
"minimum-stability": "dev"
}
1 change: 0 additions & 1 deletion phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
>
<php>
<ini name="error_reporting" value="-1" />
<env name="MESSENGER_KAFKA_DSN" value="kafka://213.139.212.99:29092" />
</php>

<testsuites>
Expand Down

0 comments on commit 39cb005

Please sign in to comment.