Skip to content

Commit

Permalink
fix es sink indexing (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Oct 30, 2020
1 parent 8188d7d commit 4f198e5
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
`Flinkrunner 3` is available on maven central, built against Flink 1.11 with Scala 2.12 and JDK 11.

```sbtshell
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "3.0.4"
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "3.0.5"
```

## What is FlinkRunner?
Expand Down
32 changes: 15 additions & 17 deletions src/main/scala/io/epiphanous/flinkrunner/util/StreamUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,15 @@ import org.apache.flink.api.common.serialization.{DeserializationSchema, Encoder
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.{
BasePathBucketAssigner,
DateTimeBucketAssigner
}
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.{
DefaultRollingPolicy,
OnCheckpointRollingPolicy
}
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.{BasePathBucketAssigner, DateTimeBucketAssigner}
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.{DefaultRollingPolicy, OnCheckpointRollingPolicy}
import org.apache.flink.streaming.api.functions.sink.filesystem.{BucketAssigner, StreamingFileSink}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.cassandra.CassandraSink
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer,
FlinkKafkaProducer,
KafkaDeserializationSchema,
KafkaSerializationSchema
}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer, KafkaDeserializationSchema, KafkaSerializationSchema}
import org.apache.flink.streaming.connectors.kinesis.{FlinkKinesisConsumer, FlinkKinesisProducer}
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
Expand Down Expand Up @@ -506,9 +495,18 @@ object StreamUtils extends LazyLogging {
.asJava
val esSink = new ElasticsearchSink.Builder[E](hosts, new ElasticsearchSinkFunction[E] {
override def process(element: E, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
val values = element.productIterator
val data = element.getClass.getDeclaredFields.map(_.getName -> values.next).toMap.asJava
val req = Requests.indexRequest(sinkConfig.index).`type`(sinkConfig.`type`).source(data)
val data = (element.getClass.getDeclaredFields
.filterNot(f => Seq("$id","$key","$timestamp","$action").contains(f.getName)).foldLeft(Map.empty[String,Any]) {
case (a, f) =>
f.setAccessible(true)
val name = f.getName
f.get(element) match {
case Some(v: Any) => a + (name -> v)
case None => a
case v: Any => a + (name -> v)
}
}).asJava
val req = Requests.indexRequest(sinkConfig.index).source(data)
indexer.add(req)
}
}).build()
Expand Down

0 comments on commit 4f198e5

Please sign in to comment.