New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch 5.x singleton client with authentication #421

Closed
wants to merge 17 commits into
base: develop
from

Conversation

Projects
None yet
2 participants
@mars
Member

mars commented Aug 14, 2017

Fixes both PIO-106 & PIO-114, replacing #372 & #420. These are combined because they each heavily revise the same class.

Authentication

Add optional username-password configuration for the new Elasticsearch 5 client; in pio-env.sh config:

# Optional basic HTTP auth
PIO_STORAGE_SOURCES_ELASTICSEARCH_USERNAME=my-name
PIO_STORAGE_SOURCES_ELASTICSEARCH_PASSWORD=my-secret

These credentials are sent in each Elasticsearch request as an HTTP Basic Authorization header.

Enables use of public-cloud, hosted Elasticsearch clusters, such as Bonsai on Heroku.

Singleton client

This PR moves to a singleton Elasticsearch RestClient which has built-in HTTP keep-alive and TCP connection pooling. Running on this branch, we've seen a 2x speed-up in predictions from the Universal Recommender with ES5, and the feared "cannot assign requested address" 😱 Elasticsearch connection errors have completely disappeared. Running pio batchpredict for 160K queries results in only 7 total TCP connections to Elasticsearch. Previously that would escalate to ~25,000 connections before denying further connections.

This fundamentally changes the interface for the new Elasticsearch 5.x REST client introduced with PredictionIO 0.11.0-incubating. With this changeset, the client is a single instance of org.elasticsearch.client.RestClient.

🚨 As a result of this change, any engine templates that directly use the Elasticsearch 5 StorageClient would require an update for compatibility. The change is this:

Original

val client: StorageClient =// code to instantiate client
val restClient: RestClient = client.open()
try {
  restClient.performRequest(…)
} finally {
  restClient.close()
}

With this PR

val client: RestClient =// code to instantiate client
client.performRequest(…)

No more balancing open & close as this is handled by using a new CleanupFunctions hook added to the framework in this PR.

Universal Recommender is the only template that I know of which directly uses the ES StorageClient outside of PredictionIO core. See example UR changes for compatibility with this PR.

Elasticsearch StorageClient changes

  • reimplemented as singleton
  • installs a cleanup function

See StorageClient

Core changes

A new CleanupFunctions hook has been added which enables developers of storage modules to register anonymous functions with CleanupFunctions.add { … } to be executed after Spark-related commands/workflows. The hook is called in a finally { CleanupFunctions.run() } from within:

  • pio import
  • pio export
  • pio train
  • pio batchpredict

Apologies for the huge indentation shifts from the requisite try-finally blocks:

try {
  // Freshly indented code.
} finally {
  CleanupFunctions.run()
}

mars added some commits Aug 8, 2017

Migrate to singleton Elasticsearch client to use underlying connectio…
…n pooling (PoolingNHttpClientConnectionManager)
Stabilize the sort order of CLASSPATH; fixes intermittent `INSTANCE` …
…errors due to wrong version of class loaded

@mars mars changed the title from Elasticsearch singleton client with authentication to Elasticsearch 5x singleton client with authentication Aug 14, 2017

@mars mars changed the title from Elasticsearch 5x singleton client with authentication to Elasticsearch 5.x singleton client with authentication Aug 14, 2017

@mars mars referenced this pull request Aug 14, 2017

Closed

Upgrade EsClient #5

@takezoe

@mars Sorry for slow response. I added some comments. Besides, some test patterns has been failed on Travis. Is it OK?

* be closed to allow the process to exit.
*/
object CleanupFunctions {
var functions: Seq[() => Unit] = Seq.empty[() => Unit]

This comment has been minimized.

@takezoe

takezoe Aug 22, 2017

Member

This field should be private or protected at least.

* @param anonymous function containing cleanup code.
*/
def run(): Unit = {
functions.map { case f => f() }

This comment has been minimized.

@takezoe

takezoe Aug 22, 2017

Member

Use foreach instead of map if partial function does not have return value. Also case is not necessary in this case. As a result, you can write above code as:

functions.foreach { f => f() }
@@ -461,4 +463,11 @@ object Storage extends Logging {
}.getOrElse(Map.empty)
}
)
def getStackTraceString(e: Throwable): String = {

This comment has been minimized.

@takezoe

takezoe Aug 22, 2017

Member

You can get stacktrace from an exception using ExceptionUtils of commons-lang3.

import org.apache.commons.lang3.exception.ExceptionUtils

val stackTrace = ExceptionUtils.getStackTrace(e)
case class ESClient(hosts: Seq[HttpHost]) {
def open(): RestClient = {
object ESClient extends Logging {
var _sharedRestClient: Option[RestClient] = None

This comment has been minimized.

@takezoe

takezoe Aug 22, 2017

Member

Is this client accessed from multi threads? If yes, I think adding volatile modifier to this field is necessary and open() and close() should be synchronized.

This comment has been minimized.

@mars

mars Aug 23, 2017

Member

Thanks for the hints here. I suspected this would be an issue. I'm investigating how to make this threadsafe.

if (!_sharedRestClient.isEmpty) {
_sharedRestClient.get.close()
_sharedRestClient = None
}

This comment has been minimized.

@takezoe

takezoe Aug 22, 2017

Member

Typically, Option.get() is hated in the Scala world. You can write as follows instead:

_sharedRestClient.foreach { client =>
   client.close()
  _sharedRestClient = None
}
val jsonResponse = parse(EntityUtils.toString(response.getEntity))
val result = (jsonResponse \ "result").extract[String]
result match {
case "deleted" => true

This comment has been minimized.

@takezoe

takezoe Aug 22, 2017

Member

true at here is no effect because the return type us Unit. I know it came from original code, but we should remove it.

result match {
case "deleted" => true
case _ =>
logger.error(s"[$result] Failed to update $index/$estype:$eventId")
false

This comment has been minimized.

@takezoe

takezoe Aug 22, 2017

Member

Same as above.

} catch {
case e: IOException =>
logger.error(s"Failed to update $index/$estype:$eventId", e)
false

This comment has been minimized.

@takezoe

takezoe Aug 22, 2017

Member

Same as above.

@mars mars referenced this pull request Aug 22, 2017

Merged

EsClient upgrade (builtin) #7

@mars

This comment has been minimized.

Member

mars commented Aug 23, 2017

Cheers @takezoe I addressed all your Scala style & usage suggestions. Still need to take care of the threadsafety issue with the singleton client.

@takezoe

This comment has been minimized.

Member

takezoe commented Aug 23, 2017

I addressed all your Scala style & usage suggestions.

👍

Still need to take care of the threadsafety issue with the singleton client.

Yes, I'm reviewing the PIO codebase from this point of view now, but still my understanding about the whole PIO is not enough...

@mars

This comment has been minimized.

Member

mars commented Aug 23, 2017

Based on these Scala Concurrency/Thread Safety docs, I believe simply annotating @volatile will cause the synchronization needed for thread-safety in this case.

So, I updated this PR with that change.

@mars

This comment has been minimized.

Member

mars commented Aug 23, 2017

🏁

@takezoe

This comment has been minimized.

Member

takezoe commented Aug 29, 2017

@mars LGTM! but sorry for conflict with my previous commit: 6789dbe

@mars

This comment has been minimized.

Member

mars commented Aug 29, 2017

I will resolve these conflicts today and then merge this PR.

@mars mars force-pushed the mars:esclient-singleton-with-auth branch from 046d07c to 9089a9c Aug 29, 2017

@asfgit asfgit closed this in bf84ede Aug 29, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment