Skip to content

Commit

Permalink
The Mono experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
artembilan committed Mar 31, 2016
1 parent dc19f0c commit 11385d6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 12 deletions.
11 changes: 5 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ allprojects {
repositories {
maven { url 'https://repo.spring.io/libs-release' }
maven { url 'https://repo.spring.io/libs-milestone' }
if (project.hasProperty('platformVersion')) {
maven { url 'https://repo.spring.io/snapshot' }
}
maven { url 'https://repo.spring.io/snapshot' }
}

}
Expand All @@ -65,8 +63,8 @@ subprojects { subproject ->
}
}

sourceCompatibility = 1.7
targetCompatibility = 1.7
sourceCompatibility = 1.8
targetCompatibility = 1.8

ext {
assertjVersion = '3.3.0'
Expand All @@ -78,6 +76,7 @@ subprojects { subproject ->
log4jVersion = '1.2.17'
mockitoVersion = '1.9.5'
// metricsVersion = '2.2.0'
reactorVersion = '2.5.0.BUILD-SNAPSHOT'
scalaVersion = '2.11'
reactor2Version = '2.0.6.RELEASE'
springRetryVersion = '1.1.2.RELEASE'
Expand Down Expand Up @@ -146,7 +145,7 @@ project ('spring-kafka') {
// compile ("org.apache.avro:avro:$avroVersion", optional)
// compile ("org.apache.avro:avro-compiler:$avroVersion", optional)
// compile "com.goldmansachs:gs-collections:$gsCollectionsVersion"
// compile "io.projectreactor:reactor-core:$reactor2Version"
compile "io.projectreactor:reactor-core:$reactorVersion"

compile "org.apache.kafka:kafka-clients:$kafkaVersion"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.ProducerListenerInvokingCallback;

import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;


/**
* A template for executing high-level operations.
Expand Down Expand Up @@ -88,28 +91,28 @@ public void setProducerListener(ProducerListener<K, V> producerListener) {
}

@Override
public Future<RecordMetadata> send(V data) {
public Future<RecordMetadata> send(V data) {
return send(this.defaultTopic, data);
}

@Override
public Future<RecordMetadata> send(K key, V data) {
public Future<RecordMetadata> send(K key, V data) {
return send(this.defaultTopic, key, data);
}

@Override
public Future<RecordMetadata> send(int partition, K key, V data) {
public Future<RecordMetadata> send(int partition, K key, V data) {
return send(this.defaultTopic, partition, key, data);
}

@Override
public Future<RecordMetadata> send(String topic, V data) {
public Future<RecordMetadata> send(String topic, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
return doSend(producerRecord);
}

@Override
public Future<RecordMetadata> send(String topic, K key, V data) {
public Future<RecordMetadata> send(String topic, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
return doSend(producerRecord);
}
Expand All @@ -121,7 +124,7 @@ public Future<RecordMetadata> send(String topic, int partition, V data) {
}

@Override
public Future<RecordMetadata> send(String topic, int partition, K key, V data) {
public Future<RecordMetadata> send(String topic, int partition, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);
return doSend(producerRecord);
}
Expand Down Expand Up @@ -180,6 +183,35 @@ public RecordMetadata syncSend(String topic, int partition, K key, V data)
return future.get();
}

public Mono<RecordMetadata> reactiveSend(String topic, int partition, K key, V data) {
if (this.producer == null) {
synchronized (this) {
if (this.producer == null) {
this.producer = this.producerFactory.createProducer();
}
}
}

ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);

if (this.logger.isTraceEnabled()) {
this.logger.trace("Sending: " + producerRecord);
}

final MonoProcessor<RecordMetadata> recordMetadataMonoProcessor = MonoProcessor.create();

this.producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
recordMetadataMonoProcessor.onError(exception);
}
else {
recordMetadataMonoProcessor.onNext(metadata);
}
});

return recordMetadataMonoProcessor;
}

/**
* Send the producer record.
* @param producerRecord the producer record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;

import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;


/**
* @author Gary Russell
Expand Down Expand Up @@ -93,6 +96,9 @@ public void onMessage(ConsumerRecord<Integer, String> record) {
assertThat(received).has(key((Integer) null));
assertThat(received).has(partition(0));
assertThat(received).has(value("qux"));

Mono<RecordMetadata> mono = template.reactiveSend(TEMPLATE_TOPIC, 0, 22, "Mono");
assertThat(mono.get(10000)).isNotNull();
}

@Test
Expand All @@ -116,4 +122,13 @@ public void onSuccess(String topic, Integer partition, Integer key, String value
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
public void testMono() throws Exception {
MonoProcessor<String> promise = MonoProcessor.create();
promise.onNext("test");
final CountDownLatch successCountDownLatch = new CountDownLatch(1);
promise.consume(v -> successCountDownLatch.countDown());
assertThat(successCountDownLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

}

1 comment on commit 11385d6

@smaldini
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW can you use reactor-core 2.5.0.M2 just in case we have identified a bug (the testMono works locally).

Please sign in to comment.