Skip to content

Commit

Permalink
Ajout de l'import de masse
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed May 10, 2023
1 parent 056e8d2 commit 37f2d6e
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 3 deletions.
144 changes: 141 additions & 3 deletions nio-server/app/controllers/ConsentController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@ package controllers
import akka.actor.ActorSystem
import akka.http.scaladsl.util.FastFuture
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{Flow, Framing, Sink, Source}
import akka.util.ByteString
import auth.SecuredAuthContext
import auth.{AuthAction, SecuredAuthContext}
import controllers.ErrorManager.{AppErrorManagerResult, ErrorManagerResult, ErrorWithStatusManagerResult}
import db.{ConsentFactMongoDataStore, LastConsentFactMongoDataStore, OrganisationMongoDataStore, UserMongoDataStore}
import libs.io.IO
import libs.io._
import libs.xmlorjson.XmlOrJson
import messaging.KafkaMessageBroker
import models.ConsentFactCommand.{PatchConsentFact, UpdateConsentFact}
import models.{ConsentFact, _}
import utils.NioLogger
import play.api.http.HttpEntity
import play.api.libs.json.Json
import play.api.libs.json.{JsError, JsNull, JsValue, Json}
import play.api.libs.streams.Accumulator
import play.api.mvc._
import reactivemongo.api.Cursor
import reactivemongo.api.bson.BSONDocument
Expand Down Expand Up @@ -268,6 +270,142 @@ class ConsentController(
} yield result).merge
}

val newLineSplit = Framing.delimiter(ByteString("\n"), 10000, allowTruncation = true)
val toJson = Flow[ByteString] via newLineSplit map (_.utf8String) filterNot (_.isEmpty) map (l => Json.parse(l))
def ndJson(implicit ec: ExecutionContext): BodyParser[Source[JsValue, _]] =
BodyParser(_ => Accumulator.source[ByteString].map(s => Right(s.via(toJson)))(ec))


object ImportError {
implicit val format = Json.format[ImportError]
}
case class ImportError(message: String, detailedError: JsValue = JsNull, command: JsValue = JsNull)
object ImportResult {
def error(message: String, command: JsValue = JsNull): ImportResult = {
ImportResult(errorsCount = 1, errors = List(ImportError(message, command = command)))
}

implicit val format = Json.format[ImportResult]
}
case class ImportResult(successCount: Int = 0, errorsCount: Int = 0, errors: List[ImportError] = List.empty) {
def combine (other: ImportResult) : ImportResult =
ImportResult(
successCount = successCount + other.successCount,
errorsCount = errorsCount + other.errorsCount,
errors = errors ++ other.errors
)
}

def batchImport(tenant: String, orgKey: String) = AuthAction(ndJson).async { implicit req =>
val result: Future[JsValue] = req.body.mapAsync(10) { json =>
json.validate[ConsentFactCommand].fold(
{ err => FastFuture.successful(ImportResult(errorsCount = 1, errors = List(ImportError("json parsing error", detailedError = JsError.toJson(err), command = json)))) },
{
case UpdateConsentFact(userId, consentFact) => handleImportUpdate(tenant, orgKey, req, json, userId, consentFact)
case PatchConsentFact(userId, patchCommand) => handleImportPatch(tenant, orgKey, req, json, userId, patchCommand)
}
)
}
.fold(ImportResult()){ (acc, elt) => acc combine elt }
.map { importResult => Json.toJson(importResult) }
.runWith(Sink.head)

result.map { json => Ok(json) }
}

private def handleImportPatch(tenant: String, orgKey: String, req: SecuredAuthContext[Source[JsValue, _]], json: JsValue, userId: String, patchCommand: PartialConsentFact): Future[ImportResult] = {
(for {
_ <- if (patchCommand.userId.isDefined && !patchCommand.userId.contains(userId)) IO.error(ImportResult.error("error.userId.is.immutable", command = json))
else IO.succeed(patchCommand)
command = patchCommand.copy(orgKey = Some(orgKey))
result <- patchCommand.offers match {
case Some(offers) =>
for {
_ <- IO.fromOption(req.authInfo.offerRestrictionPatterns, {
val errorMessages = offers.map(o => ImportError(s"offer.${o.key}.not.authorized", command = json));
NioLogger.error(s"not authorized : ${errorMessages.map(_.message)}");
ImportResult(errorsCount = errorMessages.size, errors = errorMessages.to(List))
})
_ <- IO.succeed[ImportResult](offers.filterNot(o => accessibleOfferService.accessibleOfferKey(o.key, req.authInfo.offerRestrictionPatterns)))
.keep(offer => offer.isEmpty, { unauthorizedOffers =>
val errorMessages = unauthorizedOffers.map(o => ImportError(s"offer.${o.key}.not.authorized", command = json))
NioLogger.error(s"not authorized : ${errorMessages.map(_.message)}")
ImportResult(errorsCount = errorMessages.size, errors = errorMessages.to(List))
})
consentFactSaved <- consentManagerService
.partialUpdate(tenant, req.authInfo.sub, req.authInfo.metadatas, orgKey, userId, command, Json.toJson(patchCommand))
.mapError { error =>
NioLogger.error(s"error during consent fact saving $error")
ImportResult(errorsCount = 1, errors = List(ImportError(message = "Error during update", detailedError = error.appErrors.asJson(), command = json)))
}
} yield ImportResult(successCount = 1)
case None =>
consentManagerService
.partialUpdate(tenant, req.authInfo.sub, req.authInfo.metadatas, orgKey, userId, command, Json.toJson(patchCommand))
.mapError { error =>
ImportResult(errorsCount = 1, errors = List(ImportError(message = "Error during update", detailedError = error.appErrors.asJson(), command = json)))
}
.map { _ => ImportResult(successCount = 1) }
}
} yield result).merge
}

