Skip to content

Commit

Permalink
Emit metrics for S3UploadThreadPool (#16616)
Browse files Browse the repository at this point in the history
* Emit metrics for S3UploadThreadPool

* Address review comments

* Revert unnecessary formatting change

* Revert unnecessary formatting change in metrics.md file

* Address review comments

* Add metric for task duration

* Minor fix in metrics.md

* Add s3Key and uploadId in the log message

* Address review comments

* Create new instance of ServiceMetricEvent.Builder for thread safety

* Address review comments

* Address review comments
  • Loading branch information
Akshat-Jain committed Jun 21, 2024
1 parent 35709de commit cd438b1
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 31 deletions.
13 changes: 13 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,19 @@ These metrics are only available if the `OshiSysMonitor` module is included.
|`sys/tcpv4/out/rsts`|Total "out reset" packets sent to reset the connection||Generally 0|
|`sys/tcpv4/retrans/segs`|Total segments re-transmitted||Varies|


## S3 multi-part upload

These metrics are only available if the `druid-s3-extensions` module is included and if certain specific features are being used: MSQ export to S3, durable intermediate storage on S3.

|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
|`s3/upload/part/queueSize`|Number of items currently waiting in queue to be uploaded to S3. Each item in the queue corresponds to a single part in a multi-part upload.||Varies|
|`s3/upload/part/queuedTime`|Milliseconds spent by a single item (or part) in queue before it starts getting uploaded to S3.|`uploadId`, `partNumber`|Varies|
|`s3/upload/part/time`|Milliseconds taken to upload a single part of a multi-part upload to S3.|`uploadId`, `partNumber`|Varies|
|`s3/upload/total/time`|Milliseconds taken for uploading all parts of a multi-part upload to S3.|`uploadId`|Varies|
|`s3/upload/total/bytes`|Total bytes uploaded to S3 during a multi-part upload.|`uploadId`|Varies|

## Cgroup

These metrics are available on operating systems with the cgroup kernel feature. All the values are derived by reading from `/sys/fs/cgroup`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;

Expand Down Expand Up @@ -69,6 +70,11 @@
*/
public class RetryableS3OutputStream extends OutputStream
{
// Metric related constants.
private static final String METRIC_PREFIX = "s3/upload/total/";
private static final String METRIC_TOTAL_UPLOAD_TIME = METRIC_PREFIX + "time";
private static final String METRIC_TOTAL_UPLOAD_BYTES = METRIC_PREFIX + "bytes";

private static final Logger LOG = new Logger(RetryableS3OutputStream.class);

private final S3OutputConfig config;
Expand Down Expand Up @@ -208,14 +214,20 @@ public void close() throws IOException
org.apache.commons.io.FileUtils.forceDelete(chunkStorePath);
LOG.info("Deleted chunkStorePath[%s]", chunkStorePath);

// This should be emitted as a metric
long totalChunkSize = (currentChunk.id - 1) * chunkSize + currentChunk.length();
final long totalBytesUploaded = (currentChunk.id - 1) * chunkSize + currentChunk.length();
final long totalUploadTimeMillis = pushStopwatch.elapsed(TimeUnit.MILLISECONDS);
LOG.info(
"Pushed total [%d] parts containing [%d] bytes in [%d]ms.",
"Pushed total [%d] parts containing [%d] bytes in [%d]ms for s3Key[%s], uploadId[%s].",
futures.size(),
totalChunkSize,
pushStopwatch.elapsed(TimeUnit.MILLISECONDS)
totalBytesUploaded,
totalUploadTimeMillis,
s3Key,
uploadId
);

final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setDimension("uploadId", uploadId);
uploadManager.emitMetric(builder.setMetric(METRIC_TOTAL_UPLOAD_TIME, totalUploadTimeMillis));
uploadManager.emitMetric(builder.setMetric(METRIC_TOTAL_UPLOAD_BYTES, totalBytesUploaded));
});

try (Closer ignored = closer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@
import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.utils.RuntimeInfo;

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

/**
* This class manages uploading files to S3 in chunks, while ensuring that the
Expand All @@ -44,18 +48,34 @@
@ManageLifecycle
public class S3UploadManager
{
// Metric related constants.
private static final String METRIC_PREFIX = "s3/upload/part/";
private static final String METRIC_PART_QUEUED_TIME = METRIC_PREFIX + "queuedTime";
private static final String METRIC_QUEUE_SIZE = METRIC_PREFIX + "queueSize";
private static final String METRIC_PART_UPLOAD_TIME = METRIC_PREFIX + "time";

private final ExecutorService uploadExecutor;
private final ServiceEmitter emitter;

private static final Logger log = new Logger(S3UploadManager.class);

// For metrics regarding uploadExecutor.
private final AtomicInteger executorQueueSize = new AtomicInteger(0);

@Inject
public S3UploadManager(S3OutputConfig s3OutputConfig, S3ExportConfig s3ExportConfig, RuntimeInfo runtimeInfo)
public S3UploadManager(
S3OutputConfig s3OutputConfig,
S3ExportConfig s3ExportConfig,
RuntimeInfo runtimeInfo,
ServiceEmitter emitter
)
{
int poolSize = Math.max(4, runtimeInfo.getAvailableProcessors());
int maxNumChunksOnDisk = computeMaxNumChunksOnDisk(s3OutputConfig, s3ExportConfig);
this.uploadExecutor = createExecutorService(poolSize, maxNumChunksOnDisk);
log.info("Initialized executor service for S3 multipart upload with pool size [%d] and work queue capacity [%d]",
poolSize, maxNumChunksOnDisk);
this.emitter = emitter;
}

/**
Expand Down Expand Up @@ -87,25 +107,36 @@ public Future<UploadPartResult> queueChunkForUpload(
S3OutputConfig config
)
{
return uploadExecutor.submit(() -> RetryUtils.retry(
() -> {
log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, uploadId);
UploadPartResult uploadPartResult = uploadPartIfPossible(
s3Client,
uploadId,
config.getBucket(),
key,
chunkNumber,
chunkFile
);
if (!chunkFile.delete()) {
log.warn("Failed to delete chunk [%s]", chunkFile.getAbsolutePath());
}
return uploadPartResult;
},
S3Utils.S3RETRY,
config.getMaxRetry()
));
final Stopwatch stopwatch = Stopwatch.createStarted();
executorQueueSize.incrementAndGet();
return uploadExecutor.submit(() -> {
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
emitMetric(metricBuilder.setMetric(METRIC_QUEUE_SIZE, executorQueueSize.decrementAndGet()));
metricBuilder.setDimension("uploadId", uploadId).setDimension("partNumber", chunkNumber);
emitMetric(metricBuilder.setMetric(METRIC_PART_QUEUED_TIME, stopwatch.millisElapsed()));
stopwatch.restart();

return RetryUtils.retry(
() -> {
log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, uploadId);
UploadPartResult uploadPartResult = uploadPartIfPossible(
s3Client,
uploadId,
config.getBucket(),
key,
chunkNumber,
chunkFile
);
if (!chunkFile.delete()) {
log.warn("Failed to delete chunk [%s]", chunkFile.getAbsolutePath());
}
emitMetric(metricBuilder.setMetric(METRIC_PART_UPLOAD_TIME, stopwatch.millisElapsed()));
return uploadPartResult;
},
S3Utils.S3RETRY,
config.getMaxRetry()
);
});
}

@VisibleForTesting
Expand Down Expand Up @@ -149,4 +180,8 @@ public void stop()
uploadExecutor.shutdown();
}

protected void emitMetric(ServiceMetricEvent.Builder builder)
{
emitter.emit(builder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorModule;
Expand Down Expand Up @@ -158,7 +159,8 @@ public void configure(Binder binder)
new S3UploadManager(
new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1),
new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null),
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0))
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0),
new StubServiceEmitter())
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
Expand Down Expand Up @@ -105,7 +106,8 @@ public int getMaxRetry()
s3UploadManager = new S3UploadManager(
new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1),
new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null),
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0));
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0),
new StubServiceEmitter());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
Expand Down Expand Up @@ -90,7 +91,8 @@ public void setup()
storageConnector = new S3StorageConnector(s3OutputConfig, service, new S3UploadManager(
s3OutputConfig,
new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null),
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)));
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0),
new StubServiceEmitter()));
}
catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.utils.RuntimeInfo;
Expand All @@ -43,14 +44,16 @@ public class S3UploadManagerTest
private S3UploadManager s3UploadManager;
private S3OutputConfig s3OutputConfig;
private S3ExportConfig s3ExportConfig;
private StubServiceEmitter serviceEmitter;

@Before
public void setUp()
{
s3OutputConfig = new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("100MiB"), 1);
s3ExportConfig = new S3ExportConfig("tempDir", new HumanReadableBytes("200MiB"), 1, null);
serviceEmitter = new StubServiceEmitter();
final RuntimeInfo runtimeInfo = new DruidProcessingConfigTest.MockRuntimeInfo(8, 0, 0);
s3UploadManager = new S3UploadManager(s3OutputConfig, s3ExportConfig, runtimeInfo);
s3UploadManager = new S3UploadManager(s3OutputConfig, s3ExportConfig, runtimeInfo, serviceEmitter);
}

@Test
Expand All @@ -75,6 +78,10 @@ public void testQueueChunkForUpload() throws Exception
UploadPartResult futureResult = result.get();
Assert.assertEquals(chunkId, futureResult.getPartNumber());
Assert.assertEquals("etag", futureResult.getETag());

serviceEmitter.verifyEmitted("s3/upload/part/queuedTime", 1);
serviceEmitter.verifyEmitted("s3/upload/part/queueSize", 1);
serviceEmitter.verifyEmitted("s3/upload/part/time", 1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Test implementation of {@link ServiceEmitter} that collects emitted metrics
Expand All @@ -38,7 +38,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie
{
private final List<Event> events = new ArrayList<>();
private final List<AlertEvent> alertEvents = new ArrayList<>();
private final Map<String, List<ServiceMetricEvent>> metricEvents = new HashMap<>();
private final ConcurrentHashMap<String, List<ServiceMetricEvent>> metricEvents = new ConcurrentHashMap<>();

public StubServiceEmitter()
{
Expand Down

0 comments on commit cd438b1

Please sign in to comment.