Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,22 @@ public final class StreamStatisticNames {
public static final String STREAM_FILE_CACHE_EVICTION
= "stream_file_cache_eviction";

/**
* Bytes that were prefetched by the stream.
*/
public static final String STREAM_READ_PREFETCHED_BYTES = "stream_read_prefetched_bytes";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add entries in org.apache.hadoop.fs.s3a.Statistic; these are scanned and used to create the full filesystem instance stats which the input stream updates in close()


/**
* Tracks failures in footer parsing.
*/
public static final String STREAM_READ_PARQUET_FOOTER_PARSING_FAILED = "stream_read_parquet_footer_parsing_failed";

/**
* A cache hit occurs when the request range can be satisfied by the data in the cache.
*/
public static final String STREAM_READ_CACHE_HIT = "stream_read_cache_hit";


private StreamStatisticNames() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_CACHE_HIT;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
import static org.apache.hadoop.fs.s3a.Statistic.*;

Expand Down Expand Up @@ -891,7 +894,12 @@ private InputStreamStatistics(
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED)
StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST,
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED,
StreamStatisticNames.STREAM_READ_CACHE_HIT,
StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES,
StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED
)
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
Expand Down Expand Up @@ -1128,6 +1136,32 @@ public void readVectoredBytesDiscarded(int discarded) {
bytesDiscardedInVectoredIO.addAndGet(discarded);
}

@Override
public void getRequestInitiated() {
increment(ACTION_HTTP_GET_REQUEST);
}

@Override
public void headRequestInitiated() {
increment(StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST);
}

@Override
public void bytesPrefetched(long size) {
increment(STREAM_READ_PREFETCHED_BYTES, size);
}

@Override
public void footerParsingFailed() {
increment(STREAM_READ_PARQUET_FOOTER_PARSING_FAILED);
}

@Override
public void streamReadCacheHit() {
increment(STREAM_READ_CACHE_HIT);
}


@Override
public void executorAcquired(Duration timeInQueue) {
// update the duration fields in the IOStatistics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,21 @@ public enum Statistic {
StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE,
"Gauge of active memory in use",
TYPE_GAUGE),
STREAM_READ_PREFETCH_BYTES(
StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES,
"Bytes prefetched by AAL stream",
TYPE_COUNTER),
STREAM_READ_PARQUET_FOOTER_PARSING_FAILED(
StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED,
"Count of Parquet footer parsing failures encountered by AAL",
TYPE_COUNTER),
STREAM_READ_CACHE_HIT(
StreamStatisticNames.STREAM_READ_CACHE_HIT,
"Count of cache hits in AAL stream",
TYPE_COUNTER),

/* Stream Write statistics */