private def handleImportUpdate(tenant: String, orgKey: String, req: SecuredAuthContext[Source[JsValue, _]], json: JsValue, userId: String, consentFact: ConsentFact): Future[ImportResult] = {
if (consentFact.userId != userId) {
NioLogger.error(s"error.userId.is.immutable : userId in path $userId // userId on body ${consentFact.userId}")

FastFuture.successful(ImportResult.error("error.userId.is.immutable", command = json))
} else {
val cf: ConsentFact = ConsentFact.addOrgKey(consentFact, orgKey)

(cf.offers, req.authInfo.offerRestrictionPatterns) match {
// case ask create or update offers but no pattern allowed
case (Some(offers), None) =>
val errorMessages =
offers.map(o => ImportError(s"offer.${o.key}.not.authorized", command = json))
NioLogger.error(s"not authorized : ${errorMessages.map(_.message)}")
Future.successful(ImportResult(errorsCount = errorMessages.size, errors = errorMessages.to(List)))

// case create or update consents without offers
case (None, _) =>
consentManagerService
.saveConsents(tenant, req.authInfo.sub, req.authInfo.metadatas, orgKey, userId, cf, Json.toJson(cf))
.fold(
error => {
NioLogger.error(s"error during consent fact saving $error")
ImportResult(errorsCount = 1, errors = List(ImportError(message = "Error during update", detailedError = error.appErrors.asJson(), command = json)))
},
consentFactSaved => ImportResult(successCount = 1)
)
// case create or update offers and some patterns are specified
case (Some(offers), Some(_)) =>
// validate offers key are accessible
offers
.filterNot(o =>
accessibleOfferService.accessibleOfferKey(o.key, req.authInfo.offerRestrictionPatterns)
) match {
// case all offers in consent (body) are accessible
case Nil =>
consentManagerService
.saveConsents(tenant, req.authInfo.sub, req.authInfo.metadatas, orgKey, userId, cf, Json.toJson(cf))
.fold(
error => {
NioLogger.error(s"error during consent fact saving $error")
ImportResult(errorsCount = 1, errors = List(ImportError(message = "Error during update", detailedError = error.appErrors.asJson(), command = json)))
},
_ => ImportResult(successCount = 1)
)

// case one or more offers are not accessible
case unauthorizedOffers =>
val errorMessages = unauthorizedOffers.map(o => ImportError(s"offer.${o.key}.not.authorized", command = json))
NioLogger.error(s"not authorized : ${errorMessages.map(_.message)}")
FastFuture.successful(ImportResult(errorsCount = errorMessages.size, errors = errorMessages.to(List)))
}
}
}
}

lazy val defaultPageSize: Int =
sys.env.get("DEFAULT_PAGE_SIZE").map(_.toInt).getOrElse(200)
lazy val defaultParSize: Int =
Expand Down
30 changes: 30 additions & 0 deletions nio-server/app/models/ConsentFact.scala
Original file line number Diff line number Diff line change
Expand Up @@ -617,3 +617,33 @@ object ConsentFact extends ReadableEntity[ConsentFact] {
def addOrgKey(consentFact: ConsentFact, orgKey: String): ConsentFact =
consentFact.copy(orgKey = Some(orgKey))
}

sealed trait ConsentFactCommand

object ConsentFactCommand {
case class PatchConsentFact(userId: String, command: PartialConsentFact) extends ConsentFactCommand

object PatchConsentFact {
val format = Json.format[PatchConsentFact]
}
case class UpdateConsentFact(userId: String, command: ConsentFact) extends ConsentFactCommand

object UpdateConsentFact {
val format = Json.format[UpdateConsentFact]
}

implicit val format = Format(
Reads[ConsentFactCommand] { js =>
(js \ "type").validate[String].flatMap {
case "Update" => UpdateConsentFact.format.reads(js)
case "Patch" => PatchConsentFact.format.reads(js)
}
},
Writes[ConsentFactCommand] {
case c: UpdateConsentFact => UpdateConsentFact.format.writes(c) ++ Json.obj("type" -> "Update")
case c: PatchConsentFact => PatchConsentFact.format.writes(c) ++ Json.obj("type" -> "Patch")
}
)

}

2 changes: 2 additions & 0 deletions nio-server/conf/routes
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ POST /api/:tenant/organisations/:orgKey/draft/_release

GET /api/:tenant/organisations/:orgKey/users/_template controllers.ConsentController.getTemplate(tenant: String, orgKey: String, userId: Option[String] ?= None, offerKeys: Option[Seq[String]] ?= None)

POST /api/:tenant/organisations/:orgKey/users/_batch controllers.ConsentController.batchImport(tenant: String, orgKey)

PUT /api/:tenant/organisations/:orgKey/users/:userId controllers.ConsentController.createOrReplaceIfExists(tenant: String, orgKey: String, userId: String)

PATCH /api/:tenant/organisations/:orgKey/users/:userId controllers.ConsentController.partialUpdate(tenant: String, orgKey: String, userId: String)
Expand Down

0 comments on commit 37f2d6e

Please sign in to comment.