Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,33 @@ public abstract class AbstractInputSource implements InputSource
public InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
File temporaryDirectory
File temporaryDirectory,
InputStats inputStats
)
{
if (needsFormat()) {
return formattableReader(
inputRowSchema,
Preconditions.checkNotNull(inputFormat, "inputFormat"),
temporaryDirectory
temporaryDirectory,
inputStats
);
} else {
return fixedFormatReader(inputRowSchema, temporaryDirectory);
return fixedFormatReader(inputRowSchema, temporaryDirectory, inputStats);
}
}

protected InputSourceReader formattableReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
File temporaryDirectory
File temporaryDirectory,
InputStats inputStats
)
{
throw new UnsupportedOperationException("Implement this method properly if needsFormat() = true");
}

protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, File temporaryDirectory)
protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, File temporaryDirectory, InputStats inputStats)
{
throw new UnsupportedOperationException("Implement this method properly if needsFormat() = false");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input;

import com.google.common.base.Predicate;
import com.google.common.io.CountingInputStream;

import javax.annotation.Nullable;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;

/**
* Can be used to count number of bytes read from the base InputEntity
* */
public class CountableInputEntity implements InputEntity
{
private final InputStats inputStats;
private final InputEntity baseInputEntity;

public CountableInputEntity(InputEntity baseInputEntity, InputStats inputStats)
{
this.baseInputEntity = baseInputEntity;
this.inputStats = inputStats;
}

@Nullable @Override public URI getUri()
{
return baseInputEntity.getUri();
}

@Override public InputStream open() throws IOException
{
return new BytesCountingInputStream(baseInputEntity.open(), inputStats);
}

@Override public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException
{
final CleanableFile cleanableFile = baseInputEntity.fetch(temporaryDirectory, fetchBuffer);
inputStats.incrementProcessedBytes(cleanableFile.file().length());
return cleanableFile;
}

@Override public Predicate<Throwable> getRetryCondition()
{
return baseInputEntity.getRetryCondition();
}

@Override public InputEntity getBaseInputEntity()
{
return baseInputEntity;
}

static class BytesCountingInputStream extends FilterInputStream
{
private final InputStats inputStats;

/**
* Wraps another input stream, counting the number of bytes read.
*
* Similar to {@link CountingInputStream} but does not reset count on call to
* {@link CountingInputStream#reset()}
*
* @param in the input stream to be wrapped
*/
public BytesCountingInputStream(@Nullable InputStream in, InputStats inputStats)
{
super(in);
this.inputStats = inputStats;
}

@Override public int read() throws IOException
{
int result = in.read();
if (result != -1) {
inputStats.incrementProcessedBytes(1);
}
return result;
}

@Override public int read(byte[] b, int off, int len) throws IOException
{
int result = in.read(b, off, len);
if (result != -1) {
inputStats.incrementProcessedBytes(result);
}
return result;
}

@Override public long skip(long n) throws IOException
{
long result = in.skip(n);
inputStats.incrementProcessedBytes(result);
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public boolean needsFormat()
}

@Override
protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory)
protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory, InputStats inputStats)
{
return new FirehoseToInputSourceReaderAdaptor(firehoseFactory, inputRowParser, temporaryDirectory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,13 @@ interface CleanableFile extends Closeable
/**
* Fetches the input entity into the local storage.
* This method might be preferred instead of {@link #open()}, for example
*
* <p>
* - {@link InputFormat} requires expensive random access on remote storage.
* - Holding a connection until you consume the entire InputStream is expensive.
*
* @param temporaryDirectory to store temp data. This directory will be removed automatically once
* the task finishes.
* @param fetchBuffer is used to fetch remote entity into local storage.
*
* @see FileUtils#copyLarge
*/
default CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException
Expand Down Expand Up @@ -125,4 +124,9 @@ default Predicate<Throwable> getRetryCondition()
{
return Predicates.alwaysFalse();
}

default InputEntity getBaseInputEntity()
{
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,5 @@ public interface InputSource
* @param inputFormat to parse data. It can be null if {@link #needsFormat()} = true
* @param temporaryDirectory to store temp data. It will be cleaned up automatically once the task is finished.
*/
InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File temporaryDirectory);
InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File temporaryDirectory, InputStats inputStats);
}
37 changes: 37 additions & 0 deletions core/src/main/java/org/apache/druid/data/input/InputStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input;

import java.util.concurrent.atomic.AtomicLong;

public class InputStats
{
private final AtomicLong processedBytes = new AtomicLong(0);

public void incrementProcessedBytes(long incrementByValue)
{
processedBytes.getAndAdd(incrementByValue);
}

public AtomicLong getProcessedBytes()
{
return processedBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Ints;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.CountableInputEntity;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.utils.CollectionUtils;

Expand Down Expand Up @@ -143,13 +145,15 @@ public boolean needsFormat()
protected InputSourceReader formattableReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
@Nullable File temporaryDirectory
@Nullable File temporaryDirectory,
InputStats inputStats
)
{
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
createSplits(inputFormat, null).flatMap(split -> split.get().stream()).map(this::createEntity).iterator(),
createSplits(inputFormat, null).flatMap(split -> split.get().stream()).map(
cloudObjectLocation -> new CountableInputEntity(createEntity(cloudObjectLocation), inputStats)).iterator(),
temporaryDirectory
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.CountableInputEntity;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.metadata.PasswordProvider;

Expand Down Expand Up @@ -105,16 +107,20 @@ public SplittableInputSource<URI> withSplit(InputSplit<URI> split)
protected InputSourceReader formattableReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
@Nullable File temporaryDirectory
@Nullable File temporaryDirectory,
InputStats inputStats
)
{
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
createSplits(inputFormat, null).map(split -> new HttpEntity(
split.get(),
httpAuthenticationUsername,
httpAuthenticationPasswordProvider
createSplits(inputFormat, null).map(split -> new CountableInputEntity(
new HttpEntity(
split.get(),
httpAuthenticationUsername,
httpAuthenticationPasswordProvider
),
inputStats
)).iterator(),
temporaryDirectory
);
Expand All @@ -131,8 +137,8 @@ public boolean equals(Object o)
}
HttpInputSource source = (HttpInputSource) o;
return Objects.equals(uris, source.uris) &&
Objects.equals(httpAuthenticationUsername, source.httpAuthenticationUsername) &&
Objects.equals(httpAuthenticationPasswordProvider, source.httpAuthenticationPasswordProvider);
Objects.equals(httpAuthenticationUsername, source.httpAuthenticationUsername) &&
Objects.equals(httpAuthenticationPasswordProvider, source.httpAuthenticationPasswordProvider);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.CountableInputEntity;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.StringUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -65,13 +67,14 @@ public boolean needsFormat()
protected InputSourceReader formattableReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
@Nullable File temporaryDirectory
@Nullable File temporaryDirectory,
InputStats inputStats
)
{
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
Stream.of(new ByteEntity(StringUtils.toUtf8(data))).iterator(),
Stream.of(new CountableInputEntity(new ByteEntity(StringUtils.toUtf8(data)), inputStats)).iterator(),
temporaryDirectory
);
}
Expand Down
Loading