Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Refactoring PrefetchableTextFilesFirehoseFactory (#4836)
* Refactoring prefetchable firehose

* Fix to read cache when prefetch is disabled

* More tests

* Cleanup codes

* Add Fetcher

* Fix test failure

* Count file size

* Fix test

* rename generic parameter

* address comments

* address comments

* reuse buffer

* move Execs to java-util

* use execs

* Fix build
  • Loading branch information
jihoonson authored and leventov committed Oct 14, 2017
1 parent f51f346 commit 8d99028
Show file tree
Hide file tree
Showing 91 changed files with 1,075 additions and 703 deletions.
2 changes: 1 addition & 1 deletion api/src/main/java/io/druid/data/input/FirehoseFactory.java
Expand Up @@ -21,7 +21,7 @@

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.PrefetchableTextFilesFirehoseFactory;
import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import io.druid.guice.annotations.ExtensionPoint;
import io.druid.java.util.common.parsers.ParseException;

Expand Down
Expand Up @@ -41,22 +41,22 @@
* This is an abstract class for firehose factory for making firehoses reading text files.
* It provides an unified {@link #connect(StringInputRowParser, File)} implementation for its subclasses.
*
* @param <ObjectType> object type representing input data
* @param <T> object type representing input data
*/
public abstract class AbstractTextFilesFirehoseFactory<ObjectType>
public abstract class AbstractTextFilesFirehoseFactory<T>
implements FirehoseFactory<StringInputRowParser>
{
private static final Logger LOG = new Logger(AbstractTextFilesFirehoseFactory.class);

private List<ObjectType> objects;
private List<T> objects;

@Override
public Firehose connect(StringInputRowParser firehoseParser, File temporaryDirectory) throws IOException
{
if (objects == null) {
objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "initObjects"));
}
final Iterator<ObjectType> iterator = objects.iterator();
final Iterator<T> iterator = objects.iterator();
return new FileIteratingFirehose(
new Iterator<LineIterator>()
{
Expand All @@ -72,7 +72,7 @@ public LineIterator next()
if (!hasNext()) {
throw new NoSuchElementException();
}
final ObjectType object = iterator.next();
final T object = iterator.next();
try {
return IOUtils.lineIterator(wrapObjectStream(object, openObjectStream(object)), Charsets.UTF_8);
}
Expand All @@ -97,7 +97,7 @@ public LineIterator next()
*
* @return a collection of initialized objects.
*/
protected abstract Collection<ObjectType> initObjects() throws IOException;
protected abstract Collection<T> initObjects() throws IOException;

/**
* Open an input stream from the given object. If the object is compressed, this method should return a byte stream
Expand All @@ -109,7 +109,7 @@ public LineIterator next()
*
* @throws IOException
*/
protected abstract InputStream openObjectStream(ObjectType object) throws IOException;
protected abstract InputStream openObjectStream(T object) throws IOException;

/**
* Wrap the given input stream if needed. The decompression logic should be applied to the given stream if the object
Expand All @@ -120,5 +120,5 @@ public LineIterator next()
* @return
* @throws IOException
*/
protected abstract InputStream wrapObjectStream(ObjectType object, InputStream stream) throws IOException;
protected abstract InputStream wrapObjectStream(T object, InputStream stream) throws IOException;
}

0 comments on commit 8d99028

Please sign in to comment.