Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

SolrQueryLogger interface for start/end events #47

Closed
wants to merge 1 commit into from

2 participants

Jon Hoffman Jon Shea
Jon Hoffman
Owner

No description provided.

Jon Hoffman hoffrocket closed this
Jon Shea

Is log essentially redundant with onEndExecuteQuery? yes, maybe log should be deprecated or deleted? who else uses slashem?

Jon Shea
Owner

Looks good to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 18, 2012
  1. Jon Hoffman
This page is out of date. Refresh to see the latest.
144 src/main/scala/com/foursquare/slashem/Schema.scala
View
@@ -8,7 +8,7 @@ import com.twitter.util.{Duration, ExecutorServiceFuturePool, Future, FuturePool
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.http.Http
import com.twitter.finagle.stats.StatsReceiver
-import com.twitter.finagle.Service
+import com.twitter.finagle.{Service, SimpleFilter}
import java.lang.Integer
import java.net.InetSocketAddress
import java.util.{ArrayList, HashMap}
@@ -390,10 +390,18 @@ trait SolrMeta[T <: Record[T]] extends SlashemMeta[T] {
})
qse
}
+
+ object NoopFilter extends SimpleFilter[HttpRequest, HttpResponse] {
+ def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = service(request)
+ }
+ def rawQueryFuture(params: Seq[(String, String)]): Future[String] = {
+ rawQueryFuture(params, NoopFilter)
+ }
+
// This method performs the actually query / http request. It should probably
// go in another file when it gets more sophisticated.
- def rawQueryFuture(params: Seq[(String, String)]): Future[String] = {
+ def rawQueryFuture(params: Seq[(String, String)], logFilter: SimpleFilter[HttpRequest, HttpResponse]): Future[String] = {
// Ugly
val qse = queryString(params ++
logger.queryIdToken.map("magicLoggingToken" -> _).toList)
@@ -402,8 +410,8 @@ trait SolrMeta[T <: Record[T]] extends SlashemMeta[T] {
// Here be dragons! If you have multiple backends with shared IPs this could very well explode
// but finagle doesn't seem to properly set the http host header for http/1.1
request.addHeader(HttpHeaders.Names.HOST, servers.head)
-
- client(request).map(response => {
+ val loggedClient = logFilter andThen client
+ (loggedClient(request)).map(response => {
response.getStatus match {
case HttpResponseStatus.OK => response.getContent.toString(CharsetUtil.UTF_8)
case status => throw SolrResponseException(status.getCode, status.getReasonPhrase, solrName, qse.toString)
@@ -415,9 +423,15 @@ trait SolrMeta[T <: Record[T]] extends SlashemMeta[T] {
/** Logging and Timing solr trait */
trait SolrQueryLogger {
- def log(name: String, msg: String, time: Long)
+ /**
+ * to instrument start and stop of query return a function that will be called
+ * when the query finishes
+ */
+ def onStartExecuteQuery(name: String, msg: String): Function0[Unit]
+
+ def log(name: String, msg: String, time: Long): Unit
- def debug(msg: String)
+ def debug(msg: String): Unit
// If this returns a string then it will be appended to the query
// so you can use it to match your query logs with application
@@ -437,9 +451,11 @@ trait SolrQueryLogger {
/** The default logger, does nothing. */
object NoopQueryLogger extends SolrQueryLogger {
- override def log(name: String, msg: String, time: Long) = Unit
- override def debug(msg: String) = println(msg)
- override def resultCount(name: String, count:Int) = println("Got back "+count+" results while querying "+name)
+ val noopCallback: Function0[Unit] = () => ()
+ override def onStartExecuteQuery(name: String, msg: String): Function0[Unit] = noopCallback
+ override def log(name: String, msg: String, time: Long): Unit = Unit
+ override def debug(msg: String): Unit = println(msg)
+ override def resultCount(name: String, count:Int): Unit = println("Got back "+count+" results while querying "+name)
}
//If you want any of the geo queries you will have to implement this
@@ -510,62 +526,70 @@ trait ElasticSchema[M <: Record[M]] extends SlashemSchema[M] {
def elasticQueryFuture[Ord, Lim, MM <: MinimumMatchType, Y, H <: Highlighting, Q <: QualityFilter, FC <: FacetCount, FLim, ST <: ScoreType](qb: QueryBuilder[M, Ord, Lim, MM, Y, H, Q, FC, FLim, ST], query: ElasticQueryBuilder, timeoutOpt: Option[Duration]): Future[SearchResults[M, Y]] = {
val esfp = meta.executorServiceFuturePool
-
+
+ val queryName = "e" + meta.indexName + ".query"
+ val queryText = query.toString()
+
val searchResultsFuture = esfp {
- val client = meta.client
- val from = qb.start.map(_.toInt).getOrElse(qb.DefaultStart)
- val limit = qb.limit.map(_.toInt).getOrElse(qb.DefaultLimit)
- meta.logger.debug("Query details "+query.toString())
- val baseRequest: SearchRequestBuilder = client.prepareSearch(meta.indexName)
- .setQuery(query)
- .setFrom(from)
- .setSize(limit)
- .setSearchType(SearchType.QUERY_THEN_FETCH)
- val request = qb.sort match {
- case None => baseRequest
- //Handle sorting by fields quickly
- case Some(Pair(Field(fieldName),"asc")) => baseRequest.addSort(fieldName,SortOrder.ASC)
- case Some(Pair(Field(fieldName),"desc")) => baseRequest.addSort(fieldName,SortOrder.DESC)
- //Handle sorting by scripts in general
- case Some(Pair(sort,dir)) => {
- val (params,scriptSrc) = sort.elasticBoost()
- val paramNames = (1 to params.length).map("p"+_)
- val script = scriptSrc.format(paramNames:_*)
- val keyedParams = paramNames zip params
- val sortOrder = dir match {
- case "asc" => SortOrder.ASC
- case "desc" => SortOrder.DESC
+ val onEndExecuteFunction: Function0[Unit] = meta.logger.onStartExecuteQuery(queryName, queryText)
+ try {
+ val client = meta.client
+ val from = qb.start.map(_.toInt).getOrElse(qb.DefaultStart)
+ val limit = qb.limit.map(_.toInt).getOrElse(qb.DefaultLimit)
+ meta.logger.debug("Query details " + queryText)
+ val baseRequest: SearchRequestBuilder = client.prepareSearch(meta.indexName)
+ .setQuery(query)
+ .setFrom(from)
+ .setSize(limit)
+ .setSearchType(SearchType.QUERY_THEN_FETCH)
+ val request = qb.sort match {
+ case None => baseRequest
+ //Handle sorting by fields quickly
+ case Some(Pair(Field(fieldName),"asc")) => baseRequest.addSort(fieldName,SortOrder.ASC)
+ case Some(Pair(Field(fieldName),"desc")) => baseRequest.addSort(fieldName,SortOrder.DESC)
+ //Handle sorting by scripts in general
+ case Some(Pair(sort,dir)) => {
+ val (params,scriptSrc) = sort.elasticBoost()
+ val paramNames = (1 to params.length).map("p"+_)
+ val script = scriptSrc.format(paramNames:_*)
+ val keyedParams = paramNames zip params
+ val sortOrder = dir match {
+ case "asc" => SortOrder.ASC
+ case "desc" => SortOrder.DESC
+ }
+ val sortBuilder = new ScriptSortBuilder(script,"number").order(sortOrder)
+ keyedParams.foreach(p => {sortBuilder.param(p._1,p._2)})
+ baseRequest.addSort(sortBuilder)
}
- val sortBuilder = new ScriptSortBuilder(script,"number").order(sortOrder)
- keyedParams.foreach(p => {sortBuilder.param(p._1,p._2)})
- baseRequest.addSort(sortBuilder)
+ case _ => baseRequest
}
- case _ => baseRequest
- }
- /* Set the server side timeout */
- val timeLimmitedRequest = timeoutOpt match {
- case Some(timeout) => request.setTimeout(TimeValue.timeValueMillis(timeout.inMillis))
- case _ => request
- }
+ /* Set the server side timeout */
+ val timeLimmitedRequest = timeoutOpt match {
+ case Some(timeout) => request.setTimeout(TimeValue.timeValueMillis(timeout.inMillis))
+ case _ => request
+ }
- /* Add a facet to the request */
- val facetedRequest = qb.facetSettings.facetFieldList match {
- case Nil => timeLimmitedRequest
- case _ => {
- termFacetQuery(qb.facetSettings.facetFieldList, qb.facetSettings.facetLimit).foreach(timeLimmitedRequest.addFacet(_))
- timeLimmitedRequest
+ /* Add a facet to the request */
+ val facetedRequest = qb.facetSettings.facetFieldList match {
+ case Nil => timeLimmitedRequest
+ case _ => {
+ termFacetQuery(qb.facetSettings.facetFieldList, qb.facetSettings.facetLimit).foreach(timeLimmitedRequest.addFacet(_))
+ timeLimmitedRequest
+ }
}
+ val response : SearchResponse = facetedRequest.execute().get()
+ response
+ } finally {
+ onEndExecuteFunction()
}
- val response : SearchResponse = facetedRequest.execute().get()
- response
}
- val queryText = query.toString()
+
timeFuture(searchResultsFuture).map( {
case (queryTime, result) => {
- meta.logger.log("e" + meta.indexName + ".query",queryText, queryTime)
+ meta.logger.log(queryName, queryText, queryTime)
result
}}).map({
response =>
@@ -849,11 +873,21 @@ trait SolrSchema[M <: Record[M]] extends SlashemSchema[M] {
fieldstofetch: List[String],
fallOf: Option[Double],
min: Option[Int]): Future[SearchResults[M, Y]] = {
+ val queryName = meta.solrName + ".query"
val queryText = meta.queryString(params).toString
+
+ val logFilter = new SimpleFilter[HttpRequest, HttpResponse] {
+ def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = {
+ val onEndExecuteFunction: Function0[Unit] = meta.logger.onStartExecuteQuery(queryName, queryText)
+ service(request) respond { response =>
+ onEndExecuteFunction()
+ }
+ }
+ }
- timeFuture(meta.rawQueryFuture(params)).map({
+ timeFuture(meta.rawQueryFuture(params, logFilter)).map({
case (queryTime, jsonString) => {
- meta.logger.log(meta.solrName + ".query", queryText, queryTime)
+ meta.logger.log(queryName, queryText, queryTime)
jsonString
}}).flatMap(jsonString => {
meta.extractFromResponse(jsonString, creator,
25 src/test/scala/com/foursquare/slashem/ElasticQueryTest.scala
View
@@ -52,6 +52,31 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
})
Assert.assertEquals(1,future.apply(Duration(200,TimeUnit.MILLISECONDS)))
}
+
+ @Test
+ def testStartEndExecuteQuery {
+ val oldLogger = ESimpleGeoPanda.logger
+ try {
+ var startCount = 0
+ var endCount = 0
+ ESimpleGeoPanda.logger = new SolrQueryLogger {
+ override def onStartExecuteQuery(name: String, msg: String): Function0[Unit] = {
+ startCount += 1
+ () => {
+ endCount += 1
+ }
+ }
+ override def log(name: String, msg: String, time: Long): Unit = Unit
+ override def debug(msg: String): Unit = Unit
+ override def resultCount(name: String, count:Int): Unit = Unit
+ }
+ var r = ESimpleGeoPanda where (_.metall any) fetch()
+ Assert.assertEquals("start should have been called just once", 1, startCount)
+ Assert.assertEquals("end should have been called just once", 1, endCount)
+ } finally {
+ ESimpleGeoPanda.logger = oldLogger
+ }
+ }
@Test(expected=classOf[TimeoutException])
def futureTestBlock {
32 src/test/scala/com/foursquare/slashem/QueryTest.scala
View
@@ -1,7 +1,8 @@
package com.foursquare.slashem
import com.foursquare.slashem._
-
+import com.twitter.util.Duration
+import java.util.concurrent.TimeUnit
import org.bson.types.ObjectId
import org.junit.Test
import org.junit._
@@ -15,6 +16,35 @@ import org.specs.matcher.ScalaCheckMatchers
class QueryTest extends SpecsMatchers with ScalaCheckMatchers {
+
+ @Test
+ def testStartEndExecuteQuery {
+ val oldLogger = SUserTest.logger
+ try {
+ var startCount = 0
+ var endCount = 0
+ SUserTest.logger = new SolrQueryLogger {
+ override def onStartExecuteQuery(name: String, msg: String): Function0[Unit] = {
+ startCount += 1
+ () => {
+ endCount += 1
+ }
+ }
+ override def log(name: String, msg: String, time: Long): Unit = Unit
+ override def debug(msg: String): Unit = Unit
+ override def resultCount(name: String, count:Int): Unit = Unit
+ }
+ // this query should fail since there's no solr server
+ // 10 second wait is upper limit to prevent race condition
+ SUserTest.where(_.fullname eqs "jon").fetchFuture().get(Duration(10, TimeUnit.SECONDS))
+
+ Assert.assertEquals("start should have been called just once", 1, startCount)
+ Assert.assertEquals("end should have been called just once", 1, endCount)
+ } finally {
+ SUserTest.logger = oldLogger
+ }
+ }
+
@Test
def testProduceCorrectSimpleQueryString {
val q = SUserTest where (_.fullname eqs "jon")
Something went wrong with that request. Please try again.