Skip to content

SolrQueryLogger interface for start/end events #48

Merged
merged 1 commit into from Jul 18, 2012
View
144 src/main/scala/com/foursquare/slashem/Schema.scala
@@ -8,7 +8,7 @@ import com.twitter.util.{Duration, ExecutorServiceFuturePool, Future, FuturePool
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.http.Http
import com.twitter.finagle.stats.StatsReceiver
-import com.twitter.finagle.Service
+import com.twitter.finagle.{Service, SimpleFilter}
import java.lang.Integer
import java.net.InetSocketAddress
import java.util.{ArrayList, HashMap}
@@ -390,10 +390,18 @@ trait SolrMeta[T <: Record[T]] extends SlashemMeta[T] {
})
qse
}
+
+ object NoopFilter extends SimpleFilter[HttpRequest, HttpResponse] {
+ def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = service(request)
+ }
+ def rawQueryFuture(params: Seq[(String, String)]): Future[String] = {
+ rawQueryFuture(params, NoopFilter)
+ }
+
// This method performs the actually query / http request. It should probably
// go in another file when it gets more sophisticated.
- def rawQueryFuture(params: Seq[(String, String)]): Future[String] = {
+ def rawQueryFuture(params: Seq[(String, String)], logFilter: SimpleFilter[HttpRequest, HttpResponse]): Future[String] = {
// Ugly
val qse = queryString(params ++
logger.queryIdToken.map("magicLoggingToken" -> _).toList)
@@ -402,8 +410,8 @@ trait SolrMeta[T <: Record[T]] extends SlashemMeta[T] {
// Here be dragons! If you have multiple backends with shared IPs this could very well explode
// but finagle doesn't seem to properly set the http host header for http/1.1
request.addHeader(HttpHeaders.Names.HOST, servers.head)
-
- client(request).map(response => {
+ val loggedClient = logFilter andThen client
+ (loggedClient(request)).map(response => {
response.getStatus match {
case HttpResponseStatus.OK => response.getContent.toString(CharsetUtil.UTF_8)
case status => throw SolrResponseException(status.getCode, status.getReasonPhrase, solrName, qse.toString)
@@ -415,9 +423,15 @@ trait SolrMeta[T <: Record[T]] extends SlashemMeta[T] {
/** Logging and Timing solr trait */
trait SolrQueryLogger {
- def log(name: String, msg: String, time: Long)
+ /**
+ * to instrument start and stop of query return a function that will be called
+ * when the query finishes
+ */
+ def onStartExecuteQuery(name: String, msg: String): Function0[Unit]
+
+ def log(name: String, msg: String, time: Long): Unit
- def debug(msg: String)
+ def debug(msg: String): Unit
// If this returns a string then it will be appended to the query
// so you can use it to match your query logs with application
@@ -437,9 +451,11 @@ trait SolrQueryLogger {
/** The default logger, does nothing. */
object NoopQueryLogger extends SolrQueryLogger {
- override def log(name: String, msg: String, time: Long) = Unit
- override def debug(msg: String) = println(msg)
- override def resultCount(name: String, count:Int) = println("Got back "+count+" results while querying "+name)
+ val noopCallback: Function0[Unit] = () => ()
+ override def onStartExecuteQuery(name: String, msg: String): Function0[Unit] = noopCallback
+ override def log(name: String, msg: String, time: Long): Unit = Unit
+ override def debug(msg: String): Unit = println(msg)
+ override def resultCount(name: String, count:Int): Unit = println("Got back "+count+" results while querying "+name)
}
//If you want any of the geo queries you will have to implement this
@@ -510,62 +526,70 @@ trait ElasticSchema[M <: Record[M]] extends SlashemSchema[M] {
def elasticQueryFuture[Ord, Lim, MM <: MinimumMatchType, Y, H <: Highlighting, Q <: QualityFilter, FC <: FacetCount, FLim, ST <: ScoreType](qb: QueryBuilder[M, Ord, Lim, MM, Y, H, Q, FC, FLim, ST], query: ElasticQueryBuilder, timeoutOpt: Option[Duration]): Future[SearchResults[M, Y]] = {
val esfp = meta.executorServiceFuturePool
-
+
+ val queryName = "e" + meta.indexName + ".query"
+ val queryText = query.toString()
+
val searchResultsFuture = esfp {
- val client = meta.client
- val from = qb.start.map(_.toInt).getOrElse(qb.DefaultStart)
- val limit = qb.limit.map(_.toInt).getOrElse(qb.DefaultLimit)
- meta.logger.debug("Query details "+query.toString())
- val baseRequest: SearchRequestBuilder = client.prepareSearch(meta.indexName)
- .setQuery(query)
- .setFrom(from)
- .setSize(limit)
- .setSearchType(SearchType.QUERY_THEN_FETCH)
- val request = qb.sort match {
- case None => baseRequest
- //Handle sorting by fields quickly
- case Some(Pair(Field(fieldName),"asc")) => baseRequest.addSort(fieldName,SortOrder.ASC)
- case Some(Pair(Field(fieldName),"desc")) => baseRequest.addSort(fieldName,SortOrder.DESC)
- //Handle sorting by scripts in general
- case Some(Pair(sort,dir)) => {
- val (params,scriptSrc) = sort.elasticBoost()
- val paramNames = (1 to params.length).map("p"+_)
- val script = scriptSrc.format(paramNames:_*)
- val keyedParams = paramNames zip params
- val sortOrder = dir match {
- case "asc" => SortOrder.ASC
- case "desc" => SortOrder.DESC
+ val onEndExecuteFunction: Function0[Unit] = meta.logger.onStartExecuteQuery(queryName, queryText)
+ try {
+ val client = meta.client
+ val from = qb.start.map(_.toInt).getOrElse(qb.DefaultStart)
+ val limit = qb.limit.map(_.toInt).getOrElse(qb.DefaultLimit)
+ meta.logger.debug("Query details " + queryText)
+ val baseRequest: SearchRequestBuilder = client.prepareSearch(meta.indexName)
+ .setQuery(query)
+ .setFrom(from)
+ .setSize(limit)
+ .setSearchType(SearchType.QUERY_THEN_FETCH)
+ val request = qb.sort match {
+ case None => baseRequest
+ //Handle sorting by fields quickly
+ case Some(Pair(Field(fieldName),"asc")) => baseRequest.addSort(fieldName,SortOrder.ASC)
+ case Some(Pair(Field(fieldName),"desc")) => baseRequest.addSort(fieldName,SortOrder.DESC)
+ //Handle sorting by scripts in general
+ case Some(Pair(sort,dir)) => {
+ val (params,scriptSrc) = sort.elasticBoost()
+ val paramNames = (1 to params.length).map("p"+_)
+ val script = scriptSrc.format(paramNames:_*)
+ val keyedParams = paramNames zip params
+ val sortOrder = dir match {
+ case "asc" => SortOrder.ASC
+ case "desc" => SortOrder.DESC
+ }
+ val sortBuilder = new ScriptSortBuilder(script,"number").order(sortOrder)
+ keyedParams.foreach(p => {sortBuilder.param(p._1,p._2)})
+ baseRequest.addSort(sortBuilder)
}
- val sortBuilder = new ScriptSortBuilder(script,"number").order(sortOrder)
- keyedParams.foreach(p => {sortBuilder.param(p._1,p._2)})
- baseRequest.addSort(sortBuilder)
+ case _ => baseRequest
}
- case _ => baseRequest
- }
- /* Set the server side timeout */
- val timeLimmitedRequest = timeoutOpt match {
- case Some(timeout) => request.setTimeout(TimeValue.timeValueMillis(timeout.inMillis))
- case _ => request
- }
+ /* Set the server side timeout */
+ val timeLimmitedRequest = timeoutOpt match {
+ case Some(timeout) => request.setTimeout(TimeValue.timeValueMillis(timeout.inMillis))
+ case _ => request
+ }
- /* Add a facet to the request */
- val facetedRequest = qb.facetSettings.facetFieldList match {
- case Nil => timeLimmitedRequest
- case _ => {
- termFacetQuery(qb.facetSettings.facetFieldList, qb.facetSettings.facetLimit).foreach(timeLimmitedRequest.addFacet(_))
- timeLimmitedRequest
+ /* Add a facet to the request */
+ val facetedRequest = qb.facetSettings.facetFieldList match {
+ case Nil => timeLimmitedRequest
+ case _ => {
+ termFacetQuery(qb.facetSettings.facetFieldList, qb.facetSettings.facetLimit).foreach(timeLimmitedRequest.addFacet(_))
+ timeLimmitedRequest
+ }
}
+ val response : SearchResponse = facetedRequest.execute().get()
+ response
+ } finally {
+ onEndExecuteFunction()
}
- val response : SearchResponse = facetedRequest.execute().get()
- response
}
- val queryText = query.toString()
+
timeFuture(searchResultsFuture).map( {
case (queryTime, result) => {
- meta.logger.log("e" + meta.indexName + ".query",queryText, queryTime)
+ meta.logger.log(queryName, queryText, queryTime)
result
}}).map({
response =>
@@ -849,11 +873,21 @@ trait SolrSchema[M <: Record[M]] extends SlashemSchema[M] {
fieldstofetch: List[String],
fallOf: Option[Double],
min: Option[Int]): Future[SearchResults[M, Y]] = {
+ val queryName = meta.solrName + ".query"
val queryText = meta.queryString(params).toString
+
+ val logFilter = new SimpleFilter[HttpRequest, HttpResponse] {
+ def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = {
+ val onEndExecuteFunction: Function0[Unit] = meta.logger.onStartExecuteQuery(queryName, queryText)
+ service(request) respond { response =>
+ onEndExecuteFunction()
+ }
+ }
+ }
- timeFuture(meta.rawQueryFuture(params)).map({
+ timeFuture(meta.rawQueryFuture(params, logFilter)).map({
case (queryTime, jsonString) => {
- meta.logger.log(meta.solrName + ".query", queryText, queryTime)
+ meta.logger.log(queryName, queryText, queryTime)
jsonString
}}).flatMap(jsonString => {
meta.extractFromResponse(jsonString, creator,
View
25 src/test/scala/com/foursquare/slashem/ElasticQueryTest.scala
@@ -52,6 +52,31 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
})
Assert.assertEquals(1,future.apply(Duration(200,TimeUnit.MILLISECONDS)))
}
+
+ @Test
+ def testStartEndExecuteQuery {
+ val oldLogger = ESimpleGeoPanda.logger
+ try {
+ var startCount = 0
+ var endCount = 0
+ ESimpleGeoPanda.logger = new SolrQueryLogger {
+ override def onStartExecuteQuery(name: String, msg: String): Function0[Unit] = {
+ startCount += 1
+ () => {
+ endCount += 1
+ }
+ }
+ override def log(name: String, msg: String, time: Long): Unit = Unit
+ override def debug(msg: String): Unit = Unit
+ override def resultCount(name: String, count:Int): Unit = Unit
+ }
+ var r = ESimpleGeoPanda where (_.metall any) fetch()
+ Assert.assertEquals("start should have been called just once", 1, startCount)
+ Assert.assertEquals("end should have been called just once", 1, endCount)
+ } finally {
+ ESimpleGeoPanda.logger = oldLogger
+ }
+ }
@Test(expected=classOf[TimeoutException])
def futureTestBlock {
View
32 src/test/scala/com/foursquare/slashem/QueryTest.scala
@@ -1,7 +1,8 @@
package com.foursquare.slashem
import com.foursquare.slashem._
-
+import com.twitter.util.Duration
+import java.util.concurrent.TimeUnit
import org.bson.types.ObjectId
import org.junit.Test
import org.junit._
@@ -15,6 +16,35 @@ import org.specs.matcher.ScalaCheckMatchers
class QueryTest extends SpecsMatchers with ScalaCheckMatchers {
+
+ @Test
+ def testStartEndExecuteQuery {
+ val oldLogger = SUserTest.logger
+ try {
+ var startCount = 0
+ var endCount = 0
+ SUserTest.logger = new SolrQueryLogger {
+ override def onStartExecuteQuery(name: String, msg: String): Function0[Unit] = {
+ startCount += 1
+ () => {
+ endCount += 1
+ }
+ }
+ override def log(name: String, msg: String, time: Long): Unit = Unit
+ override def debug(msg: String): Unit = Unit
+ override def resultCount(name: String, count:Int): Unit = Unit
+ }
+ // this query should fail since there's no solr server
+ // 10 second wait is upper limit to prevent race condition
+ SUserTest.where(_.fullname eqs "jon").fetchFuture().get(Duration(10, TimeUnit.SECONDS))
+
+ Assert.assertEquals("start should have been called just once", 1, startCount)
+ Assert.assertEquals("end should have been called just once", 1, endCount)
+ } finally {
+ SUserTest.logger = oldLogger
+ }
+ }
+
@Test
def testProduceCorrectSimpleQueryString {
val q = SUserTest where (_.fullname eqs "jon")
Something went wrong with that request. Please try again.