Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Support metadata for multipart upload
Browse files Browse the repository at this point in the history
  • Loading branch information
chetanmeh committed Mar 26, 2018
1 parent e7a1c38 commit 1526158
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 88 deletions.
10 changes: 8 additions & 2 deletions src/main/scala/io/findify/s3mock/provider/FileProvider.scala
Expand Up @@ -90,11 +90,12 @@ class FileProvider(dir:String) extends Provider with LazyLogging {
GetObjectData(file.byteArray, meta)
}

override def putObjectMultipartStart(bucket:String, key:String):InitiateMultipartUploadResult = {
override def putObjectMultipartStart(bucket:String, key:String, metadata: ObjectMetadata):InitiateMultipartUploadResult = {
val id = Math.abs(Random.nextLong()).toString
val bucketFile = File(s"$dir/$bucket")
if (!bucketFile.exists) throw NoSuchBucketException(bucket)
File(s"$dir/.mp/$bucket/$key/$id/.keep").createIfNotExists(createParents = true)
metadataStore.put(bucket, key, metadata)
logger.debug(s"starting multipart upload for s3://$bucket/$key")
InitiateMultipartUploadResult(bucket, key, id)
}
Expand All @@ -116,8 +117,13 @@ class FileProvider(dir:String) extends Provider with LazyLogging {
val data = parts.fold(Array[Byte]())(_ ++ _)
file.writeBytes(data.toIterator)
File(s"$dir/.mp/$bucket/$key").delete()
val hash = file.md5
metadataStore.get(bucket, key).foreach {m =>
m.setContentMD5(hash)
m.setLastModified(org.joda.time.DateTime.now().toDate)
}
logger.debug(s"completed multipart upload for s3://$bucket/$key")
CompleteMultipartUploadResult(bucket, key, file.md5)
CompleteMultipartUploadResult(bucket, key, hash)
}

override def copyObject(sourceBucket: String, sourceKey: String, destBucket: String, destKey: String, newMeta: Option[ObjectMetadata] = None): CopyObjectResult = {
Expand Down
10 changes: 8 additions & 2 deletions src/main/scala/io/findify/s3mock/provider/InMemoryProvider.scala
Expand Up @@ -101,11 +101,12 @@ class InMemoryProvider extends Provider with LazyLogging {
}
}

override def putObjectMultipartStart(bucket: String, key: String): InitiateMultipartUploadResult = {
override def putObjectMultipartStart(bucket: String, key: String, metadata: ObjectMetadata): InitiateMultipartUploadResult = {
bucketDataStore.get(bucket) match {
case Some(_) =>
val id = Math.abs(Random.nextLong()).toString
multipartTempStore.putIfAbsent(id, new mutable.TreeSet)
metadataStore.put(bucket, key, metadata)
logger.debug(s"starting multipart upload for s3://$bucket/$key")
InitiateMultipartUploadResult(bucket, key, id)
case None => throw NoSuchBucketException(bucket)
Expand All @@ -128,7 +129,12 @@ class InMemoryProvider extends Provider with LazyLogging {
bucketContent.keysInBucket.put(key, KeyContents(DateTime.now, completeBytes))
multipartTempStore.remove(uploadId)
logger.debug(s"completed multipart upload for s3://$bucket/$key")
CompleteMultipartUploadResult(bucket, key, DigestUtils.md5Hex(completeBytes))
val hash = DigestUtils.md5Hex(completeBytes)
metadataStore.get(bucket, key).foreach {m =>
m.setContentMD5(hash)
m.setLastModified(org.joda.time.DateTime.now().toDate)
}
CompleteMultipartUploadResult(bucket, key, hash)
case None => throw NoSuchBucketException(bucket)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/findify/s3mock/provider/Provider.scala
Expand Up @@ -18,7 +18,7 @@ trait Provider {
def createBucket(name:String, bucketConfig:CreateBucketConfiguration):CreateBucket
def putObject(bucket:String, key:String, data:Array[Byte], metadata: ObjectMetadata):Unit
def getObject(bucket:String, key:String): GetObjectData
def putObjectMultipartStart(bucket:String, key:String):InitiateMultipartUploadResult
def putObjectMultipartStart(bucket:String, key:String, metadata: ObjectMetadata):InitiateMultipartUploadResult
def putObjectMultipartPart(bucket:String, key:String, partNumber:Int, uploadId:String, data:Array[Byte]):Unit
def putObjectMultipartComplete(bucket:String, key:String, uploadId:String, request:CompleteMultipartUpload):CompleteMultipartUploadResult
def deleteObject(bucket:String, key:String):Unit
Expand Down
75 changes: 75 additions & 0 deletions src/main/scala/io/findify/s3mock/route/MetadataUtil.scala
@@ -0,0 +1,75 @@
package io.findify.s3mock.route

import java.lang.Iterable
import java.util

import akka.http.javadsl.model.HttpHeader
import akka.http.scaladsl.model.HttpRequest
import com.amazonaws.AmazonClientException
import com.amazonaws.services.s3.Headers
import com.amazonaws.services.s3.internal.ServiceUtils
import com.amazonaws.services.s3.model.ObjectMetadata
import com.amazonaws.util.{DateUtils, StringUtils}
import com.typesafe.scalalogging.LazyLogging

import scala.collection.JavaConverters._

object MetadataUtil extends LazyLogging {

def populateObjectMetadata(request: HttpRequest): ObjectMetadata = {
val metadata = new ObjectMetadata()
val ignoredHeaders: util.HashSet[String] = new util.HashSet[String]()
ignoredHeaders.add(Headers.DATE)
ignoredHeaders.add(Headers.SERVER)
ignoredHeaders.add(Headers.REQUEST_ID)
ignoredHeaders.add(Headers.EXTENDED_REQUEST_ID)
ignoredHeaders.add(Headers.CLOUD_FRONT_ID)
ignoredHeaders.add(Headers.CONNECTION)

val headers: Iterable[HttpHeader] = request.getHeaders()
for (header <- headers.asScala) {
var key: String = header.name()
if (StringUtils.beginsWithIgnoreCase(key, Headers.S3_USER_METADATA_PREFIX)) {
key = key.substring(Headers.S3_USER_METADATA_PREFIX.length)
metadata.addUserMetadata(key, header.value())
}
// else if (ignoredHeaders.contains(key)) {
// ignore...
// }
else if (key.equalsIgnoreCase(Headers.LAST_MODIFIED)) try
metadata.setHeader(key, ServiceUtils.parseRfc822Date(header.value()))

catch {
case pe: Exception => logger.warn("Unable to parse last modified date: " + header.value(), pe)
}
else if (key.equalsIgnoreCase(Headers.CONTENT_LENGTH)) try
metadata.setHeader(key, java.lang.Long.parseLong(header.value()))

catch {
case nfe: NumberFormatException => throw new AmazonClientException("Unable to parse content length. Header 'Content-Length' has corrupted data" + nfe.getMessage, nfe)
}
else if (key.equalsIgnoreCase(Headers.ETAG)) metadata.setHeader(key, ServiceUtils.removeQuotes(header.value()))
else if (key.equalsIgnoreCase(Headers.EXPIRES)) try
metadata.setHttpExpiresDate(DateUtils.parseRFC822Date(header.value()))

catch {
case pe: Exception => logger.warn("Unable to parse http expiration date: " + header.value(), pe)
}
// else if (key.equalsIgnoreCase(Headers.EXPIRATION)) new ObjectExpirationHeaderHandler[ObjectMetadata]().handle(metadata, response)
// else if (key.equalsIgnoreCase(Headers.RESTORE)) new ObjectRestoreHeaderHandler[ObjectRestoreResult]().handle(metadata, response)
// else if (key.equalsIgnoreCase(Headers.REQUESTER_CHARGED_HEADER)) new S3RequesterChargedHeaderHandler[S3RequesterChargedResult]().handle(metadata, response)
else if (key.equalsIgnoreCase(Headers.S3_PARTS_COUNT)) try
metadata.setHeader(key, header.value().toInt)

catch {
case nfe: NumberFormatException => throw new AmazonClientException("Unable to parse part count. Header x-amz-mp-parts-count has corrupted data" + nfe.getMessage, nfe)
}
else metadata.setHeader(key, header.value())
}

if(metadata.getContentType == null){
metadata.setContentType(request.entity.getContentType.toString)
}
metadata
}
}
64 changes: 1 addition & 63 deletions src/main/scala/io/findify/s3mock/route/PutObject.scala
@@ -1,26 +1,17 @@
package io.findify.s3mock.route

import java.lang.Iterable
import java.util

import akka.http.javadsl.model.HttpHeader
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import akka.util.ByteString
import com.amazonaws.AmazonClientException
import com.amazonaws.services.s3.Headers
import com.amazonaws.services.s3.internal.ServiceUtils
import com.amazonaws.services.s3.model.ObjectMetadata
import com.amazonaws.util.{DateUtils, StringUtils}
import com.typesafe.scalalogging.LazyLogging
import io.findify.s3mock.S3ChunkedProtocolStage
import io.findify.s3mock.error.{InternalErrorException, NoSuchBucketException}
import io.findify.s3mock.provider.Provider
import org.apache.commons.codec.digest.DigestUtils

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -95,60 +86,7 @@ case class PutObject(implicit provider:Provider, mat:Materializer) extends LazyL
}

private def populateObjectMetadata(request: HttpRequest, bytes: Array[Byte]): ObjectMetadata = {
val metadata = new ObjectMetadata()
val ignoredHeaders: util.HashSet[String] = new util.HashSet[String]()
ignoredHeaders.add(Headers.DATE)
ignoredHeaders.add(Headers.SERVER)
ignoredHeaders.add(Headers.REQUEST_ID)
ignoredHeaders.add(Headers.EXTENDED_REQUEST_ID)
ignoredHeaders.add(Headers.CLOUD_FRONT_ID)
ignoredHeaders.add(Headers.CONNECTION)

val headers: Iterable[HttpHeader] = request.getHeaders()
for (header <- headers.asScala) {
var key: String = header.name()
if (StringUtils.beginsWithIgnoreCase(key, Headers.S3_USER_METADATA_PREFIX)) {
key = key.substring(Headers.S3_USER_METADATA_PREFIX.length)
metadata.addUserMetadata(key, header.value())
}
// else if (ignoredHeaders.contains(key)) {
// ignore...
// }
else if (key.equalsIgnoreCase(Headers.LAST_MODIFIED)) try
metadata.setHeader(key, ServiceUtils.parseRfc822Date(header.value()))

catch {
case pe: Exception => logger.warn("Unable to parse last modified date: " + header.value(), pe)
}
else if (key.equalsIgnoreCase(Headers.CONTENT_LENGTH)) try
metadata.setHeader(key, java.lang.Long.parseLong(header.value()))

catch {
case nfe: NumberFormatException => throw new AmazonClientException("Unable to parse content length. Header 'Content-Length' has corrupted data" + nfe.getMessage, nfe)
}
else if (key.equalsIgnoreCase(Headers.ETAG)) metadata.setHeader(key, ServiceUtils.removeQuotes(header.value()))
else if (key.equalsIgnoreCase(Headers.EXPIRES)) try
metadata.setHttpExpiresDate(DateUtils.parseRFC822Date(header.value()))

catch {
case pe: Exception => logger.warn("Unable to parse http expiration date: " + header.value(), pe)
}
// else if (key.equalsIgnoreCase(Headers.EXPIRATION)) new ObjectExpirationHeaderHandler[ObjectMetadata]().handle(metadata, response)
// else if (key.equalsIgnoreCase(Headers.RESTORE)) new ObjectRestoreHeaderHandler[ObjectRestoreResult]().handle(metadata, response)
// else if (key.equalsIgnoreCase(Headers.REQUESTER_CHARGED_HEADER)) new S3RequesterChargedHeaderHandler[S3RequesterChargedResult]().handle(metadata, response)
else if (key.equalsIgnoreCase(Headers.S3_PARTS_COUNT)) try
metadata.setHeader(key, header.value().toInt)

catch {
case nfe: NumberFormatException => throw new AmazonClientException("Unable to parse part count. Header x-amz-mp-parts-count has corrupted data" + nfe.getMessage, nfe)
}
else metadata.setHeader(key, header.value())
}

if(metadata.getContentType == null){
metadata.setContentType(request.entity.getContentType.toString)
}
metadata.getRawMetadata
val metadata = MetadataUtil.populateObjectMetadata(request)
metadata.setContentMD5(DigestUtils.md5Hex(bytes))
metadata
}
Expand Down
Expand Up @@ -15,27 +15,30 @@ import scala.util.{Failure, Success, Try}
*/
case class PutObjectMultipartStart(implicit provider:Provider) extends LazyLogging {
def route(bucket:String, path:String) = post {
parameter('uploads) { mp =>
complete {
logger.info(s"multipart upload start to $bucket/$path")
Try(provider.putObjectMultipartStart(bucket, path)) match {
case Success(result) =>
HttpResponse(
StatusCodes.OK,
entity = HttpEntity(
ContentTypes.`application/octet-stream`, result.toXML.toString().getBytes(StandardCharsets.UTF_8)
extractRequest { request =>
parameter('uploads) { mp =>
complete {
val metadata = MetadataUtil.populateObjectMetadata(request)
logger.info(s"multipart upload start to $bucket/$path")
Try(provider.putObjectMultipartStart(bucket, path, metadata)) match {
case Success(result) =>
HttpResponse(
StatusCodes.OK,
entity = HttpEntity(
ContentTypes.`application/octet-stream`, result.toXML.toString().getBytes(StandardCharsets.UTF_8)
)
)
)
case Failure(e: NoSuchBucketException) =>
HttpResponse(
StatusCodes.NotFound,
entity = e.toXML.toString()
)
case Failure(t) =>
HttpResponse(
StatusCodes.InternalServerError,
entity = InternalErrorException(t).toXML.toString()
)
case Failure(e: NoSuchBucketException) =>
HttpResponse(
StatusCodes.NotFound,
entity = e.toXML.toString()
)
case Failure(t) =>
HttpResponse(
StatusCodes.InternalServerError,
entity = InternalErrorException(t).toXML.toString()
)
}
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/test/scala/io/findify/s3mock/MultipartUploadTest.scala
Expand Up @@ -84,5 +84,23 @@ class MultipartUploadTest extends S3MockTest {
exc.getStatusCode shouldBe 404
exc.getErrorCode shouldBe "NoSuchBucket"
}

it should "upload multipart with metadata" in {
s3.createBucket("getput")
val metadata: ObjectMetadata = new ObjectMetadata()
metadata.setContentType("application/json")
metadata.addUserMetadata("metamaic", "maic")
val init = s3.initiateMultipartUpload(new InitiateMultipartUploadRequest("getput", "foo4", metadata))
val p1 = s3.uploadPart(new UploadPartRequest().withBucketName("getput").withPartSize(10).withKey("foo4").withPartNumber(1).withUploadId(init.getUploadId).withInputStream(new ByteArrayInputStream("hellohello".getBytes())))
val p2 = s3.uploadPart(new UploadPartRequest().withBucketName("getput").withPartSize(10).withKey("foo4").withPartNumber(2).withUploadId(init.getUploadId).withInputStream(new ByteArrayInputStream("worldworld".getBytes())))
val result = s3.completeMultipartUpload(new CompleteMultipartUploadRequest("getput", "foo4", init.getUploadId, List(p1.getPartETag, p2.getPartETag).asJava))
result.getKey shouldBe "foo4"
val s3Object = s3.getObject("getput", "foo4")
getContent(s3Object) shouldBe "hellohelloworldworld"

val actualMetadata: ObjectMetadata = s3Object.getObjectMetadata
actualMetadata.getContentType shouldBe "application/json"
actualMetadata.getUserMetadata.get("metamaic") shouldBe "maic"
}
}
}

0 comments on commit 1526158

Please sign in to comment.