diff --git a/src/main/scala/com/foursquare/slashem/Schema.scala b/src/main/scala/com/foursquare/slashem/Schema.scala index 9818230..179d044 100644 --- a/src/main/scala/com/foursquare/slashem/Schema.scala +++ b/src/main/scala/com/foursquare/slashem/Schema.scala @@ -8,7 +8,7 @@ import com.twitter.util.{Duration, ExecutorServiceFuturePool, Future, FuturePool import com.twitter.finagle.builder.ClientBuilder import com.twitter.finagle.http.Http import com.twitter.finagle.stats.StatsReceiver -import com.twitter.finagle.Service +import com.twitter.finagle.{Service, SimpleFilter} import java.lang.Integer import java.net.InetSocketAddress import java.util.{ArrayList, HashMap} @@ -390,10 +390,18 @@ trait SolrMeta[T <: Record[T]] extends SlashemMeta[T] { }) qse } + + object NoopFilter extends SimpleFilter[HttpRequest, HttpResponse] { + def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = service(request) + } + def rawQueryFuture(params: Seq[(String, String)]): Future[String] = { + rawQueryFuture(params, NoopFilter) + } + // This method performs the actually query / http request. It should probably // go in another file when it gets more sophisticated. - def rawQueryFuture(params: Seq[(String, String)]): Future[String] = { + def rawQueryFuture(params: Seq[(String, String)], logFilter: SimpleFilter[HttpRequest, HttpResponse]): Future[String] = { // Ugly val qse = queryString(params ++ logger.queryIdToken.map("magicLoggingToken" -> _).toList) @@ -402,8 +410,8 @@ trait SolrMeta[T <: Record[T]] extends SlashemMeta[T] { // Here be dragons! If you have multiple backends with shared IPs this could very well explode // but finagle doesn't seem to properly set the http host header for http/1.1 request.addHeader(HttpHeaders.Names.HOST, servers.head) - - client(request).map(response => { + val loggedClient = logFilter andThen client + (loggedClient(request)).map(response => { response.getStatus match { case HttpResponseStatus.OK => response.getContent.toString(CharsetUtil.UTF_8) case status => throw SolrResponseException(status.getCode, status.getReasonPhrase, solrName, qse.toString) @@ -415,9 +423,15 @@ trait SolrMeta[T <: Record[T]] extends SlashemMeta[T] { /** Logging and Timing solr trait */ trait SolrQueryLogger { - def log(name: String, msg: String, time: Long) + /** + * to instrument start and stop of query return a function that will be called + * when the query finishes + */ + def onStartExecuteQuery(name: String, msg: String): Function0[Unit] + + def log(name: String, msg: String, time: Long): Unit - def debug(msg: String) + def debug(msg: String): Unit // If this returns a string then it will be appended to the query // so you can use it to match your query logs with application @@ -437,9 +451,11 @@ trait SolrQueryLogger { /** The default logger, does nothing. */ object NoopQueryLogger extends SolrQueryLogger { - override def log(name: String, msg: String, time: Long) = Unit - override def debug(msg: String) = println(msg) - override def resultCount(name: String, count:Int) = println("Got back "+count+" results while querying "+name) + val noopCallback: Function0[Unit] = () => () + override def onStartExecuteQuery(name: String, msg: String): Function0[Unit] = noopCallback + override def log(name: String, msg: String, time: Long): Unit = Unit + override def debug(msg: String): Unit = println(msg) + override def resultCount(name: String, count:Int): Unit = println("Got back "+count+" results while querying "+name) } //If you want any of the geo queries you will have to implement this @@ -510,62 +526,70 @@ trait ElasticSchema[M <: Record[M]] extends SlashemSchema[M] { def elasticQueryFuture[Ord, Lim, MM <: MinimumMatchType, Y, H <: Highlighting, Q <: QualityFilter, FC <: FacetCount, FLim, ST <: ScoreType](qb: QueryBuilder[M, Ord, Lim, MM, Y, H, Q, FC, FLim, ST], query: ElasticQueryBuilder, timeoutOpt: Option[Duration]): Future[SearchResults[M, Y]] = { val esfp = meta.executorServiceFuturePool - + + val queryName = "e" + meta.indexName + ".query" + val queryText = query.toString() + val searchResultsFuture = esfp { - val client = meta.client - val from = qb.start.map(_.toInt).getOrElse(qb.DefaultStart) - val limit = qb.limit.map(_.toInt).getOrElse(qb.DefaultLimit) - meta.logger.debug("Query details "+query.toString()) - val baseRequest: SearchRequestBuilder = client.prepareSearch(meta.indexName) - .setQuery(query) - .setFrom(from) - .setSize(limit) - .setSearchType(SearchType.QUERY_THEN_FETCH) - val request = qb.sort match { - case None => baseRequest - //Handle sorting by fields quickly - case Some(Pair(Field(fieldName),"asc")) => baseRequest.addSort(fieldName,SortOrder.ASC) - case Some(Pair(Field(fieldName),"desc")) => baseRequest.addSort(fieldName,SortOrder.DESC) - //Handle sorting by scripts in general - case Some(Pair(sort,dir)) => { - val (params,scriptSrc) = sort.elasticBoost() - val paramNames = (1 to params.length).map("p"+_) - val script = scriptSrc.format(paramNames:_*) - val keyedParams = paramNames zip params - val sortOrder = dir match { - case "asc" => SortOrder.ASC - case "desc" => SortOrder.DESC + val onEndExecuteFunction: Function0[Unit] = meta.logger.onStartExecuteQuery(queryName, queryText) + try { + val client = meta.client + val from = qb.start.map(_.toInt).getOrElse(qb.DefaultStart) + val limit = qb.limit.map(_.toInt).getOrElse(qb.DefaultLimit) + meta.logger.debug("Query details " + queryText) + val baseRequest: SearchRequestBuilder = client.prepareSearch(meta.indexName) + .setQuery(query) + .setFrom(from) + .setSize(limit) + .setSearchType(SearchType.QUERY_THEN_FETCH) + val request = qb.sort match { + case None => baseRequest + //Handle sorting by fields quickly + case Some(Pair(Field(fieldName),"asc")) => baseRequest.addSort(fieldName,SortOrder.ASC) + case Some(Pair(Field(fieldName),"desc")) => baseRequest.addSort(fieldName,SortOrder.DESC) + //Handle sorting by scripts in general + case Some(Pair(sort,dir)) => { + val (params,scriptSrc) = sort.elasticBoost() + val paramNames = (1 to params.length).map("p"+_) + val script = scriptSrc.format(paramNames:_*) + val keyedParams = paramNames zip params + val sortOrder = dir match { + case "asc" => SortOrder.ASC + case "desc" => SortOrder.DESC + } + val sortBuilder = new ScriptSortBuilder(script,"number").order(sortOrder) + keyedParams.foreach(p => {sortBuilder.param(p._1,p._2)}) + baseRequest.addSort(sortBuilder) } - val sortBuilder = new ScriptSortBuilder(script,"number").order(sortOrder) - keyedParams.foreach(p => {sortBuilder.param(p._1,p._2)}) - baseRequest.addSort(sortBuilder) + case _ => baseRequest } - case _ => baseRequest - } - /* Set the server side timeout */ - val timeLimmitedRequest = timeoutOpt match { - case Some(timeout) => request.setTimeout(TimeValue.timeValueMillis(timeout.inMillis)) - case _ => request - } + /* Set the server side timeout */ + val timeLimmitedRequest = timeoutOpt match { + case Some(timeout) => request.setTimeout(TimeValue.timeValueMillis(timeout.inMillis)) + case _ => request + } - /* Add a facet to the request */ - val facetedRequest = qb.facetSettings.facetFieldList match { - case Nil => timeLimmitedRequest - case _ => { - termFacetQuery(qb.facetSettings.facetFieldList, qb.facetSettings.facetLimit).foreach(timeLimmitedRequest.addFacet(_)) - timeLimmitedRequest + /* Add a facet to the request */ + val facetedRequest = qb.facetSettings.facetFieldList match { + case Nil => timeLimmitedRequest + case _ => { + termFacetQuery(qb.facetSettings.facetFieldList, qb.facetSettings.facetLimit).foreach(timeLimmitedRequest.addFacet(_)) + timeLimmitedRequest + } } + val response : SearchResponse = facetedRequest.execute().get() + response + } finally { + onEndExecuteFunction() } - val response : SearchResponse = facetedRequest.execute().get() - response } - val queryText = query.toString() + timeFuture(searchResultsFuture).map( { case (queryTime, result) => { - meta.logger.log("e" + meta.indexName + ".query",queryText, queryTime) + meta.logger.log(queryName, queryText, queryTime) result }}).map({ response => @@ -849,11 +873,21 @@ trait SolrSchema[M <: Record[M]] extends SlashemSchema[M] { fieldstofetch: List[String], fallOf: Option[Double], min: Option[Int]): Future[SearchResults[M, Y]] = { + val queryName = meta.solrName + ".query" val queryText = meta.queryString(params).toString + + val logFilter = new SimpleFilter[HttpRequest, HttpResponse] { + def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = { + val onEndExecuteFunction: Function0[Unit] = meta.logger.onStartExecuteQuery(queryName, queryText) + service(request) respond { response => + onEndExecuteFunction() + } + } + } - timeFuture(meta.rawQueryFuture(params)).map({ + timeFuture(meta.rawQueryFuture(params, logFilter)).map({ case (queryTime, jsonString) => { - meta.logger.log(meta.solrName + ".query", queryText, queryTime) + meta.logger.log(queryName, queryText, queryTime) jsonString }}).flatMap(jsonString => { meta.extractFromResponse(jsonString, creator, diff --git a/src/test/scala/com/foursquare/slashem/ElasticQueryTest.scala b/src/test/scala/com/foursquare/slashem/ElasticQueryTest.scala index 304c44d..1769f33 100644 --- a/src/test/scala/com/foursquare/slashem/ElasticQueryTest.scala +++ b/src/test/scala/com/foursquare/slashem/ElasticQueryTest.scala @@ -52,6 +52,31 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { }) Assert.assertEquals(1,future.apply(Duration(200,TimeUnit.MILLISECONDS))) } + + @Test + def testStartEndExecuteQuery { + val oldLogger = ESimpleGeoPanda.logger + try { + var startCount = 0 + var endCount = 0 + ESimpleGeoPanda.logger = new SolrQueryLogger { + override def onStartExecuteQuery(name: String, msg: String): Function0[Unit] = { + startCount += 1 + () => { + endCount += 1 + } + } + override def log(name: String, msg: String, time: Long): Unit = Unit + override def debug(msg: String): Unit = Unit + override def resultCount(name: String, count:Int): Unit = Unit + } + var r = ESimpleGeoPanda where (_.metall any) fetch() + Assert.assertEquals("start should have been called just once", 1, startCount) + Assert.assertEquals("end should have been called just once", 1, endCount) + } finally { + ESimpleGeoPanda.logger = oldLogger + } + } @Test(expected=classOf[TimeoutException]) def futureTestBlock { diff --git a/src/test/scala/com/foursquare/slashem/QueryTest.scala b/src/test/scala/com/foursquare/slashem/QueryTest.scala index 49ed903..fe11dfd 100644 --- a/src/test/scala/com/foursquare/slashem/QueryTest.scala +++ b/src/test/scala/com/foursquare/slashem/QueryTest.scala @@ -1,7 +1,8 @@ package com.foursquare.slashem import com.foursquare.slashem._ - +import com.twitter.util.Duration +import java.util.concurrent.TimeUnit import org.bson.types.ObjectId import org.junit.Test import org.junit._ @@ -15,6 +16,35 @@ import org.specs.matcher.ScalaCheckMatchers class QueryTest extends SpecsMatchers with ScalaCheckMatchers { + + @Test + def testStartEndExecuteQuery { + val oldLogger = SUserTest.logger + try { + var startCount = 0 + var endCount = 0 + SUserTest.logger = new SolrQueryLogger { + override def onStartExecuteQuery(name: String, msg: String): Function0[Unit] = { + startCount += 1 + () => { + endCount += 1 + } + } + override def log(name: String, msg: String, time: Long): Unit = Unit + override def debug(msg: String): Unit = Unit + override def resultCount(name: String, count:Int): Unit = Unit + } + // this query should fail since there's no solr server + // 10 second wait is upper limit to prevent race condition + SUserTest.where(_.fullname eqs "jon").fetchFuture().get(Duration(10, TimeUnit.SECONDS)) + + Assert.assertEquals("start should have been called just once", 1, startCount) + Assert.assertEquals("end should have been called just once", 1, endCount) + } finally { + SUserTest.logger = oldLogger + } + } + @Test def testProduceCorrectSimpleQueryString { val q = SUserTest where (_.fullname eqs "jon")