New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Elasticsearch connector #221
Conversation
|
||
final case class ElasticsearchSinkSettings(bufferSize: Int = 10) | ||
|
||
final case class IncomingMessage[T](id: Option[String], source: T) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class might not be necessary. Can't we just provide it as a special case of the Typed
variant giving it the DefaultJsonProtocol.RootJsObjectFormat
(or, if that doesn't fit type-wise, a simple identity JsonWriter
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... In my opinion, holding id and object (which is converted to document json) separately is natural when using Elasticsearch. It's possible to replace with tuple like (Option[String], T)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I actually meant the class below. Do we need both ElasticsearchSinkStage
and ElasticsearchSinkStageTyped
? Isn't ElasticsearchSinkStage
just ElasticsearchSinkStageTyped[JsObject]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it! That's right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @takezoe. Sorry for keeping this open for a while. This looks quite good.
The main things that need to be fixed are these:
- no blocking calls in GraphStages, this will interfere with the whole ecosystem, so either run these blocking calls with a dedicated dispatcher or use an asynchronous http client like akka-http
- needs documentation
|${convert(message.source).toString}""".stripMargin | ||
}.mkString("", "\n", "\n") | ||
|
||
client.performRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, if this is a blocking call it needs to be executed in a dedicated thread-pool. If the client
is just a simple HTTP client, it could make sense to use akka-http instead which is fully asynchronous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is almost Apache HTTP Components, but it offers handling multiple nodes when accessing to Elasticsearch cluster via HTTP.
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_initialization.html
In addition, client
is given from outside of the stage. So configuring client is user's responsibility.
However it seems to be possible to use CloseableHttpAsyncClient. Should I use it instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, using an asynchronous client would be best. Make sure to deliver callbacks through getAsyncCallback
as explained here: http://doc.akka.io/docs/akka/2.4.14/scala/stream/stream-customize.html#Using_asynchronous_side-channels
override def onPush(): Unit = { | ||
val message = grab(in) | ||
buffer.addLast(message) | ||
if (buffer.size >= settings.bufferSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an uncommon pattern: what happens if no new message comes in for a long time? Then the other requests will sit around in the buffer for an indefinite amount of time.
A better approach could be this:
- have two states:
- Idle - no requests in flight
- Buffering - a request is in flight
- in
Idle
state, a message that is coming in, is instantly dispatched, changing the state toBuffering
- in
Buffering
state, only as messages are pulled in as can fit into the internal buffer and otherwise the inlet is backpressured (i.e. not pulled) - when the previous request is complete, either a next batch is instantly dispatched if requests were buffered, or we go back to
Idle
state
Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. I will try to make that pattern.
|
||
protected def convert(jsObj: JsObject): T | ||
|
||
def receiveMessages(): Unit = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the Sink case, this call needs to be asynchronous. So, either use a dedicated dispatcher to use the RestClient
or use akka-http directly.
setHandler(out, | ||
new OutHandler { | ||
override def onPull(): Unit = { | ||
if (buffer.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be simplified using emit
to push the complete list of elements received by receiveMessages
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I will try that.
|
||
final case class OutgoingMessage[T](id: String, source: T) | ||
|
||
final class ElasticsearchSourceStage(indexName: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, can probably be provided by creating a JsonReader[JsObject]
and then using the typed variant.
import spray.json._ | ||
import DefaultJsonProtocol._ | ||
|
||
class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice tests.
|
||
sentHandler.invoke(()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although I tried to move entire this part to an async callback, callback wasn't invoked in second time. It works to call only tryPull()
in an async callback like this. I expect that something called from that async callback might be blocking the graph, but I can't find a correct way. Hmm...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm that's still quite dangerous to do because you can run in all kinds of race conditions. The one that I can see is the state.set(Idle)
case. When the stage fills the buffer after the val messages
in line 150 but before line 155 is executed, then the state will be set to Idle
but tryPull
won't do anything and the stream is locked because it will never be woken up again. Not very likely but still possible.
Once you do all handling in the async callback you can remove all other synchronization like the AtomicReferences
and the ConcurrentLinkedHashmap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, can you try to execute everything in the async callback? If it still blocks the graph as you say, can you post the code and maybe a stack trace gathered with jstack <pid>
on the shell (or any other means of gathering stack traces), so we can figure out what's going on?
s"/_search/scroll", | ||
Map[String, String]().asJava, | ||
new StringEntity(Map("scroll" -> "5m", "scroll_id" -> scrollId).toJson.toString), | ||
new ResponseListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GraphStageLogic could implement the ResponseListener
directly. That would also remove the code duplication with the above branch.
setHandler(out, | ||
new OutHandler { | ||
override def onPull(): Unit = | ||
if (started == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!started)
Looks good! Only small things to fix. |
@jrudolph Finished to fix. In addition, I added |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay. The source part is looking very good now. It needs the backpressure fix I outlined.
I wonder if we could move the Flow/Sink part (which I haven't reviewed in depth yet) to another PR to unblock this one. WDYT?
|
||
jsObj.fields.get("error") match { | ||
case None => { | ||
val hits = jsObj.fields("hits").asJsObject.fields("hits").asInstanceOf[JsArray] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might make sense here to model the result wrapper also using spray-json?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean creating case classes to map response from Elasticsarch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly. Would that be feasible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be possible. But the REST client of Elasticsearch is improving rapidly. I expect it will become to return Java models instead of json string as same as the transport client (it's an another type of Elasticsearch client) in the future. So now creating our models might be overinvestment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not really a big investment and would document the current view about how the API is structured, which fields are expected, etc.
Would it really be more than that:
case class Hit[T](_id: String, _source: T)
case class Response[T](hits: Seq[Hit[T]], _scroll_id: String)
object Protocol {
import DefaultJsonProtocol._
implicit def hitFormat[T: JsonFormat] = jsonFormat2(Hit.apply _)
implicit def responseFormat[T: JsonFormat] = jsonFormat2(Response.apply _)
}
An then it's just
val response = jsObj.convertTo[Response[T]]
scrollId = response._scroll_id
val messages = response.hits.map(h => OutgoingMessage(h._id, h._source)
But maybe I'm missing something ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This demands JsonFormat
for Source despite only JsonReader
is necessary essentially. And hand written readers for Hit
and Response
are not much distance from current code. I'm not sure whether I should do it.
OutgoingMessage(id, source.convertTo[T]) | ||
} | ||
emitMultiple(out, messages) | ||
sendScrollScanRequest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICS doing it like this would disable backpressure because there's basically a loop sendScrollScanRequest
-> handleResponse
-> sendScrollScanRequest
that immediately slurps the whole data source instead of waiting until the first results are drained by the stream and a new pull comes in.
Fortunately, it should be very simple to fix. Couldn't you remove the sendScrollScanRequest
here and also remove the condition in onPull
below. What then should happen is that when you use emitMultiple
, your below OutHandler
gets exchanged with an internal EmitMultiple
handler that will send out the current results. Once all of those results are drained your original OutHandler
is swapped in again which will then use sendScrollScanRequest
on the next onPull
.
It's reasonable. I will do so after fix source part. |
@jrudolph Separating this pull request to source and flow/sink is reasonable, but separating testcases might be waste... Since I'm never in a hurry, I can wait your review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree about those tests being easier to write having both the sink and the source.
I reviewed the sink and added a few comments.
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | ||
new GraphStageLogic(shape) with ResponseListener { | ||
|
||
private val state = new AtomicReference[State](Idle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for AtomicReference
inside of GraphStage
if asyncCallback
is used correctly.
new GraphStageLogic(shape) with ResponseListener { | ||
|
||
private val state = new AtomicReference[State](Idle) | ||
private val buffer = new util.concurrent.ConcurrentLinkedQueue[IncomingMessage[T]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't need to be concurrent if in GraphStage
.
|
||
} | ||
|
||
private sealed trait State |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe put these in the companion object to keep the scope clean.
.fromGraph( | ||
new ElasticsearchFlowStage(indexName, typeName, client, settings)(DefaultJsonProtocol.RootJsObjectFormat) | ||
) | ||
.mapAsync(settings.parallelism)(identity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parallelism setting has no effect as it doesn't matter how many times you call identity
concurrently. Just use 1
instead.
case ex: Exception => failStage(ex) | ||
} | ||
|
||
setHandler(out, new OutHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just extend the GraphStageLogic with InHandler with OutHandler
and put the handlers directly on the logic. Then use
setHandlers(in, out, this)
override val shape = FlowShape(in, out) | ||
|
||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | ||
new GraphStageLogic(shape) with ResponseListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment that explains the basic workflow of this stage?
|
||
object ElasticsearchFlow { | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear what the exact semantics of the flow are. How many responses do you get, one per source element or one per batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading the source code of the GraphStage, I see that it would be one Response
per batch. Would there be anything you could do when a batch fails? Could it make sense to simplify and just make the GraphStage a sink that fails the flow when the request fails? Or maybe configure it with an exception handler (similar to a supervisor) that knows what to do when a single batch request fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Response of bulk request contains results per command. So it's possible to handle errors if some commands in bulk request failed. I fixed to fail ElasticsearchFlowStage
when at least one command failed, but I'm not sure this is the best solution.
|
||
jsObj.fields.get("error") match { | ||
case None => { | ||
val hits = jsObj.fields("hits").asJsObject.fields("hits").asInstanceOf[JsArray] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not really a big investment and would document the current view about how the API is structured, which fields are expected, etc.
Would it really be more than that:
case class Hit[T](_id: String, _source: T)
case class Response[T](hits: Seq[Hit[T]], _scroll_id: String)
object Protocol {
import DefaultJsonProtocol._
implicit def hitFormat[T: JsonFormat] = jsonFormat2(Hit.apply _)
implicit def responseFormat[T: JsonFormat] = jsonFormat2(Response.apply _)
}
An then it's just
val response = jsObj.convertTo[Response[T]]
scrollId = response._scroll_id
val messages = response.hits.map(h => OutgoingMessage(h._id, h._source)
But maybe I'm missing something ;)
try { | ||
val json = messages | ||
.map { message => | ||
s"""{"index": {"_index": "${indexName}", "_type": "${typeName}"${message.id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might make sense to model these using spray-json as well, otherwise, you will have to add escaping for all of those fields.
|
||
final case class IncomingMessage[T](id: Option[String], source: T) | ||
|
||
class ElasticsearchFlowStage[T]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the basic approach here is sound.
Another thing that will be needed in the end is documentation, see https://github.com/akka/alpakka/blob/master/CONTRIBUTING.md#documentation. |
Fixed codes which are commented by @jrudolph. In addition, I updated following stuffs as well:
|
- Support durable retry and recovery in flow and sink - Flow passes failed messages to followsing stage - Makes Java API more useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. There would probably be a few cosmetic changes possible but let's not delay this any further.
Thanks a lot @takezoe, great stuff. Sorry, that it took for such a long while to get it finally reviewed another time.
|
||
private var state: State = Idle | ||
private val queue = new mutable.Queue[IncomingMessage[T]]() | ||
private val failureHandler = getAsyncCallback[(Seq[IncomingMessage[T]], Throwable)](handleFailure) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you could have used a single handler with type getAsyncCallback[(Seq[IncomingMessage[T]], Try[Response])]
instead, but that's fine for now as well.
|
||
} | ||
|
||
sealed class ElasticsearchSourceLogic[T](indexName: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
instead of sealed
?
fixes #99