/* Stream Write statistics */
STREAM_WRITE_EXCEPTIONS(
StreamStatisticNames.STREAM_WRITE_EXCEPTIONS,
"Count of stream write failures reported",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.impl.streams;

import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import ordering


/**
* Implementation of AAL's RequestCallback interface that tracks analytics operations.
*/
public class AnalyticsRequestCallback implements RequestCallback {
private final S3AInputStreamStatistics statistics;

/**
* Create a new callback instance.
* @param statistics the statistics to update
*/
public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) {
this.statistics = statistics;
}

@Override
public void onGetRequest() {
statistics.getRequestInitiated();
}

@Override
public void onHeadRequest() {
statistics.headRequestInitiated();
}

@Override
public void onBlockPrefetch(long start, long end) {
statistics.bytesPrefetched(end - start);
}

@Override
public void footerParsingFailed() {
statistics.footerParsingFailed();
}

@Override
public void onReadVectored(int numIncomingRanges, int numCombinedRanges) {
statistics.readVectoredOperationStarted(numIncomingRanges, numCombinedRanges);
}

@Override
public void onCacheHit() {
statistics.streamReadCacheHit();
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -72,6 +73,7 @@ public AnalyticsStream(final ObjectReadParameters parameters,
final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
super(InputStreamType.Analytics, parameters);
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();

this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
Expand All @@ -80,13 +82,21 @@ public AnalyticsStream(final ObjectReadParameters parameters,
@Override
public int read() throws IOException {
throwIfClosed();

getS3AStreamStatistics().readOperationStarted(getPos(), 1);

int bytesRead;
try {
bytesRead = inputStream.read();
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}

if (bytesRead != -1) {
incrementBytesRead(1);
}

return bytesRead;
}

Expand Down Expand Up @@ -122,26 +132,41 @@ public synchronized long getPos() {
*/
public int readTail(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
getS3AStreamStatistics().readOperationStarted(getPos(), len);

int bytesRead;
try {
bytesRead = inputStream.readTail(buf, off, len);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}

if (bytesRead > 0) {
incrementBytesRead(bytesRead);
}

return bytesRead;
}

@Override
public int read(byte[] buf, int off, int len) throws IOException {
throwIfClosed();

getS3AStreamStatistics().readOperationStarted(getPos(), len);

int bytesRead;
try {
bytesRead = inputStream.read(buf, off, len);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}

if (bytesRead > 0) {
incrementBytesRead(bytesRead);
}

return bytesRead;
}

Expand Down Expand Up @@ -177,8 +202,6 @@ public void readVectored(final List<? extends FileRange> ranges,
range.setData(result);
}

// AAL does not do any range coalescing, so input and combined ranges are the same.
this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), ranges.size());
inputStream.readVectored(objectRanges, allocate, release);
}

Expand Down Expand Up @@ -247,10 +270,13 @@ private void onReadFailure(IOException ioe) throws IOException {
}

private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) {

final RequestCallback requestCallback = new AnalyticsRequestCallback(getS3AStreamStatistics());

OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder =
OpenStreamInformation.builder()
.inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
.getInputPolicy()));
.getInputPolicy())).requestCallback(requestCallback);

if (parameters.getObjectAttributes().getETag() != null) {
openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
Expand Down Expand Up @@ -300,4 +326,16 @@ protected void throwIfClosed() throws IOException {
throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}

/**
* Increment the bytes read counter if there is a stats instance
* and the number of bytes read is more than zero.
* @param bytesRead number of bytes read
*/
private void incrementBytesRead(long bytesRead) {
getS3AStreamStatistics().bytesRead(bytesRead);
if (getContext().getStats() != null && bytesRead > 0) {
getContext().getStats().incrementBytesRead(bytesRead);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {

private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
private LazyAutoCloseableReference<S3SeekableInputStreamFactory> s3SeekableInputStreamFactory;
private boolean requireCrt;

public AnalyticsStreamFactory() {
super("AnalyticsStreamFactory");
Expand All @@ -61,7 +60,6 @@ protected void serviceInit(final Configuration conf) throws Exception {
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
this.seekableInputStreamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
this.requireCrt = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,32 @@ void readVectoredOperationStarted(int numIncomingRanges,
*/
void readVectoredBytesDiscarded(int discarded);

/**
* Number of S3 GET requests initiated by the stream.
*/
void getRequestInitiated();

/**
* Number of S3 HEAD requests initiated by the stream.
*/
void headRequestInitiated();

/**
* Number of bytes prefetched.
* @param size number of bytes prefetched.
*/
void bytesPrefetched(long size);

/**
* Number of failures in footer parsing.
*/
void footerParsingFailed();

/**
* If the request data is already in the data cache.
*/
void streamReadCacheHit();

@Override
void close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,31 @@ public void readVectoredBytesDiscarded(int discarded) {

}

@Override
public void getRequestInitiated() {

}

@Override
public void headRequestInitiated() {

}

@Override
public void bytesPrefetched(long size) {

}

@Override
public void footerParsingFailed() {

}

@Override
public void streamReadCacheHit() {

}

@Override
public void close() {

Expand Down
Loading