Skip to content

Commit

Permalink
Merge branch '__rultor'
Browse files Browse the repository at this point in the history
  • Loading branch information
rultor committed May 19, 2023
2 parents fb977d8 + 2950802 commit fd7e09e
Show file tree
Hide file tree
Showing 10 changed files with 679 additions and 7 deletions.
4 changes: 2 additions & 2 deletions src/it/producer-consumer-api/src/test/java/EntryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void createsProducerAndSendsData() throws IOException {

@Test
@Order(5)
void createsProducerAndSendsMessage() {
void createsProducerAndSendsMessage() throws Exception {
final Producer<String, String> producer = new KfProducer<>(
new KfFlexible<>(
new KfProducerParams(
Expand Down Expand Up @@ -229,7 +229,7 @@ void createsProducerAndSendsMessage() {

@Test
@Order(4)
void createsProducerWithCallback() throws IOException {
void createsProducerWithCallback() throws Exception {
try (
final Producer<String, String> producer =
new KfCallback<>(
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/github/eocqrs/kafka/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.Closeable;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @todo #287:30m/DEV Producer send is not flexible enough
Expand All @@ -45,6 +46,7 @@ public interface Producer<K, X> extends Closeable {
* @param key message key
* @param data data wrapper to process
* @return Future with RecordMetadata.
* @throws Exception When something went wrong.
*/
Future<RecordMetadata> send(K key, Data<X> data);
Future<RecordMetadata> send(K key, Data<X> data) throws Exception;
}
65 changes: 65 additions & 0 deletions src/main/java/io/github/eocqrs/kafka/fake/FkBroker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2022 Aliaksei Bialiauski, EO-CQRS
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.github.eocqrs.kafka.fake;

import io.github.eocqrs.kafka.Data;

import java.util.Collection;

/**
* Fake Kafka Broker.
*
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.2.3
*/
public interface FkBroker {

/**
* Adds Dataset.
*
* @param key Dataset key
* @param data Data
* @param <X> Data value type
* @return FkBroker
* @throws Exception When something went wrong.
*/
<X> FkBroker withDataset(Object key, Data<X> data)
throws Exception;

/**
* Adds topics.
*
* @param tpcs Topics
* @return FkBroker
*/
FkBroker withTopics(String... tpcs);

/**
* Query data.
*
* @param query Query
* @return Collection of Strings
* @throws Exception When something went wrong.
*/
Collection<String> data(String query) throws Exception;
}
12 changes: 8 additions & 4 deletions src/main/java/io/github/eocqrs/kafka/fake/FkProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@

import io.github.eocqrs.kafka.Data;
import io.github.eocqrs.kafka.Producer;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.concurrent.Future;

/**
* Fake Producer.
*
Expand All @@ -41,15 +42,18 @@ public final class FkProducer<K, X> implements Producer<K, X> {
* @todo #44:60m/DEV Fake send is not implemented
*/
@Override
public Future<RecordMetadata> send(final K key, final Data<X> message) {
throw new UnsupportedOperationException("#send()");
public Future<RecordMetadata> send(
final K key,
final Data<X> message
) throws Exception {
throw new UnsupportedOperationException("#send");
}

/*
* @todo #44:60m/DEV Fake producer close is not implemented
*/
@Override
public void close() {
throw new UnsupportedOperationException("#close()");
throw new UnsupportedOperationException("#close");
}
}
76 changes: 76 additions & 0 deletions src/main/java/io/github/eocqrs/kafka/fake/FkStorage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2022 Aliaksei Bialiauski, EO-CQRS
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.github.eocqrs.kafka.fake;

import com.jcabi.xml.XML;
import org.xembly.Directive;

/**
* Fake Kafka Storage.
*
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.2.3
*/
public interface FkStorage {

/**
* Full XML.
*
* @return XML
* @throws Exception When something went wrong.
*/
XML xml() throws Exception;

/**
* Update XML with new directives.
*
* @param dirs Directives
* @throws Exception When something went wrong.
*/
void apply(Iterable<Directive> dirs) throws Exception;

/**
* Locks storage to the current thread.
*
* <p>If the lock is available, grant it
* to the calling thread and block all operations from other threads.
* If not available, wait for the holder of the lock to release it with
* {@link #unlock()} before any other operations can be performed.
*
* <p>Locking behavior is reentrant, which means a thread can invoke
* multiple times, where a hold count is maintained.
*/
void lock();

/**
* Unlock storage.
*
* <p>Locking behavior is reentrant, thus if the thread invoked
* {@link #lock()} multiple times, the hold count is decremented. If the
* hold count reaches 0, the lock is released.
*
* <p>If the current thread does not hold the lock, an
* {@link IllegalMonitorStateException} will be thrown.
*/
void unlock();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2022 Aliaksei Bialiauski, EO-CQRS
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.github.eocqrs.kafka.fake;

import java.io.Serial;
import java.util.concurrent.locks.ReentrantLock;

/**
* Immutable Lock.
*
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.2.3
*/
final class ImmutableReentrantLock extends ReentrantLock {

/**
* Serialization id.
*/
@Serial
private static final long serialVersionUID = -3003135962114984635L;
}
122 changes: 122 additions & 0 deletions src/main/java/io/github/eocqrs/kafka/fake/InFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright (c) 2022 Aliaksei Bialiauski, EO-CQRS
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.github.eocqrs.kafka.fake;

import com.jcabi.xml.XML;
import com.jcabi.xml.XMLDocument;
import org.cactoos.io.TeeInput;
import org.cactoos.scalar.LengthOf;
import org.cactoos.text.TextOf;
import org.xembly.Directive;
import org.xembly.Xembler;

import java.io.File;
import java.nio.charset.StandardCharsets;

/**
* Storage in file.
*
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.2.3
*/
public final class InFile implements FkStorage {

/**
* File name.
*/
private final transient String name;
/**
* Lock.
*/
private final transient ImmutableReentrantLock lock =
new ImmutableReentrantLock();

/**
* Ctor.
*
* @throws Exception When something went wrong.
*/
public InFile() throws Exception {
this(File.createTempFile("fake-kafka", ".xml"));
new File(this.name).deleteOnExit();
}

/**
* Ctor.
*
* @param file File
* @throws Exception When something went wrong.
*/
public InFile(final File file) throws Exception {
new LengthOf(
new TeeInput(
"<broker/>",
file,
StandardCharsets.UTF_8
)
).value();
this.name = file.getAbsolutePath();
}

@Override
public XML xml() throws Exception {
synchronized (this.name) {
return new XMLDocument(
new TextOf(
new File(
this.name
)
).asString()
);
}
}

@Override
public void apply(final Iterable<Directive> dirs) throws Exception {
synchronized (this.name) {
new LengthOf(
new TeeInput(
new XMLDocument(
new Xembler(
dirs
).applyQuietly(this.xml().node())
).toString(),
new File(
this.name
),
StandardCharsets.UTF_8
)
).value();
}
}

@Override
public void lock() {
this.lock.lock();
}

@Override
public void unlock() {
this.lock.unlock();
}
}
Loading

0 comments on commit fd7e09e

Please sign in to comment.