Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

[S2GRAPH-204] Avoid N + 1 queries in GraphQL #157

Merged
merged 1 commit into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.s2graph.graphql.types.SchemaDef
import org.slf4j.LoggerFactory
import sangria.ast.Document
import sangria.execution._
import sangria.execution.deferred.DeferredResolver
import sangria.marshalling.sprayJson._
import sangria.parser.QueryParser
import sangria.schema.Schema
Expand Down Expand Up @@ -97,13 +98,16 @@ object GraphQLServer {

private def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
val s2schema = schemaCache.withCache("s2Schema")(createNewSchema())
import GraphRepository._
val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher)

Executor.execute(
s2schema,
query,
s2Repository,
variables = vars,
operationName = op
operationName = op,
deferredResolver = resolver
)
.map((res: spray.json.JsValue) => OK -> res)
.recover {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,46 @@ import org.apache.s2graph.core.storage.MutateResponse
import org.apache.s2graph.core.types._
import org.apache.s2graph.graphql.types.S2Type._
import org.slf4j.{Logger, LoggerFactory}
import sangria.execution.deferred._
import sangria.schema._

import scala.concurrent._
import scala.util.{Failure, Success, Try}

object GraphRepository {

implicit val vertexHasId = new HasId[S2VertexLike, S2VertexLike] {
override def id(value: S2VertexLike): S2VertexLike = value
}

implicit val edgeHasId = new HasId[(S2VertexLike, QueryParam, Seq[S2EdgeLike]), DeferFetchEdges] {
override def id(value: (S2VertexLike, QueryParam, Seq[S2EdgeLike])): DeferFetchEdges =
DeferFetchEdges(value._1, value._2)
}

val vertexFetcher = Fetcher((ctx: GraphRepository, ids: Seq[S2VertexLike]) => {
ctx.getVertices(ids)
})

val edgeFetcher = Fetcher((ctx: GraphRepository, ids: Seq[DeferFetchEdges]) => {
implicit val ec = ctx.ec

val edgesByParam = ids.groupBy(_.qp).map { case (qp, deLs) =>
val vertices = deLs.map(de => de.v)

ctx.getEdges(vertices, qp).map(qp -> _)
}

val f: Future[Iterable[(QueryParam, Seq[S2EdgeLike])]] = Future.sequence(edgesByParam)
val grouped = f.map { tpLs =>
tpLs.toSeq.flatMap { case (qp, edges) =>
edges.groupBy(_.srcForVertex).map { case (v, edges) => (v, qp, edges) }
}
}

grouped
})

def withLogTryResponse[A](opName: String, tryObj: Try[A])(implicit logger: Logger): Try[A] = {
tryObj match {
case Success(v) => logger.info(s"${opName} Success:", v)
Expand All @@ -42,6 +75,9 @@ object GraphRepository {

tryObj
}

case class DeferFetchEdges(v: S2VertexLike, qp: QueryParam)

}

class GraphRepository(val graph: S2GraphLike) {
Expand Down Expand Up @@ -89,6 +125,13 @@ class GraphRepository(val graph: S2GraphLike) {
graph.getVertices(vertex)
}

def getEdges(vertices: Seq[S2VertexLike], queryParam: QueryParam): Future[Seq[S2EdgeLike]] = {
val step = Step(Seq(queryParam))
val q = Query(vertices, steps = Vector(step))

graph.getEdges(q).map(_.edgeWithScores.map(_.edge))
}

def getEdges(vertex: S2VertexLike, queryParam: QueryParam): Future[Seq[S2EdgeLike]] = {
val step = Step(Seq(queryParam))
val q = Query(Seq(vertex), steps = Vector(step))
Expand Down Expand Up @@ -232,13 +275,9 @@ class GraphRepository(val graph: S2GraphLike) {
withLogTryResponse("deleteLabel", deleteLabelTry)
}

def allServices(): List[Service] = Service.findAll()

def allServiceColumns(): List[ServiceColumn] = ServiceColumn.findAll()

def findServiceByName(name: String): Option[Service] = Service.findByName(name)
def services(): List[Service] = Service.findAll()

def allLabels() = Label.findAll()
def serviceColumns(): List[ServiceColumn] = ServiceColumn.findAll()

def findLabelByName(name: String): Option[Label] = Label.findByName(name)
def labels() = Label.findAll()
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ManagementType(repo: GraphRepository) {
import org.apache.s2graph.graphql.bind.Unmarshaller._
import org.apache.s2graph.graphql.types.StaticTypes._

lazy val serviceColumnOnServiceWithPropInputObjectFields = repo.allServices.map { service =>
lazy val serviceColumnOnServiceWithPropInputObjectFields = repo.services().map { service =>
InputField(service.serviceName, OptionInputType(InputObjectType(
s"Input_${service.serviceName}_ServiceColumn_Props",
description = "desc here",
Expand All @@ -78,7 +78,7 @@ class ManagementType(repo: GraphRepository) {
)))
}

lazy val serviceColumnOnServiceInputObjectFields = repo.allServices.map { service =>
lazy val serviceColumnOnServiceInputObjectFields = repo.services().map { service =>
InputField(service.serviceName, OptionInputType(InputObjectType(
s"Input_${service.serviceName}_ServiceColumn",
description = "desc here",
Expand All @@ -99,7 +99,7 @@ class ManagementType(repo: GraphRepository) {
)
}

lazy val labelPropsInputFields = repo.allLabels().map { label =>
lazy val labelPropsInputFields = repo.labels().map { label =>
InputField(label.label, OptionInputType(InputObjectType(
s"Input_${label.label}_props",
description = "desc here",
Expand Down Expand Up @@ -135,7 +135,7 @@ class ManagementType(repo: GraphRepository) {
s"Enum_Service",
description = Option("desc here"),
values =
dummyEnum +: repo.allServices.map { service =>
dummyEnum +: repo.services().map { service =>
EnumValue(service.serviceName, value = service.serviceName)
}
)
Expand All @@ -144,7 +144,7 @@ class ManagementType(repo: GraphRepository) {
s"Enum_ServiceColumn",
description = Option("desc here"),
values =
dummyEnum +: repo.allServiceColumns.map { serviceColumn =>
dummyEnum +: repo.serviceColumns().map { serviceColumn =>
EnumValue(serviceColumn.columnName, value = serviceColumn.columnName)
}
)
Expand All @@ -153,7 +153,7 @@ class ManagementType(repo: GraphRepository) {
s"Enum_Label",
description = Option("desc here"),
values =
dummyEnum +: repo.allLabels().map { label =>
dummyEnum +: repo.labels().map { label =>
EnumValue(label.label, value = label.label)
}
)
Expand Down Expand Up @@ -183,8 +183,8 @@ class ManagementType(repo: GraphRepository) {
arguments = List(LabelNameArg),
resolve = { c =>
c.argOpt[String]("name") match {
case Some(name) => c.ctx.allLabels().filter(_.label == name)
case None => c.ctx.allLabels()
case Some(name) => c.ctx.labels().filter(_.label == name)
case None => c.ctx.labels()
}
}
)
Expand Down Expand Up @@ -222,8 +222,8 @@ class ManagementType(repo: GraphRepository) {
arguments = List(ServiceNameArg),
resolve = { c =>
c.argOpt[String]("name") match {
case Some(name) => c.ctx.allServices.filter(_.serviceName == name)
case None => c.ctx.allServices
case Some(name) => c.ctx.services().filter(_.serviceName == name)
case None => c.ctx.services()
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.s2graph.graphql.repository.GraphRepository
import sangria.schema._
import org.apache.s2graph.graphql.bind.AstHelper
import org.apache.s2graph.graphql.repository
import org.apache.s2graph.graphql.repository.GraphRepository.DeferFetchEdges
import org.apache.s2graph.graphql.types.StaticTypes._

import scala.language.existentials
Expand Down Expand Up @@ -142,10 +143,10 @@ object S2Type {
description = Option("desc here"),
resolve = c => {
implicit val ec = c.ctx.ec
val (vertices, canSkipFetchVertex) = graphql.types.FieldResolver.serviceColumnOnService(column, c)
val (vertices, canSkipFetchVertex) = FieldResolver.serviceColumnOnService(column, c)

if (canSkipFetchVertex) Future.successful(vertices)
else c.ctx.getVertices(vertices)
else GraphRepository.vertexFetcher.deferSeq(vertices)
}
): Field[GraphRepository, Any]
}
Expand All @@ -170,10 +171,10 @@ object S2Type {

lazy val serviceColumnField: Field[GraphRepository, Any] = Field(column.columnName, labelColumnType, resolve = c => {
implicit val ec = c.ctx.ec
val (vertex, canSkipFetchVertex) = graphql.types.FieldResolver.serviceColumnOnLabel(c)
val (vertex, canSkipFetchVertex) = FieldResolver.serviceColumnOnLabel(c)

if (canSkipFetchVertex) Future.successful(vertex)
else c.ctx.getVertices(Seq(vertex)).map(_.head) // fill props
else GraphRepository.vertexFetcher.defer(vertex)
})

lazy val EdgeType = ObjectType(
Expand Down Expand Up @@ -211,14 +212,18 @@ object S2Type {
arguments = dirArgs ++ paramArgs,
description = Some("fetch edges"),
resolve = { c =>
implicit val ec = c.ctx.ec

val (vertex, queryParam) = graphql.types.FieldResolver.label(label, c)
c.ctx.getEdges(vertex, queryParam)
val de = DeferFetchEdges(vertex, queryParam)
val empty = Seq.empty[S2EdgeLike]

DeferredValue(GraphRepository.edgeFetcher.deferOpt(de)).map(m => m.fold(empty)(_._3))
}
)

edgeTypeField
}

}

class S2Type(repo: GraphRepository) {
Expand All @@ -231,8 +236,8 @@ class S2Type(repo: GraphRepository) {
/**
* fields
*/
lazy val serviceFields: List[Field[GraphRepository, Any]] = repo.allServices.map { service =>
lazy val serviceFields = DummyObjectTypeField :: makeServiceField(service, repo.allLabels())
lazy val serviceFields: List[Field[GraphRepository, Any]] = repo.services().map { service =>
lazy val serviceFields = DummyObjectTypeField :: makeServiceField(service, repo.labels())

lazy val ServiceType = ObjectType(
s"Service_${service.serviceName}",
Expand All @@ -251,7 +256,7 @@ class S2Type(repo: GraphRepository) {
* arguments
*/
lazy val addVertexArg = {
val serviceArguments = repo.allServices().map { service =>
val serviceArguments = repo.services().map { service =>
val serviceFields = DummyInputField +: makeInputFieldsOnService(service)

val ServiceInputType = InputObjectType[List[AddVertexParam]](
Expand All @@ -265,7 +270,7 @@ class S2Type(repo: GraphRepository) {
}

lazy val addEdgeArg = {
val labelArguments = repo.allLabels().map { label =>
val labelArguments = repo.labels().map { label =>
val labelFields = DummyInputField +: makeInputFieldsOnLabel(label)
val labelInputType = InputObjectType[AddEdgeParam](
s"Input_label_${label.label}_param",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@
package org.apache.s2graph.graphql

import com.typesafe.config.Config
import org.apache.s2graph
import org.apache.s2graph.core.Management.JsonModel.Prop
import org.apache.s2graph.core.mysqls.{Label, Model, Service}
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.{Management, S2Graph}
import org.apache.s2graph.graphql
import org.apache.s2graph.graphql.repository.GraphRepository
import org.apache.s2graph.graphql.types.SchemaDef
import play.api.libs.json._
import sangria.ast.Document
import sangria.execution.Executor
import sangria.execution.deferred.DeferredResolver
import sangria.renderer.SchemaRenderer
import sangria.schema.Schema

Expand All @@ -53,14 +56,23 @@ trait TestGraph {

def showSchema: String

import GraphRepository._

val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher)

def queryAsJs(query: Document): JsValue = {
implicit val playJsonMarshaller = sangria.marshalling.playJson.PlayJsonResultMarshaller
val js = Await.result(Executor.execute(schema, query, repository), Duration("10 sec"))
js
Await.result(
Executor.execute(schema, query, repository, deferredResolver = resolver),
Duration("10 sec")
)
}

def queryAsRaw(query: Document, graph: TestGraph): Any = {
Await.result(Executor.execute(schema, query, repository), Duration("10 sec"))
Await.result(
Executor.execute(schema, query, repository, deferredResolver = resolver),
Duration("10 sec")
)
}
}

Expand Down