Skip to content
This repository has been archived by the owner on Sep 27, 2021. It is now read-only.

Commit

Permalink
Added storage to events
Browse files Browse the repository at this point in the history
  • Loading branch information
umbreak committed Mar 1, 2019
1 parent 529122d commit b7a4add
Show file tree
Hide file tree
Showing 17 changed files with 322 additions and 324 deletions.
4 changes: 4 additions & 0 deletions src/main/resources/contexts/resource-context.json
Expand Up @@ -31,6 +31,10 @@
"@id": "nxv:constrainedBy",
"@type": "@id"
},
"_storageId": {
"@id": "nxv:storageId",
"@type": "@id"
},
"_rev": "nxv:rev",
"_digest": "nxv:digest",
"_algorithm": "nxv:algorithm",
Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/elasticsearch/default-context.json
Expand Up @@ -16,6 +16,10 @@
"@id": "https://bluebrain.github.io/nexus/vocabulary/createdBy",
"@type": "@id"
},
"_storageId": {
"@id": "https://bluebrain.github.io/nexus/vocabulary/storageId",
"@type": "@id"
},
"_constrainedBy": {
"@id": "https://bluebrain.github.io/nexus/vocabulary/constrainedBy",
"@type": "@id"
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/elasticsearch/mapping.json
Expand Up @@ -42,6 +42,9 @@
"_mediaType": {
"type": "keyword"
},
"_storageId": {
"type": "keyword"
},
"_filename": {
"type": "keyword"
},
Expand Down
Expand Up @@ -52,6 +52,7 @@ object Vocabulary {
val value = PrefixMapping.metadata("value")
val bytes = PrefixMapping.metadata("bytes")
val mediaType = PrefixMapping.metadata("mediaType")
val storageId = PrefixMapping.metadata("storageId")

// ElasticSearch sourceAsText predicate
val originalSource = PrefixMapping.metadata("original_source")
Expand Down
30 changes: 21 additions & 9 deletions src/main/scala/ch/epfl/bluebrain/nexus/kg/resources/Command.scala
Expand Up @@ -7,6 +7,7 @@ import ch.epfl.bluebrain.nexus.kg.config.Schemas._
import ch.epfl.bluebrain.nexus.kg.config.Vocabulary._
import ch.epfl.bluebrain.nexus.kg.resources.file.File.FileAttributes
import ch.epfl.bluebrain.nexus.kg.resources.syntax._
import ch.epfl.bluebrain.nexus.kg.storage.Storage
import ch.epfl.bluebrain.nexus.rdf.Iri.AbsoluteIri
import io.circe.Json

Expand Down Expand Up @@ -117,12 +118,17 @@ object Command {
/**
* An intent to create a file resource.
*
* @param id the resource identifier
* @param value the file metadata
* @param instant the instant when this event was recorded
* @param id the resource identifier
* @param storage the storage where the file is going to be saved
* @param value the file metadata
* @param instant the instant when this event was recorded
* @param subject the subject which generated this event
*/
final case class CreateFile(id: Id[ProjectRef], value: FileAttributes, instant: Instant, subject: Subject)
final case class CreateFile(id: Id[ProjectRef],
storage: Storage,
value: FileAttributes,
instant: Instant,
subject: Subject)
extends Command {

/**
Expand All @@ -144,13 +150,19 @@ object Command {
/**
* An intent to update a file resource.
*
* @param id the resource identifier
* @param rev the last known revision of the resource when this command was created
* @param value the file metadata
* @param instant the instant when this event was recorded
* @param id the resource identifier
* @param storage the storage where the file is going to be saved
* @param rev the last known revision of the resource when this command was created
* @param value the file metadata
* @param instant the instant when this event was recorded
* @param subject the subject which generated this event
*/
final case class UpdateFile(id: Id[ProjectRef], rev: Long, value: FileAttributes, instant: Instant, subject: Subject)
final case class UpdateFile(id: Id[ProjectRef],
storage: Storage,
rev: Long,
value: FileAttributes,
instant: Instant,
subject: Subject)
extends Command {

/**
Expand Down
Expand Up @@ -10,6 +10,7 @@ import ch.epfl.bluebrain.nexus.kg.config.Schemas.fileSchemaUri
import ch.epfl.bluebrain.nexus.kg.config.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.kg.resources.file.File.{Digest, FileAttributes}
import ch.epfl.bluebrain.nexus.kg.resources.syntax._
import ch.epfl.bluebrain.nexus.kg.storage.Storage
import ch.epfl.bluebrain.nexus.rdf.Iri.AbsoluteIri
import ch.epfl.bluebrain.nexus.rdf.syntax._
import io.circe.{Encoder, Json}
Expand Down Expand Up @@ -128,12 +129,14 @@ object Event {
* A witness that a file resource has been created.
*
* @param id the resource identifier
* @param storage the storage used to save the file
* @param attributes the metadata of the file
* @param instant the instant when this event was recorded
* @param subject the identity which generated this event
*/
final case class FileCreated(
id: Id[ProjectRef],
storage: Storage,
attributes: FileAttributes,
instant: Instant,
subject: Subject
Expand All @@ -159,13 +162,15 @@ object Event {
* A witness that a file resource has been updated.
*
* @param id the resource identifier
* @param storage the storage used to save the file
* @param rev the revision that this event generated
* @param attributes the metadata of the file
* @param instant the instant when this event was recorded
* @param subject the identity which generated this event
*/
final case class FileUpdated(
id: Id[ProjectRef],
storage: Storage,
rev: Long,
attributes: FileAttributes,
instant: Instant,
Expand All @@ -186,6 +191,7 @@ object Event {
.withDiscriminator("@type")
.copy(transformMemberNames = {
case "id" => "_resourceId"
case "storageId" => "_storageId"
case "rev" => "_rev"
case "instant" => "_instant"
case "subject" => "_subject"
Expand Down Expand Up @@ -217,6 +223,8 @@ object Event {
private implicit def subjectIdEncoder(implicit ic: IamClientConfig): Encoder[Subject] =
Encoder.encodeJson.contramap(_.id.asJson)

private implicit def storageEncoder: Encoder[Storage] = deriveEncoder[Storage]

implicit def eventsEventEncoder(implicit ic: IamClientConfig): Encoder[Event] =
Encoder.encodeJson.contramap[Event] { ev =>
deriveEncoder[Event]
Expand Down
119 changes: 30 additions & 89 deletions src/main/scala/ch/epfl/bluebrain/nexus/kg/resources/Repo.scala
Expand Up @@ -17,13 +17,14 @@ import ch.epfl.bluebrain.nexus.kg.resources.Event._
import ch.epfl.bluebrain.nexus.kg.resources.Rejection._
import ch.epfl.bluebrain.nexus.kg.resources.Repo.Agg
import ch.epfl.bluebrain.nexus.kg.resources.State.{Current, Initial}
import ch.epfl.bluebrain.nexus.kg.resources.file.File.{FileAttributes, FileDescription}
import ch.epfl.bluebrain.nexus.kg.resources.file.File.FileDescription
import ch.epfl.bluebrain.nexus.kg.storage.Storage
import ch.epfl.bluebrain.nexus.kg.storage.Storage.StorageOperations.Save
import ch.epfl.bluebrain.nexus.kg.{resources, uuid}
import ch.epfl.bluebrain.nexus.rdf.Iri.AbsoluteIri
import ch.epfl.bluebrain.nexus.sourcing.Aggregate
import ch.epfl.bluebrain.nexus.sourcing.akka.{AkkaAggregate, SourcingConfig}
import ch.epfl.bluebrain.nexus.sourcing.retry.Retry
import ch.epfl.bluebrain.nexus.kg.storage.StorageOperations
import io.circe.Json

/**
Expand Down Expand Up @@ -98,107 +99,45 @@ class Repo[F[_]: Monad](agg: Agg[F], clock: Clock, toIdentifier: ResId => String
* Creates a file resource.
*
* @param id the id of the resource
* @param storage the storage where the file is going to be saved
* @param fileDesc the file description metadata
* @param source the source of the file
* @param instant an optionally provided operation instant
* @tparam In the storage input type
* @return either a rejection or the new resource representation in the F context
*/
def createFile[In](id: ResId, fileDesc: FileDescription, source: In, instant: Instant = clock.instant)(
implicit subject: Subject,
storageOps: StorageOperations[F, In, _]): EitherT[F, Rejection, Resource] =
def createFile[In](id: ResId,
storage: Storage,
fileDesc: FileDescription,
source: In,
instant: Instant = clock.instant)(implicit subject: Subject,
saveStorage: Save[F, In]): EitherT[F, Rejection, Resource] =
EitherT
.right(storageOps.save(id, fileDesc, source))
.flatMap(attr => evaluate(id, CreateFile(id, attr, instant, subject)))
.right(storage.save.apply(id, fileDesc, source))
.flatMap(attr => evaluate(id, CreateFile(id, storage, attr, instant, subject)))

/**
* Replaces a file resource.
*
* @param id the id of the resource
* @param storage the storage where the file is going to be saved
* @param rev the optional last known revision of the resource
* @param fileDesc the file description metadata
* @param source the source of the file
* @param instant an optionally provided operation instant
* @tparam In the storage input type
* @return either a rejection or the new resource representation in the F context
*/
def updateFile[In](id: ResId, rev: Long, fileDesc: FileDescription, source: In, instant: Instant = clock.instant)(
implicit subject: Subject,
storageOps: StorageOperations[F, In, _]): EitherT[F, Rejection, Resource] =
def updateFile[In](id: ResId,
storage: Storage,
rev: Long,
fileDesc: FileDescription,
source: In,
instant: Instant = clock.instant)(implicit subject: Subject,
saveStorage: Save[F, In]): EitherT[F, Rejection, Resource] =
EitherT
.right(storageOps.save(id, fileDesc, source))
.flatMap(attr => evaluate(id, UpdateFile(id, rev, attr, instant, subject)))

/**
* Attempts to stream the file resource identified by the argument id.
*
* @param id the id of the resource.
* @param schema the optionally available schema of the resource
* @tparam Out the type for the output streaming of the file
* @return the optional streamed file in the F context
*/
def getFile[Out](id: ResId, schema: Option[Ref])(
implicit storageOps: StorageOperations[F, _, Out]): OptionT[F, (FileAttributes, Out)] =
get(id, schema) subflatMap (_.file.flatMap(at => Some(storageOps.fetch(at)).map(out => at -> out)))

/**
* Attempts to stream the file resource identified by the argument id and the revision.
*
* @param id the id of the resource
* @param rev the revision of the resource
* @param schema the optionally available schema of the resource
* @tparam Out the type for the output streaming of the file
* @return the optional streamed file in the F context
*/
def getFile[Out](id: ResId, rev: Long, schema: Option[Ref])(
implicit storageOps: StorageOperations[F, _, Out]): OptionT[F, (FileAttributes, Out)] =
get(id, rev, schema) subflatMap (_.file.flatMap(at => Some(storageOps.fetch(at)).map(out => at -> out)))

/**
* Attempts to stream the file resource identified by the argument id and the tag. The
* tag is transformed into a revision value using the latest resource tag to revision mapping.
*
* @param id the id of the resource
* @param tag the tag of the resource
* @param schema the optionally available schema of the resource
* @tparam Out the type for the output streaming of the file
* @return the optional streamed file in the F context
*/
def getFile[Out](id: ResId, tag: String, schema: Option[Ref])(
implicit storageOps: StorageOperations[F, _, Out]): OptionT[F, (FileAttributes, Out)] =
get(id, tag, schema) subflatMap (_.file.flatMap(at => Some(storageOps.fetch(at)).map(out => at -> out)))

/**
* Attempts to fetch the resource tags identified by the argument id.
*
* @param id the id of the resource
* @param schema the optionally available schema of the resource
* @return the optional streamed file in the F context
*/
def getTags(id: ResId, schema: Option[Ref]): OptionT[F, Tags] =
get(id, schema).map(_.tags)

/**
* Attempts to fetch the resource tags identified by the argument id and the revision.
*
* @param id the id of the resource
* @param rev the revision of the resource
* @param schema the optionally available schema of the resource
* @return the optional streamed file in the F context
*/
def getTags(id: ResId, rev: Long, schema: Option[Ref]): OptionT[F, Tags] =
get(id, rev, schema).map(_.tags)

/**
* Attempts to fetch the resource tags identified by the argument id and the tag.
*
* @param id the id of the resource
* @param tag the tag of the resource
* @param schema the optionally available schema of the resource
* @return the optional streamed file in the F context
*/
def getTags(id: ResId, tag: String, schema: Option[Ref]): OptionT[F, Tags] =
get(id, tag, schema).map(_.tags)
.right(storage.save.apply(id, fileDesc, source))
.flatMap(attr => evaluate(id, UpdateFile(id, storage, rev, attr, instant, subject)))

/**
* Attempts to read the resource identified by the argument id.
Expand Down Expand Up @@ -272,8 +211,10 @@ object Repo {
(state, ev) match {
case (Initial, e @ Created(id, schema, types, value, tm, ident)) =>
Current(id, e.rev, types, false, Map.empty, None, tm, tm, ident, ident, schema, value)
case (Initial, e @ FileCreated(id, file, tm, ident)) =>
Current(id, e.rev, e.types, false, Map.empty, Some(file), tm, tm, ident, ident, e.schema, Json.obj())
case (Initial, e @ FileCreated(id, storage, file, tm, ident)) =>
// format: off
Current(id, e.rev, e.types, deprecated = false, Map.empty, Some(storage -> file), tm, tm, ident, ident, e.schema, Json.obj())
// format: on
case (Initial, _) => Initial
case (c: Current, TagAdded(_, rev, targetRev, name, tm, ident)) =>
c.copy(rev = rev, tags = c.tags + (name -> targetRev), updated = tm, updatedBy = ident)
Expand All @@ -282,8 +223,8 @@ object Repo {
c.copy(rev = rev, updated = tm, updatedBy = ident, deprecated = true)
case (c: Current, Updated(_, rev, types, value, tm, ident)) =>
c.copy(rev = rev, types = types, source = value, updated = tm, updatedBy = ident)
case (c: Current, FileUpdated(_, rev, file, tm, ident)) =>
c.copy(rev = rev, file = Some(file), updated = tm, updatedBy = ident)
case (c: Current, FileUpdated(_, storage, rev, file, tm, ident)) =>
c.copy(rev = rev, file = Some(storage -> file), updated = tm, updatedBy = ident)
}

final def eval(state: State, cmd: Command): Either[Rejection, Event] = {
Expand All @@ -303,7 +244,7 @@ object Repo {

def createFile(c: CreateFile): Either[Rejection, FileCreated] =
state match {
case Initial => Right(FileCreated(c.id, c.value, c.instant, c.subject))
case Initial => Right(FileCreated(c.id, c.storage, c.value, c.instant, c.subject))
case _ => Left(ResourceAlreadyExists(c.id.ref))
}

Expand All @@ -313,7 +254,7 @@ object Repo {
case s: Current if s.rev != c.rev => Left(IncorrectRev(c.id.ref, c.rev, s.rev))
case s: Current if s.deprecated => Left(ResourceIsDeprecated(c.id.ref))
case s: Current if s.file.isEmpty => Left(NotAFileResource(c.id.ref))
case s: Current => Right(FileUpdated(s.id, s.rev + 1, c.value, c.instant, c.subject))
case s: Current => Right(FileUpdated(s.id, c.storage, s.rev + 1, c.value, c.instant, c.subject))
}

def update(c: Update): Either[Rejection, Updated] =
Expand Down
Expand Up @@ -12,6 +12,7 @@ import ch.epfl.bluebrain.nexus.kg.config.Schemas._
import ch.epfl.bluebrain.nexus.kg.config.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.kg.resources.file.File.FileAttributes
import ch.epfl.bluebrain.nexus.kg.resources.syntax._
import ch.epfl.bluebrain.nexus.kg.storage.Storage
import ch.epfl.bluebrain.nexus.rdf.Graph._
import ch.epfl.bluebrain.nexus.rdf.Iri.AbsoluteIri
import ch.epfl.bluebrain.nexus.rdf.Iri.Path._
Expand All @@ -31,7 +32,7 @@ import io.circe.Json
* @param types the collection of known types of this resource
* @param deprecated whether the resource is deprecated of not
* @param tags the collection of tag names to revisions of the resource
* @param file the optional file
* @param file the optional file metadata with the storage where the file was saved
* @param created the instant when this resource was created
* @param updated the last instant when this resource was updated
* @param createdBy the identity that created this resource
Expand All @@ -48,7 +49,7 @@ final case class ResourceF[P, S, A](
types: Set[AbsoluteIri],
deprecated: Boolean,
tags: Map[String, Long],
file: Option[FileAttributes],
file: Option[(Storage, FileAttributes)],
created: Instant,
updated: Instant,
createdBy: Identity,
Expand Down Expand Up @@ -77,15 +78,17 @@ final case class ResourceF[P, S, A](
*/
def metadata(implicit config: AppConfig, project: Project, ev: S =:= Ref): Set[Triple] = {

def triplesFor(at: FileAttributes): Set[Triple] = {
val blankDigest = Node.blank
def triplesFor(storageAndAttributes: (Storage, FileAttributes)): Set[Triple] = {
val blankDigest = Node.blank
val (storage, at) = storageAndAttributes
Set(
(blankDigest, nxv.algorithm, at.digest.algorithm),
(blankDigest, nxv.value, at.digest.value),
(node, rdf.tpe, nxv.File),
(node, nxv.bytes, at.bytes),
(node, nxv.digest, blankDigest),
(node, nxv.mediaType, at.mediaType),
(node, nxv.storageId, storage.id),
(node, nxv.filename, at.filename)
)
}
Expand Down

0 comments on commit b7a4add

Please sign in to comment.