Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SolrQueryLogger interface for start/end events #47

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
144 changes: 89 additions & 55 deletions src/main/scala/com/foursquare/slashem/Schema.scala
Expand Up @@ -8,7 +8,7 @@ import com.twitter.util.{Duration, ExecutorServiceFuturePool, Future, FuturePool
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.http.Http
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.Service
import com.twitter.finagle.{Service, SimpleFilter}
import java.lang.Integer
import java.net.InetSocketAddress
import java.util.{ArrayList, HashMap}
Expand Down Expand Up @@ -390,10 +390,18 @@ trait SolrMeta[T <: Record[T]] extends SlashemMeta[T] {
})
qse
}

object NoopFilter extends SimpleFilter[HttpRequest, HttpResponse] {
def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = service(request)
}

def rawQueryFuture(params: Seq[(String, String)]): Future[String] = {
rawQueryFuture(params, NoopFilter)
}

// This method performs the actually query / http request. It should probably
// go in another file when it gets more sophisticated.
def rawQueryFuture(params: Seq[(String, String)]): Future[String] = {
def rawQueryFuture(params: Seq[(String, String)], logFilter: SimpleFilter[HttpRequest, HttpResponse]): Future[String] = {
// Ugly
val qse = queryString(params ++
logger.queryIdToken.map("magicLoggingToken" -> _).toList)
Expand All @@ -402,8 +410,8 @@ trait SolrMeta[T <: Record[T]] extends SlashemMeta[T] {
// Here be dragons! If you have multiple backends with shared IPs this could very well explode
// but finagle doesn't seem to properly set the http host header for http/1.1
request.addHeader(HttpHeaders.Names.HOST, servers.head)

client(request).map(response => {
val loggedClient = logFilter andThen client
(loggedClient(request)).map(response => {
response.getStatus match {
case HttpResponseStatus.OK => response.getContent.toString(CharsetUtil.UTF_8)
case status => throw SolrResponseException(status.getCode, status.getReasonPhrase, solrName, qse.toString)
Expand All @@ -415,9 +423,15 @@ trait SolrMeta[T <: Record[T]] extends SlashemMeta[T] {

/** Logging and Timing solr trait */
trait SolrQueryLogger {
def log(name: String, msg: String, time: Long)
/**
* to instrument start and stop of query return a function that will be called
* when the query finishes
*/
def onStartExecuteQuery(name: String, msg: String): Function0[Unit]

def log(name: String, msg: String, time: Long): Unit

def debug(msg: String)
def debug(msg: String): Unit

// If this returns a string then it will be appended to the query
// so you can use it to match your query logs with application
Expand All @@ -437,9 +451,11 @@ trait SolrQueryLogger {

/** The default logger, does nothing. */
object NoopQueryLogger extends SolrQueryLogger {
override def log(name: String, msg: String, time: Long) = Unit
override def debug(msg: String) = println(msg)
override def resultCount(name: String, count:Int) = println("Got back "+count+" results while querying "+name)
val noopCallback: Function0[Unit] = () => ()
override def onStartExecuteQuery(name: String, msg: String): Function0[Unit] = noopCallback
override def log(name: String, msg: String, time: Long): Unit = Unit
override def debug(msg: String): Unit = println(msg)
override def resultCount(name: String, count:Int): Unit = println("Got back "+count+" results while querying "+name)
}

//If you want any of the geo queries you will have to implement this
Expand Down Expand Up @@ -510,62 +526,70 @@ trait ElasticSchema[M <: Record[M]] extends SlashemSchema[M] {

def elasticQueryFuture[Ord, Lim, MM <: MinimumMatchType, Y, H <: Highlighting, Q <: QualityFilter, FC <: FacetCount, FLim, ST <: ScoreType](qb: QueryBuilder[M, Ord, Lim, MM, Y, H, Q, FC, FLim, ST], query: ElasticQueryBuilder, timeoutOpt: Option[Duration]): Future[SearchResults[M, Y]] = {
val esfp = meta.executorServiceFuturePool


val queryName = "e" + meta.indexName + ".query"
val queryText = query.toString()

val searchResultsFuture = esfp {
val client = meta.client
val from = qb.start.map(_.toInt).getOrElse(qb.DefaultStart)
val limit = qb.limit.map(_.toInt).getOrElse(qb.DefaultLimit)
meta.logger.debug("Query details "+query.toString())
val baseRequest: SearchRequestBuilder = client.prepareSearch(meta.indexName)
.setQuery(query)
.setFrom(from)
.setSize(limit)
.setSearchType(SearchType.QUERY_THEN_FETCH)
val request = qb.sort match {
case None => baseRequest
//Handle sorting by fields quickly
case Some(Pair(Field(fieldName),"asc")) => baseRequest.addSort(fieldName,SortOrder.ASC)
case Some(Pair(Field(fieldName),"desc")) => baseRequest.addSort(fieldName,SortOrder.DESC)
//Handle sorting by scripts in general
case Some(Pair(sort,dir)) => {
val (params,scriptSrc) = sort.elasticBoost()
val paramNames = (1 to params.length).map("p"+_)
val script = scriptSrc.format(paramNames:_*)
val keyedParams = paramNames zip params
val sortOrder = dir match {
case "asc" => SortOrder.ASC
case "desc" => SortOrder.DESC
val onEndExecuteFunction: Function0[Unit] = meta.logger.onStartExecuteQuery(queryName, queryText)
try {
val client = meta.client
val from = qb.start.map(_.toInt).getOrElse(qb.DefaultStart)
val limit = qb.limit.map(_.toInt).getOrElse(qb.DefaultLimit)
meta.logger.debug("Query details " + queryText)
val baseRequest: SearchRequestBuilder = client.prepareSearch(meta.indexName)
.setQuery(query)
.setFrom(from)
.setSize(limit)
.setSearchType(SearchType.QUERY_THEN_FETCH)
val request = qb.sort match {
case None => baseRequest
//Handle sorting by fields quickly
case Some(Pair(Field(fieldName),"asc")) => baseRequest.addSort(fieldName,SortOrder.ASC)
case Some(Pair(Field(fieldName),"desc")) => baseRequest.addSort(fieldName,SortOrder.DESC)
//Handle sorting by scripts in general
case Some(Pair(sort,dir)) => {
val (params,scriptSrc) = sort.elasticBoost()
val paramNames = (1 to params.length).map("p"+_)
val script = scriptSrc.format(paramNames:_*)
val keyedParams = paramNames zip params
val sortOrder = dir match {
case "asc" => SortOrder.ASC
case "desc" => SortOrder.DESC
}
val sortBuilder = new ScriptSortBuilder(script,"number").order(sortOrder)
keyedParams.foreach(p => {sortBuilder.param(p._1,p._2)})
baseRequest.addSort(sortBuilder)
}
val sortBuilder = new ScriptSortBuilder(script,"number").order(sortOrder)
keyedParams.foreach(p => {sortBuilder.param(p._1,p._2)})
baseRequest.addSort(sortBuilder)
case _ => baseRequest
}
case _ => baseRequest
}

/* Set the server side timeout */
val timeLimmitedRequest = timeoutOpt match {
case Some(timeout) => request.setTimeout(TimeValue.timeValueMillis(timeout.inMillis))
case _ => request
}
/* Set the server side timeout */
val timeLimmitedRequest = timeoutOpt match {
case Some(timeout) => request.setTimeout(TimeValue.timeValueMillis(timeout.inMillis))
case _ => request
}

/* Add a facet to the request */
val facetedRequest = qb.facetSettings.facetFieldList match {
case Nil => timeLimmitedRequest
case _ => {
termFacetQuery(qb.facetSettings.facetFieldList, qb.facetSettings.facetLimit).foreach(timeLimmitedRequest.addFacet(_))
timeLimmitedRequest
/* Add a facet to the request */
val facetedRequest = qb.facetSettings.facetFieldList match {
case Nil => timeLimmitedRequest
case _ => {
termFacetQuery(qb.facetSettings.facetFieldList, qb.facetSettings.facetLimit).foreach(timeLimmitedRequest.addFacet(_))
timeLimmitedRequest
}
}
val response : SearchResponse = facetedRequest.execute().get()
response
} finally {
onEndExecuteFunction()
}
val response : SearchResponse = facetedRequest.execute().get()
response
}

val queryText = query.toString()


timeFuture(searchResultsFuture).map( {
case (queryTime, result) => {
meta.logger.log("e" + meta.indexName + ".query",queryText, queryTime)
meta.logger.log(queryName, queryText, queryTime)
result
}}).map({
response =>
Expand Down Expand Up @@ -849,11 +873,21 @@ trait SolrSchema[M <: Record[M]] extends SlashemSchema[M] {
fieldstofetch: List[String],
fallOf: Option[Double],
min: Option[Int]): Future[SearchResults[M, Y]] = {
val queryName = meta.solrName + ".query"
val queryText = meta.queryString(params).toString

val logFilter = new SimpleFilter[HttpRequest, HttpResponse] {
def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = {
val onEndExecuteFunction: Function0[Unit] = meta.logger.onStartExecuteQuery(queryName, queryText)
service(request) respond { response =>
onEndExecuteFunction()
}
}
}

timeFuture(meta.rawQueryFuture(params)).map({
timeFuture(meta.rawQueryFuture(params, logFilter)).map({
case (queryTime, jsonString) => {
meta.logger.log(meta.solrName + ".query", queryText, queryTime)
meta.logger.log(queryName, queryText, queryTime)
jsonString
}}).flatMap(jsonString => {
meta.extractFromResponse(jsonString, creator,
Expand Down
25 changes: 25 additions & 0 deletions src/test/scala/com/foursquare/slashem/ElasticQueryTest.scala
Expand Up @@ -52,6 +52,31 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
})
Assert.assertEquals(1,future.apply(Duration(200,TimeUnit.MILLISECONDS)))
}

@Test
def testStartEndExecuteQuery {
val oldLogger = ESimpleGeoPanda.logger
try {
var startCount = 0
var endCount = 0
ESimpleGeoPanda.logger = new SolrQueryLogger {
override def onStartExecuteQuery(name: String, msg: String): Function0[Unit] = {
startCount += 1
() => {
endCount += 1
}
}
override def log(name: String, msg: String, time: Long): Unit = Unit
override def debug(msg: String): Unit = Unit
override def resultCount(name: String, count:Int): Unit = Unit
}
var r = ESimpleGeoPanda where (_.metall any) fetch()
Assert.assertEquals("start should have been called just once", 1, startCount)
Assert.assertEquals("end should have been called just once", 1, endCount)
} finally {
ESimpleGeoPanda.logger = oldLogger
}
}

@Test(expected=classOf[TimeoutException])
def futureTestBlock {
Expand Down
32 changes: 31 additions & 1 deletion src/test/scala/com/foursquare/slashem/QueryTest.scala
@@ -1,7 +1,8 @@
package com.foursquare.slashem

import com.foursquare.slashem._

import com.twitter.util.Duration
import java.util.concurrent.TimeUnit
import org.bson.types.ObjectId
import org.junit.Test
import org.junit._
Expand All @@ -15,6 +16,35 @@ import org.specs.matcher.ScalaCheckMatchers


class QueryTest extends SpecsMatchers with ScalaCheckMatchers {

@Test
def testStartEndExecuteQuery {
val oldLogger = SUserTest.logger
try {
var startCount = 0
var endCount = 0
SUserTest.logger = new SolrQueryLogger {
override def onStartExecuteQuery(name: String, msg: String): Function0[Unit] = {
startCount += 1
() => {
endCount += 1
}
}
override def log(name: String, msg: String, time: Long): Unit = Unit
override def debug(msg: String): Unit = Unit
override def resultCount(name: String, count:Int): Unit = Unit
}
// this query should fail since there's no solr server
// 10 second wait is upper limit to prevent race condition
SUserTest.where(_.fullname eqs "jon").fetchFuture().get(Duration(10, TimeUnit.SECONDS))

Assert.assertEquals("start should have been called just once", 1, startCount)
Assert.assertEquals("end should have been called just once", 1, endCount)
} finally {
SUserTest.logger = oldLogger
}
}

@Test
def testProduceCorrectSimpleQueryString {
val q = SUserTest where (_.fullname eqs "jon")
Expand Down