From 2267d9d210b9a1f612ca31c92ab24e0c23a735de Mon Sep 17 00:00:00 2001 From: Chris Wewerka Date: Wed, 24 Oct 2018 11:52:40 +0200 Subject: [PATCH 1/4] support for Future-based predict-method, see https://github.com/apache/predictionio/pull/495 --- build.sbt | 2 +- src/main/scala/EsClient.scala | 30 ++-- src/main/scala/ScalaRestClient.scala | 23 +++ src/main/scala/URAlgorithm.scala | 238 ++++++++++++++------------- 4 files changed, 167 insertions(+), 126 deletions(-) create mode 100644 src/main/scala/ScalaRestClient.scala diff --git a/build.sbt b/build.sbt index d03457a..9ae4e51 100644 --- a/build.sbt +++ b/build.sbt @@ -14,7 +14,7 @@ scalaVersion in ThisBuild := "2.11.11" val mahoutVersion = "0.13.0" -val pioVersion = "0.12.0-incubating" +val pioVersion = "0.14.0-SNAPSHOT" val elasticsearchVersion = "5.5.2" diff --git a/src/main/scala/EsClient.scala b/src/main/scala/EsClient.scala index 8040153..bd4c3c1 100644 --- a/src/main/scala/EsClient.scala +++ b/src/main/scala/EsClient.scala @@ -22,13 +22,13 @@ import java.util import grizzled.slf4j.Logger import org.apache.http.util.EntityUtils -import org.apache.predictionio.data.storage.{ DataMap, Storage, StorageClientConfig } +import org.apache.predictionio.data.storage.{DataMap, Storage, StorageClientConfig} import org.apache.predictionio.workflow.CleanupFunctions import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.elasticsearch.client.RestClient import org.apache.http.HttpHost -import org.apache.http.auth.{ AuthScope, UsernamePasswordCredentials } +import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} import org.apache.http.entity.ContentType import org.apache.http.entity.StringEntity import org.apache.http.impl.client.BasicCredentialsProvider @@ -42,6 +42,9 @@ import org.elasticsearch.spark._ import org.json4s.JValue import org.json4s.DefaultFormats import org.json4s.JsonAST.JString +import ScalaRestClient.ExtendedScalaRestClient + +import scala.concurrent.{ExecutionContext, Future} // import org.json4s.native.Serialization.writePretty import com.actionml.helpers.{ ItemID, ItemProps } @@ -367,20 +370,23 @@ object EsClient { * @param indexName the index to search * @return a [PredictedResults] collection */ - def search(query: String, indexName: String): Option[JValue] = { + def search(query: String, indexName: String)(implicit ec: ExecutionContext): Future[Option[JValue]] = { logger.info(s"Query:\n${query}") - val response = client.performRequest( + val responseFuture = client.performRequestFuture( "POST", s"/$indexName/_search", - Map.empty[String, String].asJava, + Map.empty[String, String], new StringEntity(query, ContentType.APPLICATION_JSON)) - response.getStatusLine.getStatusCode match { - case 200 => - logger.info(s"Got source from query: ${query}") - Some(parse(EntityUtils.toString(response.getEntity))) - case _ => - logger.info(s"Query: ${query}\nproduced status code: ${response.getStatusLine.getStatusCode}") - None + responseFuture.map { + response => + response.getStatusLine.getStatusCode match { + case 200 => + logger.info(s"Got source from query: ${query}") + Some(parse(EntityUtils.toString(response.getEntity))) + case _ => + logger.info(s"Query: ${query}\nproduced status code: ${response.getStatusLine.getStatusCode}") + None + } } } diff --git a/src/main/scala/ScalaRestClient.scala b/src/main/scala/ScalaRestClient.scala new file mode 100644 index 0000000..cad97bb --- /dev/null +++ b/src/main/scala/ScalaRestClient.scala @@ -0,0 +1,23 @@ +package com.actionml + +import org.apache.http.{Header, HttpEntity} +import org.elasticsearch.client.{Response, ResponseListener, RestClient} +import scala.collection.JavaConverters._ +import scala.concurrent.{Future, Promise} + +object ScalaRestClient { + + implicit class ExtendedScalaRestClient(restClient: RestClient) { + + def performRequestFuture(method: String, endpoint: String, params: Map[String, String], + entity: HttpEntity, headers: Header*): Future[Response] = { + val promise: Promise[Response] = Promise() + val responseListener = new ResponseListener { + override def onSuccess(response: Response): Unit = promise.success(response) + override def onFailure(exception: Exception): Unit = promise.failure(exception) + } + restClient.performRequestAsync(method, endpoint, params.asJava, entity, responseListener, headers: _*) + promise.future + } + } +} diff --git a/src/main/scala/URAlgorithm.scala b/src/main/scala/URAlgorithm.scala index cd58cd7..c92c4fc 100644 --- a/src/main/scala/URAlgorithm.scala +++ b/src/main/scala/URAlgorithm.scala @@ -20,10 +20,10 @@ package com.actionml import java.util import grizzled.slf4j.Logger -import org.apache.predictionio.controller.{ P2LAlgorithm, Params } -import org.apache.predictionio.data.storage.{ DataMap, Event, NullModel, PropertyMap } +import org.apache.predictionio.controller.{P2LAlgorithm, Params} +import org.apache.predictionio.data.storage.{DataMap, Event, NullModel, PropertyMap} import org.apache.predictionio.data.store.LEventStore -import org.apache.mahout.math.cf.{ DownsamplableCrossOccurrenceDataset, SimilarityAnalysis } +import org.apache.mahout.math.cf.{DownsamplableCrossOccurrenceDataset, SimilarityAnalysis} import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -34,10 +34,12 @@ import org.json4s.JsonAST._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import com.actionml.helpers._ - +import scala.concurrent.ExecutionContext import scala.collection.JavaConverters._ +import scala.concurrent.Future import scala.concurrent.duration.Duration -import scala.language.{ implicitConversions, postfixOps } +import scala.language.{implicitConversions, postfixOps} +import ScalaRestClient.ExtendedScalaRestClient /** Available value for algorithm param "RecsModel" */ object RecsModels { // todo: replace this with rankings @@ -481,51 +483,58 @@ class URAlgorithm(val ap: URAlgorithmParams) * @todo Need to prune that query to minimum required for data include, for instance no need for the popularity * ranking if no PopModel is being used, same for "must" clause and dates. */ - def predict(model: NullModel, query: Query): PredictedResult = { + override def predictAsync(model: NullModel, query: Query)(implicit ec: ExecutionContext): Future[PredictedResult] = { queryEventNames = query.eventNames.getOrElse(modelEventNames) // eventNames in query take precedence - val (queryStr, blacklist) = buildQuery(ap, query, rankingFieldNames) - // old es1 query - // val searchHitsOpt = EsClient.search(queryStr, esIndex, queryEventNames) - val searchHitsOpt = EsClient.search(queryStr, esIndex) - - val withRanks = query.withRanks.getOrElse(false) - val predictedResults = searchHitsOpt match { - case Some(searchHits) => - val hits = (searchHits \ "hits" \ "hits").extract[Seq[JValue]] - val recs = hits.map { hit => - if (withRanks) { - val source = hit \ "source" - val ranks: Map[String, Double] = rankingsParams map { backfillParams => - val backfillType = backfillParams.`type`.getOrElse(DefaultURAlgoParams.BackfillType) - val backfillFieldName = backfillParams.name.getOrElse(PopModel.nameByType(backfillType)) - backfillFieldName -> (source \ backfillFieldName).extract[Double] - } toMap - - ItemScore((hit \ "_id").extract[String], (hit \ "_score").extract[Double], - ranks = if (ranks.nonEmpty) Some(ranks) else None) - } else { - ItemScore((hit \ "_id").extract[String], (hit \ "_score").extract[Double]) - } - }.toArray - logger.info(s"Results: ${hits.length} retrieved of a possible ${(searchHits \ "hits" \ "total").extract[Long]}") - PredictedResult(recs) - - case _ => - logger.info(s"No results for query ${parse(queryStr)}") - PredictedResult(Array.empty[ItemScore]) + val queryStrBlacklistFuture = buildQuery(ap, query, rankingFieldNames) + + queryStrBlacklistFuture.flatMap { + case (queryStr, blacklist) => + // old es1 query + // val searchHitsOpt = EsClient.search(queryStr, esIndex, queryEventNames) + val searchHitsOptFuture = EsClient.search(queryStr, esIndex) + + val withRanks = query.withRanks.getOrElse(false) + searchHitsOptFuture.map { + searchHitsOpt => + val predictedResults = searchHitsOpt match { + case Some(searchHits) => + val hits = (searchHits \ "hits" \ "hits").extract[Seq[JValue]] + val recs = hits.map { hit => + if (withRanks) { + val source = hit \ "source" + val ranks: Map[String, Double] = rankingsParams map { backfillParams => + val backfillType = backfillParams.`type`.getOrElse(DefaultURAlgoParams.BackfillType) + val backfillFieldName = backfillParams.name.getOrElse(PopModel.nameByType(backfillType)) + backfillFieldName -> (source \ backfillFieldName).extract[Double] + } toMap + + ItemScore((hit \ "_id").extract[String], (hit \ "_score").extract[Double], + ranks = if (ranks.nonEmpty) Some(ranks) else None) + } else { + ItemScore((hit \ "_id").extract[String], (hit \ "_score").extract[Double]) + } + }.toArray + logger.info(s"Results: ${hits.length} retrieved of a possible ${(searchHits \ "hits" \ "total").extract[Long]}") + PredictedResult(recs) + + case _ => + logger.info(s"No results for query ${parse(queryStr)}") + PredictedResult(Array.empty[ItemScore]) + } + + // todo: is this needed to remove ranked items from recs? + //if (recsModel == RecsModels.CF) { + // PredictedResult(predictedResults.filter(_.score != 0.0)) + //} else PredictedResult(predictedResults) + + // should have all blacklisted items excluded + // todo: need to add dithering, mean, sigma, seed required, make a seed that only changes on some fixed time + // period so the recs ordering stays fixed for that time period. + predictedResults + } } - - // todo: is this needed to remove ranked items from recs? - //if (recsModel == RecsModels.CF) { - // PredictedResult(predictedResults.filter(_.score != 0.0)) - //} else PredictedResult(predictedResults) - - // should have all blacklisted items excluded - // todo: need to add dithering, mean, sigma, seed required, make a seed that only changes on some fixed time - // period so the recs ordering stays fixed for that time period. - predictedResults } /** Calculate all fields and items needed for ranking. @@ -563,56 +572,60 @@ class URAlgorithm(val ap: URAlgorithmParams) def buildQuery( ap: URAlgorithmParams, query: Query, - backfillFieldNames: Seq[String] = Seq.empty): (String, Seq[Event]) = { + backfillFieldNames: Seq[String] = Seq.empty)(implicit ec: ExecutionContext): Future[(String, Seq[Event])] = { logger.info(s"Got query: \n${query}") val startPos = query.from.getOrElse(0) logger.info(s"from: ${startPos}") - try { - // create a list of all query correlators that can have a bias (boost or filter) attached - val (boostable, events) = getBiasedRecentUserActions(query) - logger.info(s"getBiasedRecentUserActions returned boostable: ${boostable} and events: ${events}") - - // since users have action history and items have correlators and both correspond to the same "actions" like - // purchase or view, we'll pass both to the query if the user history or items correlators are empty - // then metadata or backfill must be relied on to return results. - val numRecs = if (query.num.isDefined) query.num.get else limit // num in query orerrides num in config - logger.info(s"UR query num = ${query.num}") - logger.info(s"query.num.getOrElse returned numRecs: ${numRecs}") - - val should = buildQueryShould(query, boostable) - logger.info(s"buildQueryShould returned should: ${should}") - val must = buildQueryMust(query, boostable) - logger.info(s"buildQueryMust returned must: ${must}") - val mustNot = buildQueryMustNot(query, events) - logger.info(s"buildQueryMustNot returned mustNot: ${mustNot}") - val sort = buildQuerySort() - logger.info(s"buildQuerySort returned sort: ${sort}") - - val json = - ("from" -> startPos) ~ - ("size" -> numRecs) ~ - ("query" -> - ("bool" -> - ("should" -> should) ~ - ("must" -> must) ~ - ("must_not" -> mustNot) ~ - ("minimum_should_match" -> 1))) ~ - ("sort" -> sort) - - logger.info(s"json is: ${json}") - val compactJson = compact(render(json)) - logger.info(s"compact json is: ${compactJson}") - - //logger.info(s"Query:\n$compactJson") - (compactJson, events) - } catch { - case e: IllegalArgumentException => { - logger.warn("whoops, IllegalArgumentException for something in buildQuery.") - ("", Seq.empty[Event]) - } + // create a list of all query correlators that can have a bias (boost or filter) attached + val biasedRecentUserActionsFuture = getBiasedRecentUserActions(query) + + biasedRecentUserActionsFuture.map { + case (boostable, events) => + try { + logger.info(s"getBiasedRecentUserActions returned boostable: ${boostable} and events: ${events}") + + // since users have action history and items have correlators and both correspond to the same "actions" like + // purchase or view, we'll pass both to the query if the user history or items correlators are empty + // then metadata or backfill must be relied on to return results. + val numRecs = if (query.num.isDefined) query.num.get else limit // num in query orerrides num in config + logger.info(s"UR query num = ${query.num}") + logger.info(s"query.num.getOrElse returned numRecs: ${numRecs}") + + val should = buildQueryShould(query, boostable) + logger.info(s"buildQueryShould returned should: ${should}") + val must = buildQueryMust(query, boostable) + logger.info(s"buildQueryMust returned must: ${must}") + val mustNot = buildQueryMustNot(query, events) + logger.info(s"buildQueryMustNot returned mustNot: ${mustNot}") + val sort = buildQuerySort() + logger.info(s"buildQuerySort returned sort: ${sort}") + + val json = + ("from" -> startPos) ~ + ("size" -> numRecs) ~ + ("query" -> + ("bool" -> + ("should" -> should) ~ + ("must" -> must) ~ + ("must_not" -> mustNot) ~ + ("minimum_should_match" -> 1))) ~ + ("sort" -> sort) + + logger.info(s"json is: ${json}") + val compactJson = compact(render(json)) + logger.info(s"compact json is: ${compactJson}") + + //logger.info(s"Query:\n$compactJson") + (compactJson, events) + } catch { + case e: IllegalArgumentException => { + logger.warn("whoops, IllegalArgumentException for something in buildQuery.") + ("", Seq.empty[Event]) + } + } } } @@ -792,10 +805,10 @@ class URAlgorithm(val ap: URAlgorithmParams) } /** Get recent events of the user on items to create the recommendations query from */ - def getBiasedRecentUserActions(query: Query): (Seq[BoostableCorrelators], Seq[Event]) = { + def getBiasedRecentUserActions(query: Query)(implicit ec: ExecutionContext): Future[(Seq[BoostableCorrelators], Seq[Event])] = { - val recentEvents = try { - LEventStore.findByEntity( + val recentEventsFuture = + LEventStore.findByEntityAsync( appName = appName, // entityType and entityId is specified for fast lookup entityType = "user", @@ -806,13 +819,9 @@ class URAlgorithm(val ap: URAlgorithmParams) // targetEntityType = None, // limit = Some(maxQueryEvents), // this will get all history then each action can be limited before using in // the query - latest = true, - // set time limit to avoid super long DB access - timeout = Duration(200, "millis")).toSeq - } catch { - case e: scala.concurrent.TimeoutException => - logger.error(s"Timeout when reading recent events. Empty list is used. $e") - Seq.empty[Event] + latest = true).map(_.toSeq) + + val recoveredRecentEventsFuture = recentEventsFuture.recover { case e: NoSuchElementException => logger.info("No user id for recs, returning item-based recs if an item is specified in the query.") Seq.empty[Event] @@ -821,21 +830,24 @@ class URAlgorithm(val ap: URAlgorithmParams) Seq.empty[Event] } - val userEventBias = query.userBias.getOrElse(userBias) - val userEventsBoost = if (userEventBias > 0 && userEventBias != 1) Some(userEventBias) else None - val rActions = queryEventNames.map { action => - var items = Seq.empty[String] - - for (event <- recentEvents) { // todo: use indidatorParams for each indicator type - if (event.event == action && items.size < indicatorParams(action).maxItemsPerUser) { - items = event.targetEntityId.get +: items - // todo: may throw exception and we should ignore the event instead of crashing + recoveredRecentEventsFuture.map { + recentEvents => + val userEventBias = query.userBias.getOrElse(userBias) + val userEventsBoost = if (userEventBias > 0 && userEventBias != 1) Some(userEventBias) else None + val rActions = queryEventNames.map { action => + var items = Seq.empty[String] + + for (event <- recentEvents) { // todo: use indidatorParams for each indicator type + if (event.event == action && items.size < indicatorParams(action).maxItemsPerUser) { + items = event.targetEntityId.get +: items + // todo: may throw exception and we should ignore the event instead of crashing + } + // userBias may be None, which will cause no JSON output for this + } + BoostableCorrelators(action, items.distinct, userEventsBoost) } - // userBias may be None, which will cause no JSON output for this - } - BoostableCorrelators(action, items.distinct, userEventsBoost) + (rActions, recentEvents) } - (rActions, recentEvents) } /** get all metadata fields that potentially have boosts (not filters) */ From c100b6871af54ec789d25482e6d0dd79cd539083 Mon Sep 17 00:00:00 2001 From: Chris Wewerka Date: Fri, 16 Nov 2018 15:28:46 +0100 Subject: [PATCH 2/4] Adaptions to changes for Option in LEventStore.findByEntityAsync --- src/main/scala/URAlgorithm.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/scala/URAlgorithm.scala b/src/main/scala/URAlgorithm.scala index c92c4fc..5225a54 100644 --- a/src/main/scala/URAlgorithm.scala +++ b/src/main/scala/URAlgorithm.scala @@ -807,12 +807,14 @@ class URAlgorithm(val ap: URAlgorithmParams) /** Get recent events of the user on items to create the recommendations query from */ def getBiasedRecentUserActions(query: Query)(implicit ec: ExecutionContext): Future[(Seq[BoostableCorrelators], Seq[Event])] = { + val entityTypeUserIfPresent = query.user.map(_ => "user") + val recentEventsFuture = LEventStore.findByEntityAsync( appName = appName, // entityType and entityId is specified for fast lookup - entityType = "user", - entityId = query.user.get, + entityType = entityTypeUserIfPresent, + entityId = query.user, // one query per eventName is not ideal, maybe one query for lots of events then split by eventName // eventNames = Some(Seq(action)),// get all and separate later eventNames = Some(queryEventNames), // get all and separate later From d564af2a548221237b895cdb010332935ad13d51 Mon Sep 17 00:00:00 2001 From: Chris Wewerka Date: Mon, 19 Nov 2018 14:11:00 +0100 Subject: [PATCH 3/4] Avoiding error for reversed=true when entityType/Id is not set --- src/main/scala/URAlgorithm.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/URAlgorithm.scala b/src/main/scala/URAlgorithm.scala index 5225a54..ad09e2e 100644 --- a/src/main/scala/URAlgorithm.scala +++ b/src/main/scala/URAlgorithm.scala @@ -808,6 +808,7 @@ class URAlgorithm(val ap: URAlgorithmParams) def getBiasedRecentUserActions(query: Query)(implicit ec: ExecutionContext): Future[(Seq[BoostableCorrelators], Seq[Event])] = { val entityTypeUserIfPresent = query.user.map(_ => "user") + val latestFirst = entityTypeUserIfPresent.fold(false)(_ => true) // reversed is only allowed if entityType/Id is specified val recentEventsFuture = LEventStore.findByEntityAsync( @@ -821,7 +822,7 @@ class URAlgorithm(val ap: URAlgorithmParams) // targetEntityType = None, // limit = Some(maxQueryEvents), // this will get all history then each action can be limited before using in // the query - latest = true).map(_.toSeq) + latest = latestFirst).map(_.toSeq) val recoveredRecentEventsFuture = recentEventsFuture.recover { case e: NoSuchElementException => From c1665b2dbae5a77bb3ac570472873dc5e1c36561 Mon Sep 17 00:00:00 2001 From: Chris Wewerka Date: Tue, 20 Nov 2018 16:46:01 +0100 Subject: [PATCH 4/4] added future timeout for event db access --- src/main/scala/FutureTimeout.scala | 70 ++++++++++++++++++++++++++++++ src/main/scala/URAlgorithm.scala | 32 +++++++++----- 2 files changed, 90 insertions(+), 12 deletions(-) create mode 100644 src/main/scala/FutureTimeout.scala diff --git a/src/main/scala/FutureTimeout.scala b/src/main/scala/FutureTimeout.scala new file mode 100644 index 0000000..ebc7d46 --- /dev/null +++ b/src/main/scala/FutureTimeout.scala @@ -0,0 +1,70 @@ +/* + * Copyright ActionML, LLC under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * ActionML licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.actionml + +import java.util.concurrent.TimeoutException +import java.util.{Timer, TimerTask} + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.language.postfixOps +import scala.util.{Failure, Success} + +object FutureUtil { + + // All Future's that use futureWithTimeout will use the same Timer object + // it is thread safe and scales to thousands of active timers + // The true parameter ensures that timeout timers are daemon threads and do not stop + // the program from shutting down + + val timer: Timer = new Timer(true) + + /** + * Returns the result of the provided future within the given time or a timeout exception, whichever is first + * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a + * Thread.sleep would + * @param future Caller passes a future to execute + * @param timeout Time before we return a Timeout exception instead of future's outcome + * @return Future[T] + */ + def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = { + + // Promise will be fulfilled with either the callers Future or the timer task if it times out + var p = Promise[T] + + // and a Timer task to handle timing out + + val timerTask = new TimerTask() { + def run() : Unit = p.tryFailure(new TimeoutException()) + } + + // Set the timeout to check in the future + timer.schedule(timerTask, timeout.toMillis) + + future.onComplete { + case Success(value) => + p.trySuccess(value) + timerTask.cancel() + case Failure(e) => + p.tryFailure(e) + timerTask.cancel() + } + + p.future + } +} diff --git a/src/main/scala/URAlgorithm.scala b/src/main/scala/URAlgorithm.scala index ad09e2e..b2d2405 100644 --- a/src/main/scala/URAlgorithm.scala +++ b/src/main/scala/URAlgorithm.scala @@ -21,7 +21,7 @@ import java.util import grizzled.slf4j.Logger import org.apache.predictionio.controller.{P2LAlgorithm, Params} -import org.apache.predictionio.data.storage.{DataMap, Event, NullModel, PropertyMap} +import org.apache.predictionio.data.storage.{Event, NullModel} import org.apache.predictionio.data.store.LEventStore import org.apache.mahout.math.cf.{DownsamplableCrossOccurrenceDataset, SimilarityAnalysis} import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark @@ -34,12 +34,14 @@ import org.json4s.JsonAST._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import com.actionml.helpers._ -import scala.concurrent.ExecutionContext + +import scala.concurrent.{ExecutionContext, Future} import scala.collection.JavaConverters._ -import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.language.{implicitConversions, postfixOps} -import ScalaRestClient.ExtendedScalaRestClient + +import scala.concurrent.duration._ +import scala.language.postfixOps /** Available value for algorithm param "RecsModel" */ object RecsModels { // todo: replace this with rankings @@ -824,14 +826,20 @@ class URAlgorithm(val ap: URAlgorithmParams) // the query latest = latestFirst).map(_.toSeq) - val recoveredRecentEventsFuture = recentEventsFuture.recover { - case e: NoSuchElementException => - logger.info("No user id for recs, returning item-based recs if an item is specified in the query.") - Seq.empty[Event] - case e: Exception => // fatal because of error, an empty query - logger.error(s"Error when reading recent events. Trying to continue by ignoring the error. $e") - Seq.empty[Event] - } + val recoveredRecentEventsFuture = + FutureUtil + .futureWithTimeout(recentEventsFuture, 200 millis) + .recover { + case e: scala.concurrent.TimeoutException => + logger.error(s"Timeout when reading recent events. Empty list is used. $e") + Seq.empty[Event] + case e: NoSuchElementException => + logger.info("No user id for recs, returning item-based recs if an item is specified in the query.") + Seq.empty[Event] + case e: Exception => // fatal because of error, an empty query + logger.error(s"Error when reading recent events. Trying to continue by ignoring the error. $e") + Seq.empty[Event] + } recoveredRecentEventsFuture.map { recentEvents =>