Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/maven-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
distribution: 'zulu'

- name: Build with Maven
run: mvn clean -B package -DskipTests=true --file pom.xml
run: mvn clean -B package --file pom.xml

check_sonar_configured:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -71,4 +71,4 @@ jobs:
mvn --update-snapshots verify \
org.sonarsource.scanner.maven:sonar-maven-plugin:sonar \
-Dsonar.projectKey=eclipse-ecsp_streambase -Dsonar.organization=eclipse-ecsp \
-Dcheckstyle.skip -Dpmd.skip=true
-Dcheckstyle.skip -Dpmd.skip=true
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1230,4 +1230,4 @@
</build>
</profile>
</profiles>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@
* wrapper over some other messaging broker. Hence, that launcher class can extend from this abstract class
* and get the common functionality.
*
* @param <K1> the generic type for incoming key
* @param <V1> the generic type for incoming value
* @param <K2> the generic type for outgoing key
* @param <V2> the generic type for outgoing value
* @param <KIn> the generic type for incoming key
* @param <VIn> the generic type for incoming value
* @param <KOut> the generic type for outgoing key
* @param <VOut>> the generic type for outgoing value
*/

public abstract class AbstractLauncher<KIn, VIn, KOut, VOut> implements LauncherProvider {
Expand Down Expand Up @@ -125,7 +125,7 @@ public final void launch(Properties props) {
}

/**
* Method to define how would the streams, for eg. {@link KafkaStreams} be launched.
* Method to launch the application with streaming.
*
* @param props the props
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,12 @@
* {@link org.apache.kafka.streams.KafkaStreams}.
*/
public interface KafkaStateAgentListener {

/**
* Callback method to handle state changes in Kafka Streams.
*
* @param newState the new state of the Kafka Streams instance
* @param oldState the previous state of the Kafka Streams instance
*/
void onChange(final State newState, final State oldState);
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,16 @@
import java.util.Map;

/**
* An implementation of {@link KafkaStateAgentListener} which takes the following actions on {@link KafkaStreams}
* state change.
* <li>
* Restart the {@link BackdoorKafkaConsumer}.
* </li>
* <li>
* If the streams have been in rebalancing state for over 10 mins then restart the KafkaStreams.
* </li>
* An implementation of {@link KafkaStreams.StateListener} and {@link HealthMonitor} that monitors
* the state of {@link KafkaStreams} and takes appropriate actions based on state changes.
*
* <p>Key functionalities include:
* <ul>
* <li>Restarting {@link BackdoorKafkaConsumer} instances when the state changes to RUNNING.</li>
* <li>Monitoring the REBALANCING state and restarting the application if it persists for too long.</li>
* <li>Notifying {@link OffsetManager} to repopulate offsets from MongoDB.</li>
* </ul>
*/

@Component
public class KafkaStateListener implements KafkaStreams.StateListener, HealthMonitor {

Expand Down Expand Up @@ -147,18 +146,20 @@ void setBackdoorConsumers(List<BackdoorKafkaConsumer> backdoorConsumers) {
}

/**
* Following actions are taken when the state of KafkaStreams changes.
* Handles state changes in the {@link KafkaStreams} instance.
*
* <p>This method performs the following actions:
* <ul>
* <li>Notify all the {@link BackdoorKafkaConsumer} that the KafkaStreams state has changed so that
* necessary action could be taken by the Kafka consumers</li>.
* <li>Keep monitoring the state of KafkaStreams and if it remains in the REBALANCING state for
* more than 10 mins, then restart the streams application. </li>
* <li>Notify the {@link OffsetManager} so that offsets could be repopulated from MongoDB. This is
* related to the manual offset management done in the stream-base library. See {@link OffsetManager} for more.
* <li>Notifies all {@link BackdoorKafkaConsumer} instances about the state change.</li>
* <li>Updates the health status based on the new state.</li>
* <li>Monitors the REBALANCING state and starts a thread to validate if it persists too long.</li>
* <li>Notifies {@link OffsetManager} to repopulate offsets when transitioning from REBALANCING to RUNNING.</li>
* <li>Invokes any registered {@link KafkaStateAgentListener} instances when
* transitioning from REBALANCING to RUNNING.</li>
* </ul>
*
* @param newState the new state
* @param oldState the old state
* @param newState the new state of the {@link KafkaStreams}.
* @param oldState the previous state of the {@link KafkaStreams}.
*/
@Override
public void onChange(State newState, State oldState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.eclipse.ecsp.analytics.stream.base.KafkaStreamsProcessorContext.StoreType;
import org.eclipse.ecsp.analytics.stream.base.discovery.StreamProcessorDiscoveryService;
Expand Down Expand Up @@ -506,18 +507,10 @@ private void validateProps(Properties props) {
if (props.getProperty(PropertyNames.APPLICATION_ID) == null) {
throw new IllegalArgumentException(PropertyNames.APPLICATION_ID + " is mandatory");
}
// if (props.getProperty(PropertyNames.AUTO_OFFSET_RESET_CONFIG) ==
// null) {
// throw new
// IllegalArgumentException(PropertyNames.AUTO_OFFSET_RESET_CONFIG + "
// is mandatory");
// }
if (props.getProperty(PropertyNames.BOOTSTRAP_SERVERS) == null) {
props.put(PropertyNames.BOOTSTRAP_SERVERS, "localhost:9092");
}
if (props.get(PropertyNames.ZOOKEEPER_CONNECT) == null) {
props.put(PropertyNames.ZOOKEEPER_CONNECT, "localhost:2181/haa");
}
props.computeIfAbsent(PropertyNames.ZOOKEEPER_CONNECT, k -> "localhost:2181/haa");
String numThreads = null;
if ((numThreads = props.getProperty(PropertyNames.NUM_STREAM_THREADS)) == null) {
props.put(PropertyNames.NUM_STREAM_THREADS, Constants.FOUR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,12 @@ public static void main(String[] args) throws Exception {
}

/**
* Extract all the properties supplied via ".properties" file & config-map and, launch the
* Extract all the properties supplied via properties file and config-map and, launch the
* streams application.
*
* @throws IllegalArgumentException if {@link LauncherProvider} implementation isn't available on the
* classpath.
*/

public void launch() throws IllegalArgumentException {
Thread.setDefaultUncaughtExceptionHandler((Thread t, Throwable e) ->
LOGGER.error("Uncaught exception for thread " + t.getName(), e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,21 @@
* @author ssasidharan
*/
public interface LauncherProvider {

/**
* Launches the stream processing system with the provided properties.
*
* @param props the properties to configure the stream processing system
*/
void launch(Properties props);

/**
* Terminates the stream processing system and releases resources.
*/
void terminate();

//WI-365808 For unit test cases
/**
* Terminates the streams with a timeout for unit test cases.
*/
void terminateStreamsWithTimeout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ public static String getDefaultPropertyValue(String propertyName) {
public static final String KAFKA_HEADERS_ENABLED = "kafka.headers.enabled";

/**
* RDNG 171775 & RTC 503148 Expose RocksDB metrics to Prometheus.
* RDNG 171775 and RTC 503148 Expose RocksDB metrics to Prometheus.
*/
public static final String ROCKSDB_METRICS_ENABLED = "rocksdb.metrics.enabled";

Expand All @@ -916,7 +916,7 @@ public static String getDefaultPropertyValue(String propertyName) {
public static final String ROCKSDB_METRICS_THREAD_FREQUENCY_MS = "rocksdb.metrics.thread.frequency.ms";

/**
* RDNG 171859 & RTC 525171 Report internal cache metrics to Prometheus .
* RDNG 171859 and RTC 525171 Report internal cache metrics to Prometheus .
*/
public static final String INTERNAL_METRICS_ENABLED = "internal.metrics.enabled";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ public interface StreamProcessingContext<K, V> {
*/
public void checkpoint();

/**
* Retrieves the state store by its name.
*
* @param name the name of the state store to retrieve
* @return the KeyValueStore associated with the given name
*/
@SuppressWarnings("rawtypes")
public KeyValueStore getStateStore(String name);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ public interface StreamProcessor<KIn, VIn, KOut, VOut> {

/**
* Perform actions on a schedule dictated by wall clock time.
* One tick is roughly one second. Note that implementations of this method
* should use the StreamsProcessorContext.forwardDirectly() instead of the forward() methods.
* This method is called periodically, with one tick representing roughly one second.
* Implementations should use StreamsProcessorContext.forwardDirectly() instead of forward() methods.
*
* @param ticks The number of ticks since the processor started.
*/
default void punctuateWc(long ticks) {
}
Expand All @@ -110,6 +112,12 @@ default String[] sources() {
*/
void configChanged(Properties props);

/**
* Creates a state store for the processor.
* The state store is used to persist key-value pairs for processing.
*
* @return A new instance of `HarmanPersistentKVStore`.
*/
@SuppressWarnings("rawtypes")
HarmanPersistentKVStore createStateStore();

Expand Down Expand Up @@ -144,6 +152,9 @@ default void initConfig(Properties props) {
* property or master data needs to be read as and when
* needed.
*
* @param key The key of the data to update.
* @param value The value of the data to update.
* @param streamName The name of the stream associated with the update.
*/
default void updateSharedData(Object key, Object value, String streamName) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@
public interface StreamProcessorFilter {

/**
* returns if current stream processor is enabled or not.
* Determines whether the current stream processor should be included in the processor chain.
*
* @return boolean
* @param props the properties to evaluate for inclusion
* @return {@code true} if the processor should be included, {@code false} otherwise
*/
boolean includeInProcessorChain(Properties props);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,11 @@
* @author ssasidharan
*/
public interface TickListener {

/**
* Invoked when a tick event occurs.
*
* @param seconds the number of seconds since the last tick
*/
void tick(long seconds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ public class StreamBaseSpringContext implements ApplicationContextAware {
private static ApplicationContext context;

/**
* Returns the Spring managed bean instance of the given class type if it exists. Returns null otherwise.
* Retrieves a Spring-managed bean of the specified class type.
*
* @param beanClass beanClass
* @param <T> the type of the bean to retrieve
* @param beanClass the class type of the bean to retrieve
* @return the Spring-managed bean instance of the specified type
* @throws BeansException if the bean could not be created or retrieved
*/
public static <T extends Object> T getBean(Class<T> beanClass) {
return context.getBean(beanClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
**/
public interface CacheBackedInMemoryBatchCompleteCallBack {

/**
* Callback method invoked after the completion of processing a batch of records in the in-memory cache.
*
* @param processedRecords A list of objects representing the records that were processed in the batch.
*/
public void batchCompleted(List<Object> processedRecords);

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
*/
public interface SinkNode<K, V> {

/**
* Initialize the sink node with the given properties.
*
* @param prop the properties to initialize the sink node
*/
public void init(Properties prop);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@
*/
public class ConnectionException extends RuntimeException {

private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 943434134L;

/**
* Constructs a new ConnectionException with the specified detail message.
*
* @param msg the detail message explaining the reason for the exception
*/
public ConnectionException(String msg) {
super(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,13 @@ public void put(byte[] key, byte[] messageInBytes, String kafkaTopic, String pri
try {
logger.debug("Sending message to kafka. Topic: {}, Message: {}, key: {}", kafkaTopic, keyString);
java.util.concurrent.Future<org.apache.kafka.clients.producer.RecordMetadata> f = producer
.send(new ProducerRecord<byte[], byte[]>(kafkaTopic, key, messageInBytes), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
logger.error("Exception occurred in "
+ "KafkaProducerByPartition callback for key : {}", keyString, e);
.send(new ProducerRecord<byte[], byte[]>(kafkaTopic, key, messageInBytes),
(metadata, exception) -> {
if (exception != null) {
logger.error("Exception occurred in KafkaProducerByPartition callback "
+ "for key : {}", keyString, exception);
}
}
});
});
testIsSync(f, keyString);
} catch (Exception e) {
logger.error("Unable to send messages on Kafka for key : {} ", keyString, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
/**
* The purpose of this class is to chain the mandatory pre and post processors along with the service processors.
* The pre and post processor classes are pluggable via the following configs exposed by the stream-base
* library: {@link PropertyNames#PRE_PROCESSORS} & {@link PropertyNames#POST_PROCESSORS}.
* library: {@link PropertyNames#PRE_PROCESSORS} and {@link PropertyNames#POST_PROCESSORS}.
*
* <p>
* In between pre and post processors, service integrating the stream-base library can provide its own
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@

package org.eclipse.ecsp.analytics.stream.base.exception;


/**
* Custom runtime exception for errors in {@link BackdoorKafkaConsumer}.
* Custom runtime exception for errors in
* {@link org.eclipse.ecsp.analytics.stream.base.kafka.internal.BackdoorKafkaConsumer}.
*/
public class BackdoorKafkaConsumerException extends RuntimeException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
package org.eclipse.ecsp.analytics.stream.base.exception;

/**
* Custom exception for invalid source topic name coming from the {@link StreamProcessingContext} instance.
* Custom exception for invalid source topic name.
*/
public class InvalidSourceTopicException extends RuntimeException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

/**
* Custom exception for retries exhausted if a function doesn't return successfully.
* Check usage in {@link RetryUtils}
*/
public class MaxRetriesFailedException extends RuntimeException {

Expand Down
Loading
Loading