Skip to content

wypb/flink-elasticsearch2-connector

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

17 Commits
 
 
 
 
 
 

Repository files navigation

flink-elasticsearch2-connector

Flink DataSet ElasticSearchOutputFormat create by https://www.iteblog.com based on org.apache.flink#flink-connector-elasticsearch2_2.10#1.1.2, We can use it in Scala or Java. Through the ElasticSearchOutputFormat, we can save Flink DataSet to elasticsearch.

Usage

Environment

Elasticsearch: 2.x.x

Flink: 1.x.x

Scala: 2.10.x

pom.xml

<dependency>
       <groupId>com.iteblog</groupId>
       <artifactId>flink-elasticsearch2-connector</artifactId>
       <version>1.0.1</version>
</dependency>

Using in Scala

import scala.collection.JavaConversions._
val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> "elasticsearch")
val hosts = "www.iteblog.com"

val transports = hosts.split(",").map(host => new InetSocketAddress(InetAddress.getByName(host), 9300)).toList

val data : DataSet[String] = ....
data.output(new ElasticSearchOutputFormat(config, transports, new ElasticsearchSinkFunction[String] {
      def createIndexRequest(element: String): IndexRequest = {
        Requests.indexRequest.index("iteblog").`type`("info").source(element)
      }

      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
        indexer.add(createIndexRequest(element))
      }
}))

Using in Java

Map<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1000");
config.put("cluster.name", "elasticsearch");

String hosts = "www.iteblog.com";

List<InetSocketAddress> list = Lists.newArrayList();
for (String host : hosts.split(",")) {
    list.add(new InetSocketAddress(InetAddress.getByName(host), 9300));
}

DataSet<String> data  = ....;

data.output(new ElasticSearchOutputFormat<>(config, list, new ElasticsearchSinkFunction<String>() {
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }

    private IndexRequest createIndexRequest(String element) {
        return Requests.indexRequest().index("iteblog").type("info").source(element);
    }
}));

Releases

No releases published

Packages

No packages published

Languages