Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solr: support update/delete/atomic update operations and batch those #1164

Merged
merged 12 commits into from
Sep 26, 2018
Merged

Solr: support update/delete/atomic update operations and batch those #1164

merged 12 commits into from
Sep 26, 2018

Conversation

giena
Copy link
Contributor

@giena giena commented Aug 24, 2018

At this moment, there's a little problem of performance with SolR component because there's no asynchronous part in the code. So the batch size is always 1.
I propose an alternative by keeping the inspiration from the Elastic Search component.
The solr update is now encapsulated in a future, and the async callbacks are used to make everything coherent (in failure and in success).
Still using the elastic search code, I will provide a notion of operation, to allow the removal of documents.
Please, have a look.

@giena giena changed the title Based on alpakka elastic search, for better performance Based on alpakka elastic search, for better performance with SolR Aug 24, 2018
@ennru
Copy link
Member

ennru commented Aug 27, 2018

Thank you for making the Solr connector non-blocking!

Now that the blocking call is executed asynchronously, you should take care of selecting a proper execution context for it - the global one is not a good choice.
A better default is using Akka's akka.stream.default-blocking-io-dispatcher and possibly making it configurable. For inspiration how to use it, look into JmsConsumerStage.

@ennru
Copy link
Member

ennru commented Aug 28, 2018

I've now understood that you can make the async stuff much simpler.

  1. Select the IODispatcher for the stage
override protected def initialAttributes: Attributes = Attributes(ActorAttributes.IODispatcher)
  1. Remove the ExecutionContext, Future and blocking from the client calls (the stage will execute on the IODispatcher)

This should make the code much simpler.

@giena
Copy link
Contributor Author

giena commented Aug 28, 2018

@ennru : Not sure to see what you mean. Here a code snippet of the onPush method:
override def onPush(): Unit = {
queue.enqueue(grab(in))

state match {
  case Idle => {
    state = Sending
    val messages = (1 to settings.bufferSize).flatMap { _ =>
      queue.dequeueFirst(_ => true)
    }
    sendBulkToSolr(messages)
  }
  case _ => ()
}

tryPull()

}
Even if i use IODispatcher, i will send message 1 by 1, if sendBulkToSolr does not use Futures...
The ElasticSearch code does not use IODispatcher no more.
The best way to have the best performance with SolR is to batch updates. Not to scale horizontally with many threads with batch of one update.

@ennru
Copy link
Member

ennru commented Aug 29, 2018

Ah ok, I finally see how you are trying to add asynchronous updates to Solr. I'm afraid your solution will not be able to guarantee the order of updates to Solr, as Futures not necessarily are executed in the order they are created. Are you confident the client library is even thread-safe?

For Elasticsearch client API is prepared for this use-case, it is not added by Alpakka.

If you can ignore the order of updates to Solr in your use-case, you might construct a flow using the Graph DSL, that runs several Solr stages in parallel.

@giena
Copy link
Contributor Author

giena commented Aug 29, 2018

It is why i use getAsyncCallback on the completion (success or failure) of my future. Since my batch is ordered and the buffer is managed by onPush, all my updates from a source will be ordered in my sink. There is no doubt. For me the client is thread safe (used by us in multithreaded application since some years), ans thus, if we cannot guarantee the order as you said, the elastic search implementation can't do it too. This is the same code.

@ennru
Copy link
Member

ennru commented Aug 29, 2018

I'm not sure what you're referring to in the Elasticsearch way of doing this. The Alpakka Elasticsearch connector sends all operations as one JSON using the ES client's performRequestAsync (see https://github.com/akka/alpakka/blob/master/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchFlowStage.scala#L299).

@giena
Copy link
Contributor Author

giena commented Aug 29, 2018

The solr client sends documents as a batch too...
https://github.com/worldline-messaging/alpakka/blob/master/solr/src/main/scala/akka/stream/alpakka/solr/SolrFlowStage.scala#L180
I don't understand why you make some difference...

@ennru
Copy link
Member

ennru commented Aug 31, 2018

Yes, the Solr client sends batches. But your proposal first runs a groupBy(operation) after that calls the client with batches per operation wrapped in futures. The order Solr will see the messages will be something different from what the incoming stream contained.
It would be more correct to implement some takeWhile(Operation) and update Solr with those, before taking the next batch.
You should not wrap the call to a client in Futures, the stage's asynchronicity must be enough.
(Side note: blocking should not be used in Futures.)

@giena
Copy link
Contributor Author

