From 230a5cf956bd972b7e1e594bf91cf4b8df01cebc Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 19 Mar 2012 20:33:39 -0700 Subject: [PATCH] Revert the attempt to wrap the ALF in a Twitter Future This reverts commit 85c787677a228fee94c19b7e57cb873de5129220. --- .../scala/com/foursquare/slashem/Schema.scala | 131 ++++++------------ .../foursquare/slashem/ElasticQueryTest.scala | 46 +++--- 2 files changed, 68 insertions(+), 109 deletions(-) diff --git a/src/main/scala/com/foursquare/slashem/Schema.scala b/src/main/scala/com/foursquare/slashem/Schema.scala index 82d30c0..665b943 100644 --- a/src/main/scala/com/foursquare/slashem/Schema.scala +++ b/src/main/scala/com/foursquare/slashem/Schema.scala @@ -4,8 +4,7 @@ package com.foursquare.slashem import com.foursquare.slashem.Ast._ -import com.twitter.util.{Duration, Future, FutureTask, Promise, Return, Throw, Try, TryLike} -import com.twitter.concurrent.IVar +import com.twitter.util.{Duration, Future, FutureTask} import com.twitter.finagle.builder.ClientBuilder import com.twitter.finagle.http.Http import com.twitter.finagle.Service @@ -21,7 +20,6 @@ import org.codehaus.jackson.map.{DeserializationConfig, ObjectMapper} import org.elasticsearch.action.search.SearchRequestBuilder import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.action.search.SearchType -import org.elasticsearch.action.{ActionListener, ListenableActionFuture} import org.elasticsearch.client.Client import org.elasticsearch.client.transport.TransportClient import org.elasticsearch.common.settings.ImmutableSettings @@ -45,49 +43,6 @@ import org.joda.time.DateTime import scala.annotation.tailrec import scalaj.collection.Imports._ -import java.util.concurrent.TimeUnit - -class WrappedActionListenerFuture[A](af: ListenableActionFuture[A]) extends Promise[A] with ActionListener[A] { - //ActionListener - def onResponse(r: A) { - println("got a response") - this.update(Return(r)) - } - def onFailure(e: Throwable) { - println("got a failure"+e) - this.update(Throw(e)) - } - //Promise - override def get(timeout: Duration): Try[A] = { - println("got called with get") - //TODO: respect timeout - try { - Return(af.get()) - } catch { - case e: Throwable => Throw(e) - } - } - override def isCancelled: Boolean = { - af.isCancelled - } - - override def cancel() { - af.cancel(true) - } - - override def isDefined = af.isDone - - -} - -object WrappedActionListenerFuture { - def fromActionFuture[A](af: ListenableActionFuture[A]): Future[A] = { - val tf = new WrappedActionListenerFuture(af) - af.addListener(tf) - tf - } -} - /** * SolrResponseException class that extends RuntimeException */ @@ -505,52 +460,56 @@ trait ElasticSchema[M <: Record[M]] extends SlashemSchema[M] { } def elasticQueryFuture[Ord, Lim, MM <: MinimumMatchType, Y, H <: Highlighting, Q <: QualityFilter, FC <: FacetCount, FLim](qb: QueryBuilder[M, Ord, Lim, MM, Y, H, Q, FC, FLim], query: ElasticQueryBuilder, timeoutOpt: Option[Duration]): Future[SearchResults[M, Y]] = { - val client = meta.client - val from = qb.start.map(_.toInt).getOrElse(qb.DefaultStart) - val limit = qb.limit.map(_.toInt).getOrElse(qb.DefaultLimit) - meta.logger.debug("Query details "+query.toString()) - val baseRequest: SearchRequestBuilder = client.prepareSearch(meta.indexName) - .setQuery(query) - .setFrom(from) - .setSize(limit) - .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) - val request = qb.sort match { - case None => baseRequest - //Handle sorting by fields quickly - case Some(Pair(Field(fieldName),"asc")) => baseRequest.addSort(fieldName,SortOrder.ASC) - case Some(Pair(Field(fieldName),"desc")) => baseRequest.addSort(fieldName,SortOrder.DESC) - //Handle sorting by scripts in general - case Some(Pair(sort,"asc")) => baseRequest.addSort(new ScriptSortBuilder(sort.elasticBoost(),"number").order(SortOrder.ASC)) - case Some(Pair(sort,"desc")) => baseRequest.addSort(new ScriptSortBuilder(sort.elasticBoost(),"number").order(SortOrder.DESC)) - - case _ => baseRequest + val future : FutureTask[SearchResults[M,Y]]= new FutureTask({ + val client = meta.client + val from = qb.start.map(_.toInt).getOrElse(qb.DefaultStart) + val limit = qb.limit.map(_.toInt).getOrElse(qb.DefaultLimit) + meta.logger.debug("Query details "+query.toString()) + val baseRequest: SearchRequestBuilder = client.prepareSearch(meta.indexName) + .setQuery(query) + .setFrom(from) + .setSize(limit) + .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) + val request = qb.sort match { + case None => baseRequest + //Handle sorting by fields quickly + case Some(Pair(Field(fieldName),"asc")) => baseRequest.addSort(fieldName,SortOrder.ASC) + case Some(Pair(Field(fieldName),"desc")) => baseRequest.addSort(fieldName,SortOrder.DESC) + //Handle sorting by scripts in general + case Some(Pair(sort,"asc")) => baseRequest.addSort(new ScriptSortBuilder(sort.elasticBoost(),"number").order(SortOrder.ASC)) + case Some(Pair(sort,"desc")) => baseRequest.addSort(new ScriptSortBuilder(sort.elasticBoost(),"number").order(SortOrder.DESC)) + + case _ => baseRequest } - /* Set the server side timeout */ - val timeLimmitedRequest = timeoutOpt match { - case Some(timeout) => request.setTimeout(TimeValue.timeValueMillis(timeout.inMillis)) - case _ => request - } + /* Set the server side timeout */ + val timeLimmitedRequest = timeoutOpt match { + case Some(timeout) => request.setTimeout(TimeValue.timeValueMillis(timeout.inMillis)) + case _ => request + } - /* Add a facet to the request */ - val facetedRequest = qb.facetSettings.facetFieldList match { - case Nil => timeLimmitedRequest - case _ => { - termFacetQuery(qb.facetSettings.facetFieldList, qb.facetSettings.facetLimit).foreach(timeLimmitedRequest.addFacet(_)) - timeLimmitedRequest + /* Add a facet to the request */ + val facetedRequest = qb.facetSettings.facetFieldList match { + case Nil => timeLimmitedRequest + case _ => { + termFacetQuery(qb.facetSettings.facetFieldList, qb.facetSettings.facetLimit).foreach(timeLimmitedRequest.addFacet(_)) + timeLimmitedRequest + } } - } - val responseActionFuture: ListenableActionFuture[SearchResponse] = facetedRequest.execute() - val future = WrappedActionListenerFuture.fromActionFuture(responseActionFuture) - val responseFuture = future.map({ - response => constructSearchResults(qb.creator, - qb.start.map(_.toInt).getOrElse(qb.DefaultStart), - qb.fallOf, - qb.min, - response)}) - timeFuture(responseFuture).map( { + val response: SearchResponse = facetedRequest + .execute().actionGet() + meta.logger.debug("Search response "+response.toString()) + constructSearchResults(qb.creator, + qb.start.map(_.toInt).getOrElse(qb.DefaultStart), + qb.fallOf, + qb.min, + response) + } + ) + future.run() + timeFuture(future).map( { case (queryTime, result) => { meta.logger.log("e" + meta.indexName + ".query",query.toString(), queryTime) result diff --git a/src/test/scala/com/foursquare/slashem/ElasticQueryTest.scala b/src/test/scala/com/foursquare/slashem/ElasticQueryTest.scala index a057490..31c2637 100644 --- a/src/test/scala/com/foursquare/slashem/ElasticQueryTest.scala +++ b/src/test/scala/com/foursquare/slashem/ElasticQueryTest.scala @@ -39,7 +39,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { val r = ESimplePanda where (_.name eqs "lolsdonotinsertsomethingwiththisinit") fetch() Assert.assertEquals(0,r.response.results.length) } - //@Test + @Test def testNonEmptySearch { val r = ESimplePanda where (_.hobos contains "hobos") fetch() Assert.assertEquals(1,r.response.results.length) @@ -49,7 +49,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { Assert.assertEquals("hobos",doc.hobos.value) Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b11"),doc.id.is) } - //@Test + @Test def testNonEmptyMM100Search { val r = ESimplePanda where (_.name contains "loler eating hobo") minimumMatchPercent(100) fetch() Assert.assertEquals(1,r.response.results.length) @@ -57,7 +57,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { val doc = r.response.results.apply(0) Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b18"),doc.id.is) } - //@Test + @Test def testNonEmptyMM100SearchWithTimeout { val r = ESimplePanda where (_.name contains "loler eating hobo") minimumMatchPercent(100) fetch(Duration(1, TimeUnit.SECONDS)) Assert.assertEquals(1,r.response.results.length) @@ -65,12 +65,12 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { val doc = r.response.results.apply(0) Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b18"),doc.id.is) } - //@Test + @Test def testNonEmptyMultiFieldSearch { val r = ESimplePanda.where(_.default contains "onlyinnamefield").queryField(_.name).queryField(_.hobos) fetch() Assert.assertEquals(1,r.response.results.length) } - //@Test + @Test def testNonEmptySearchOidScorePare { val r = ESimplePanda where (_.hobos contains "hobos") fetch() Assert.assertEquals(1,r.response.results.length) @@ -78,7 +78,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { val doc = r.response.oidScorePair.apply(0) Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b11"),doc._1) } - //@Test + @Test def testSimpleInQuery { val r = ESimplePanda where (_.hobos in List("hobos")) fetch() Assert.assertEquals(1,r.response.results.length) @@ -86,22 +86,22 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { val doc = r.response.oidScorePair.apply(0) Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b11"),doc._1) } - //@Test + @Test def testSimpleNInQuery { val r = ESimplePanda where (_.hobos nin List("hobos")) fetch() Assert.assertEquals(7,r.response.results.length) } - //@Test + @Test def testManyResultsSearch { val r = ESimplePanda where (_.name contains "loler") fetch() Assert.assertEquals(4,r.response.results.length) } - //@Test + @Test def testAndSearch { val r = ESimplePanda where (_.name contains "loler") and (_.hobos contains "nyet") fetch() Assert.assertEquals(1,r.response.results.length) } - //@Test + @Test def orderDesc { var r = ESimplePanda where (_.name contains "ordertest") orderDesc(_.followers) fetch() Assert.assertEquals(2,r.response.results.length) @@ -110,7 +110,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { Assert.assertEquals(doc0._1,new ObjectId("4c809f4251ada1cdc3790b14")) Assert.assertEquals(doc1._1,new ObjectId("4c809f4251ada1cdc3790b15")) } - //@Test + @Test def orderAsc { var r = ESimplePanda where (_.name contains "ordertest") orderAsc(_.followers) fetch() Assert.assertEquals(2,r.response.results.length) @@ -119,7 +119,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { Assert.assertEquals(doc0._1,new ObjectId("4c809f4251ada1cdc3790b15")) Assert.assertEquals(doc1._1,new ObjectId("4c809f4251ada1cdc3790b14")) } - //@Test + @Test def geoOrderDesc { var r = ESimpleGeoPanda where (_.name contains "ordertest") complexOrderDesc(_.pos sqeGeoDistance(74.0,-31.0)) fetch() Assert.assertEquals(2,r.response.results.length) @@ -128,7 +128,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b16"),doc0._1) Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b17"),doc1._1) } - //@Test + @Test def geoOrderAsc { var r = ESimpleGeoPanda where (_.name contains "ordertest") complexOrderAsc(_.pos sqeGeoDistance(74.0,-31.0)) fetch() Assert.assertEquals(2,r.response.results.length) @@ -137,7 +137,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b17"),doc0._1) Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b16"),doc1._1) } - //@Test + @Test def geoOrderIntAsc { var r = ESimpleGeoPanda where (_.name contains "ordertest") complexOrderAsc(_.pos sqeGeoDistance(74,-31)) fetch() Assert.assertEquals(2,r.response.results.length) @@ -148,12 +148,12 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { } - //@Test + @Test def testAndOrSearch { val r = ESimplePanda where (_.name contains "loler") and (x => (x.hobos contains "nyet") or (x.hobos contains "robot")) fetch() Assert.assertEquals(r.response.results.length,2) } - //@Test + @Test def testPhraseBoostOrdering { val rWithLowPhraseBoost = ESimplePanda where (_.name contains "loler skates") phraseBoost(_.name,10) fetch() val rWithHighPhraseBoost = ESimplePanda where (_.name contains "loler skates") phraseBoost(_.name,10000) fetch() @@ -170,7 +170,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { Assert.assertTrue(doc1b.score.value > doc2b.score.value) Assert.assertTrue(doc3b.score.value > doc1b.score.value) } - //@Test + @Test def testFieldFaceting { val r = ESimplePanda where (_.name contains "loler skates") facetField(_.foreign) fetch() Assert.assertEquals(4,r.response.results.length) @@ -178,7 +178,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { Assert.assertEquals(3,r.response.fieldFacets.get("foreign").get("pants")) } - //@Test + @Test def testMaxCountFieldFaceting { val r = ESimplePanda where (_.name contains "loler skates") facetField(_.foreign) facetLimit(1) fetch() Assert.assertEquals(4,r.response.results.length) @@ -187,7 +187,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { } - //@Test + @Test def testFieldBoost { val r1 = ESimplePanda where (_.magic contains "yes") fetch() val r2 = ESimplePanda where (_.magic contains "yes") boostField(_.followers,10) fetch() @@ -196,7 +196,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { Assert.assertTrue(r2.response.results.apply(0).score.value > r1.response.results.apply(0).score.value) } - //@Test + @Test def testGeoBoost { //Test GeoBoosting. Note will actually make further away document come up first val geoLat = 74 @@ -207,7 +207,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { Assert.assertEquals(r2.response.results.length,2) Assert.assertTrue(r2.response.results.apply(0).score.value > r1.response.results.apply(0).score.value) } - //@Test + @Test def testPointExtract { //Test GeoBoosting. Note will actually make further away document come up first val geoLat = 74 @@ -217,7 +217,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { Assert.assertEquals(r.response.results.apply(0).pos.value._1,74.0,0.9) } - //@Test + @Test def testRecipGeoBoost { val geoLat = 74 val geoLong = -31 @@ -228,7 +228,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers { Assert.assertTrue(r2.response.results.apply(0).score.value > r1.response.results.apply(0).score.value) } - //@Test + @Test def testListFieldContains { val response1 = ESimplePanda where (_.favnums contains 2) fetch() val response2 = ESimplePanda where (_.favnums contains 6) fetch()