Skip to content

Commit

Permalink
fix elastic transports (#17)
Browse files Browse the repository at this point in the history
* fix elastic transports

* add tests

* update readme skip ci
  • Loading branch information
nextdude committed Oct 29, 2020
1 parent 74a4564 commit a3094ff
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
`Flinkrunner 3` is now on maven central, built against Flink 1.11 with Scala 2.12 and JDK 11.

```sbtshell
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "3.0.0"
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "3.0.3"
```

## What is FlinkRunner?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,10 @@ object StreamUtils extends LazyLogging {
) = {
val hosts = sinkConfig.transports
.map(s => {
val url = new URL(s"https://${s}")
val url = new URL(if (s.startsWith("http")) s else s"http://${s}")
val hostname = url.getHost
val port = if (url.getPort < 0) 9200 else url.getPort
new HttpHost(hostname, port, "https")
new HttpHost(hostname, port, url.getProtocol)
})
.asJava
val esSink = new ElasticsearchSink.Builder[E](hosts, new ElasticsearchSinkFunction[E] {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.epiphanous.flinkrunner.model

import java.net.URL
import java.util.Properties

import io.epiphanous.flinkrunner.BasePropSpec
import org.apache.http.HttpHost

import scala.collection.JavaConverters._

class ElasticsearchSinkConfigSpec extends BasePropSpec {
def getHosts(transports:List[String]) = transports
.map(s => {
val url = new URL(if (s.startsWith("http")) s else s"http://${s}")
val hostname = url.getHost
val port = if (url.getPort < 0) 9200 else url.getPort
new HttpHost(hostname, port, url.getProtocol)
})
.asJava

property("transports config with protocol") {
println(getHosts(List("https://elastic:9200")))
}
property("transports config without protocol") {
println(getHosts(List("elastic:9200")))
}
}

0 comments on commit a3094ff

Please sign in to comment.