-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Elasticsearch 5.x singleton client with authentication #421
Conversation
…n pooling (PoolingNHttpClientConnectionManager)
…errors due to wrong version of class loaded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mars Sorry for slow response. I added some comments. Besides, some test patterns has been failed on Travis. Is it OK?
* be closed to allow the process to exit. | ||
*/ | ||
object CleanupFunctions { | ||
var functions: Seq[() => Unit] = Seq.empty[() => Unit] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field should be private or protected at least.
* @param anonymous function containing cleanup code. | ||
*/ | ||
def run(): Unit = { | ||
functions.map { case f => f() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use foreach
instead of map
if partial function does not have return value. Also case
is not necessary in this case. As a result, you can write above code as:
functions.foreach { f => f() }
@@ -461,4 +463,11 @@ object Storage extends Logging { | |||
}.getOrElse(Map.empty) | |||
} | |||
) | |||
|
|||
def getStackTraceString(e: Throwable): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can get stacktrace from an exception using ExceptionUtils
of commons-lang3.
import org.apache.commons.lang3.exception.ExceptionUtils
val stackTrace = ExceptionUtils.getStackTrace(e)
case class ESClient(hosts: Seq[HttpHost]) { | ||
def open(): RestClient = { | ||
object ESClient extends Logging { | ||
var _sharedRestClient: Option[RestClient] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this client accessed from multi threads? If yes, I think adding volatile
modifier to this field is necessary and open()
and close()
should be synchronized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the hints here. I suspected this would be an issue. I'm investigating how to make this threadsafe.
if (!_sharedRestClient.isEmpty) { | ||
_sharedRestClient.get.close() | ||
_sharedRestClient = None | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically, Option.get()
is hated in the Scala world. You can write as follows instead:
_sharedRestClient.foreach { client =>
client.close()
_sharedRestClient = None
}
val jsonResponse = parse(EntityUtils.toString(response.getEntity)) | ||
val result = (jsonResponse \ "result").extract[String] | ||
result match { | ||
case "deleted" => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true
at here is no effect because the return type us Unit
. I know it came from original code, but we should remove it.
result match { | ||
case "deleted" => true | ||
case _ => | ||
logger.error(s"[$result] Failed to update $index/$estype:$eventId") | ||
false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
} catch { | ||
case e: IOException => | ||
logger.error(s"Failed to update $index/$estype:$eventId", e) | ||
false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
Cheers @takezoe I addressed all your Scala style & usage suggestions. Still need to take care of the threadsafety issue with the singleton client. |
👍
Yes, I'm reviewing the PIO codebase from this point of view now, but still my understanding about the whole PIO is not enough... |
Based on these Scala Concurrency/Thread Safety docs, I believe simply annotating So, I updated this PR with that change. |
🏁 |
I will resolve these conflicts today and then merge this PR. |
046d07c
to
9089a9c
Compare
Fixes both PIO-106 & PIO-114, replacing #372 & #420. These are combined because they each heavily revise the same class.
Authentication
Add optional username-password configuration for the new Elasticsearch 5 client; in
pio-env.sh
config:# Optional basic HTTP auth PIO_STORAGE_SOURCES_ELASTICSEARCH_USERNAME=my-name PIO_STORAGE_SOURCES_ELASTICSEARCH_PASSWORD=my-secret
These credentials are sent in each Elasticsearch request as an HTTP Basic Authorization header.
Enables use of public-cloud, hosted Elasticsearch clusters, such as Bonsai on Heroku.
Singleton client
This PR moves to a singleton Elasticsearch RestClient which has built-in HTTP keep-alive and TCP connection pooling. Running on this branch, we've seen a 2x speed-up in predictions from the Universal Recommender with ES5, and the feared "cannot assign requested address" 😱 Elasticsearch connection errors have completely disappeared. Running
pio batchpredict
for 160K queries results in only 7 total TCP connections to Elasticsearch. Previously that would escalate to ~25,000 connections before denying further connections.This fundamentally changes the interface for the new Elasticsearch 5.x REST client introduced with PredictionIO 0.11.0-incubating. With this changeset, the
client
is a single instance oforg.elasticsearch.client.RestClient
.🚨 As a result of this change, any engine templates that directly use the Elasticsearch 5 StorageClient would require an update for compatibility. The change is this:
Original
With this PR
No more balancing
open
&close
as this is handled by using a newCleanupFunctions
hook added to the framework in this PR.Universal Recommender is the only template that I know of which directly uses the ES StorageClient outside of PredictionIO core. See example UR changes for compatibility with this PR.
Elasticsearch StorageClient changes
See StorageClient
Core changes
A new
CleanupFunctions
hook has been added which enables developers of storage modules to register anonymous functions withCleanupFunctions.add { … }
to be executed after Spark-related commands/workflows. The hook is called in afinally { CleanupFunctions.run() }
from within:pio import
pio export
pio train
pio batchpredict
Apologies for the huge indentation shifts from the requisite try-finally blocks: