Skip to content

Commit

Permalink
Merge pull request #151 from codefeedr/elastic_update
Browse files Browse the repository at this point in the history
Update ElasticSearch to version 1.6
  • Loading branch information
wzorgdrager committed Feb 1, 2019
2 parents d5d4054 + 405a7dd commit e05209c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ lazy val dependencies =
val flinkStreaming = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided
val flinkKafka = "org.apache.flink" %% "flink-connector-kafka-0.11" % flinkVersion
val flinkRuntimeWeb = "org.apache.flink" %% "flink-runtime-web" % flinkVersion % Provided
val flinkElasticSearch = "org.apache.flink" %% "flink-connector-elasticsearch5" % flinkVersion
val flinkElasticSearch = "org.apache.flink" %% "flink-connector-elasticsearch6" % flinkVersion
val flinkRabbitMQ = "org.apache.flink" %% "flink-connector-rabbitmq" % flinkVersion

val redis = "net.debasishg" %% "redisclient" % "3.6"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,23 @@ package org.codefeedr.plugins.elasticsearch.stages

import java.net.{InetAddress, InetSocketAddress, URI}
import java.nio.charset.StandardCharsets
import java.util

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.runtime.rest.RestClient
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import org.apache.flink.streaming.connectors.elasticsearch6.{ElasticsearchSink, RestClientFactory}
import org.apache.http.HttpHost
import org.apache.logging.log4j.scala.Logging
import org.codefeedr.stages.{OutputStage, StageAttributes}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.elasticsearch.client.{Requests, RestClientBuilder}
import org.elasticsearch.common.xcontent.XContentType
import org.json4s.NoTypeHints
import org.json4s.ext.JavaTimeSerializers
import org.json4s.jackson.Serialization
import collection.JavaConversions._

import scala.reflect.{ClassTag, Manifest}

Expand All @@ -50,19 +55,19 @@ class ElasticSearchOutput[T <: Serializable with AnyRef : ClassTag : Manifest](i
attributes: StageAttributes = StageAttributes())
extends OutputStage[T](attributes) with Logging {

//TODO Add configuration support
override def main(source: DataStream[T]): Unit = {
val config = createConfig()
val transportAddresses = createTransportAddresses()

source.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticSearchSink[T](index)))
val eSinkBuilder = new ElasticsearchSink.Builder[T](transportAddresses, new ElasticSearchSink(index))

eSinkBuilder.setBulkFlushMaxActions(1)
source.addSink(eSinkBuilder.build())
}

def createConfig(): java.util.HashMap[String, String] = {
val config = new java.util.HashMap[String, String]

// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")

config
}

Expand All @@ -71,20 +76,20 @@ class ElasticSearchOutput[T <: Serializable with AnyRef : ClassTag : Manifest](i
*
* @return List
*/
def createTransportAddresses(): java.util.ArrayList[InetSocketAddress] = {
val transportAddresses = new java.util.ArrayList[InetSocketAddress]
def createTransportAddresses(): java.util.ArrayList[HttpHost] = {
val transportAddresses = new java.util.ArrayList[HttpHost]

if (servers.isEmpty) {
logger.info("Transport address set is empty. Using localhost with default port 9300.")
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
transportAddresses.add(new HttpHost("localhost", 9300, "http"))
}

for (server <- servers) {
val uri = new URI(server)

if (uri.getScheme == "es") {
logger.info(s"Adding transport address $server")
transportAddresses.add(new InetSocketAddress(InetAddress.getByName(uri.getHost), uri.getPort))
transportAddresses.add(new HttpHost(uri.getHost, uri.getPort, "http"))
}
}

Expand All @@ -109,7 +114,7 @@ private class ElasticSearchSink[T <: Serializable with AnyRef : ClassTag : Manif
Requests.indexRequest()
.index(index)
.`type`("json")
.source(bytes)
.source(bytes, XContentType.JSON)
}

override def process(element: T, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
Expand All @@ -124,6 +129,7 @@ private class ElasticSearchSink[T <: Serializable with AnyRef : ClassTag : Manif
*/
def serialize(element: T): Array[Byte] = {
val bytes = Serialization.write[T](element)(formats)

bytes.getBytes(StandardCharsets.UTF_8)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,11 @@ class ElasticSearchOutputTest extends FunSuite {
assert(addresses.size() == 1)
}

test("ElasticSearchSinkFunction should properly deserialize") {
val sink = new ElasticSearchSink[StringType](index)

assert(sink.serialize(StringType("test")).isInstanceOf[Array[Byte]])
}


}

0 comments on commit e05209c

Please sign in to comment.