diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index b74a7d52186d..68719f6d8aaf 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -41,11 +41,13 @@ import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_COUNT_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_DIRECTIVE_HEADER; +import static org.apache.hadoop.ozone.s3.util.S3Utils.hasMultiChunksPayload; import static org.apache.hadoop.ozone.s3.util.S3Utils.stripQuotes; import static org.apache.hadoop.ozone.s3.util.S3Utils.validateSignatureHeader; import static org.apache.hadoop.ozone.s3.util.S3Utils.wrapInQuotes; import com.google.common.collect.ImmutableMap; +import com.google.common.io.FileBackedOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -198,12 +200,23 @@ Response handlePutRequest(ObjectRequestContext context, String keyPath, InputStr final long startNanos = context.getStartNanos(); String copyHeader = null; + FileBackedOutputStream spooledBody = null; MultiDigestInputStream multiDigestInputStream = null; try { OzoneVolume volume = context.getVolume(); OzoneBucket bucket = context.getBucket(); final String lengthHeader = getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH); + final String rawAmzContentSha256Header = getHeaders().getHeaderString(S3Consts.X_AMZ_CONTENT_SHA256); + final boolean hasMultiChunksUpload = + rawAmzContentSha256Header != null && hasMultiChunksPayload(rawAmzContentSha256Header); + boolean hasCalculatedLength = false; long length = lengthHeader != null ? Long.parseLong(lengthHeader) : 0; + if (lengthHeader == null && body != null && !hasMultiChunksUpload) { + spooledBody = new FileBackedOutputStream(32); + length = IOUtils.copyLarge(body, spooledBody, new byte[getIOBufferSize(0)]); + body = spooledBody.asByteSource().openStream(); + hasCalculatedLength = true; + } if (uploadID != null && !uploadID.equals("")) { if (getHeaders().getHeaderString(COPY_SOURCE_HEADER) == null) { @@ -245,8 +258,10 @@ Response handlePutRequest(ObjectRequestContext context, String keyPath, InputStr getHeaders().getHeaderString(S3Consts.DECODED_CONTENT_LENGTH_HEADER); boolean hasAmzDecodedLengthZero = amzDecodedLength != null && Long.parseLong(amzDecodedLength) == 0; + boolean hasKnownZeroLength = hasAmzDecodedLengthZero || + (length == 0 && (lengthHeader != null || hasCalculatedLength || body == null)); if (canCreateDirectory && - (length == 0 || hasAmzDecodedLengthZero) && + hasKnownZeroLength && StringUtils.endsWith(keyPath, "/") ) { context.setAction(S3GAction.CREATE_DIRECTORY); @@ -343,6 +358,9 @@ customMetadata, tags, multiDigestInputStream, getHeaders(), if (multiDigestInputStream != null) { multiDigestInputStream.resetDigests(); } + if (spooledBody != null) { + spooledBody.reset(); + } } } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java index 15398577b58b..a5ff089a7f72 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java @@ -44,6 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.mock; @@ -54,6 +55,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -63,10 +65,12 @@ import java.util.Base64; import java.util.Map; import java.util.stream.Stream; +import javax.ws.rs.HttpMethod; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MultivaluedHashMap; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; +import javax.xml.bind.DatatypeConverter; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; @@ -246,6 +250,48 @@ void testPutObjectWithSignedChunks() throws Exception { assertEquals(15, keyDetails.getDataSize()); } + @Test + void testPutObjectWithSignedChunksWithoutContentLengthDoesNotCalculateTransferLength() throws Exception { + String chunkedContent = "0a;chunk-signature=signature\r\n" + + "1234567890\r\n" + + "05;chunk-signature=signature\r\n" + + "abcde\r\n"; + String keyName = "streaming-object/"; + when(objectEndpoint.getContext().getMethod()).thenReturn(HttpMethod.PUT); + when(headers.getHeaderString(HttpHeaders.CONTENT_LENGTH)).thenReturn(null); + when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)) + .thenReturn(STREAMING_AWS4_HMAC_SHA256_PAYLOAD); + when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)) + .thenReturn("15"); + + try (MockedStatic mocked = mockStatic(IOUtils.class, CALLS_REAL_METHODS); + InputStream body = new ByteArrayInputStream(chunkedContent.getBytes(StandardCharsets.UTF_8))) { + mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class), any(byte[].class))) + .thenThrow(new AssertionError("streaming payload should not be buffered for length calculation")); + + assertSucceeds(() -> objectEndpoint.put(FSO_BUCKET_NAME, keyName, body)); + } + + OzoneKeyDetails keyDetails = assertKeyContent(fsoBucket, keyName, "1234567890abcde"); + assertThat(keyDetails.isFile()).as("object").isTrue(); + assertEquals(15, keyDetails.getDataSize()); + } + + @Test + void testPutObjectWithChunkedTransferEncoding() throws Exception { + when(objectEndpoint.getContext().getMethod()).thenReturn(HttpMethod.PUT); + when(headers.getHeaderString(HttpHeaders.CONTENT_LENGTH)).thenReturn(null); + when(headers.getHeaderString("Transfer-Encoding")).thenReturn("chunked"); + when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn(sha256Hex(CONTENT)); + + try (InputStream body = new ByteArrayInputStream(CONTENT.getBytes(StandardCharsets.UTF_8))) { + assertSucceeds(() -> objectEndpoint.put(BUCKET_NAME, KEY_NAME, body)); + } + + OzoneKeyDetails keyDetails = assertKeyContent(bucket, KEY_NAME, CONTENT); + assertEquals(CONTENT.length(), keyDetails.getDataSize()); + } + @Test public void testPutObjectMessageDigestResetDuringException() { MessageDigest messageDigest = mock(MessageDigest.class); @@ -493,6 +539,21 @@ void testDirectoryCreation() throws Exception { assertThat(key.isFile()).as("directory").isFalse(); } + @Test + void testDirectoryCreationWithChunkedTransferEncoding() throws Exception { + final String path = "dir/"; + when(objectEndpoint.getContext().getMethod()).thenReturn(HttpMethod.PUT); + when(headers.getHeaderString(HttpHeaders.CONTENT_LENGTH)).thenReturn(null); + when(headers.getHeaderString("Transfer-Encoding")).thenReturn("chunked"); + + try (InputStream body = new ByteArrayInputStream(new byte[0])) { + assertSucceeds(() -> objectEndpoint.put(FSO_BUCKET_NAME, path, body)); + } + + OzoneKeyDetails key = fsoBucket.getKey(path); + assertThat(key.isFile()).as("directory").isFalse(); + } + @Test void testDirectoryCreationOverFile() throws Exception { // GIVEN @@ -672,4 +733,10 @@ private Response putObject(String bucketName, String keyName) throws IOException private Response putObject(String content) throws IOException, OS3Exception { return put(objectEndpoint, BUCKET_NAME, KEY_NAME, content); } + + private static String sha256Hex(String content) throws NoSuchAlgorithmException { + MessageDigest digest = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + byte[] contentDigest = digest.digest(content.getBytes(StandardCharsets.UTF_8)); + return DatatypeConverter.printHexBinary(contentDigest).toLowerCase(); + } }