Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,12 @@ The following common service properties control how durable storage behaves:
|Parameter |Default | Description |
|-------------------|----------------------------------------|----------------------|
|`druid.msq.intermediate.storage.bucket` | n/a | The bucket in S3 where you want to store intermediate files. |
| `druid.msq.intermediate.storage.chunkSize` | n/a | Optional. Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. Druid computes the chunk size automatically if no value is provided.|
|`druid.msq.intermediate.storage.chunkSize` | 100MiB | Optional. Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.|
|`druid.msq.intermediate.storage.enable` | true | Required. Whether to enable durable storage for the cluster.|
| `druid.msq.intermediate.storage.maxTriesOnTransientErrors` | 10 | Optional. Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. |
|`druid.msq.intermediate.storage.type` | `s3` if your deep storage is S3 | Required. The type of storage to use. You can either set this to `local` or `s3`. |
|`druid.msq.intermediate.storage.maxRetry` | 10 | Optional. Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. |
|`druid.msq.intermediate.storage.prefix` | n/a | S3 prefix to store intermediate stage results. Provide a unique value for the prefix. Don't share the same prefix between clusters. If the location includes other files or directories, then they will get cleaned up as well. |
| `druid.msq.intermediate.storage.tempDir`| | Required. Directory path on the local disk to temporarily store intermediate stage results. |
|`druid.msq.intermediate.storage.tempDir`| n/a | Required. Directory path on the local disk to temporarily store intermediate stage results. |
|`druid.msq.intermediate.storage.type` | `s3` if your deep storage is S3 | Required. The type of storage to use. You can either set this to `local` or `s3`. |

