Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create splits of multiple files for parallel indexing #9360

Merged
merged 14 commits into from
Feb 25, 2020

Conversation

jihoonson
Copy link
Contributor

@jihoonson jihoonson commented Feb 13, 2020

Description

For now, the Parallel task creates a sub task per input file. This could be not very efficient when you have lots of small files because each task has an overhead for scheduling, JVM startup, etc.

This PR adds a new MaxSizeSplitHintSpec and allows the Parallel task to create splits of multiple files. If a split has only one files, that file could be larger than the configured maxSize. Otherwise, the total size of files in the same split cannot be larger than maxSize. This means, if you have a very large file, there will be only one task that processes the big file. This could be addressed in the future by creating multiple splits for the same file, each of which references to disjoint parts of the file.

This PR changes the default splitHintSpec from none to MaxSizeSplitHintSpec.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link

@sthetland sthetland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added doc review.

docs/ingestion/native-batch.md Outdated Show resolved Hide resolved
doesn't depend on other external systems like Hadoop. The `index_parallel` task is a supervisor task which orchestrates
the whole indexing process. It splits the input data and and issues worker tasks
to the Overlord which actually process the assigned input split and create segments.
Once a worker task successfully processes all assigned input split, it reports the generated segment list to the supervisor task.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s a little unclear to me who's doing what in this. Is the following accurate/clearer?

“The index_parallel task is a supervisor task that orchestrates the indexing process. The task splits input data for processing by Overlord worker tasks, which process the input splits assigned to them and create segments from the input. Once a worker task successfully processes all assigned input splits, it reports the generated segment list to the supervisor task.”

If not, for a lighter edit, maybe just clarify that it's the worker tasks more specifically, rather than the overlord, that is processing input splits (if that's the case).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a look!

If not, for a lighter edit, maybe just clarify that it's the worker tasks more specifically, rather than the overlord, that is processing input splits (if that's the case).

This is correct. I tried to make it more clear.

The Parallel task (type `index_parallel`) is a task for parallel batch indexing. This task only uses Druid’s resource and
doesn’t depend on other external systems like Hadoop. The `index_parallel` task is a supervisor task that orchestrates
the whole indexing process. The supervisor task splits the input data and creates worker tasks to process those splits.
The created worker tasks are issued to the Overlord so that they can be scheduled and run on MiddleManagers or Indexers.
Once a worker task successfully processes the assigned input split, it reports the generated segment list to the supervisor task.
The supervisor task periodically checks the status of worker tasks. If one of them fails, it retries the failed task
until the number of retries reaches the configured limit. If all worker tasks succeed, it publishes the reported segments at once and finalizes ingestion.

docs/ingestion/native-batch.md Outdated Show resolved Hide resolved
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should always be `maxSize`.|none|yes|
|maxSplitSize|Maximum number of bytes of input files to process in a single task. If a single file is larger than this number, it will be processed by itself in a single task (splitting a large file is not supported yet).|500MB|no|

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this match the wording used below, so:
"....in a single task. (Files are never split across tasks.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I added "yet" at the end of the sentence since we may want to split files across tasks in the future.

import java.util.Objects;
import java.util.stream.Stream;

public class SpecificFilesLocalInputSource extends AbstractInputSource implements SplittableInputSource<List<File>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it isn't too much trouble, it seems like this would be better to just be a part of LocalInputSource to be more consistent with the cloud file input sources, rather than introducing a new type. Though if it is needlessly complicated then is probably fine as is.

jihoonson and others added 3 commits February 19, 2020 22:34
Co-Authored-By: sthetland <steve.hetland@imply.io>
Co-Authored-By: sthetland <steve.hetland@imply.io>
this.files = files;

if (baseDir == null && CollectionUtils.isNullOrEmpty(files)) {
throw new IAE("Either one of baseDir or files should be specified");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better to accept both baseDir + filter and explicit files list, or should you specify one or the other exclusively?

If you think accepting both is better then this exception message should probably say 'At least one of ...' instead of 'Either one of'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks. I'm not sure why we cannot have both at the same time as long as we don't process the same file more than once. It can be more aligned with the cloud input sources though.. (Also, why do we do this?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think actually it probably would be better to allow both uris and prefixes in the cloud file input sources and any others that match this pattern, not sure why we do only one or the other currently..

current.add(peeking);
splitSize += size;
peeking = null;
} else if (splitSize + size < maxSplitSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the splitSize + size < maxSplitSize and current.isEmpty() block can be combined

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch. Fixed.


@JsonCreator
public LocalInputSource(
@JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter
@JsonProperty("filter") String filter,
@JsonProperty("files") Set<File> files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this new property to the LocalInputSource property docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, added.

@@ -48,23 +48,23 @@
public InputEntityIteratingReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
Stream<InputEntity> sourceStream,
Iterator<? extends InputEntity> sourceStream,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could call this sourceIterator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}

public InputEntityIteratingReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
CloseableIterator<InputEntity> sourceIterator,
CloseableIterator<? extends InputEntity> sourceIterator,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could call this sourceCloseableIterator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


@JsonCreator
public LocalInputSource(
@JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter
@JsonProperty("filter") String filter,
@JsonProperty("files") Set<File> files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can add a @Nullable here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

sizeInLong = sizeInBigInteger.longValueExact();
}
catch (ArithmeticException e) {
sizeInLong = Long.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this propagate the exception instead? If we get an object with a byte size that can't be stored in a long, something seems very wrong

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The length of a google storage object is the unsigned long type (https://cloud.google.com/storage/docs/json_api/v1/objects#resource-representations). I think it's better to work instead of failing. Added a warning log about the exception.

retryPolicyFactory,
dataSource,
interval,
splitHintSpec == null ? new SegmentsSplitHintSpec(null) : splitHintSpec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it would get converted into a MaxSizeSplitHintSpec in createSplit, could this create a MaxSizeSplitHintSpec directly? (Does this also mean SegmentsSplitHintSpec is deprecated?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to create MaxSizeSplitHintSpec directly.

Does this also mean SegmentsSplitHintSpec is deprecated?

Good question. MaxSizeSplitHintSpec and SegmentsSplitHintSpec work exactly same for now, but I think SegmentsSplitHintSpec can be further optimized in the future. Added some comment about the future improvement.

* If there is only one file in the split, its size can be larger than {@link #maxSplitSize}.
* If there are two or more files in the split, their total size cannot be larger than {@link #maxSplitSize}.
*/
public class MaxSizeSplitHintSpec implements SplitHintSpec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should make spec classes be pure data objects (or beans). Adding methods like split to them makes them complicated and adds logic that makes it hard to version them in the future. We should think of data objects as literals, not as objects with business logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I agree it is a better structure, but the problem is there are too many classes doing this kind of things especially on the ingestion side. I don't think it's possible to apply the suggested design to all classes anytime soon. Also, I think it's better to promote SQL for ingestion as well so that Druid users don't have to worry about the API changes.

{
return new Iterator<List<T>>()
{
private T peeking;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can simplify the logic of the next method below if you initialize peeking to inputIterator.next(), and only set peeking to null when inputIterator.hasNext() is false. In your next() below, you would just keeping shifting values from inputIterator into current after each iteration as long as there are more inputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how it works. peeking is to keep the last fetched input from the underlying iterator because it can be returned or not based on the total size of inputs in the current list. If the last fetched input was not added, it should be returned in the following next() call.

{
return Objects.hash(maxSplitSize);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

equals and hashCode need unit tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@@ -56,6 +57,12 @@ public long getMaxInputSegmentBytesPerTask()
return maxInputSegmentBytesPerTask;
}

@Override
public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this method really doesn't belong here if not all subclasses or implementation need it? Or should this class be abstract instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment about it.

}

@Override
public int hashCode()
{
return Objects.hash(baseDir, filter);
return Objects.hash(baseDir, filter, files);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

equals and hashCode need unit tests for maintainability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

public int hashCode()
{
return Objects.hash(segmentId, intervals);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests for equals and hashcode please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@jon-wei jon-wei merged commit 3bc7ae7 into apache:master Feb 25, 2020
@jihoonson jihoonson added this to the 0.18.0 milestone Mar 26, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants