Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PrefetchableTextFilesFirehoseFactory for cloud storage types #4193

Merged
merged 28 commits into from
May 18, 2017

Conversation

jihoonson
Copy link
Contributor

@jihoonson jihoonson commented Apr 21, 2017

Currently, IndexTask always wraps a given Firehose with the ReplayableFirehose. The problem is that ReplayableFirehose downloads the whole data in a local disk when it is initialized which can cause the out of disk error. A possible simple solution is to add a max cache capacity to ReplayableFirehose, but I didn't because

  1. ReplayableFirehose first reads the whole input and stores them in the smile format. When it reads back the data, it needs to parse the data of smile format which introduces an additional parsing overhead.
  2. I think ReplayableFirehose is valuable as it is because this is meaningful for truly unreliable data sources. Also, it can be used for fault tolerance of streaming ingestion like kafka does after some modifications.

So, I added a new class PrefetchableTextFilesFirehoseFactory which is responsible for caching and prefetching objects to a local disk. It has three key features.

  • Caching: for the first call of connect(StringInputRowParser), it caches objects in a local disk up to maxCacheCapacityBytes. These caches are NOT deleted until the process terminates, and thus can be used for future reads.
  • Fetching: when it reads all cached data, it fetches remaining objects into a local disk and reads data from them. For the performance reason, prefetch technique is used, that is, when the size of remaining cached or fetched data is smaller than prefetchTriggerBytes, a background prefetch thread automatically starts fetching remaining objects. A fetched object is deleted when the its LineIterator is closed.
  • Retry: if an exception occurs while downloading an object, it retries up to maxFetchRetry. I think this is a temporal solution for handling frequent exceptions occurred while accessing cloud storages, and believe we should handle them at a lower level in the future.

Additionally, I refactored file-based firehoses by adding AbstractTextFilesFirehoseFactory, and fixed a bug in FileIteratingFirehose which doesn't close after iterating LineIterator.


This change is Reviewable

