Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
h1alexbel committed May 8, 2023
1 parent acd5890 commit 99a6ee4
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 38 deletions.
12 changes: 6 additions & 6 deletions src/main/java/io/github/eocqrs/kafka/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

package io.github.eocqrs.kafka;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;

/**
* Consumer.
Expand Down Expand Up @@ -63,13 +63,13 @@ public interface Consumer<K, X> extends Closeable {
void subscribe(ConsumerRebalanceListener listener, String... topics);

/**
* Dataized.
* Fetch Records.
*
* @param topic topic to poll
* @param timeout max time to wait
* @return Dataized polled data.
* @return Records.
*/
List<Dataized<X>> iterate(String topic, Duration timeout);
ConsumerRecords<K, X> records(String topic, Duration timeout);

/**
* Unsubscribe.
Expand Down
31 changes: 7 additions & 24 deletions src/main/java/io/github/eocqrs/kafka/consumer/KfConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,13 @@

import io.github.eocqrs.kafka.Consumer;
import io.github.eocqrs.kafka.ConsumerSettings;
import io.github.eocqrs.kafka.Dataized;
import io.github.eocqrs.kafka.data.KfData;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.cactoos.list.ListOf;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* @todo #236:30m/DEV Unsubscribe is not implemented
*/

/**
* Kafka Consumer.
Expand Down Expand Up @@ -89,25 +83,14 @@ public void subscribe(final ConsumerRebalanceListener listener,
}

/**
* @todo #236:30m/DEV ConsumerRecords data polling
* @todo #289:30m/DEV ConsumerRecords wrapping up
* we have to wrap the ConsumerRecords into some object
*/
@Override
public List<Dataized<X>> iterate(final String topic, final Duration timeout) {
final List<Dataized<X>> accumulator = new ArrayList<>(0);
this.origin
.poll(timeout)
.records(topic)
.forEach(
data ->
accumulator.add(
new KfData<>(
data.value(),
topic,
data.partition()
).dataized()
)
);
return accumulator;
public ConsumerRecords<K, X> records(final String topic,
final Duration timeout) {
this.subscribe(topic);
return this.origin.poll(timeout);

Check warning on line 93 in src/main/java/io/github/eocqrs/kafka/consumer/KfConsumer.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/github/eocqrs/kafka/consumer/KfConsumer.java#L92-L93

Added lines #L92 - L93 were not covered by tests
}

@Override
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/github/eocqrs/kafka/fake/FkConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
package io.github.eocqrs.kafka.fake;

import io.github.eocqrs.kafka.Consumer;
import io.github.eocqrs.kafka.Dataized;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;

/**
* Fake Consumer.
Expand Down Expand Up @@ -64,11 +63,12 @@ public void subscribe(final ConsumerRebalanceListener listener,
}

/*
* @todo #54:60m/DEV Fake iterate is not implemented
* @todo #54:60m/DEV Fake records is not implemented
*/
@Override
public List<Dataized<X>> iterate(final String topic, final Duration timeout) {
throw new UnsupportedOperationException("#iterate()");
public ConsumerRecords<K, X> records(final String topic,
final Duration timeout) {
throw new UnsupportedOperationException("#records()");
}

/*
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/github/eocqrs/kafka/fake/FkConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
);
assertThrows(
UnsupportedOperationException.class,
() -> consumer.iterate("123", Duration.ofMillis(100L))
() -> consumer.records("123", Duration.ofMillis(100L))
);
assertThrows(
UnsupportedOperationException.class,
() -> consumer.iterate("123", Duration.ofMillis(100L))
() -> consumer.records("123", Duration.ofMillis(100L))
);
assertThrows(
UnsupportedOperationException.class,
Expand Down

0 comments on commit 99a6ee4

Please sign in to comment.