Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get file size from S3 after upload + fix upload #4952

Merged
merged 5 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ch.epfl.bluebrain.nexus.delta.kernel

object Hex {

/**
* Convert the array of bytes to a string of the hexadecimal values
*/
def valueOf(value: Array[Byte]): String = value.map("%02x".format(_)).mkString
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Common shared utility method


}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ final class Files(
private[files] def updateAttributes(f: FileState, storage: Storage): IO[Unit] = {
val attr = f.attributes
for {
_ <- IO.raiseWhen(f.attributes.digest.computed)(DigestAlreadyComputed(f.id))
_ <- IO.raiseUnless(f.attributes.digest == Digest.NotComputedDigest)(DigestAlreadyComputed(f.id))
newAttr <- fetchAttributes(storage, attr, f.id)
mediaType = attr.mediaType orElse Some(newAttr.mediaType)
command = UpdateFileAttributes(f.id, f.project, mediaType, newAttr.bytes, newAttr.digest, f.rev, f.updatedBy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.circe.{Encoder, JsonObject}
* A digest value
*/
sealed trait Digest extends Product with Serializable {
def computed: Boolean
def computed: Boolean = this != Digest.NotComputedDigest
}

object Digest {
Expand All @@ -21,19 +21,23 @@ object Digest {
* @param value
* the actual value of the digest of the file
*/
final case class ComputedDigest(algorithm: DigestAlgorithm, value: String) extends Digest {
override val computed: Boolean = true
}
final case class ComputedDigest(algorithm: DigestAlgorithm, value: String) extends Digest

/**
* A digest that does not yield a value because it is still being computed
*/
final case object NotComputedDigest extends Digest {
override val computed: Boolean = false
}
final case object NoDigest extends Digest
Copy link
Contributor Author

@imsdu imsdu May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slightly different from the other one (NotComputedDigest) where we were expecting it to be computed in the future


val none: Digest = NoDigest

/**
* A digest that does not yield a value because it is still being computed
*/
final case object NotComputedDigest extends Digest

implicit val digestEncoder: Encoder.AsObject[Digest] = Encoder.encodeJsonObject.contramapObject {
case ComputedDigest(algorithm, value) => JsonObject("_algorithm" -> algorithm.asJson, "_value" -> value.asJson)
case NotComputedDigest => JsonObject("_value" -> "".asJson)
case NoDigest => JsonObject("_value" -> "".asJson)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ object FileEvent {
private object FileExtraFields {
def fromEvent(event: FileEvent): FileExtraFields =
event match {
case c: FileCreated if c.attributes.digest.computed =>
case c: FileCreated if c.attributes.digest != Digest.NotComputedDigest =>
FileExtraFields(
c.storage.iri,
c.storageType,
Expand All @@ -460,7 +460,7 @@ object FileEvent {
c.attributes.mediaType,
Some(c.attributes.origin)
)
case c: FileCreated =>
case c: FileCreated =>
FileExtraFields(
c.storage.iri,
c.storageType,
Expand All @@ -469,7 +469,7 @@ object FileEvent {
None,
Some(c.attributes.origin)
)
case u: FileUpdated if u.attributes.digest.computed =>
case u: FileUpdated if u.attributes.digest != Digest.NotComputedDigest =>
FileExtraFields(
u.storage.iri,
u.storageType,
Expand All @@ -478,7 +478,7 @@ object FileEvent {
u.attributes.mediaType,
Some(u.attributes.origin)
)
case u: FileUpdated =>
case u: FileUpdated =>
FileExtraFields(
u.storage.iri,
u.storageType,
Expand All @@ -487,7 +487,7 @@ object FileEvent {
None,
Some(u.attributes.origin)
)
case fau: FileAttributesUpdated =>
case fau: FileAttributesUpdated =>
FileExtraFields(
fau.storage.iri,
fau.storageType,
Expand All @@ -496,15 +496,15 @@ object FileEvent {
fau.mediaType,
Some(FileAttributesOrigin.Storage)
)
case fcmu: FileCustomMetadataUpdated =>
case fcmu: FileCustomMetadataUpdated =>
FileExtraFields(fcmu.storage.iri, fcmu.storageType, None, None, None, None)
case fta: FileTagAdded =>
case fta: FileTagAdded =>
FileExtraFields(fta.storage.iri, fta.storageType, None, None, None, None)
case ftd: FileTagDeleted =>
case ftd: FileTagDeleted =>
FileExtraFields(ftd.storage.iri, ftd.storageType, None, None, None, None)
case fd: FileDeprecated =>
case fd: FileDeprecated =>
FileExtraFields(fd.storage.iri, fd.storageType, None, None, None, None)
case fud: FileUndeprecated =>
case fud: FileUndeprecated =>
FileExtraFields(fud.storage.iri, fud.storageType, None, None, None, None)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations

import akka.http.scaladsl.model.{StatusCodes, Uri}
import cats.data.NonEmptyList
import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageType
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
Expand Down Expand Up @@ -138,6 +137,14 @@ object StorageFileRejection {
s"File cannot be saved because it already exists on path '$path'."
)

/**
* Rejection returned when a file can not be saved because content-length is not provided
*/
final case object ContentLengthIsMissing
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When doing a put with a stream and asking for a checksum, it seems that the content length is needed

extends SaveFileRejection(
s"Content length must be supplied."
)

/**
* Rejection returned when a storage cannot save a file due to an unexpected reason
*/
Expand Down Expand Up @@ -196,11 +203,6 @@ object StorageFileRejection {
sealed abstract class RegisterFileRejection(loggedDetails: String) extends StorageFileRejection(loggedDetails)

object RegisterFileRejection {
final case class MissingS3Attributes(missingAttributes: NonEmptyList[String])
extends RegisterFileRejection(s"Missing attributes from S3: ${missingAttributes.toList.mkString(", ")}")

final case class InvalidContentType(received: String)
extends RegisterFileRejection(s"Invalid content type returned from S3: $received")

final case class InvalidPath(path: Uri.Path)
extends RegisterFileRejection(s"An S3 path must contain at least the filename. Path was $path")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import akka.stream.scaladsl.{FileIO, Sink}
import akka.util.ByteString
import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Hex
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
Expand Down Expand Up @@ -79,7 +80,7 @@ final class DiskStorageSaveFile(implicit as: ActorSystem, uuidf: UUIDF) {
digest.update(currentBytes.asByteBuffer)
digest
}
.mapMaterializedValue(_.map(dig => ComputedDigest(algorithm, dig.digest.map("%02x".format(_)).mkString)))
.mapMaterializedValue(_.map(dig => ComputedDigest(algorithm, Hex.valueOf(dig.digest))))
}

object DiskStorageSaveFile {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3

import akka.http.scaladsl.model.ContentType
import ch.epfl.bluebrain.nexus.delta.kernel.Hex
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import software.amazon.awssdk.services.s3.model.HeadObjectResponse

import java.util.Base64

case class HeadObject(fileSize: Long, contentType: Option[ContentType], digest: Digest)

object HeadObject {
def apply(response: HeadObjectResponse): HeadObject = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smart constructor to add more value to the class

val contentType = Option(response.contentType()).flatMap { value =>
// It is highly likely for S3 to return an erroneous value here
ContentType.parse(value).toOption
}
val digestValue = Option(response.checksumSHA256).map { encodedChecksum =>
Hex.valueOf(Base64.getDecoder.decode(encodedChecksum))
}
val digest = digestValue.fold(Digest.none) { value =>
ComputedDigest(DigestAlgorithm.SHA256, value)
}
HeadObject(
response.contentLength(),
contentType,
digest
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,18 @@ import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection.UnexpectedFetchError
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.RegisterFileRejection.InvalidContentType
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.HeadObject
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import org.apache.commons.codec.binary.Hex

import java.net.URLDecoder
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Base64
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success, Try}

trait S3FileOperations {
def checkBucketExists(bucket: String): IO[Unit]
Expand Down Expand Up @@ -98,51 +92,21 @@ object S3FileOperations {
log.error(e)(s"Failed fetching required attributes for S3 file registration. Bucket $bucket and path $path")
}

private def parseContentType(raw: Option[String]): IO[Option[ContentType]] = {
raw match {
case Some(value) =>
ContentType.parse(value) match {
case Left(_) => IO.raiseError(InvalidContentType(value))
case Right(value) => IO.pure(Some(value))
}
case None => IO.none
}
}

private def mkS3Metadata(path: Uri.Path, resp: HeadObject)(implicit
uuidf: UUIDF
) =
for {
uuid <- uuidf()
contentType <- parseContentType(resp.contentType)
checksum <- checksumFrom(resp)
uuid <- uuidf()
} yield S3FileMetadata(
contentType,
resp.contentType,
FileStorageMetadata(
uuid,
resp.fileSize,
checksum,
resp.digest,
FileAttributesOrigin.External,
Uri(path.toString()),
path
)
)

private def checksumFrom(response: HeadObject) = IO.fromOption {
response.sha256Checksum
.map { checksum =>
Try {
Base64.getDecoder.decode(checksum)
} match {
case Failure(_) => Digest.NotComputedDigest
case Success(decodedValue) =>
Digest.ComputedDigest(
DigestAlgorithm.default,
Hex.encodeHexString(decodedValue)
)
}

}
}(new IllegalArgumentException("Missing checksum"))

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.UploadMetadata
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import fs2.Stream

Expand All @@ -41,19 +39,21 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient, locationGenerato

private def storeFile(
bucket: String,
path: Uri,
location: Uri,
uuid: UUID,
entity: BodyPartEntity
): IO[FileStorageMetadata] = {
val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes)
val key = path.toString()
val key = location.toString()
(for {
_ <- log(bucket, key, s"Checking for object existence")
_ <- validateObjectDoesNotExist(bucket, key)
_ <- log(bucket, key, s"Beginning upload")
uploadMetadata <- s3StorageClient.uploadFile(fileData, bucket, key)
_ <- log(bucket, key, s"Finished upload. Digest: ${uploadMetadata.checksum}")
attr = fileMetadata(path, uuid, uploadMetadata)
_ <- log(bucket, key, s"Checking for object existence")
_ <- validateObjectDoesNotExist(bucket, key)
_ <- log(bucket, key, s"Beginning upload")
contentLength <- IO.fromOption(entity.contentLengthOption)(ContentLengthIsMissing)
(duration, _) <- s3StorageClient.uploadFile(fileData, bucket, key, contentLength).timed
_ <- log(bucket, key, s"Finished upload for $location after ${duration.toSeconds} seconds.")
headResponse <- s3StorageClient.headObject(bucket, key)
attr = fileMetadata(location, uuid, headResponse)
} yield attr)
.onError(e => logger.error(e)("Unexpected error when storing file"))
.adaptError { err => UnexpectedSaveError(key, err.getMessage) }
Expand All @@ -62,12 +62,12 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient, locationGenerato
private def fileMetadata(
location: Uri,
uuid: UUID,
uploadMetadata: UploadMetadata
headResponse: HeadObject
): FileStorageMetadata =
FileStorageMetadata(
uuid = uuid,
bytes = uploadMetadata.fileSize,
digest = Digest.ComputedDigest(DigestAlgorithm.SHA256, uploadMetadata.checksum),
bytes = headResponse.fileSize,
digest = headResponse.digest,
origin = Client,
location = location,
path = location.path
Expand Down
Loading