Skip to content

Commit

Permalink
Merge pull request #2250 from FoxComm/search-service/interpreter-draft
Browse files Browse the repository at this point in the history
Add dsl interpreter
  • Loading branch information
aafa committed Jun 22, 2017
2 parents 6e6d063 + 60b03ca commit b26c25d
Show file tree
Hide file tree
Showing 15 changed files with 360 additions and 119 deletions.
19 changes: 11 additions & 8 deletions agni/api/app/foxcomm/agni/api/Api.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package foxcomm.agni.api

import com.sksamuel.elastic4s.ElasticImplicits._
import com.twitter.finagle.Http
import com.twitter.finagle.http.Status
import com.twitter.util.Await
Expand All @@ -9,19 +8,23 @@ import foxcomm.utils.finch._
import io.circe.generic.extras.auto._
import io.finch._
import io.finch.circe._
import monix.execution.Scheduler
import org.elasticsearch.common.ValidationException
import scala.concurrent.ExecutionContext

object Api extends App {
def endpoints(searchService: SearchService)(implicit ec: ExecutionContext) =
def endpoints(searchService: SearchService)(implicit s: Scheduler) =
post(
"api" :: "search" :: string :: string :: param("size")
.as[Int] :: paramOption("from").as[Int] :: jsonBody[SearchPayload]) {
(searchIndex: String, searchType: String, size: Int, from: Option[Int], searchQuery: SearchPayload)
searchService
.searchFor(searchIndex / searchType, searchQuery, searchSize = size, searchFrom = from)
.toTwitterFuture
.searchFor(searchIndex = searchIndex,
searchType = searchType,
searchQuery = searchQuery,
searchSize = size,
searchFrom = from)
.map(Ok)
.toTwitterFuture
} :+: get("ping") {
Ok("pong")
}
Expand All @@ -32,9 +35,9 @@ object Api extends App {
case ex Output.failure(new RuntimeException(ex), Status.InternalServerError)
}

implicit val ec: ExecutionContext = ExecutionContext.global
val config = AppConfig.load()
val svc = SearchService.fromConfig(config)
implicit val s: Scheduler = Scheduler.global
val config = AppConfig.load()
val svc = SearchService.fromConfig(config, interpreter.es.default)

Await.result(
Http.server
Expand Down
4 changes: 2 additions & 2 deletions agni/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import sbtassembly.{MergeStrategy, PathList}
lazy val core = (project in file("core"))
.settings(Settings.common)
.settings(
libraryDependencies ++= Dependencies.core ++ Dependencies.es ++ Dependencies.circe ++ Dependencies.test.core
libraryDependencies ++= Dependencies.core ++ Dependencies.es ++ Dependencies.circe ++ Dependencies.monix ++ Dependencies.test.core
)

lazy val finch = (project in file("finch"))
.settings(Settings.common)
.settings(
libraryDependencies ++= Dependencies.finch ++ Dependencies.circe :+ Dependencies.jwt
libraryDependencies ++= Dependencies.finch ++ Dependencies.circe ++ Dependencies.jwt ++ Dependencies.monix
)

lazy val api = (project in file("api"))
Expand Down
20 changes: 19 additions & 1 deletion agni/core/app/foxcomm/agni/AppConfig.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
package foxcomm.agni

import cats.data.NonEmptyList
import com.typesafe.config.ConfigFactory
import java.net.InetSocketAddress
import pureconfig._
import scala.util.Try

final case class AppConfig(http: AppConfig.Http, elasticsearch: AppConfig.ElasticSearch)

@SuppressWarnings(Array("org.wartremover.warts.Equals"))
object AppConfig {
implicit val readHostConfig: ConfigReader[NonEmptyList[InetSocketAddress]] =
ConfigReader.fromNonEmptyStringTry(s
Try {
val withoutPrefix = s.stripPrefix("elasticsearch://")
val hosts = withoutPrefix.split(',').map { host
val parts = host.split(':')
require(parts.length == 2,
"ElasticSearch uri must be in format elasticsearch://host:port,host:port,...")
new InetSocketAddress(parts(0), parts(1).toInt)
}
require(hosts.length >= 1, "At least single ElasticSearch host should be specified")
NonEmptyList.fromListUnsafe(hosts.toList)
})

final case class Http(interface: String, port: Int)

final case class ElasticSearch(host: String, cluster: String)
final case class ElasticSearch(host: NonEmptyList[InetSocketAddress], cluster: String)

def load(): AppConfig = {
val config =
Expand Down
112 changes: 57 additions & 55 deletions agni/core/app/foxcomm/agni/SearchService.scala
Original file line number Diff line number Diff line change
@@ -1,85 +1,87 @@
package foxcomm.agni

import scala.language.postfixOps
import cats.implicits._
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s._
import foxcomm.agni.dsl.query._
import foxcomm.agni.interpreter.es._
import io.circe._
import io.circe.jawn.parseByteBuffer
import monix.eval.{Coeval, Task}
import org.elasticsearch.action.search.{SearchAction, SearchRequestBuilder, SearchResponse}
import org.elasticsearch.client.Client
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.Settings
import scala.concurrent.{ExecutionContext, Future}
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.search.SearchHit

class SearchService(private val client: ElasticClient) extends AnyVal {
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
class SearchService private (client: Client, qi: ESQueryInterpreter) {
import SearchService.ExtractJsonObject

def searchFor(searchIndex: IndexAndTypes,
def searchFor(searchIndex: String,
searchType: String,
searchQuery: SearchPayload,
searchSize: Int,
searchFrom: Option[Int])(implicit ec: ExecutionContext): Future[SearchResult] = {
val withQuery = searchQuery match {
case SearchPayload.es(query, _) (_: SearchDefinition) rawQuery Json.fromJsonObject(query).noSpaces
searchFrom: Option[Int]): Task[SearchResult] = {
def prepareBuilder: Coeval[SearchRequestBuilder] = Coeval.eval {
val builder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
builder
.setIndices(searchIndex)
.setTypes(searchType)
.setSize(searchSize)
searchFrom.foreach(builder.setFrom)
searchQuery.fields.foreach(fs builder.setFetchSource(fs.toList.toArray, Array.empty[String]))
builder
}

def evalQuery(builder: SearchRequestBuilder): Coeval[SearchRequestBuilder] = searchQuery match {
case SearchPayload.es(query, _)
Coeval.eval(builder.setQuery(Json.fromJsonObject(query).dump))
case SearchPayload.fc(query, _)
// TODO: this is really some basic and quite ugly interpreter
// consider more principled approach
// maybe free monad would be a good fit there?
(_: SearchDefinition) bool {
query
.map(_.query.foldLeft(new BoolQueryDefinition) {
case (bool, QueryFunction.eq(in, value))
bool.filter(in.toList.map(termsQuery(_, value.toList: _*)))
case (bool, QueryFunction.neq(in, value))
bool.not(in.toList.map(termsQuery(_, value.toList: _*)))
case (bool, QueryFunction.matches(in, value))
val fields = in.toList
bool.must(value.toList.map(q multiMatchQuery(q).fields(fields)))
case (bool, QueryFunction.range(in, value))
val query = rangeQuery(in.field)
val unified = value.unify
val queryWithLowerBound = unified.lower.fold(query) {
case (b, v) query.from(v).includeLower(b.withBound)
}
val boundedQuery = unified.upper.fold(queryWithLowerBound) {
case (b, v) queryWithLowerBound.to(v).includeUpper(b.withBound)
}
bool.filter(boundedQuery)
case (bool, _) bool // TODO: implement rest of cases
})
.getOrElse((new BoolQueryDefinition).must(matchAllQuery))
}
qi(query).map(builder.setQuery)
}
val baseSearch = withQuery(search in searchIndex size searchSize)
val limitedSearch =
searchQuery.fields.fold(baseSearch)(fields baseSearch sourceInclude (fields.toList: _*))
client
.execute(searchFrom.fold(limitedSearch)(limitedSearch from))
.map(response
SearchResult(
result = response.hits.collect {

def setupBuilder: Task[SearchRequestBuilder] = (prepareBuilder flatMap evalQuery).task

for {
builder setupBuilder
request = builder.request()
response async[SearchResponse, SearchResult](client.search(request, _))
} yield {
val hits = response.getHits
SearchResult(
result = hits
.hits()
.view
.collect {
case ExtractJsonObject(obj) obj
}(collection.breakOut),
pagination = SearchPagination(total = response.totalHits),
maxScore = response.maxScore
))
}
.toList,
pagination = SearchPagination(total = hits.totalHits()),
maxScore = hits.getMaxScore
)
}
}
}

object SearchService {
object ExtractJsonObject {
def unapply(hit: RichSearchHit): Option[JsonObject] =
def unapply(hit: SearchHit): Option[JsonObject] =
parseByteBuffer(hit.sourceRef.toChannelBuffer.toByteBuffer).toOption
.flatMap(_.asObject)
}

def apply(client: ElasticClient): SearchService = new SearchService(client)
def apply(client: Client, qi: ESQueryInterpreter): SearchService =
new SearchService(client, qi)

def fromConfig(config: AppConfig): SearchService = {
def fromConfig(config: AppConfig, qi: ESQueryInterpreter): SearchService = {
val esConfig = config.elasticsearch
val settings =
Settings.settingsBuilder().put("cluster.name", esConfig.cluster).build()
val client =
ElasticClient.transport(settings, ElasticsearchClientUri(esConfig.host))
val client = TransportClient
.builder()
.settings(settings)
.build()
.addTransportAddresses(esConfig.host.toList.map(new InetSocketTransportAddress(_)): _*)

new SearchService(client)
apply(client, qi)
}
}
90 changes: 61 additions & 29 deletions agni/core/app/foxcomm/agni/dsl/query.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package foxcomm.agni.dsl

import cats.data.NonEmptyList
import cats.syntax.either._
import cats.implicits._
import io.circe._
import io.circe.generic.extras.semiauto._
import io.circe.{Decoder, JsonNumber, KeyDecoder}
import shapeless._
import shapeless.ops.coproduct.Folder

@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf", "org.wartremover.warts.ExplicitImplicitTypes"))
@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
object query {
sealed trait QueryField {
def toList: List[String]
Expand All @@ -17,12 +17,7 @@ object query {
def toList: List[String] = List(field)
}
object Single {
implicit val decodeSingle: Decoder[Single] = Decoder.decodeString
.emap {
case s if s.startsWith("$") Either.left(s"Defined unknown special query field $s")
case s Either.right(s)
}
.map(Single(_))
implicit val decodeSingle: Decoder[Single] = Decoder.decodeString.map(Single(_))
}

final case class Multiple(fields: NonEmptyList[String]) extends QueryField {
Expand All @@ -34,8 +29,18 @@ object query {
}

implicit val decodeQueryField: Decoder[QueryField] =
Decoder[Single].map(s s: QueryField) or
Decoder[Multiple].map(m m: QueryField)
Decoder[Single].map(identity[QueryField]) or
Decoder[Multiple].map(identity[QueryField])
}

sealed trait QueryContext
object QueryContext {
case object filter extends QueryContext
case object must extends QueryContext
case object should extends QueryContext
case object not extends QueryContext

implicit val decodeQueryContext: Decoder[QueryContext] = deriveEnumerationDecoder[QueryContext]
}

sealed trait RangeFunction
Expand Down Expand Up @@ -68,15 +73,6 @@ object query {
}
}

sealed trait EntityState
object EntityState {
case object all extends EntityState
case object active extends EntityState
case object inactive extends EntityState

implicit val decodeEntityState: Decoder[EntityState] = deriveEnumerationDecoder[EntityState]
}

final case class RangeBound[T](lower: Option[(RangeFunction.LowerBound, T)],
upper: Option[(RangeFunction.UpperBound, T)]) {
def toMap: Map[RangeFunction, T] = Map.empty ++ lower.toList ++ upper.toList
Expand Down Expand Up @@ -159,21 +155,57 @@ object query {

sealed trait QueryFunction
object QueryFunction {
final case class matches(in: QueryField, value: QueryValue[String]) extends QueryFunction
final case class range(in: QueryField.Single, value: RangeValue) extends QueryFunction
final case class eq(in: QueryField, value: CompoundValue) extends QueryFunction
final case class neq(in: QueryField, value: CompoundValue) extends QueryFunction
final case class state(value: EntityState) extends QueryFunction
sealed trait WithField { this: QueryFunction
def field: QueryField
}
sealed trait WithContext { this: QueryFunction
def ctx: QueryContext
}
sealed trait TermLevel extends WithContext { this: QueryFunction
def context: Option[QueryContext]

final def ctx: QueryContext = context.getOrElse(QueryContext.filter)
}
sealed trait FullText extends WithContext with WithField { this: QueryFunction
def context: Option[QueryContext]
def in: Option[QueryField]

final def ctx: QueryContext = context.getOrElse(QueryContext.must)
final def field: QueryField = in.getOrElse(QueryField.Single("_all"))
}

final case class matches private (in: Option[QueryField],
value: QueryValue[String],
context: Option[QueryContext])
extends QueryFunction
with FullText
final case class equals private (in: QueryField, value: CompoundValue, context: Option[QueryContext])
extends QueryFunction
with TermLevel
final case class exists private (value: QueryField, context: Option[QueryContext])
extends QueryFunction
with TermLevel
final case class range private (in: QueryField.Single, value: RangeValue, context: Option[QueryContext])
extends QueryFunction
with TermLevel
final case class raw private (value: JsonObject, context: QueryContext)
extends QueryFunction
with WithContext {
def ctx: QueryContext = context
}

implicit val decodeQueryFunction: Decoder[QueryFunction] = deriveDecoder[QueryFunction]
}

final case class FCQuery(query: NonEmptyList[QueryFunction])
final case class FCQuery(query: Option[NonEmptyList[QueryFunction]])
object FCQuery {
implicit val decodeFCQuery: Decoder[FCQuery] =
implicit val decodeFCQuery: Decoder[FCQuery] = {
Decoder
.decodeNonEmptyList[QueryFunction]
.or(Decoder[QueryFunction].map(NonEmptyList.of(_)))
.decodeOption(
Decoder
.decodeNonEmptyList[QueryFunction]
.or(Decoder[QueryFunction].map(NonEmptyList.of(_))))
.map(FCQuery(_))
}
}
}

0 comments on commit b26c25d

Please sign in to comment.