Skip to content

Commit

Permalink
Sharding sur l'id
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed May 11, 2023
1 parent a74dc29 commit add0ff9
Showing 1 changed file with 36 additions and 13 deletions.
49 changes: 36 additions & 13 deletions nio-server/app/controllers/ConsentController.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package controllers

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.util.FastFuture
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Framing, Sink, Source}
import akka.stream.{FlowShape, Materializer}
import akka.stream.scaladsl.{Flow, Framing, GraphDSL, Merge, Partition, Sink, Source}
import akka.util.ByteString
import auth.{AuthAction, SecuredAuthContext}
import controllers.ErrorManager.{AppErrorManagerResult, ErrorManagerResult, ErrorWithStatusManagerResult}
Expand All @@ -27,6 +28,7 @@ import utils.Result.{AppErrors, ErrorMessage}

import scala.collection.Seq
import scala.concurrent.{ExecutionContext, Future}
import scala.util.hashing.MurmurHash3

class ConsentController(
val AuthAction: ActionBuilder[SecuredAuthContext, AnyContent],
Expand Down Expand Up @@ -294,19 +296,40 @@ class ConsentController(
)
}

def batchImport(tenant: String, orgKey: String) = AuthAction.async(ndJson) { implicit req =>
val result: Future[JsValue] = req.body.mapAsync(10) { json =>
json.validate[ConsentFactCommand].fold(
{ err => FastFuture.successful(ImportResult(errorsCount = 1, errors = List(ImportError("json parsing error", detailedError = JsError.toJson(err), command = json)))) },
{
case UpdateConsentFact(userId, consentFact) => handleImportUpdate(tenant, orgKey, req, json, userId, consentFact)
case PatchConsentFact(userId, patchCommand) => handleImportPatch(tenant, orgKey, req, json, userId, patchCommand)
def sharding[In, Out](parallelism: Int, aFlow: Flow[(String, In), Out, NotUsed]) =
Flow.fromGraph {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

val merge = b.add(Merge[Out](parallelism))
val partition = b.add(Partition[(String, In)](parallelism, {
case (id, _) => Math.abs(MurmurHash3.stringHash(id) % parallelism)
}))

for (i <- 0 until parallelism) {
partition.out(i) ~> aFlow.async ~> merge.in(i)
}
)

FlowShape(partition.in, merge.out)
}
}
.fold(ImportResult()){ (acc, elt) => acc combine elt }
.map { importResult => Json.toJson(importResult) }
.runWith(Sink.head)


def batchImport(tenant: String, orgKey: String) = AuthAction.async(ndJson) { implicit req =>
val result: Future[JsValue] = req.body
.map(json => ((json \ "userId").validate[String].getOrElse(""), json))
.via(sharding(10, Flow[(String, JsValue)].mapAsync(1) { case (_, json) =>
json.validate[ConsentFactCommand].fold(
{ err => FastFuture.successful(ImportResult(errorsCount = 1, errors = List(ImportError("json parsing error", detailedError = JsError.toJson(err), command = json)))) },
{
case UpdateConsentFact(userId, consentFact) => handleImportUpdate(tenant, orgKey, req, json, userId, consentFact)
case PatchConsentFact(userId, patchCommand) => handleImportPatch(tenant, orgKey, req, json, userId, patchCommand)
}
)
}))
.fold(ImportResult()){ (acc, elt) => acc combine elt }
.map { importResult => Json.toJson(importResult) }
.runWith(Sink.head)

result.map { json => Ok(json) }
}
Expand Down

0 comments on commit add0ff9

Please sign in to comment.