Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ php:

env:
global:
- DOCKER_COMPOSE_VERSION=1.13.0
- DOCKER_COMPOSE_VERSION=1.19.0
matrix:
- DEPENDENCIES="low" INTEGRATION_TEST="enabled"
- DEPENDENCIES="low" INTEGRATION_TEST="disabled"
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ quick-test:

integration-test:
docker-compose up -d
sleep 10
sleep 20
./vendor/bin/phpunit -c phpunit.xml.integration.dist -v --group integration
docker-compose down

Expand Down
6 changes: 0 additions & 6 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@
"email": "thomas.ploch@flixbus.com"
}
],
"repositories": [
{
"type": "vcs",
"url": "git@github.com:flix-tech/avro-php.git"
}
],
"require": {
"php": "~7.0",
"guzzlehttp/guzzle": "~6.3",
Expand Down
8 changes: 1 addition & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ services:
ipv4_address: 172.25.0.101

broker:
image: confluentinc/cp-enterprise-kafka
image: confluentinc/cp-kafka
hostname: broker
depends_on:
- zookeeper
Expand All @@ -24,12 +24,6 @@ services:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
networks:
confluent-net:
ipv4_address: 172.25.0.102
Expand Down
33 changes: 27 additions & 6 deletions src/Registry/CachedRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace FlixTech\SchemaRegistryApi\Registry;

use AvroSchema;
use FlixTech\SchemaRegistryApi\Exception\SchemaRegistryException;
use FlixTech\SchemaRegistryApi\Registry;
use GuzzleHttp\Promise\PromiseInterface;

Expand Down Expand Up @@ -47,7 +48,11 @@ public function __construct(Registry $registry, CacheAdapter $cacheAdapter, call
*/
public function register(string $subject, AvroSchema $schema, callable $requestCallback = null)
{
$closure = function (int $schemaId) use ($schema) {
$closure = function ($schemaId) use ($schema) {
if ($schemaId instanceof SchemaRegistryException) {
return $schemaId;
}

$this->cacheAdapter->cacheSchemaWithId($schema, $schemaId);
$this->cacheAdapter->cacheSchemaIdByHash($schemaId, $this->getSchemaHash($schema));

Expand All @@ -68,7 +73,11 @@ function (PromiseInterface $promise) use ($closure) {
*/
public function schemaVersion(string $subject, AvroSchema $schema, callable $requestCallback = null)
{
$closure = function (int $version) use ($schema, $subject) {
$closure = function ($version) use ($schema, $subject) {
if ($version instanceof SchemaRegistryException) {
return $version;
}

$this->cacheAdapter->cacheSchemaWithSubjectAndVersion($schema, $subject, $version);

return $version;
Expand All @@ -94,7 +103,11 @@ public function schemaId(string $subject, AvroSchema $schema, callable $requestC
return $this->cacheAdapter->getIdWithHash($schemaHash);
}

$closure = function (int $schemaId) use ($schema, $schemaHash) {
$closure = function ($schemaId) use ($schema, $schemaHash) {
if ($schemaId instanceof SchemaRegistryException) {
return $schemaId;
}

$this->cacheAdapter->cacheSchemaWithId($schema, $schemaId);
$this->cacheAdapter->cacheSchemaIdByHash($schemaId, $schemaHash);

Expand All @@ -119,7 +132,11 @@ public function schemaForId(int $schemaId, callable $requestCallback = null)
return $this->cacheAdapter->getWithId($schemaId);
}

$closure = function (AvroSchema $schema) use ($schemaId) {
$closure = function ($schema) use ($schemaId) {
if ($schema instanceof SchemaRegistryException) {
return $schema;
}

$this->cacheAdapter->cacheSchemaWithId($schema, $schemaId);
$this->cacheAdapter->cacheSchemaIdByHash($schemaId, $this->getSchemaHash($schema));

Expand All @@ -144,7 +161,11 @@ public function schemaForSubjectAndVersion(string $subject, int $version, callab
return $this->cacheAdapter->getWithSubjectAndVersion($subject, $version);
}

$closure = function (AvroSchema $schema) use ($subject, $version) {
$closure = function ($schema) use ($subject, $version) {
if ($schema instanceof SchemaRegistryException) {
return $schema;
}

$this->cacheAdapter->cacheSchemaWithSubjectAndVersion($schema, $subject, $version);

return $schema;
Expand Down Expand Up @@ -184,6 +205,6 @@ private function applyValueHandlers($value, callable $promiseHandler, callable $

private function getSchemaHash(AvroSchema $schema): string
{
return call_user_func($this->hashAlgoFunc, $schema);
return \call_user_func($this->hashAlgoFunc, $schema);
}
}
22 changes: 22 additions & 0 deletions test/Registry/CachedRegistryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace FlixTech\SchemaRegistryApi\Test\Registry;

use AvroSchema;
use FlixTech\SchemaRegistryApi\Exception\SubjectNotFoundException;
use FlixTech\SchemaRegistryApi\Registry;
use FlixTech\SchemaRegistryApi\Registry\CacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
Expand Down Expand Up @@ -381,4 +382,25 @@ public function it_should_not_cache_latest_version_calls()

$this->assertEquals($this->schema, $this->cachedRegistry->latestVersion($this->subject));
}

/**
* @test
*/
public function it_should_handle_exceptions_wrapped_in_promises_correctly()
{
$subjectNotFoundException = new SubjectNotFoundException();

$promise = new FulfilledPromise($subjectNotFoundException);

$this->registryMock
->expects($this->once())
->method('register')
->with($this->subject, $this->schema)
->willReturn($promise);

$this->assertEquals(
$this->cachedRegistry->register($this->subject, $this->schema)->wait(),
$subjectNotFoundException
);
}
}