Skip to content

Commit

Permalink
Merge branch '__rultor'
Browse files Browse the repository at this point in the history
  • Loading branch information
rultor committed Jun 22, 2023
2 parents 07066da + fd4c990 commit d565454
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 10 deletions.
3 changes: 2 additions & 1 deletion src/main/java/io/github/eocqrs/kafka/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public interface Consumer<K, X> extends Closeable {

/**
* Unsubscribe.
* @throws Exception When something went wrong.
*/
void unsubscribe();
void unsubscribe() throws Exception;
}
16 changes: 11 additions & 5 deletions src/main/java/io/github/eocqrs/kafka/fake/FkConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,18 @@ public ConsumerRecords<K, X> records(
throw new UnsupportedOperationException("#records()");
}

/*
* @todo #54:60m/DEV Fake unsubscribe is not implemented
*/
@Override
public void unsubscribe() {
throw new UnsupportedOperationException("#unsubscribe()");
public void unsubscribe() throws Exception {
while (
!this.broker.data(
"broker/subs/sub[consumer = '%s']/consumer/text()"
.formatted(
this.id
)
).isEmpty()
) {
this.broker.with(new UnsubscribeDirs(this.id).value());
}
}

@Override
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/io/github/eocqrs/kafka/fake/UnsubscribeDirs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2023 Aliaksei Bialiauski, EO-CQRS
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.github.eocqrs.kafka.fake;

import org.cactoos.Scalar;
import org.xembly.Directives;

import java.util.UUID;

/**
* Unsubscribe Directives.
*
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.3.5
*/
public final class UnsubscribeDirs implements Scalar<Directives> {

/**
* Consumer ID.
*/
private final UUID consumer;

/**
* Ctor.
*
* @param cnsmr Consumer ID
*/
public UnsubscribeDirs(final UUID cnsmr) {
this.consumer = cnsmr;
}

@Override
public Directives value() throws Exception {
return new Directives()
.xpath("broker/subs/sub[consumer = '%s']/consumer"
.formatted(
this.consumer
)
)
.remove()
.remove();
}
}
86 changes: 82 additions & 4 deletions src/test/java/io/github/eocqrs/kafka/fake/FkConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.logging.Level;

Expand Down Expand Up @@ -241,6 +242,87 @@ public void onPartitionsAssigned(final Collection<TopicPartition> collection) {
);
}

@Test
void unsubscribes() throws Exception {
final String topic = "unsubscribe.test";
final UUID uuid = UUID.randomUUID();
final Consumer<String, String> consumer =
new FkConsumer<>(
uuid,
this.broker
);
consumer.subscribe(topic);
consumer.unsubscribe();
MatcherAssert.assertThat(
"Consumer ID in subscription is not present",
this.broker.data(
"broker/subs/sub[consumer = '%s']/consumer/text()"
.formatted(
uuid
)
)
.isEmpty(),
Matchers.equalTo(true)
);
MatcherAssert.assertThat(
"Topic in subscription is not present",
this.broker.data(
"broker/subs/sub[topic = '%s']/topic/text()"
.formatted(
topic
)
)
.isEmpty(),
Matchers.equalTo(true)
);
consumer.close();
}

@Test
void unsubscribesWithSecondConsumerExisting() throws Exception {
final String topic = "unsubscribes.with.second.consumer.existing";
final UUID firstID = UUID.fromString("f3000fb7-b9fb-42d0-8210-f09a58c44a1f");
final UUID secondID = UUID.fromString("69a4cd5a-afdb-456c-9ade-658569f52d7b");
final Consumer<String, String> first =
new FkConsumer<>(
firstID,
this.broker
);
final Consumer<String, String> second =
new FkConsumer<>(
secondID,
this.broker
);
first.subscribe(topic);
second.subscribe(topic);
first.unsubscribe();
MatcherAssert.assertThat(
"No such subscription with first Consumer ID",
this.broker.data(
"broker/subs/sub[topic = '%s' and consumer = '%s']/topic/text()"
.formatted(
topic,
firstID
)
).isEmpty(),
Matchers.equalTo(true)
);
MatcherAssert.assertThat(
"Topic with subscription exists with second Consumer ID",
this.broker.data(
"broker/subs/sub[topic = '%s' and consumer = '%s']/topic/text()"
.formatted(
topic,
secondID
)
)
.isEmpty(),
Matchers.equalTo(false)
);
first.close();
second.close();
}

@Test
void createsFakeConsumer() {
final FkConsumer<String, String> consumer =
Expand All @@ -254,10 +336,6 @@ void createsFakeConsumer() {
UnsupportedOperationException.class,
() -> consumer.records("123", Duration.ofMillis(100L))
);
assertThrows(
UnsupportedOperationException.class,
consumer::unsubscribe
);
}

@Test
Expand Down
53 changes: 53 additions & 0 deletions src/test/java/io/github/eocqrs/kafka/fake/UnsubscribeDirsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2023 Aliaksei Bialiauski, EO-CQRS
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.github.eocqrs.kafka.fake;

import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import java.util.UUID;

/**
* Test case for {@link UnsubscribeDirs}.
*
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.3.5
*/
final class UnsubscribeDirsTest {

@Test
void dirsInRightFormat() throws Exception {
final UUID uuid = UUID.fromString("1ce12119-7ccc-46fc-a993-36d5b815a7b5");
final String directives = "XPATH \"broker/subs/sub"
+ "[consumer = &apos;1ce12119-7ccc-46fc-a993-36d5b815a7b5&apos;]/consumer\";"
+ "\n1:REMOVE;REMOVE;";
MatcherAssert.assertThat(
"Directives in right format",
new UnsubscribeDirs(uuid)
.value()
.toString(),
Matchers.equalTo(directives)
);
}
}

1 comment on commit d565454

@0pdd
Copy link
Collaborator

@0pdd 0pdd commented on d565454 Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Puzzle 54-fcc45b5d disappeared from src/main/java/io/github/eocqrs/kafka/fake/FkConsumer.java), that's why I closed #297. Please, remember that the puzzle was not necessarily removed in this particular commit. Maybe it happened earlier, but we discovered this fact only now.

Please sign in to comment.