Skip to content

Commit

Permalink
action poc
Browse files Browse the repository at this point in the history
  • Loading branch information
h1alexbel committed Jun 2, 2023
1 parent 42f194c commit ecb80d4
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/main/java/io/github/eocqrs/kafka/ext/Action.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.github.eocqrs.kafka.ext;

/**
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.2.5
*/
public interface Action {

void apply();
}
30 changes: 30 additions & 0 deletions src/main/java/io/github/eocqrs/kafka/ext/AssignPartitions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.github.eocqrs.kafka.ext;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.cactoos.list.ListOf;

import java.util.Collection;

/**
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.2.5
*/
public final class AssignPartitions implements Action {

private final KafkaConsumer<?, ?> consumer;
private final Collection<TopicPartition> partitions;

public AssignPartitions(
final KafkaConsumer<?, ?> cnsmr,
TopicPartition... prts
) {
this.consumer = cnsmr;
this.partitions = new ListOf<>(prts);
}

@Override
public void apply() {
this.consumer.assign(this.partitions);
}
}
19 changes: 19 additions & 0 deletions src/main/java/io/github/eocqrs/kafka/ext/CommitAsync.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.github.eocqrs.kafka.ext;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.2.5
*/
@RequiredArgsConstructor
public final class CommitAsync implements Action {

private final KafkaConsumer<?, ?> consumer;

@Override
public void apply() {
this.consumer.commitAsync();
}
}
19 changes: 19 additions & 0 deletions src/main/java/io/github/eocqrs/kafka/ext/CommitSync.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.github.eocqrs.kafka.ext;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.2.5
*/
@RequiredArgsConstructor
public final class CommitSync implements Action {

private final KafkaConsumer<?, ?> consumer;

@Override
public void apply() {
this.consumer.commitSync();
}
}
23 changes: 23 additions & 0 deletions src/main/java/io/github/eocqrs/kafka/ext/ShutdownHook.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.github.eocqrs.kafka.ext;

import lombok.RequiredArgsConstructor;

/**
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.2.5
*/
@RequiredArgsConstructor
public final class ShutdownHook implements Action {

/**
* Thread.
*/
private final Thread thread;

@Override
public void apply() {
Runtime.getRuntime().addShutdownHook(
this.thread
);
}
}
19 changes: 19 additions & 0 deletions src/main/java/io/github/eocqrs/kafka/ext/Wakeup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.github.eocqrs.kafka.ext;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.2.5
*/
@RequiredArgsConstructor
public final class Wakeup implements Action {

private final KafkaConsumer<?, ?> consumer;

@Override
public void apply() {
this.consumer.wakeup();
}
}

0 comments on commit ecb80d4

Please sign in to comment.