Skip to content

Commit

Permalink
Introduced generic Kafka Connect Source (#30)
Browse files Browse the repository at this point in the history
* Update to 4.0

* Introduced generic Kafka Connect Source

* compile fix, enhance README

* Address review comments

Co-authored-by: Ali <ali@hazelcast.com>
  • Loading branch information
Emin Demirci and Ali committed Dec 25, 2019
1 parent e87537a commit 01b94e2
Show file tree
Hide file tree
Showing 52 changed files with 710 additions and 246 deletions.
11 changes: 8 additions & 3 deletions build.gradle
Expand Up @@ -26,6 +26,12 @@ subprojects {
repositories {
mavenCentral()
mavenLocal()
maven {
url "https://oss.sonatype.org/content/repositories/snapshots"
mavenContent {
snapshotsOnly()
}
}
}

idea {
Expand All @@ -46,16 +52,15 @@ subprojects {
targetCompatibility = '1.8'

ext {
jetVersion = '3.1'
hazelcastVersion = '3.12.1'
jetVersion = '4.0-SNAPSHOT'
hazelcastVersion = '3.12.4'
}

dependencies {
compile group: 'com.hazelcast.jet', name: 'hazelcast-jet', version: jetVersion
compile group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.2'
testCompile "com.hazelcast.jet:hazelcast-jet-core:" + jetVersion + ":tests"
testCompile "com.hazelcast:hazelcast:" + hazelcastVersion + ":tests"
testCompile "com.hazelcast:hazelcast-client:" + hazelcastVersion + ":tests"
testCompile "javax.cache:cache-api:1.1.0"
testCompile 'log4j:log4j:1.2.17'
testCompile 'junit:junit:4.11'
Expand Down
8 changes: 4 additions & 4 deletions elasticsearch/elasticsearch-5/README.md
Expand Up @@ -37,7 +37,7 @@ results:
```java
Pipeline p = Pipeline.create();

p.drawFrom(ElasticsearchSources.elasticsearch("sourceName",
p.readFrom(ElasticsearchSources.elasticsearch("sourceName",
() -> RestClient.builder(HttpHost.create("hostAddress")).build(),
() -> {
SearchRequest searchRequest = new SearchRequest("users");
Expand All @@ -49,7 +49,7 @@ p.drawFrom(ElasticsearchSources.elasticsearch("sourceName",
"10s",
SearchHit::getSourceAsString,
RestClient::close))
.drainTo(Sinks.logger());
.writeTo(Sinks.logger());
```

#### As a Sink
Expand All @@ -62,8 +62,8 @@ List and indexes them to Elasticsearch:

```java
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(users))
.drainTo(ElasticsearchSinks.elasticsearch("sinkName",
p.readFrom(Sources.list(users))
.writeTo(ElasticsearchSinks.elasticsearch("sinkName",
() -> RestClient.builder(HttpHost.create("hostAddress")).build(),
BulkRequest::new,
user -> {
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkBuilder;
import org.apache.http.HttpHost;
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.SourceBuilder.SourceBuffer;
Expand Down
Expand Up @@ -16,10 +16,10 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.IListJet;
import com.hazelcast.collection.IList;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.function.FunctionEx;
import org.apache.http.HttpHost;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
Expand Down Expand Up @@ -55,7 +55,7 @@ public abstract class ElasticsearchBaseTest extends JetTestSupport {
.withNetwork(Network.newNetwork());

JetInstance jet;
IListJet<User> userList;
IList<User> userList;
String indexName = "users";

private RestClient client;
Expand Down
Expand Up @@ -27,8 +27,8 @@ public class ElasticsearchSinkTest extends ElasticsearchBaseTest {
@Test
public void test_elasticsearchSink() throws IOException {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(userList))
.drainTo(ElasticsearchSinks.elasticsearch(indexName, DEFAULT_USER, DEFAULT_PASS,
p.readFrom(Sources.list(userList))
.writeTo(ElasticsearchSinks.elasticsearch(indexName, DEFAULT_USER, DEFAULT_PASS,
container.getContainerIpAddress(), mappedPort(), indexFn(indexName)));

jet.newJob(p).join();
Expand Down
Expand Up @@ -16,8 +16,8 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.IListJet;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.collection.IList;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
Expand All @@ -43,8 +43,8 @@ public void test() throws IOException {
SupplierEx<RestClient> clientSupplier = () -> createClient(containerIpAddress, port);

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(userList))
.drainTo(ElasticsearchSinks.elasticsearch(indexName, clientSupplier,
p.readFrom(Sources.list(userList))
.writeTo(ElasticsearchSinks.elasticsearch(indexName, clientSupplier,
() -> new BulkRequest().setRefreshPolicy(IMMEDIATE), indexFn(indexName), RestClient::close));

jet.newJob(p).join();
Expand All @@ -53,19 +53,19 @@ public void test() throws IOException {

p = Pipeline.create();

p.drawFrom(ElasticsearchSources.elasticsearch("users", clientSupplier,
p.readFrom(ElasticsearchSources.elasticsearch("users", clientSupplier,
() -> {
SearchRequest searchRequest = new SearchRequest("users");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(termQuery("age", 8));
searchRequest.source(searchSourceBuilder);
return searchRequest;
}))
.drainTo(Sinks.list("sink"));
.writeTo(Sinks.list("sink"));

jet.newJob(p).join();

IListJet<Object> sink = jet.getList("sink");
IList<Object> sink = jet.getList("sink");
assertEquals(1, sink.size());
}

Expand Down
8 changes: 4 additions & 4 deletions elasticsearch/elasticsearch-6/README.md
Expand Up @@ -37,7 +37,7 @@ results:
```java
Pipeline p = Pipeline.create();

p.drawFrom(ElasticsearchSources.elasticsearch("sourceName",
p.readFrom(ElasticsearchSources.elasticsearch("sourceName",
() -> new RestHighLevelClient(RestClient.builder(HttpHost.create(hostAddress))),
() -> {
SearchRequest searchRequest = new SearchRequest("users");
Expand All @@ -49,7 +49,7 @@ p.drawFrom(ElasticsearchSources.elasticsearch("sourceName",
"10s",
SearchHit::getSourceAsString,
RestHighLevelClient::close))
.drainTo(Sinks.logger());
.writeTo(Sinks.logger());
```

#### As a Sink
Expand All @@ -62,8 +62,8 @@ List and indexes them to Elasticsearch.

```java
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(users))
.drainTo(ElasticsearchSinks.elasticsearch("sinkName",
p.readFrom(Sources.list(users))
.writeTo(ElasticsearchSinks.elasticsearch("sinkName",
() -> new RestHighLevelClient(RestClient.builder(HttpHost.create(hostAddress))),
BulkRequest::new,
user -> {
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkBuilder;
import org.apache.http.HttpHost;
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.SourceBuilder.SourceBuffer;
Expand Down
Expand Up @@ -16,10 +16,10 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.IListJet;
import com.hazelcast.collection.IList;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.function.FunctionEx;
import org.apache.http.HttpHost;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -48,7 +48,7 @@ public abstract class ElasticsearchBaseTest extends JetTestSupport {
.withNetwork(Network.newNetwork());

JetInstance jet;
IListJet<User> userList;
IList<User> userList;
String indexName = "users";
private RestHighLevelClient client;

Expand Down
Expand Up @@ -29,8 +29,8 @@ public void test_elasticsearchSink() throws IOException {
String containerAddress = container.getHttpHostAddress();

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(userList))
.drainTo(ElasticsearchSinks.elasticsearch(indexName, () -> createClient(containerAddress), indexFn(indexName)));
p.readFrom(Sources.list(userList))
.writeTo(ElasticsearchSinks.elasticsearch(indexName, () -> createClient(containerAddress), indexFn(indexName)));

jet.newJob(p).join();

Expand Down
Expand Up @@ -16,7 +16,7 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.IListJet;
import com.hazelcast.collection.IList;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
Expand All @@ -39,8 +39,8 @@ public void test() throws IOException {
String containerAddress = container.getHttpHostAddress();

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(userList))
.drainTo(ElasticsearchSinks.elasticsearch(indexName, () -> createClient(containerAddress),
p.readFrom(Sources.list(userList))
.writeTo(ElasticsearchSinks.elasticsearch(indexName, () -> createClient(containerAddress),
() -> new BulkRequest().setRefreshPolicy(IMMEDIATE), indexFn(indexName), RestHighLevelClient::close));

jet.newJob(p).join();
Expand All @@ -49,19 +49,19 @@ public void test() throws IOException {

p = Pipeline.create();

p.drawFrom(ElasticsearchSources.elasticsearch("users", () -> createClient(containerAddress),
p.readFrom(ElasticsearchSources.elasticsearch("users", () -> createClient(containerAddress),
() -> {
SearchRequest searchRequest = new SearchRequest("users");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(termQuery("age", 8));
searchRequest.source(searchSourceBuilder);
return searchRequest;
}))
.drainTo(Sinks.list("sink"));
.writeTo(Sinks.list("sink"));

jet.newJob(p).join();

IListJet<Object> sink = jet.getList("sink");
IList<Object> sink = jet.getList("sink");
assertEquals(1, sink.size());
}

Expand Down
8 changes: 4 additions & 4 deletions elasticsearch/elasticsearch-7/README.md
Expand Up @@ -37,7 +37,7 @@ results:
```java
Pipeline p = Pipeline.create();

p.drawFrom(ElasticsearchSources.elasticsearch("sourceName",
p.readFrom(ElasticsearchSources.elasticsearch("sourceName",
() -> new RestHighLevelClient(RestClient.builder(HttpHost.create(hostAddress))),
() -> {
SearchRequest searchRequest = new SearchRequest("users");
Expand All @@ -50,7 +50,7 @@ p.drawFrom(ElasticsearchSources.elasticsearch("sourceName",
SearchHit::getSourceAsString,
request -> RequestOptions.DEFAULT,
RestHighLevelClient::close))
.drainTo(Sinks.logger());
.writeTo(Sinks.logger());
```

#### As a Sink
Expand All @@ -63,8 +63,8 @@ List and indexes them to Elasticsearch.

```java
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(users))
.drainTo(ElasticsearchSinks.elasticsearch("sinkName",
p.readFrom(Sources.list(users))
.writeTo(ElasticsearchSinks.elasticsearch("sinkName",
() -> new RestHighLevelClient(RestClient.builder(HttpHost.create(hostAddress))),
BulkRequest::new,
user -> {
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkBuilder;
import org.apache.http.HttpHost;
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.SourceBuilder.SourceBuffer;
Expand Down
Expand Up @@ -16,10 +16,10 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.IListJet;
import com.hazelcast.collection.IList;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.function.FunctionEx;
import org.apache.http.HttpHost;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest;
Expand Down Expand Up @@ -52,7 +52,7 @@ public abstract class ElasticsearchBaseTest extends JetTestSupport {
new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.1.0")
.withNetwork(Network.newNetwork());
JetInstance jet;
IListJet<User> userList;
IList<User> userList;
String indexName = "users";
private RestHighLevelClient client;

Expand Down
Expand Up @@ -31,8 +31,8 @@ public void test_elasticsearchSink() throws IOException {
String containerAddress = container.getHttpHostAddress();

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(userList))
.drainTo(elasticsearch(indexName, () -> createClient(containerAddress), indexFn(indexName)));
p.readFrom(Sources.list(userList))
.writeTo(elasticsearch(indexName, () -> createClient(containerAddress), indexFn(indexName)));

jet.newJob(p).join();

Expand Down

0 comments on commit 01b94e2

Please sign in to comment.