diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala index dcddd011..c2597d63 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala @@ -33,6 +33,7 @@ import org.apache.s2graph.graphql.types.SchemaDef import org.slf4j.LoggerFactory import sangria.ast.Document import sangria.execution._ +import sangria.execution.deferred.DeferredResolver import sangria.marshalling.sprayJson._ import sangria.parser.QueryParser import sangria.schema.Schema @@ -97,13 +98,16 @@ object GraphQLServer { private def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = { val s2schema = schemaCache.withCache("s2Schema")(createNewSchema()) + import GraphRepository._ + val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher) Executor.execute( s2schema, query, s2Repository, variables = vars, - operationName = op + operationName = op, + deferredResolver = resolver ) .map((res: spray.json.JsValue) => OK -> res) .recover { diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala index 078aefd3..c7ffae10 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala @@ -27,6 +27,7 @@ import org.apache.s2graph.core.storage.MutateResponse import org.apache.s2graph.core.types._ import org.apache.s2graph.graphql.types.S2Type._ import org.slf4j.{Logger, LoggerFactory} +import sangria.execution.deferred._ import sangria.schema._ import scala.concurrent._ @@ -34,6 +35,38 @@ import scala.util.{Failure, Success, Try} object GraphRepository { + implicit val vertexHasId = new HasId[S2VertexLike, S2VertexLike] { + override def id(value: S2VertexLike): S2VertexLike = value + } + + implicit val edgeHasId = new HasId[(S2VertexLike, QueryParam, Seq[S2EdgeLike]), DeferFetchEdges] { + override def id(value: (S2VertexLike, QueryParam, Seq[S2EdgeLike])): DeferFetchEdges = + DeferFetchEdges(value._1, value._2) + } + + val vertexFetcher = Fetcher((ctx: GraphRepository, ids: Seq[S2VertexLike]) => { + ctx.getVertices(ids) + }) + + val edgeFetcher = Fetcher((ctx: GraphRepository, ids: Seq[DeferFetchEdges]) => { + implicit val ec = ctx.ec + + val edgesByParam = ids.groupBy(_.qp).map { case (qp, deLs) => + val vertices = deLs.map(de => de.v) + + ctx.getEdges(vertices, qp).map(qp -> _) + } + + val f: Future[Iterable[(QueryParam, Seq[S2EdgeLike])]] = Future.sequence(edgesByParam) + val grouped = f.map { tpLs => + tpLs.toSeq.flatMap { case (qp, edges) => + edges.groupBy(_.srcForVertex).map { case (v, edges) => (v, qp, edges) } + } + } + + grouped + }) + def withLogTryResponse[A](opName: String, tryObj: Try[A])(implicit logger: Logger): Try[A] = { tryObj match { case Success(v) => logger.info(s"${opName} Success:", v) @@ -42,6 +75,9 @@ object GraphRepository { tryObj } + + case class DeferFetchEdges(v: S2VertexLike, qp: QueryParam) + } class GraphRepository(val graph: S2GraphLike) { @@ -89,6 +125,13 @@ class GraphRepository(val graph: S2GraphLike) { graph.getVertices(vertex) } + def getEdges(vertices: Seq[S2VertexLike], queryParam: QueryParam): Future[Seq[S2EdgeLike]] = { + val step = Step(Seq(queryParam)) + val q = Query(vertices, steps = Vector(step)) + + graph.getEdges(q).map(_.edgeWithScores.map(_.edge)) + } + def getEdges(vertex: S2VertexLike, queryParam: QueryParam): Future[Seq[S2EdgeLike]] = { val step = Step(Seq(queryParam)) val q = Query(Seq(vertex), steps = Vector(step)) @@ -232,13 +275,9 @@ class GraphRepository(val graph: S2GraphLike) { withLogTryResponse("deleteLabel", deleteLabelTry) } - def allServices(): List[Service] = Service.findAll() - - def allServiceColumns(): List[ServiceColumn] = ServiceColumn.findAll() - - def findServiceByName(name: String): Option[Service] = Service.findByName(name) + def services(): List[Service] = Service.findAll() - def allLabels() = Label.findAll() + def serviceColumns(): List[ServiceColumn] = ServiceColumn.findAll() - def findLabelByName(name: String): Option[Label] = Label.findByName(name) + def labels() = Label.findAll() } diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala index d07feebd..0312abfe 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala @@ -67,7 +67,7 @@ class ManagementType(repo: GraphRepository) { import org.apache.s2graph.graphql.bind.Unmarshaller._ import org.apache.s2graph.graphql.types.StaticTypes._ - lazy val serviceColumnOnServiceWithPropInputObjectFields = repo.allServices.map { service => + lazy val serviceColumnOnServiceWithPropInputObjectFields = repo.services().map { service => InputField(service.serviceName, OptionInputType(InputObjectType( s"Input_${service.serviceName}_ServiceColumn_Props", description = "desc here", @@ -78,7 +78,7 @@ class ManagementType(repo: GraphRepository) { ))) } - lazy val serviceColumnOnServiceInputObjectFields = repo.allServices.map { service => + lazy val serviceColumnOnServiceInputObjectFields = repo.services().map { service => InputField(service.serviceName, OptionInputType(InputObjectType( s"Input_${service.serviceName}_ServiceColumn", description = "desc here", @@ -99,7 +99,7 @@ class ManagementType(repo: GraphRepository) { ) } - lazy val labelPropsInputFields = repo.allLabels().map { label => + lazy val labelPropsInputFields = repo.labels().map { label => InputField(label.label, OptionInputType(InputObjectType( s"Input_${label.label}_props", description = "desc here", @@ -135,7 +135,7 @@ class ManagementType(repo: GraphRepository) { s"Enum_Service", description = Option("desc here"), values = - dummyEnum +: repo.allServices.map { service => + dummyEnum +: repo.services().map { service => EnumValue(service.serviceName, value = service.serviceName) } ) @@ -144,7 +144,7 @@ class ManagementType(repo: GraphRepository) { s"Enum_ServiceColumn", description = Option("desc here"), values = - dummyEnum +: repo.allServiceColumns.map { serviceColumn => + dummyEnum +: repo.serviceColumns().map { serviceColumn => EnumValue(serviceColumn.columnName, value = serviceColumn.columnName) } ) @@ -153,7 +153,7 @@ class ManagementType(repo: GraphRepository) { s"Enum_Label", description = Option("desc here"), values = - dummyEnum +: repo.allLabels().map { label => + dummyEnum +: repo.labels().map { label => EnumValue(label.label, value = label.label) } ) @@ -183,8 +183,8 @@ class ManagementType(repo: GraphRepository) { arguments = List(LabelNameArg), resolve = { c => c.argOpt[String]("name") match { - case Some(name) => c.ctx.allLabels().filter(_.label == name) - case None => c.ctx.allLabels() + case Some(name) => c.ctx.labels().filter(_.label == name) + case None => c.ctx.labels() } } ) @@ -222,8 +222,8 @@ class ManagementType(repo: GraphRepository) { arguments = List(ServiceNameArg), resolve = { c => c.argOpt[String]("name") match { - case Some(name) => c.ctx.allServices.filter(_.serviceName == name) - case None => c.ctx.allServices + case Some(name) => c.ctx.services().filter(_.serviceName == name) + case None => c.ctx.services() } } ) diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala index d458ba43..9066d217 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala @@ -28,6 +28,7 @@ import org.apache.s2graph.graphql.repository.GraphRepository import sangria.schema._ import org.apache.s2graph.graphql.bind.AstHelper import org.apache.s2graph.graphql.repository +import org.apache.s2graph.graphql.repository.GraphRepository.DeferFetchEdges import org.apache.s2graph.graphql.types.StaticTypes._ import scala.language.existentials @@ -142,10 +143,10 @@ object S2Type { description = Option("desc here"), resolve = c => { implicit val ec = c.ctx.ec - val (vertices, canSkipFetchVertex) = graphql.types.FieldResolver.serviceColumnOnService(column, c) + val (vertices, canSkipFetchVertex) = FieldResolver.serviceColumnOnService(column, c) if (canSkipFetchVertex) Future.successful(vertices) - else c.ctx.getVertices(vertices) + else GraphRepository.vertexFetcher.deferSeq(vertices) } ): Field[GraphRepository, Any] } @@ -170,10 +171,10 @@ object S2Type { lazy val serviceColumnField: Field[GraphRepository, Any] = Field(column.columnName, labelColumnType, resolve = c => { implicit val ec = c.ctx.ec - val (vertex, canSkipFetchVertex) = graphql.types.FieldResolver.serviceColumnOnLabel(c) + val (vertex, canSkipFetchVertex) = FieldResolver.serviceColumnOnLabel(c) if (canSkipFetchVertex) Future.successful(vertex) - else c.ctx.getVertices(Seq(vertex)).map(_.head) // fill props + else GraphRepository.vertexFetcher.defer(vertex) }) lazy val EdgeType = ObjectType( @@ -211,14 +212,18 @@ object S2Type { arguments = dirArgs ++ paramArgs, description = Some("fetch edges"), resolve = { c => + implicit val ec = c.ctx.ec + val (vertex, queryParam) = graphql.types.FieldResolver.label(label, c) - c.ctx.getEdges(vertex, queryParam) + val de = DeferFetchEdges(vertex, queryParam) + val empty = Seq.empty[S2EdgeLike] + + DeferredValue(GraphRepository.edgeFetcher.deferOpt(de)).map(m => m.fold(empty)(_._3)) } ) edgeTypeField } - } class S2Type(repo: GraphRepository) { @@ -231,8 +236,8 @@ class S2Type(repo: GraphRepository) { /** * fields */ - lazy val serviceFields: List[Field[GraphRepository, Any]] = repo.allServices.map { service => - lazy val serviceFields = DummyObjectTypeField :: makeServiceField(service, repo.allLabels()) + lazy val serviceFields: List[Field[GraphRepository, Any]] = repo.services().map { service => + lazy val serviceFields = DummyObjectTypeField :: makeServiceField(service, repo.labels()) lazy val ServiceType = ObjectType( s"Service_${service.serviceName}", @@ -251,7 +256,7 @@ class S2Type(repo: GraphRepository) { * arguments */ lazy val addVertexArg = { - val serviceArguments = repo.allServices().map { service => + val serviceArguments = repo.services().map { service => val serviceFields = DummyInputField +: makeInputFieldsOnService(service) val ServiceInputType = InputObjectType[List[AddVertexParam]]( @@ -265,7 +270,7 @@ class S2Type(repo: GraphRepository) { } lazy val addEdgeArg = { - val labelArguments = repo.allLabels().map { label => + val labelArguments = repo.labels().map { label => val labelFields = DummyInputField +: makeInputFieldsOnLabel(label) val labelInputType = InputObjectType[AddEdgeParam]( s"Input_label_${label.label}_param", diff --git a/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala b/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala index 46db9fc4..acd6f661 100644 --- a/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala +++ b/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala @@ -20,15 +20,18 @@ package org.apache.s2graph.graphql import com.typesafe.config.Config +import org.apache.s2graph import org.apache.s2graph.core.Management.JsonModel.Prop import org.apache.s2graph.core.mysqls.{Label, Model, Service} import org.apache.s2graph.core.rest.RequestParser import org.apache.s2graph.core.{Management, S2Graph} +import org.apache.s2graph.graphql import org.apache.s2graph.graphql.repository.GraphRepository import org.apache.s2graph.graphql.types.SchemaDef import play.api.libs.json._ import sangria.ast.Document import sangria.execution.Executor +import sangria.execution.deferred.DeferredResolver import sangria.renderer.SchemaRenderer import sangria.schema.Schema @@ -53,14 +56,23 @@ trait TestGraph { def showSchema: String + import GraphRepository._ + + val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher) + def queryAsJs(query: Document): JsValue = { implicit val playJsonMarshaller = sangria.marshalling.playJson.PlayJsonResultMarshaller - val js = Await.result(Executor.execute(schema, query, repository), Duration("10 sec")) - js + Await.result( + Executor.execute(schema, query, repository, deferredResolver = resolver), + Duration("10 sec") + ) } def queryAsRaw(query: Document, graph: TestGraph): Any = { - Await.result(Executor.execute(schema, query, repository), Duration("10 sec")) + Await.result( + Executor.execute(schema, query, repository, deferredResolver = resolver), + Duration("10 sec") + ) } }