giena commented Aug 31, 2018

Ok, see what you mean now. Thank you! I will try to make it.

@giena
Copy link
Contributor Author

giena commented Aug 31, 2018

To get the best performance with SolR, you need to batch. It's what i do. To batch documents, i need to enqueue them before calling SolR. If i do not use futures, and if i use IODispatcher as you described, i scale horizontally perhaps, but the batch size is always 1 element. And the performance are the same as the initial version. I tell that because i test it. So i keep going with futures, but now the order is respected (takeWhile).

…o keep exact ordering fo the incoming messages
@ennru
Copy link
Member

ennru commented Sep 3, 2018

Ok, if the Solr API does much better with batches of data, I'd suggest this stage should accept Seq[IncomingMessage] and it is the user's responsibility to find the best level of batching with eg. groupedWithin. The stage should not apply batching itself.

@ennru ennru added the p:solr label Sep 3, 2018
@giena
Copy link
Contributor Author

giena commented Sep 3, 2018

Ok. I think we are not so far now.

Jean-Noel Allart added 2 commits September 3, 2018 20:33
…s and deletes have to be done with documents, beans and typed methods.
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this approach is better.
You are introducing some unnecessary breaking API changes, please try to not break the API. (You may add @deprecated.)

Please add documentation.

return IncomingMessage.create(doc);
List<IncomingMessage<SolrInputDocument, NotUsed>> list = new ArrayList<>();
list.add(IncomingUpdateMessage.create(doc));
return list;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of building the lists in user code, the examples should show groupedWithin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should

@@ -67,18 +68,20 @@ class SolrSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
.map { tuple: Tuple =>
val book: Book = tupleToBook(tuple)
val doc: SolrInputDocument = bookToDoc(book)
IncomingMessage(doc)
Seq(IncomingUpdateMessage(doc))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same about groupedWithin.

collection = "collection2",
settings = SolrUpdateSettings(commitWithin = 5)
)
)(cluster.getSolrClient)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you passing the Solr client explicitly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the implicit was another solr client, and we do not have to instantiate multiple clients. We could instantiate an implicit lazily with this instance.

createCollection("collection7") //create a new collection
val stream = getTupleStream("collection1")

//#run-document
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated #run-document even here.

@ennru ennru changed the title Based on alpakka elastic search, for better performance with SolR Solr: support update/delete/atomic update operations and batch those Sep 5, 2018
@giena
Copy link
Contributor Author

giena commented Sep 5, 2018

@ennru I've done my best to complete your main requirements in this last code review. But i won't work on this project for the next weeks. However I will follow this pull request to help people if necessary. And i hope it will be successfully merged soon. Thank you for your ideas and support. See you.

@giena
Copy link
Contributor Author

giena commented Sep 19, 2018

@ennru Hi, i've just pushed some solr features and improved documentation. Please, Have a look.

documentation
Copy link
Member

@2m 2m left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a quick review and noticed that it might drop messages now since they are no longer enqueued.

@giena
Copy link
Contributor Author

giena commented Sep 25, 2018

Let's go for the merge? ;-)

//Now take the remaining
val remaining = toSend.dropWhile(m => m.operation == operation)
if (remaining.nonEmpty) {
send(remaining) //Important: Not really recursive, because the future breaks the recursion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment is outdated, is it not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx

case Finished => handleSuccess()
case _ => state = Idle
doc.addField(message.idFieldOpt.get, message.idFieldValueOpt.get)
if (client.isInstanceOf[CloudSolrClient]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace isInstanceOf with a pattern match:

client match {
  case c: CloudSolrClient => ...
  case _ => ...
}```

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx

messageBinder(source)
}
)
.flatten
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map + flatten = flatMap

Replace to:

messages.flatMap(_.sourceOpt.map(messageBinder))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx

}
responses.filter(r => r.getStatus != 0).headOption.getOrElse(responses.head)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter + headOption = find

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx

@2m
Copy link
Member

2m commented Sep 25, 2018

Almost there. :) If you could fix the last nitpicks, then this can go in!

@giena
Copy link
Contributor Author

giena commented Sep 26, 2018

Thx @2m for your help. Do you think we could merge? ;-)

Copy link
Member

@2m 2m left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for pushing this one through!

@2m 2m merged commit c75a0a3 into akka:master Sep 26, 2018
@2m 2m added this to the 1.0-M1 milestone Sep 26, 2018
@giena giena deleted the PR branch October 12, 2018 08:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants