Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced generic Kafka Connect Source #30

Merged
merged 4 commits into from Dec 25, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 8 additions & 3 deletions build.gradle
Expand Up @@ -26,6 +26,12 @@ subprojects {
repositories {
mavenCentral()
mavenLocal()
maven {
url "https://oss.sonatype.org/content/repositories/snapshots"
mavenContent {
snapshotsOnly()
}
}
}

idea {
Expand All @@ -46,16 +52,15 @@ subprojects {
targetCompatibility = '1.8'

ext {
jetVersion = '3.1'
hazelcastVersion = '3.12.1'
jetVersion = '4.0-SNAPSHOT'
hazelcastVersion = '3.12.4'
}

dependencies {
compile group: 'com.hazelcast.jet', name: 'hazelcast-jet', version: jetVersion
compile group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.2'
testCompile "com.hazelcast.jet:hazelcast-jet-core:" + jetVersion + ":tests"
testCompile "com.hazelcast:hazelcast:" + hazelcastVersion + ":tests"
testCompile "com.hazelcast:hazelcast-client:" + hazelcastVersion + ":tests"
testCompile "javax.cache:cache-api:1.1.0"
testCompile 'log4j:log4j:1.2.17'
testCompile 'junit:junit:4.11'
Expand Down
8 changes: 4 additions & 4 deletions elasticsearch/elasticsearch-5/README.md
Expand Up @@ -37,7 +37,7 @@ results:
```java
Pipeline p = Pipeline.create();

p.drawFrom(ElasticsearchSources.elasticsearch("sourceName",
p.readFrom(ElasticsearchSources.elasticsearch("sourceName",
() -> RestClient.builder(HttpHost.create("hostAddress")).build(),
() -> {
SearchRequest searchRequest = new SearchRequest("users");
Expand All @@ -49,7 +49,7 @@ p.drawFrom(ElasticsearchSources.elasticsearch("sourceName",
"10s",
SearchHit::getSourceAsString,
RestClient::close))
.drainTo(Sinks.logger());
.writeTo(Sinks.logger());
```

#### As a Sink
Expand All @@ -62,8 +62,8 @@ List and indexes them to Elasticsearch:

```java
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(users))
.drainTo(ElasticsearchSinks.elasticsearch("sinkName",
p.readFrom(Sources.list(users))
.writeTo(ElasticsearchSinks.elasticsearch("sinkName",
() -> RestClient.builder(HttpHost.create("hostAddress")).build(),
BulkRequest::new,
user -> {
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkBuilder;
import org.apache.http.HttpHost;
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.SourceBuilder.SourceBuffer;
Expand Down
Expand Up @@ -16,10 +16,10 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.IListJet;
import com.hazelcast.collection.IList;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.function.FunctionEx;
import org.apache.http.HttpHost;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
Expand Down Expand Up @@ -55,7 +55,7 @@ public abstract class ElasticsearchBaseTest extends JetTestSupport {
.withNetwork(Network.newNetwork());

JetInstance jet;
IListJet<User> userList;
IList<User> userList;
String indexName = "users";

private RestClient client;
Expand Down
Expand Up @@ -27,8 +27,8 @@ public class ElasticsearchSinkTest extends ElasticsearchBaseTest {
@Test
public void test_elasticsearchSink() throws IOException {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(userList))
.drainTo(ElasticsearchSinks.elasticsearch(indexName, DEFAULT_USER, DEFAULT_PASS,
p.readFrom(Sources.list(userList))
.writeTo(ElasticsearchSinks.elasticsearch(indexName, DEFAULT_USER, DEFAULT_PASS,
container.getContainerIpAddress(), mappedPort(), indexFn(indexName)));

jet.newJob(p).join();
Expand Down
Expand Up @@ -16,8 +16,8 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.IListJet;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.collection.IList;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
Expand All @@ -43,8 +43,8 @@ public void test() throws IOException {
SupplierEx<RestClient> clientSupplier = () -> createClient(containerIpAddress, port);

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(userList))
.drainTo(ElasticsearchSinks.elasticsearch(indexName, clientSupplier,
p.readFrom(Sources.list(userList))
.writeTo(ElasticsearchSinks.elasticsearch(indexName, clientSupplier,
() -> new BulkRequest().setRefreshPolicy(IMMEDIATE), indexFn(indexName), RestClient::close));

jet.newJob(p).join();
Expand All @@ -53,19 +53,19 @@ public void test() throws IOException {

p = Pipeline.create();

p.drawFrom(ElasticsearchSources.elasticsearch("users", clientSupplier,
p.readFrom(ElasticsearchSources.elasticsearch("users", clientSupplier,
() -> {
SearchRequest searchRequest = new SearchRequest("users");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(termQuery("age", 8));
searchRequest.source(searchSourceBuilder);
return searchRequest;
}))
.drainTo(Sinks.list("sink"));
.writeTo(Sinks.list("sink"));

jet.newJob(p).join();

IListJet<Object> sink = jet.getList("sink");
IList<Object> sink = jet.getList("sink");
assertEquals(1, sink.size());
}

Expand Down
8 changes: 4 additions & 4 deletions elasticsearch/elasticsearch-6/README.md
Expand Up @@ -37,7 +37,7 @@ results:
```java
Pipeline p = Pipeline.create();

p.drawFrom(ElasticsearchSources.elasticsearch("sourceName",
p.readFrom(ElasticsearchSources.elasticsearch("sourceName",
() -> new RestHighLevelClient(RestClient.builder(HttpHost.create(hostAddress))),
() -> {
SearchRequest searchRequest = new SearchRequest("users");
Expand All @@ -49,7 +49,7 @@ p.drawFrom(ElasticsearchSources.elasticsearch("sourceName",
"10s",
SearchHit::getSourceAsString,
RestHighLevelClient::close))
.drainTo(Sinks.logger());
.writeTo(Sinks.logger());
```

#### As a Sink
Expand All @@ -62,8 +62,8 @@ List and indexes them to Elasticsearch.

```java
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(users))
.drainTo(ElasticsearchSinks.elasticsearch("sinkName",
p.readFrom(Sources.list(users))
.writeTo(ElasticsearchSinks.elasticsearch("sinkName",
() -> new RestHighLevelClient(RestClient.builder(HttpHost.create(hostAddress))),
BulkRequest::new,
user -> {
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkBuilder;
import org.apache.http.HttpHost;
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.SourceBuilder.SourceBuffer;
Expand Down
Expand Up @@ -16,10 +16,10 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.IListJet;
import com.hazelcast.collection.IList;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.function.FunctionEx;
import org.apache.http.HttpHost;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -48,7 +48,7 @@ public abstract class ElasticsearchBaseTest extends JetTestSupport {
.withNetwork(Network.newNetwork());

JetInstance jet;
IListJet<User> userList;
IList<User> userList;
String indexName = "users";
private RestHighLevelClient client;

Expand Down
Expand Up @@ -29,8 +29,8 @@ public void test_elasticsearchSink() throws IOException {
String containerAddress = container.getHttpHostAddress();

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(userList))
.drainTo(ElasticsearchSinks.elasticsearch(indexName, () -> createClient(containerAddress), indexFn(indexName)));
p.readFrom(Sources.list(userList))
.writeTo(ElasticsearchSinks.elasticsearch(indexName, () -> createClient(containerAddress), indexFn(indexName)));

jet.newJob(p).join();

Expand Down
Expand Up @@ -16,7 +16,7 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.IListJet;
import com.hazelcast.collection.IList;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
Expand All @@ -39,8 +39,8 @@ public void test() throws IOException {
String containerAddress = container.getHttpHostAddress();

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(userList))
.drainTo(ElasticsearchSinks.elasticsearch(indexName, () -> createClient(containerAddress),
p.readFrom(Sources.list(userList))
.writeTo(ElasticsearchSinks.elasticsearch(indexName, () -> createClient(containerAddress),
() -> new BulkRequest().setRefreshPolicy(IMMEDIATE), indexFn(indexName), RestHighLevelClient::close));

jet.newJob(p).join();
Expand All @@ -49,19 +49,19 @@ public void test() throws IOException {

p = Pipeline.create();

p.drawFrom(ElasticsearchSources.elasticsearch("users", () -> createClient(containerAddress),
p.readFrom(ElasticsearchSources.elasticsearch("users", () -> createClient(containerAddress),
() -> {
SearchRequest searchRequest = new SearchRequest("users");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(termQuery("age", 8));
searchRequest.source(searchSourceBuilder);
return searchRequest;
}))
.drainTo(Sinks.list("sink"));
.writeTo(Sinks.list("sink"));

jet.newJob(p).join();

IListJet<Object> sink = jet.getList("sink");
IList<Object> sink = jet.getList("sink");
assertEquals(1, sink.size());
}

Expand Down
8 changes: 4 additions & 4 deletions elasticsearch/elasticsearch-7/README.md
Expand Up @@ -37,7 +37,7 @@ results:
```java
Pipeline p = Pipeline.create();

p.drawFrom(ElasticsearchSources.elasticsearch("sourceName",
p.readFrom(ElasticsearchSources.elasticsearch("sourceName",
() -> new RestHighLevelClient(RestClient.builder(HttpHost.create(hostAddress))),
() -> {
SearchRequest searchRequest = new SearchRequest("users");
Expand All @@ -50,7 +50,7 @@ p.drawFrom(ElasticsearchSources.elasticsearch("sourceName",
SearchHit::getSourceAsString,
request -> RequestOptions.DEFAULT,
RestHighLevelClient::close))
.drainTo(Sinks.logger());
.writeTo(Sinks.logger());
```

#### As a Sink
Expand All @@ -63,8 +63,8 @@ List and indexes them to Elasticsearch.

```java
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(users))
.drainTo(ElasticsearchSinks.elasticsearch("sinkName",
p.readFrom(Sources.list(users))
.writeTo(ElasticsearchSinks.elasticsearch("sinkName",
() -> new RestHighLevelClient(RestClient.builder(HttpHost.create(hostAddress))),
BulkRequest::new,
user -> {
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkBuilder;
import org.apache.http.HttpHost;
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.SourceBuilder.SourceBuffer;
Expand Down
Expand Up @@ -16,10 +16,10 @@

package com.hazelcast.jet.contrib.elasticsearch;

import com.hazelcast.jet.IListJet;
import com.hazelcast.collection.IList;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.function.FunctionEx;
import org.apache.http.HttpHost;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest;
Expand Down Expand Up @@ -52,7 +52,7 @@ public abstract class ElasticsearchBaseTest extends JetTestSupport {
new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.1.0")
.withNetwork(Network.newNetwork());
JetInstance jet;
IListJet<User> userList;
IList<User> userList;
String indexName = "users";
private RestHighLevelClient client;

Expand Down
Expand Up @@ -31,8 +31,8 @@ public void test_elasticsearchSink() throws IOException {
String containerAddress = container.getHttpHostAddress();

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(userList))
.drainTo(elasticsearch(indexName, () -> createClient(containerAddress), indexFn(indexName)));
p.readFrom(Sources.list(userList))
.writeTo(elasticsearch(indexName, () -> createClient(containerAddress), indexFn(indexName)));

jet.newJob(p).join();

Expand Down