Skip to content

Commit

Permalink
messages
Browse files Browse the repository at this point in the history
  • Loading branch information
l3r8yJ committed Jul 8, 2023
1 parent f35f269 commit 220a244
Show file tree
Hide file tree
Showing 12 changed files with 227 additions and 143 deletions.
16 changes: 11 additions & 5 deletions src/main/java/io/github/eocqrs/kafka/fake/FkRecords.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,27 @@ public FkRecords(
*/
@Override
public ConsumerRecords<Object, String> value() throws Exception {
final Map<TopicPartition, List<ConsumerRecord<Object, String>>> part
= new HashMap<>(0);
final List<ConsumerRecord<Object, String>> recs = new ListOf<>();
this.datasets.forEach(
dataset -> recs.add(
new ConsumerRecord<>(
this.topic,
DEFAULT_PARTITION,
ZERO_OFFSET,
FkRecords.DEFAULT_PARTITION,
FkRecords.ZERO_OFFSET,
null,
dataset
)
)
);
part.put(new TopicPartition(this.topic, DEFAULT_PARTITION), recs);
final Map<TopicPartition, List<ConsumerRecord<Object, String>>> part
= new HashMap<>(0);
part.put(
new TopicPartition(
this.topic,
FkRecords.DEFAULT_PARTITION
),
recs
);
return new ConsumerRecords<>(part);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ void assignsPartitions(
) {
final Action assign =
new AssignPartitions(mck, new TopicPartition("test", 1));
Assertions.assertDoesNotThrow(assign::apply);
Assertions.assertDoesNotThrow(
assign::apply,
() -> "Creates %s without any exceptions thrown".formatted(assign)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ void commitsAsync(
@Mock final KafkaConsumer<String, String> mck
) {
final Action async = new CommitAsync(mck);
Assertions.assertDoesNotThrow(async::apply);
Assertions.assertDoesNotThrow(
async::apply,
() -> "Creates %s without any exceptions thrown".formatted(async)
);
}
}
5 changes: 4 additions & 1 deletion src/test/java/io/github/eocqrs/kafka/act/CommitSyncTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ void commitsSync(
@Mock final KafkaConsumer<String, String> mck
) {
final Action sync = new CommitSync(mck);
Assertions.assertDoesNotThrow(sync::apply);
Assertions.assertDoesNotThrow(
sync::apply,
() -> "Creates %s without any exceptions thrown".formatted(sync)
);
}
}
5 changes: 4 additions & 1 deletion src/test/java/io/github/eocqrs/kafka/act/WakeupTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ final class WakeupTest {
@Test
void wakeups(@Mock final KafkaConsumer<String, String> mck) {
final Action wakeup = new Wakeup(mck);
Assertions.assertDoesNotThrow(wakeup::apply);
Assertions.assertDoesNotThrow(
wakeup::apply,
() -> "Creates %s without any exceptions thrown".formatted(wakeup)
);
}
}
95 changes: 60 additions & 35 deletions src/test/java/io/github/eocqrs/kafka/consumer/KfConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

package io.github.eocqrs.kafka.consumer;

import com.jcabi.xml.XMLDocument;
import io.github.eocqrs.kafka.Consumer;
import io.github.eocqrs.kafka.ConsumerSettings;
import io.github.eocqrs.kafka.Params;
import io.github.eocqrs.kafka.consumer.settings.KfConsumerParams;
import io.github.eocqrs.kafka.parameters.BootstrapServers;
import io.github.eocqrs.kafka.parameters.GroupId;
Expand All @@ -39,6 +41,7 @@
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.cactoos.io.ResourceOf;
import org.cactoos.list.ListOf;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -47,6 +50,7 @@
import org.mockito.junit.jupiter.MockitoExtension;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

/**
Expand All @@ -64,23 +68,27 @@ void subscribesWithoutException(
) {
Mockito.when(settings.consumer()).thenReturn(consumer);
final Consumer<String, String> underTest = new KfConsumer<>(settings);
assertDoesNotThrow(
() -> {
underTest.subscribe(new ListOf<>("transactions-info"));
underTest.subscribe("transactions-info");
underTest.subscribe(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
}
assertAll(
() -> assertDoesNotThrow(
() -> {
underTest.subscribe(new ListOf<>("transactions-info"));
underTest.subscribe("transactions-info");
underTest.subscribe(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
}
}, "transactions-info");
}
);
assertDoesNotThrow(
underTest::close
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
}
}, "transactions-info");
},
"Should create an %s and subscribe without exceptions".formatted(underTest)
),
() -> assertDoesNotThrow(
underTest::close,
() -> "Should close %s without an exception".formatted(underTest)
)
);
}

Expand All @@ -90,12 +98,12 @@ void recordsPollingDoesntThrowException(
@Mock final KafkaConsumer<String, String> origin
) {
Mockito.when(settings.consumer()).thenReturn(origin);
final Consumer<String, String> consumer =
new KfConsumer<>(settings);
assertDoesNotThrow(() ->
consumer.records(
"TEST", Duration.ofSeconds(5L)
)
assertDoesNotThrow(
() ->
new KfConsumer<>(settings).records(
"TEST", Duration.ofSeconds(5L)
),
() -> "Should to poll things without exception"
);
}

Expand All @@ -107,35 +115,52 @@ void unsubscribesWithoutException(
Mockito.when(settings.consumer()).thenReturn(origin);
final Consumer<String, String> consumer =
new KfConsumer<>(settings);
assertDoesNotThrow(consumer::unsubscribe);
assertDoesNotThrow(consumer::close);
assertAll(
() -> assertDoesNotThrow(consumer::unsubscribe),
() -> assertDoesNotThrow(consumer::close),
() -> "%s subscribes and unsubscribes without exceptions".formatted(consumer)
);
}

@Test
void constructsConsumerWithXML() throws Exception {
final String xml = "consumer.xml";
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfXmlFlexible<String, String>("consumer.xml")
new KfXmlFlexible<String, String>(xml)
.consumer()
);
assertThat(consumer).isNotNull();
assertThat(consumer).isNotNull()
.describedAs(
"%s should be created from %s without exceptions".formatted(
consumer,
new XMLDocument(
new ResourceOf(xml).stream()
)
)
);
}

@Test
void constructsConsumerWithParams() {
final Params params = new KfParams(
new BootstrapServers("localhost:9092"),
new GroupId("1"),
new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
);
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfFlexible<>(
new KfConsumerParams(
new KfParams(
new BootstrapServers("localhost:9092"),
new GroupId("1"),
new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
)
)
new KfConsumerParams(params)
)
);
assertThat(consumer).isNotNull()
.describedAs(
"Consumer %s created from %s without exceptions".formatted(
consumer,
params.asXml()
)
);
assertThat(consumer).isNotNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ final class KfConsumerParamsTest {
@Test
void representsXmlCorrectly() {
final Params mock = Mockito.mock(Params.class);
Mockito.when(mock.asXml()).thenReturn("<groupId>103</groupId>");
final String group = "<groupId>103</groupId>";
Mockito.when(mock.asXml()).thenReturn(group);
MatcherAssert.assertThat(
"Represents right XML settings",
"Should return %s inside a consumer tag".formatted(group),
new KfConsumerParams(mock).asXml(),
Matchers.equalTo("<consumer>\n<groupId>103</groupId>\n</consumer>\n")
);
Expand Down
Loading

0 comments on commit 220a244

Please sign in to comment.