In addition to the common service properties, there are certain properties that you configure on the Overlord specifically to clean up intermediate files:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.google.common.io.CountingOutputStream;
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
Expand Down Expand Up @@ -203,9 +202,6 @@ private void pushCurrentChunk() throws IOException
try {
if (chunk.length() > 0) {
resultsSize += chunk.length();
if (resultsSize > config.getMaxResultsSize()) {
throw new IOE("Exceeded max results size [%s]", config.getMaxResultsSize());
}

pushStopwatch.start();
pushResults.add(push(chunk));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ public class S3OutputConfig
{
public static final long S3_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES = 5L * 1024 * 1024;
public static final long S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES = 5L * 1024 * 1024 * 1024L;
private static final int S3_MULTIPART_UPLOAD_MAX_NUM_PARTS = 10_000;
public static final long S3_MULTIPART_UPLOAD_MIN_OBJECT_SIZE_BYTES = 5L * 1024 * 1024;
public static final long S3_MULTIPART_UPLOAD_MAX_OBJECT_SIZE_BYTES = 5L * 1024 * 1024 * 1024 * 1024;

@JsonProperty
private String bucket;
Expand All @@ -48,10 +45,7 @@ public class S3OutputConfig

@Nullable
@JsonProperty
private HumanReadableBytes chunkSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the documentation also.
The release notes should mention that the defaults have changed and the value is removed.


@JsonProperty
private HumanReadableBytes maxResultsSize = new HumanReadableBytes("100MiB");
private HumanReadableBytes chunkSize = new HumanReadableBytes("100MiB");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need :

 public static final long S3_MULTIPART_UPLOAD_MIN_OBJECT_SIZE_BYTES = 5L * 1024*1024;
 public static final long S3_MULTIPART_UPLOAD_MAX_OBJECT_SIZE_BYTES = 5L * 1024 * 1024 * 1024 * 1024;


/**
* Max number of tries for each upload.
Expand All @@ -65,11 +59,10 @@ public S3OutputConfig(
@JsonProperty(value = "prefix", required = true) String prefix,
@JsonProperty(value = "tempDir", required = true) File tempDir,
@JsonProperty("chunkSize") HumanReadableBytes chunkSize,
@JsonProperty("maxResultsSize") HumanReadableBytes maxResultsSize,
@JsonProperty("maxRetry") Integer maxRetry
)
{
this(bucket, prefix, tempDir, chunkSize, maxResultsSize, maxRetry, true);
this(bucket, prefix, tempDir, chunkSize, maxRetry, true);
}

@VisibleForTesting
Expand All @@ -80,8 +73,6 @@ protected S3OutputConfig(
@Nullable
HumanReadableBytes chunkSize,
@Nullable
HumanReadableBytes maxResultsSize,
@Nullable
Integer maxRetry,
boolean validation
)
Expand All @@ -92,9 +83,6 @@ protected S3OutputConfig(
if (chunkSize != null) {
this.chunkSize = chunkSize;
}
if (maxResultsSize != null) {
this.maxResultsSize = maxResultsSize;
}
if (maxRetry != null) {
this.maxRetry = maxRetry;
}
Expand All @@ -116,21 +104,9 @@ private void validateFields()
);
}

// check result size which relies on the s3 multipart upload limits.
// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html for more details.
if (maxResultsSize.getBytes() < S3_MULTIPART_UPLOAD_MIN_OBJECT_SIZE_BYTES
|| maxResultsSize.getBytes() > S3_MULTIPART_UPLOAD_MAX_OBJECT_SIZE_BYTES) {
throw new IAE(
"maxResultsSize[%d] should be >= [%d] and <= [%d] bytes",
maxResultsSize.getBytes(),
S3_MULTIPART_UPLOAD_MIN_OBJECT_SIZE_BYTES,
S3_MULTIPART_UPLOAD_MAX_OBJECT_SIZE_BYTES
);
}

//check results size and chunk size are compatible.
if (chunkSize != null) {
validateChunkSize(maxResultsSize.getBytes(), chunkSize.getBytes());
validateChunkSize(chunkSize.getBytes());
}
}

Expand All @@ -151,38 +127,16 @@ public File getTempDir()

public Long getChunkSize()
{
return chunkSize == null ? computeMinChunkSize(getMaxResultsSize()) : chunkSize.getBytes();
}

public long getMaxResultsSize()
{
return maxResultsSize.getBytes();
return chunkSize.getBytes();
}

public int getMaxRetry()
{
return maxRetry;
}


public static long computeMinChunkSize(long maxResultsSize)
{
return Math.max(
(long) Math.ceil(maxResultsSize / (double) S3_MULTIPART_UPLOAD_MAX_NUM_PARTS),
S3_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES
);
}

private static void validateChunkSize(long maxResultsSize, long chunkSize)
private static void validateChunkSize(long chunkSize)
{
if (S3OutputConfig.computeMinChunkSize(maxResultsSize) > chunkSize) {
throw new IAE(
"chunkSize[%d] is too small for maxResultsSize[%d]. chunkSize should be at least [%d]",
chunkSize,
maxResultsSize,
S3OutputConfig.computeMinChunkSize(maxResultsSize)
);
}
if (S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES < chunkSize) {
throw new IAE(
"chunkSize[%d] should be smaller than [%d]",
Expand All @@ -206,14 +160,13 @@ public boolean equals(Object o)
&& bucket.equals(that.bucket)
&& prefix.equals(that.prefix)
&& tempDir.equals(that.tempDir)
&& Objects.equals(chunkSize, that.chunkSize)
&& maxResultsSize.equals(that.maxResultsSize);
&& Objects.equals(chunkSize, that.chunkSize);
}

@Override
public int hashCode()
{
return Objects.hash(bucket, prefix, tempDir, chunkSize, maxResultsSize, maxRetry);
return Objects.hash(bucket, prefix, tempDir, chunkSize, maxRetry);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ public S3StorageConnectorProvider(
@JsonProperty(value = "prefix", required = true) String prefix,
@JsonProperty(value = "tempDir", required = true) File tempDir,
@JsonProperty("chunkSize") HumanReadableBytes chunkSize,
@JsonProperty("maxResultsSize") HumanReadableBytes maxResultsSize,
@JsonProperty("maxRetry") Integer maxRetry
)
{
super(bucket, prefix, tempDir, chunkSize, maxResultsSize, maxRetry);
super(bucket, prefix, tempDir, chunkSize, maxRetry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public class RetryableS3OutputStreamTest


private S3OutputConfig config;
private long maxResultsSize;
private long chunkSize;

@Before
Expand All @@ -78,7 +77,6 @@ public void setup() throws IOException
"TEST",
tempDir,
HumanReadableBytes.valueOf(chunkSize),
HumanReadableBytes.valueOf(maxResultsSize),
2,
false
)
Expand All @@ -95,12 +93,6 @@ public Long getChunkSize()
return chunkSize;
}

@Override
public long getMaxResultsSize()
{
return maxResultsSize;
}

@Override
public int getMaxRetry()
{
Expand All @@ -112,7 +104,6 @@ public int getMaxRetry()
@Test
public void testWriteAndHappy() throws IOException
{
maxResultsSize = 1000;
chunkSize = 10;
ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES);
try (RetryableS3OutputStream out = new RetryableS3OutputStream(
Expand All @@ -135,7 +126,6 @@ public void testWriteAndHappy() throws IOException
@Test
public void testWriteSizeLargerThanConfiguredMaxChunkSizeShouldSucceed() throws IOException
{
maxResultsSize = 1000;
chunkSize = 10;
ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES * 3);
try (RetryableS3OutputStream out = new RetryableS3OutputStream(
Expand All @@ -158,7 +148,6 @@ public void testWriteSizeLargerThanConfiguredMaxChunkSizeShouldSucceed() throws
@Test
public void testWriteSmallBufferShouldSucceed() throws IOException
{
maxResultsSize = 1000;
chunkSize = 128;
try (RetryableS3OutputStream out = new RetryableS3OutputStream(
config,
Expand All @@ -175,43 +164,11 @@ public void testWriteSmallBufferShouldSucceed() throws IOException
s3.assertCompleted(chunkSize, 600);
}

@Test
public void testHitResultsSizeLimit() throws IOException
{
maxResultsSize = 50;
ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES);
try (RetryableS3OutputStream out = new RetryableS3OutputStream(
config,
s3,
path,
false
)) {
for (int i = 0; i < 14; i++) {
bb.clear();
bb.putInt(i);
out.write(bb.array());
}

Assert.assertThrows(
"Exceeded max results size [50]",
IOException.class,
() -> {
bb.clear();
bb.putInt(14);
out.write(bb.array());
}
);
}

s3.assertCancelled();
}

@Test
public void testSuccessToUploadAfterRetry() throws IOException
{
final TestAmazonS3 s3 = new TestAmazonS3(1);

maxResultsSize = 1000;
chunkSize = 10;
ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES);
try (RetryableS3OutputStream out = new RetryableS3OutputStream(
Expand All @@ -236,7 +193,6 @@ public void testFailToUploadAfterRetries() throws IOException
{
final TestAmazonS3 s3 = new TestAmazonS3(3);

maxResultsSize = 1000;
ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES);
try (RetryableS3OutputStream out = new RetryableS3OutputStream(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.storage.s3.output;

import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IAE;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -38,53 +37,9 @@ public class S3OutputConfigTest
private static String PREFIX = "PREFIX";
private static int MAX_RETRY_COUNT = 0;


@Test
public void testTooSmallChunkSize() throws IOException
{
long maxResultsSize = 100_000_000_000L;
long chunkSize = 9000_000L;

expectedException.expect(IAE.class);
expectedException.expectMessage(
"chunkSize[9000000] is too small for maxResultsSize[100000000000]. chunkSize should be at least [10000000]"
);
new S3OutputConfig(
BUCKET,
PREFIX,
temporaryFolder.newFolder(),
HumanReadableBytes.valueOf(chunkSize),
HumanReadableBytes.valueOf(maxResultsSize),
MAX_RETRY_COUNT,
true
);
}

@Test
public void testTooSmallChunkSizeMaxResultsSizeIsNotRetionalToMaxPartNum() throws IOException
{
long maxResultsSize = 274_877_906_944L;
long chunkSize = 2_7487_790;

expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(
"chunkSize[27487790] is too small for maxResultsSize[274877906944]. chunkSize should be at least [27487791]"
);
new S3OutputConfig(
BUCKET,
PREFIX,
temporaryFolder.newFolder(),
HumanReadableBytes.valueOf(chunkSize),
HumanReadableBytes.valueOf(maxResultsSize),
MAX_RETRY_COUNT,
true
);
}

@Test
public void testTooLargeChunkSize() throws IOException
{
long maxResultsSize = 1024L * 1024 * 1024 * 1024;
long chunkSize = S3OutputConfig.S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES + 1;

expectedException.expect(IllegalArgumentException.class);
Expand All @@ -96,49 +51,8 @@ public void testTooLargeChunkSize() throws IOException
PREFIX,
temporaryFolder.newFolder(),
HumanReadableBytes.valueOf(chunkSize),
HumanReadableBytes.valueOf(maxResultsSize),
MAX_RETRY_COUNT,
true
);
}

@Test
public void testResultsTooLarge() throws IOException
{
long maxResultsSize = S3OutputConfig.S3_MULTIPART_UPLOAD_MAX_OBJECT_SIZE_BYTES + 1;

expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(
"maxResultsSize[5497558138881] should be >= "
);
new S3OutputConfig(
BUCKET,
PREFIX,
temporaryFolder.newFolder(),
null,
HumanReadableBytes.valueOf(maxResultsSize),
MAX_RETRY_COUNT,
true
);
}

@Test
public void testResultsTooSmall() throws IOException
{
long maxResultsSize = S3OutputConfig.S3_MULTIPART_UPLOAD_MIN_OBJECT_SIZE_BYTES - 1;
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(
"maxResultsSize[5242879] should be >= "
);
new S3OutputConfig(
BUCKET,
PREFIX,
temporaryFolder.newFolder(),
null,
HumanReadableBytes.valueOf(maxResultsSize),
MAX_RETRY_COUNT,
true
);
}

}
Loading