Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

[PIO-106] Elasticsearch 5.x StorageClient should reuse RestClient #420

Closed
wants to merge 6 commits into from

Conversation

mars
Copy link
Member

@mars mars commented Aug 11, 2017

Implements PIO-106

This PR moves to a singleton Elasticsearch RestClient which has built-in HTTP keep-alive and TCP connection pooling. Running on this branch, we've seen a 2x speed-up in predictions from the Universal Recommender with ES5, and the feared "cannot assign requested address" 😱 Elasticsearch connection errors have completely disappeared. Running pio batchpredict for 160K queries results in only 7 total TCP connections to Elasticsearch. Previously that would escalate to ~25,000 connections before denying further connections.

This fundamentally changes the interface for the new Elasticsearch 5.x REST client introduced with PredictionIO 0.11.0-incubating. With this changeset, the client is a single instance of org.elasticsearch.client.RestClient.

🚨 As a result of this change, any engine templates that directly use the Elasticsearch 5 StorageClient would require an update for compatibility. The change is this:

Original

val client: StorageClient =// code to instantiate client
val restClient: RestClient = client.open()
try {
  restClient.performRequest(…)
} finally {
  restClient.close()
}

With this PR

val client: RestClient =// code to instantiate client
client.performRequest(…)

No more balancing open & close as this is handled by using a new CleanupFunctions hook added to the framework in this PR.

Universal Recommender is the only template that I know of which directly uses the ES StorageClient outside of PredictionIO core. See example UR changes for compatibility with this PR.

Elasticsearch StorageClient changes

  • reimplemented as singleton
  • installs a cleanup function

See StorageClient

Core changes

A new CleanupFunctions hook has been added which enables developers of storage modules to register anonymous functions with CleanupFunctions.add { … } to be executed after Spark-related commands/workflows. The hook is called in a finally { CleanupFunctions.run() } from within:

  • pio import
  • pio export
  • pio train
  • pio batchpredict

Apologies for the huge indentation shifts from the requisite try-finally blocks:

try {
  // Freshly indented code.
} finally {
  CleanupFunctions.run()
}

@@ -110,28 +104,24 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
error(s"Failed to access to /$index/$estype/$id", e)
None
} finally {
restClient.close()
client.close()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This close should be removed.

@mars
Copy link
Member Author

mars commented Aug 11, 2017

Seems to solve this long ago reported Elasticsearch connection issue

@mars
Copy link
Member Author

mars commented Aug 14, 2017

Closing in favor of #421

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
1 participant