@JsonProperty("blobs") AzureBlob[] blobs
) {
@JsonProperty("blobs") List<AzureBlob> blobs,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These @JsonProperty annotations need to be fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, copy-paste bad. Fixed them. Thanks!


@JsonCreator
public StaticCloudFilesFirehoseFactory(
@JacksonInject("objectApi") CloudFilesApi cloudFilesApi,
@JsonProperty("blobs") CloudFilesBlob[] blobs
@JsonProperty("blobs") List<CloudFilesBlob> blobs,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These @JsonProperty annotations need to be fixed

@JsonProperty("blobs") GoogleBlob[] blobs
) {
@JsonProperty("blobs") List<GoogleBlob> blobs,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These @JsonProperty annotations need to be fixed


@JsonCreator
public StaticS3FirehoseFactory(
@JacksonInject("s3Client") RestS3Service s3Client,
@JsonProperty("uris") List<URI> uris
@JsonProperty("uris") List<URI> uris,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These @JsonProperty annotations need to be fixed

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't do a full review yet, but had some initial comments from the parts I did look at. Let me know what you think of those and I will also continue reviewing the rest.

I think this patch also won't totally solve the original problem, since running out of disk isn't just due to the replayable firehose caching everything. That's what you hit first, but once that's fixed that, there's still the segments we write on disk. They'll eventually fill up the disk because we don't publish them until the very end of the task. I think it's ok to address that in a separate patch though -- it's a different problem than the one you're solving in this patch.

@@ -80,7 +80,6 @@
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ReplayableFirehoseFactory;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the only use of ReplayableFirehoseFactory (it wasn't exposed to users… just used internally). So if it's not needed anymore, you should just remove it from the code base.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.


@JsonCreator
public IndexIOConfig(
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("appendToExisting") @Nullable Boolean appendToExisting,
@JsonProperty("skipFirehoseCaching") @Nullable Boolean skipFirehoseCaching
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should remove skipFirehoseCaching from the docs too (tasks.md I think).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

smileMapper
);
}
final FirehoseFactory firehoseFactory = delegateFirehoseFactory;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well call delegateFirehoseFactory as firehoseFactory from the start. The renaming isn't doing much useful anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot renaming it. Thanks.


final File baseDir = new File("lol");
baseDir.mkdir();
baseDir.deleteOnExit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for? Could it be a @Rule TemporaryFolder instead? That's preferred for any temporary directories or files needed by tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was for checking that the directory exists in the constructor of LocalFirehoseFactory. Removed now.

@@ -60,9 +51,18 @@ public LocalFirehoseFactory(
@JsonProperty("parser") StringInputRowParser parser
)
{
super(
FileUtils.listFiles(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't happen in construction. Firehose factories get constructed semi-frequently (anytime Task objects are deserialized) and that doesn't necessarily mean they will really be used. Anything that hits the disk or network should be deferred until the firehose is connected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I fixed. Thanks!

*
* @return true if the object is compressed with gzip
*/
protected abstract boolean isGzipped(ObjectType object);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to have isGzipped and wrapIfNeeded? I would think that the implementations could wrap if needed, since they should know if an object is gzipped or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

* IndexTask can read the whole data twice for determining partition specs and generating segments if the intervals of
* GranularitySpec is not specified.
*/
public abstract class PrefetcheableTextFilesFirehoseFactory<ObjectType>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefetchable (spelling)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

{
if (baseDir == null) {
baseDir = Files.createTempDir();
baseDir.deleteOnExit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanding use of deleteOnExit here is a little undesirable (see #3923 where we removed others). We did use it in ReplayableFirehoseFactory but that was only used by the IndexTask. Now these firehoses are used by potentially more code.

I wonder if instead, we should either add a File temporaryDirectory parameter to connect or do a hack similar to #4069, to allow the caller to pass in an appropriate tmp directory. Something nicer than #4069 is probably better, since that approach doesn't work if the firehose is wrapped in any way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I vote for passing a temporaryDirectory parameter because it works well with wrapped firehoses and the caller can manage its resource.

@@ -88,5 +104,9 @@ public void close() throws IOException
if (lineIterator != null) {
lineIterator.close();
}

if (closer != null) {
closer.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should close this even if lineIterator fails to close.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Long maxCacheCapacityBytes,
Long maxFetchCapacityBytes,
Long prefetchTriggerBytes,
Integer fetchTimeout,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally we use longs to represent times and durations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

@jihoonson jihoonson changed the title Fix out of disk in IndexTask Add PrefetchableTextFilesFirehoseFactory for cloud storage types Apr 26, 2017
@jihoonson
Copy link
Contributor Author

@gianm you're right. I changed the title. Also, I'll fix the problem of publishing segments at the very end of the index task in a follow-up pr.

}

/**
* Downloads an object. It retires downloading {@link #maxFetchRetry} times and throws that exception.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retires -> retries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.

}

try {
InputStream stream = FileUtils.openInputStream(fetchedFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this block use the factory's openStream() method instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AbstractTextFilesFirehoseFactory.openStream() is used for fetching data from remote storage in PrefetchableTextFilesFirehoseFactory.download(). Once a file is fetched, it is opened with FileUtils.openInputStream().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, should we remove the GZip file extension check here then, and require that implementations of openStream() output a decompressed stream?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current mechanism is fine, it saves some disk space by postponing decompression until reading from disk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, my bad. I separated openStream() into openObjectStream() and wrapObjectStream(). openObjectStream() describes how an input stream is opened for an object, and wrapObjectStream() wraps the input stream if the object is compressed.

@@ -410,6 +414,8 @@ public void run()
plumber.finishJob();
}
}

FileUtils.forceDelete(firehoseTempDir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved to the outer finally block? If an exception occurs after firehoseTempDir is created (normalExit is false), it looks like firehoseTempDir won't be deleted here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I moved to the finally block.

*/
public Firehose connect(T parser) throws IOException, ParseException;
Firehose connect(T parser, File temporaryDirectory) throws IOException, ParseException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about removing temporaryDirectory from the connect() parameters, and have the temp directory creation/cleanup be managed in the Firehoses that need it?

It looks like most Firehose implementations aside from the one returned by PrefetchableTestFilesFirehoseFactory don't need the temp dir.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The life cycle of temporaryDirectory is same with that of FirehoseFactory rather than Firehose. This is because the temporaryDirectory containing cached data should not be deleted for reusing even when a PrefetchableFirehose is closed.

We may add abstract methods for initialization and termination to FirehoseFactory and the implementation of PrefetchableFirehoseFactory can create and delete the temporaryDirectory in those methods. Do you think it is better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I think it would be better to have the temp dir management contained within the PrefetchableFirehoseFactory implementation then as you mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you tell me what kind of temp dir management you think? I think the temp dir should be managed by the caller of PrefetchableFirehoseFactory.connect() because PrefetchableFirehoseFactory cannot decide when it removes the temp dir by itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think initialization/termination methods that you mentioned on FirehoseFactory would be good place for the temp dir management.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that FirehoseFactory is in the druid api module. I think it would be better to manage temporaryDirectory internally rather than leaving the management to users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about the context where it's used. In our world, callers of firehoses are indexing tasks, which already have working directories that are in user-configurable locations and are automatically cleaned up. It makes sense to use this directory (and pass it into the firehose factory) rather than expect the firehose to set up and clean up its own temporary directories. Especially in the relatively common case where the user-configured working directory is on a filesystem with more space than the system default temp directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I changed to use a temporary directory in taskWorkDir.

*/
public Firehose connect(T parser) throws IOException, ParseException;
Firehose connect(T parser, File temporaryDirectory) throws IOException, ParseException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interface for extensions, so for backwards compat we should keep the old method and add a default implementation for this one that just calls the old one, ignoring temporaryDirectory. But also mark the old one @Deprecated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@Override
public Firehose connect(StringInputRowParser firehoseParser, File temporaryDirectory) throws IOException
{
final List<ObjectType> objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "initObjects"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe objects should be cached? IndexTask connects the same firehose multiple times, caching could be useful for both performance and for being consistent from run-to-run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't have to be in this PR, but, in addition to the text-based firehoses, IngestSegmentFirehose should also support some kind of run-to-run consistency. It would be an input for the index task in the case of reindexing, and it might get connected multiple times in the same task if the task wants to do multiple passes. Ideally each pass should see the same data.

Copy link
Contributor Author

@jihoonson jihoonson May 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I agree on we should support run-to-run consistency for IngestSegmentFirehose.
However, maybe is it better to leave the consistency management to users for other firehoses which read data from the outside of Druid? Because we cannot guarantee it if some files are removed or changed between subsequent scans.

Copy link
Contributor

@gianm gianm May 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, maybe is it better to leave the consistency management to users for other firehoses which read data from the outside of Druid? Because we cannot guarantee it if some files are removed or changed between subsequent scans.

By user do you mean the caller of the firehose? How would the user be able to manage consistency? It seems tough to me, since for features like S3 prefix listing, the user doesn't really know what objects are being read.

IMO the firehose is in a pretty good position to do what it can to try to get run-to-run consistency (like remembering objects and using that on subsequent connects) and to detect when it's impossible (perhaps remembering checksums of object contents, and detecting when they change).

(I do think this sort of consistency verification isn't necessary for this patch, but happy to talk through it for the future)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm, I cached objects. It would be good for performance anyway.

By user do you mean the caller of the firehose? How would the user be able to manage consistency? It seems tough to me, since for features like S3 prefix listing, the user doesn't really know what objects are being read.

I was thinking that Druid operators (people) can be responsible for consistency management. I think they can do by deferring changes on their data sets until the current ingestion task completes.

IMO the firehose is in a pretty good position to do what it can to try to get run-to-run consistency (like remembering objects and using that on subsequent connects) and to detect when it's impossible (perhaps remembering checksums of object contents, and detecting when they change).

It sounds good, but will definitely affect to the ingestion performance. I'm wondering it would be worthwhile if we can leave the responsibility for consistency management to Druid operators.

// scan yet, so we must download the whole file at once. It's still possible for the size of cached/fetched data to
// not exceed these variables by estimating the after-fetch size, but it makes us consider the case when any files
// cannot be fetched due to their large size, which makes the implementation complicated.
private long maxCacheCapacityBytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be final, I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

}

try {
InputStream stream = FileUtils.openInputStream(fetchedFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current mechanism is fine, it saves some disk space by postponing decompression until reading from disk.

// maximum retry for fetching an object from the remote site
private final int maxFetchRetry;

private volatile int nextFetchIndex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment about why this is volatile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

);

// fetchExecutor is responsible for background data fetching
final ExecutorService fetchExecutor = Executors.newSingleThreadExecutor();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give this thread a descriptive name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
finally {
FileUtils.forceDelete(firehoseTempDir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to do this, work directories get automatically cleaned up after the task exits. It's probably better not to do this since then we'd have the option of skipping the higher-level cleanup if we want to inspect a failed task's working directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.


@JsonCreator
public StaticS3FirehoseFactory(
@JacksonInject("s3Client") RestS3Service s3Client,
@JsonProperty("uris") List<URI> uris
@JsonProperty("uris") List<URI> uris,
@JsonProperty("directories") List<URI> directories,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you call this "prefixes"? S3 doesn't really have directories (see http://docs.aws.amazon.com/AmazonS3/latest/UG/FolderOperations.html) so this term is misleading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"prefixes" and the new prefetching parameters should be added to the docs for development/extensions-core/s3.md.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated documents.

@@ -324,6 +326,7 @@ public String getVersion(final Interval interval)
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics);

Supplier<Committer> committerSupplier = null;
final File firehoseTempDir = Files.createTempDir();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be inside the task working dir -- a subdirectory of toolbox.getTaskWorkDir() -- not system temp directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -163,33 +164,22 @@ public IndexIngestionSpec getIngestionSchema()
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
final File firehoseTempDir = Files.createTempDir();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be inside the task working dir -- a subdirectory of toolbox.getTaskWorkDir() -- not system temp directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

@dclim dclim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good pending other review comments + a suggestion to look at using listObjectsChunked

final String bucket = uri.getAuthority();
final String prefix = extractS3Key(uri);
try {
final S3Object[] listed = s3Client.listObjects(bucket, prefix, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this support common prefixes (subdirectory paths), and is this something we want to support? The docs indicate that we should use listObjectsChunked here instead, and that might be preferable anyway in case there's a really large number of items.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I changed to use listObjectsChunked().

// scan yet, so we must download the whole file at once. It's still possible for the size of cached/fetched data to
// not exceed these variables by estimating the after-fetch size, but it makes us consider the case when any files
// cannot be fetched due to their large size, which makes the implementation complicated.
private final long maxCacheCapacityBytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a bit of time to wrap my head around the separate cache / fetch pool implementation where the files in the cache remain until termination and the files in the fetch pool are deleted when closed. My thought was that there would be a single configurably-sized pool that would have some initial data written into it by the prefetcher and then would continue to have more data added to it by the background thread. If the entire file set fit within the pool then all the data would be completely cached (the same as the cache pool). Otherwise, once the pool started approaching the limit, files that have already been processed would begin to be evicted to make room for more pre-fetched data.

I think the main advantage of doing it this way would be that there is just one pool that needs to be tuned, and indexing could start before the entire cache is filled with data (if the amount of data exceeds the cache size).

I don't feel super strongly about it either way and realize it would be more complicated to implement, but just wondering if it's something you considered and decided against, and if so what your reasoning was.

Copy link
Contributor Author

@jihoonson jihoonson May 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dclim thanks for your comment. The idea behind separating cache and fetch pools is that caching and fetching have different goals. Caching is for reducing the initial latency of the firehose while prefetching is for increasing the throughput of the firehose (it is assumed that reading directly from the remote site is slow). So, I thought cached objects should be the first n objects (it's also assumed that objects should be read in the given order and the order must not be changed if firehose is reconnected).
Your idea about early ingestion during caching sounds good. I updated the patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dclim, also it would be useful if caching and prefetching can be turn on/off individually because some users might want to use only one of caching or prefetching features. I updated the patch to support disabling prefetching.

@jihoonson
Copy link
Contributor Author

The travis failure seems to not relate to this patch. I'll close and reopen this pr.

@jihoonson jihoonson closed this May 11, 2017
@jihoonson jihoonson reopened this May 11, 2017
@jihoonson
Copy link
Contributor Author

Hmm, I'm looking at the failure.

@gianm gianm added this to the 0.10.1 milestone May 12, 2017
}
final ObjectType object = iterator.next();
try {
return IOUtils.lineIterator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can get rid of the BufferedReader and InputStreamReader here since wrapObjectStream() returns an InputStream:

https://commons.apache.org/proper/commons-io/javadocs/api-2.5/org/apache/commons/io/IOUtils.html

"All the methods in this class that read a stream are buffered internally. This means that there is no cause to use a BufferedInputStream or BufferedReader. The default buffer size of 4K has been shown to be efficient in tests."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks! I also removed from PrefetchableTextFilesFirehoseFactory because LineIterator also automatically wraps the input reader with a BufferedReader.

Copy link
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 on design, had a few code level comments

@Override
public int read(char[] cbuf, int off, int len) throws IOException
{
final char[] chs = "\n".toCharArray();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could extract chs here and make it static

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


assertResult(rows);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test where both cache and prefetch are enabled, but the max prefetch size is less than the max cache capacity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests.

@jihoonson
Copy link
Contributor Author

@jon-wei thanks. I addressed your comments.

@dclim
Copy link
Contributor

dclim commented May 18, 2017

👍

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good other than the FirehoseFactory comment.

* @param parser an input row parser
* @param temporaryDirectory a directory where temporary files are stored
*/
Firehose connect(T parser, File temporaryDirectory) throws IOException, ParseException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one should have a default implementation too (for compatibility with pre-existing firehose impls that don't know about overriding this method). It's ok for them to both have default impls that call each other -- anyone implementing a firehose should override at least one. (it's worth calling this out in the javadoc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, how about throwing an exception like NotImplementedException in the default implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, never mind. It will break the compatibility anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's got to call the other method; the case we're trying to support is that someone already wrote an extension for Druid 0.10.0 with a custom FirehoseFactory, and then upgrades to Druid 0.10.1. The new Druid 0.10.1 code will call connect(parser, temporaryDirectory) and unless there is a default impl that calls connect(parser), the user's extension will not work properly.

Generally, we want to keep extension compatibility for the extension points on http://druid.io/docs/latest/development/modules.html within a major version like 0.9.x, 0.10.x, etc (it's ok to break extensions from 0.9.x -> 0.10.x if there is a good reason).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I added a default implementation.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 after travis, thx @jihoonson

@gianm gianm merged commit 733dfc9 into apache:master May 18, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants