Skip to content

Commit

Permalink
java17 support
Browse files Browse the repository at this point in the history
  • Loading branch information
steveblackmon committed Dec 6, 2023
1 parent c7164c8 commit 51528c1
Show file tree
Hide file tree
Showing 42 changed files with 329 additions and 252 deletions.
251 changes: 193 additions & 58 deletions pom.xml

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions src/site/markdown/dependency-info.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ Browse the "Project Modules" index of streams-project to find artifacts you migh

<pre class="prettyprint">&lt;dependency&gt;
&lt;groupId&gt;org.apache.streams&lt;/groupId&gt;
&lt;artifactId&gt;streams-master&lt;/artifactId&gt;
&lt;version&gt;0.5.1&lt;/version&gt;
&lt;artifactId&gt;streams-core&lt;/artifactId&gt;
&lt;version&gt;0.6.1&lt;/version&gt;
&lt;type&gt;pom&lt;/type&gt;
&lt;/dependency&gt;</pre>

Expand All @@ -35,7 +35,7 @@ Browse the "Project Modules" index of streams-project to find artifacts you migh

<div class="source">

<pre class="prettyprint">'org.apache.streams:apache-streams:pom:0.5.1'</pre>
<pre class="prettyprint">'org.apache.streams:streams-core:jar:0.6.1'</pre>

</div>

Expand All @@ -47,8 +47,8 @@ Browse the "Project Modules" index of streams-project to find artifacts you migh

<div class="source">

<pre class="prettyprint">&lt;dependency org=&quot;org.apache.streams&quot; name=&quot;streams-master&quot; rev=&quot;0.5.1&quot;&gt;
&lt;artifact name=&quot;streams-master&quot; type=&quot;pom&quot; /&gt;
<pre class="prettyprint">&lt;dependency org=&quot;org.apache.streams&quot; name=&quot;streams-core&quot; rev=&quot;0.6.1&quot;&gt;
&lt;artifact name=&quot;streams-core&quot; type=&quot;jar&quot; /&gt;
&lt;/dependency&gt;</pre>

</div>
Expand All @@ -60,7 +60,7 @@ Browse the "Project Modules" index of streams-project to find artifacts you migh
<h3><a name="Groovy_Grape"></a>Groovy Grape</h3><a name="Groovy_Grape"></a>

<div class="source"><pre class="prettyprint">@Grapes(
@Grab(group='org.apache.streams', module='streams-master', version='0.5.1')
@Grab(group='org.apache.streams', module='streams-core', version='0.6.1')
)</pre>

</div>
Expand All @@ -71,7 +71,7 @@ Browse the "Project Modules" index of streams-project to find artifacts you migh

<h3><a name="Gradle"></a>Gradle</h3><a name="Gradle"></a>

<div class="source"><pre class="prettyprint">compile 'org.apache.streams:streams-master:0.5.1'</pre>
<div class="source"><pre class="prettyprint">compile 'org.apache.streams:streams-core:0.6.1'</pre>

</div>

Expand All @@ -82,7 +82,7 @@ Browse the "Project Modules" index of streams-project to find artifacts you migh

<div class="source">

<pre class="prettyprint">[org.apache.streams/streams-master &quot;0.5.1&quot;]</pre>
<pre class="prettyprint">[org.apache.streams/streams-core &quot;0.6.1&quot;]</pre>

</div>

Expand All @@ -94,7 +94,7 @@ Browse the "Project Modules" index of streams-project to find artifacts you migh

<div class="source">

<pre class="prettyprint">libraryDependencies += &quot;org.apache.streams&quot; % &quot;streams-master&quot; % &quot;0.5.1&quot;</pre>
<pre class="prettyprint">libraryDependencies += &quot;org.apache.streams&quot; % &quot;streams-core&quot; % &quot;0.6.1&quot;</pre>

</div>

Expand Down
12 changes: 12 additions & 0 deletions src/site/markdown/versions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
##Versions Supported

Apache Streams should be able to be built and used with
Java 1.8, Java 11, and Java 17.

You'll need to activate only one of the following profiles:

- java-8
- java-11
- java-17

The default active profile is java-17.
5 changes: 3 additions & 2 deletions src/site/site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,12 @@
<menu name="Developers">
<item name="Install Git" href="install/git.html"/>
<item name="Install Maven" href="install/maven.html"/>
<item name="Changelog" href="changelog.html" />
<item name="Versions Supported" href="versions.html"/>
<item name="Release Setup" href="release-setup.html" />
<item name="Release Process" href="release.html" />
<item name="Website Management" href="website.html" />
<item name="Coding Conventions" href="code-conventions.html"/>
<item name="Changelog" href="changelog.html" />
</menu>
<!--<menu name="Details">-->
<!--<item name="Mailing Lists" href="mail-lists.html" />-->
Expand All @@ -123,4 +124,4 @@
<item name="Thanks" href="http://www.apache.org/foundation/thanks.html" />
</menu>
</body>
</project>
</project>
4 changes: 2 additions & 2 deletions streams-components/streams-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<artifactId>mockito-inline</artifactId>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<artifactId>powermock-api-mockito2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
Expand All @@ -39,14 +40,15 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

import static org.mockito.Matchers.any;
import static org.mockito.ArgumentMatchers.any;

/**
* Test for
* @see org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter
*/
@Ignore
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("jdk.internal.reflect.*")
@PowerMockIgnore({"javax.management.*","jdk.internal.reflect.*"})
@PrepareForTest({HttpClients.class, CloseableHttpResponse.class, CloseableHttpResponse.class})
public class SimpleHTTPPostPersistWriterTest {

Expand Down
5 changes: 3 additions & 2 deletions streams-config/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,16 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<artifactId>mockito-inline</artifactId>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
Expand Down
7 changes: 4 additions & 3 deletions streams-contrib/streams-persist-console/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<artifactId>mockito-inline</artifactId>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
Expand All @@ -82,4 +83,4 @@
</testResources>
</build>

</project>
</project>
7 changes: 4 additions & 3 deletions streams-contrib/streams-persist-filebuffer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<artifactId>mockito-inline</artifactId>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
Expand Down Expand Up @@ -138,4 +139,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
6 changes: 3 additions & 3 deletions streams-contrib/streams-persist-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
<description>HBase Module</description>

<properties>
<hadoop-common.version>2.7.0</hadoop-common.version>
<hbase.version>1.2.3</hbase.version>
<hadoop-common.version>3.2.4</hadoop-common.version>
<hbase.version>1.7.2</hbase.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -222,4 +222,4 @@

</profile>
</profiles>
</project>
</project>
2 changes: 1 addition & 1 deletion streams-contrib/streams-persist-hdfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<description>HDFS Module</description>

<properties>
<hdfs.version>2.7.0</hdfs.version>
<hdfs.version>3.2.4</hdfs.version>
</properties>

<dependencies>
Expand Down
4 changes: 2 additions & 2 deletions streams-contrib/streams-persist-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
<description>Kafka Module</description>

<properties>
<scala.binary.version>2.11</scala.binary.version>
<kafka.version>1.0.0</kafka.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.6.0</kafka.version>
<clojure.version>1.2.0</clojure.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@

package org.apache.streams.kafka;

import com.google.common.collect.Lists;
import org.apache.commons.collections.ListUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;

import com.fasterxml.jackson.databind.ObjectMapper;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.joda.time.DateTime;
Expand All @@ -38,6 +39,7 @@

import java.io.Serializable;
import java.math.BigInteger;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Queue;
Expand All @@ -61,9 +63,7 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {

private KafkaReaderConfiguration config;

private ConsumerConnector consumerConnector;

public List<KafkaStream<String, String>> inStreams;
protected Consumer<String, String> consumer;

private ExecutorService executor = Executors.newSingleThreadExecutor();

Expand Down Expand Up @@ -98,18 +98,14 @@ public void startStream() {
Properties props = new Properties();
props.setProperty("serializer.encoding", "UTF8");

ConsumerConfig consumerConfig = new ConsumerConfig(props);

consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
consumer = new KafkaConsumer(props);
List<String> topics = ListUtils.EMPTY_LIST;
topics.add(config.getTopic());

Whitelist topics = new Whitelist(config.getTopic());
consumer.subscribe(topics);
VerifiableProperties vprops = new VerifiableProperties(props);

inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops));

for (final KafkaStream stream : inStreams) {
executor.submit(new KafkaPersistReaderTask(this, stream));
}
executor.submit(new KafkaPersistReaderTask(this));

}

Expand Down Expand Up @@ -155,7 +151,7 @@ public void prepare(Object configurationObject) {

@Override
public void cleanUp() {
consumerConnector.shutdown();
consumer.close();
while ( !executor.isTerminated()) {
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package org.apache.streams.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.streams.core.StreamsDatum;

import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Random;

/**
Expand All @@ -36,22 +37,20 @@ public class KafkaPersistReaderTask implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReaderTask.class);

private KafkaPersistReader reader;
private KafkaStream<String,String> stream;

public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,String> stream) {
public KafkaPersistReaderTask(KafkaPersistReader reader) {
this.reader = reader;
this.stream = stream;
}

@Override
public void run() {

MessageAndMetadata<String,String> item;
ConsumerRecords<String,String> records = reader.consumer.poll(Duration.ofMillis(100));

while (true) {

for (MessageAndMetadata<String, String> aStream : stream) {
item = aStream;
reader.persistQueue.add(new StreamsDatum(item.message()));
for (ConsumerRecord<String, String> record : records) {
reader.persistQueue.add(new StreamsDatum(record.value(), record.key()));
}
try {
Thread.sleep(new Random().nextInt(100));
Expand Down
Loading

0 comments on commit 51528c1

Please sign in to comment.