Permalink
Browse files

Revert the attempt to wrap the ALF in a Twitter Future

This reverts commit 85c787677a228fee94c19b7e57cb873de5129220.
  • Loading branch information...
Holden Karau
Holden Karau committed Mar 20, 2012
1 parent 8a8d923 commit 230a5cf956bd972b7e1e594bf91cf4b8df01cebc
@@ -4,8 +4,7 @@ package com.foursquare.slashem
import com.foursquare.slashem.Ast._
-import com.twitter.util.{Duration, Future, FutureTask, Promise, Return, Throw, Try, TryLike}
-import com.twitter.concurrent.IVar
+import com.twitter.util.{Duration, Future, FutureTask}
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.http.Http
import com.twitter.finagle.Service
@@ -21,7 +20,6 @@ import org.codehaus.jackson.map.{DeserializationConfig, ObjectMapper}
import org.elasticsearch.action.search.SearchRequestBuilder
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.action.search.SearchType
-import org.elasticsearch.action.{ActionListener, ListenableActionFuture}
import org.elasticsearch.client.Client
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.ImmutableSettings
@@ -45,49 +43,6 @@ import org.joda.time.DateTime
import scala.annotation.tailrec
import scalaj.collection.Imports._
-import java.util.concurrent.TimeUnit
-
-class WrappedActionListenerFuture[A](af: ListenableActionFuture[A]) extends Promise[A] with ActionListener[A] {
- //ActionListener
- def onResponse(r: A) {
- println("got a response")
- this.update(Return(r))
- }
- def onFailure(e: Throwable) {
- println("got a failure"+e)
- this.update(Throw(e))
- }
- //Promise
- override def get(timeout: Duration): Try[A] = {
- println("got called with get")
- //TODO: respect timeout
- try {
- Return(af.get())
- } catch {
- case e: Throwable => Throw(e)
- }
- }
- override def isCancelled: Boolean = {
- af.isCancelled
- }
-
- override def cancel() {
- af.cancel(true)
- }
-
- override def isDefined = af.isDone
-
-
-}
-
-object WrappedActionListenerFuture {
- def fromActionFuture[A](af: ListenableActionFuture[A]): Future[A] = {
- val tf = new WrappedActionListenerFuture(af)
- af.addListener(tf)
- tf
- }
-}
-
/**
* SolrResponseException class that extends RuntimeException
*/
@@ -505,52 +460,56 @@ trait ElasticSchema[M <: Record[M]] extends SlashemSchema[M] {
}
def elasticQueryFuture[Ord, Lim, MM <: MinimumMatchType, Y, H <: Highlighting, Q <: QualityFilter, FC <: FacetCount, FLim](qb: QueryBuilder[M, Ord, Lim, MM, Y, H, Q, FC, FLim], query: ElasticQueryBuilder, timeoutOpt: Option[Duration]): Future[SearchResults[M, Y]] = {
- val client = meta.client
- val from = qb.start.map(_.toInt).getOrElse(qb.DefaultStart)
- val limit = qb.limit.map(_.toInt).getOrElse(qb.DefaultLimit)
- meta.logger.debug("Query details "+query.toString())
- val baseRequest: SearchRequestBuilder = client.prepareSearch(meta.indexName)
- .setQuery(query)
- .setFrom(from)
- .setSize(limit)
- .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
- val request = qb.sort match {
- case None => baseRequest
- //Handle sorting by fields quickly
- case Some(Pair(Field(fieldName),"asc")) => baseRequest.addSort(fieldName,SortOrder.ASC)
- case Some(Pair(Field(fieldName),"desc")) => baseRequest.addSort(fieldName,SortOrder.DESC)
- //Handle sorting by scripts in general
- case Some(Pair(sort,"asc")) => baseRequest.addSort(new ScriptSortBuilder(sort.elasticBoost(),"number").order(SortOrder.ASC))
- case Some(Pair(sort,"desc")) => baseRequest.addSort(new ScriptSortBuilder(sort.elasticBoost(),"number").order(SortOrder.DESC))
-
- case _ => baseRequest
+ val future : FutureTask[SearchResults[M,Y]]= new FutureTask({
+ val client = meta.client
+ val from = qb.start.map(_.toInt).getOrElse(qb.DefaultStart)
+ val limit = qb.limit.map(_.toInt).getOrElse(qb.DefaultLimit)
+ meta.logger.debug("Query details "+query.toString())
+ val baseRequest: SearchRequestBuilder = client.prepareSearch(meta.indexName)
+ .setQuery(query)
+ .setFrom(from)
+ .setSize(limit)
+ .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
+ val request = qb.sort match {
+ case None => baseRequest
+ //Handle sorting by fields quickly
+ case Some(Pair(Field(fieldName),"asc")) => baseRequest.addSort(fieldName,SortOrder.ASC)
+ case Some(Pair(Field(fieldName),"desc")) => baseRequest.addSort(fieldName,SortOrder.DESC)
+ //Handle sorting by scripts in general
+ case Some(Pair(sort,"asc")) => baseRequest.addSort(new ScriptSortBuilder(sort.elasticBoost(),"number").order(SortOrder.ASC))
+ case Some(Pair(sort,"desc")) => baseRequest.addSort(new ScriptSortBuilder(sort.elasticBoost(),"number").order(SortOrder.DESC))
+
+ case _ => baseRequest
}
- /* Set the server side timeout */
- val timeLimmitedRequest = timeoutOpt match {
- case Some(timeout) => request.setTimeout(TimeValue.timeValueMillis(timeout.inMillis))
- case _ => request
- }
+ /* Set the server side timeout */
+ val timeLimmitedRequest = timeoutOpt match {
+ case Some(timeout) => request.setTimeout(TimeValue.timeValueMillis(timeout.inMillis))
+ case _ => request
+ }
- /* Add a facet to the request */
- val facetedRequest = qb.facetSettings.facetFieldList match {
- case Nil => timeLimmitedRequest
- case _ => {
- termFacetQuery(qb.facetSettings.facetFieldList, qb.facetSettings.facetLimit).foreach(timeLimmitedRequest.addFacet(_))
- timeLimmitedRequest
+ /* Add a facet to the request */
+ val facetedRequest = qb.facetSettings.facetFieldList match {
+ case Nil => timeLimmitedRequest
+ case _ => {
+ termFacetQuery(qb.facetSettings.facetFieldList, qb.facetSettings.facetLimit).foreach(timeLimmitedRequest.addFacet(_))
+ timeLimmitedRequest
+ }
}
- }
- val responseActionFuture: ListenableActionFuture[SearchResponse] = facetedRequest.execute()
- val future = WrappedActionListenerFuture.fromActionFuture(responseActionFuture)
- val responseFuture = future.map({
- response => constructSearchResults(qb.creator,
- qb.start.map(_.toInt).getOrElse(qb.DefaultStart),
- qb.fallOf,
- qb.min,
- response)})
- timeFuture(responseFuture).map( {
+ val response: SearchResponse = facetedRequest
+ .execute().actionGet()
+ meta.logger.debug("Search response "+response.toString())
+ constructSearchResults(qb.creator,
+ qb.start.map(_.toInt).getOrElse(qb.DefaultStart),
+ qb.fallOf,
+ qb.min,
+ response)
+ }
+ )
+ future.run()
+ timeFuture(future).map( {
case (queryTime, result) => {
meta.logger.log("e" + meta.indexName + ".query",query.toString(), queryTime)
result
@@ -39,7 +39,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
val r = ESimplePanda where (_.name eqs "lolsdonotinsertsomethingwiththisinit") fetch()
Assert.assertEquals(0,r.response.results.length)
}
- //@Test
+ @Test
def testNonEmptySearch {
val r = ESimplePanda where (_.hobos contains "hobos") fetch()
Assert.assertEquals(1,r.response.results.length)
@@ -49,59 +49,59 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
Assert.assertEquals("hobos",doc.hobos.value)
Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b11"),doc.id.is)
}
- //@Test
+ @Test
def testNonEmptyMM100Search {
val r = ESimplePanda where (_.name contains "loler eating hobo") minimumMatchPercent(100) fetch()
Assert.assertEquals(1,r.response.results.length)
//Lets look at the document and make sure its what we expected
val doc = r.response.results.apply(0)
Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b18"),doc.id.is)
}
- //@Test
+ @Test
def testNonEmptyMM100SearchWithTimeout {
val r = ESimplePanda where (_.name contains "loler eating hobo") minimumMatchPercent(100) fetch(Duration(1, TimeUnit.SECONDS))
Assert.assertEquals(1,r.response.results.length)
//Lets look at the document and make sure its what we expected
val doc = r.response.results.apply(0)
Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b18"),doc.id.is)
}
- //@Test
+ @Test
def testNonEmptyMultiFieldSearch {
val r = ESimplePanda.where(_.default contains "onlyinnamefield").queryField(_.name).queryField(_.hobos) fetch()
Assert.assertEquals(1,r.response.results.length)
}
- //@Test
+ @Test
def testNonEmptySearchOidScorePare {
val r = ESimplePanda where (_.hobos contains "hobos") fetch()
Assert.assertEquals(1,r.response.results.length)
//Lets look at the document and make sure its what we expected
val doc = r.response.oidScorePair.apply(0)
Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b11"),doc._1)
}
- //@Test
+ @Test
def testSimpleInQuery {
val r = ESimplePanda where (_.hobos in List("hobos")) fetch()
Assert.assertEquals(1,r.response.results.length)
//Lets look at the document and make sure its what we expected
val doc = r.response.oidScorePair.apply(0)
Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b11"),doc._1)
}
- //@Test
+ @Test
def testSimpleNInQuery {
val r = ESimplePanda where (_.hobos nin List("hobos")) fetch()
Assert.assertEquals(7,r.response.results.length)
}
- //@Test
+ @Test
def testManyResultsSearch {
val r = ESimplePanda where (_.name contains "loler") fetch()
Assert.assertEquals(4,r.response.results.length)
}
- //@Test
+ @Test
def testAndSearch {
val r = ESimplePanda where (_.name contains "loler") and (_.hobos contains "nyet") fetch()
Assert.assertEquals(1,r.response.results.length)
}
- //@Test
+ @Test
def orderDesc {
var r = ESimplePanda where (_.name contains "ordertest") orderDesc(_.followers) fetch()
Assert.assertEquals(2,r.response.results.length)
@@ -110,7 +110,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
Assert.assertEquals(doc0._1,new ObjectId("4c809f4251ada1cdc3790b14"))
Assert.assertEquals(doc1._1,new ObjectId("4c809f4251ada1cdc3790b15"))
}
- //@Test
+ @Test
def orderAsc {
var r = ESimplePanda where (_.name contains "ordertest") orderAsc(_.followers) fetch()
Assert.assertEquals(2,r.response.results.length)
@@ -119,7 +119,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
Assert.assertEquals(doc0._1,new ObjectId("4c809f4251ada1cdc3790b15"))
Assert.assertEquals(doc1._1,new ObjectId("4c809f4251ada1cdc3790b14"))
}
- //@Test
+ @Test
def geoOrderDesc {
var r = ESimpleGeoPanda where (_.name contains "ordertest") complexOrderDesc(_.pos sqeGeoDistance(74.0,-31.0)) fetch()
Assert.assertEquals(2,r.response.results.length)
@@ -128,7 +128,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b16"),doc0._1)
Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b17"),doc1._1)
}
- //@Test
+ @Test
def geoOrderAsc {
var r = ESimpleGeoPanda where (_.name contains "ordertest") complexOrderAsc(_.pos sqeGeoDistance(74.0,-31.0)) fetch()
Assert.assertEquals(2,r.response.results.length)
@@ -137,7 +137,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b17"),doc0._1)
Assert.assertEquals(new ObjectId("4c809f4251ada1cdc3790b16"),doc1._1)
}
- //@Test
+ @Test
def geoOrderIntAsc {
var r = ESimpleGeoPanda where (_.name contains "ordertest") complexOrderAsc(_.pos sqeGeoDistance(74,-31)) fetch()
Assert.assertEquals(2,r.response.results.length)
@@ -148,12 +148,12 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
}
- //@Test
+ @Test
def testAndOrSearch {
val r = ESimplePanda where (_.name contains "loler") and (x => (x.hobos contains "nyet") or (x.hobos contains "robot")) fetch()
Assert.assertEquals(r.response.results.length,2)
}
- //@Test
+ @Test
def testPhraseBoostOrdering {
val rWithLowPhraseBoost = ESimplePanda where (_.name contains "loler skates") phraseBoost(_.name,10) fetch()
val rWithHighPhraseBoost = ESimplePanda where (_.name contains "loler skates") phraseBoost(_.name,10000) fetch()
@@ -170,15 +170,15 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
Assert.assertTrue(doc1b.score.value > doc2b.score.value)
Assert.assertTrue(doc3b.score.value > doc1b.score.value)
}
- //@Test
+ @Test
def testFieldFaceting {
val r = ESimplePanda where (_.name contains "loler skates") facetField(_.foreign) fetch()
Assert.assertEquals(4,r.response.results.length)
Assert.assertEquals(1,r.response.fieldFacets.get("foreign").get("b"))
Assert.assertEquals(3,r.response.fieldFacets.get("foreign").get("pants"))
}
- //@Test
+ @Test
def testMaxCountFieldFaceting {
val r = ESimplePanda where (_.name contains "loler skates") facetField(_.foreign) facetLimit(1) fetch()
Assert.assertEquals(4,r.response.results.length)
@@ -187,7 +187,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
}
- //@Test
+ @Test
def testFieldBoost {
val r1 = ESimplePanda where (_.magic contains "yes") fetch()
val r2 = ESimplePanda where (_.magic contains "yes") boostField(_.followers,10) fetch()
@@ -196,7 +196,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
Assert.assertTrue(r2.response.results.apply(0).score.value > r1.response.results.apply(0).score.value)
}
- //@Test
+ @Test
def testGeoBoost {
//Test GeoBoosting. Note will actually make further away document come up first
val geoLat = 74
@@ -207,7 +207,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
Assert.assertEquals(r2.response.results.length,2)
Assert.assertTrue(r2.response.results.apply(0).score.value > r1.response.results.apply(0).score.value)
}
- //@Test
+ @Test
def testPointExtract {
//Test GeoBoosting. Note will actually make further away document come up first
val geoLat = 74
@@ -217,7 +217,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
Assert.assertEquals(r.response.results.apply(0).pos.value._1,74.0,0.9)
}
- //@Test
+ @Test
def testRecipGeoBoost {
val geoLat = 74
val geoLong = -31
@@ -228,7 +228,7 @@ class ElasticQueryTest extends SpecsMatchers with ScalaCheckMatchers {
Assert.assertTrue(r2.response.results.apply(0).score.value > r1.response.results.apply(0).score.value)
}
- //@Test
+ @Test
def testListFieldContains {
val response1 = ESimplePanda where (_.favnums contains 2) fetch()
val response2 = ESimplePanda where (_.favnums contains 6) fetch()

0 comments on commit 230a5cf

Please sign in to comment.