Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compaction task #4985

Merged
merged 21 commits into from
Nov 4, 2017
Merged

Add compaction task #4985

merged 21 commits into from
Nov 4, 2017

Conversation

jihoonson
Copy link
Contributor

@jihoonson jihoonson commented Oct 20, 2017

Part of #4479.

This patch introduces a new task type, CompactionTask. The reason for introducing a new task type instead of using existing IndexTask + IngestSegmentFirehose is to make the task spec as simple as possible because it should also be submitted by humans as well as coordinators. As a result, I removed most unnecessary parameters from the task spec.

This task type is a sort of factory which generates an IndexTask spec doing the compaction work. An example of this task spec from the integration test is

{
  "type" : "compact",
  "dataSource" : "wikipedia_index_test",
  "interval" : "2013-08-31/2013-09-02"
}

This compaction task compacts all wikipedia_index_test segments of the 2013-08-31/2013-09-02 interval.

When CompactionTask.run() is called, it internally generates an indexTask spec for the given dataSource and interval. The generated index task spec includes all dimensions and metrics of the segments of the given interval. The segments of the given interval should have the same queryGranularity and rollup flag.

The generated indexTask spec for the above compactionTask is

{
  "type" : "index",
  "id" : "compaction_wikipedia_index_test_2017-10-20T01:33:54.420Z",
  "resource" : {
    "availabilityGroup" : "compaction_wikipedia_index_test_2017-10-20T01:33:54.420Z",
    "requiredCapacity" : 1
  },
  "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia_index_test",
      "parser" : {
        "type" : "noop"
      },
      "metricsSpec" : [ {
        "type" : "doubleSum",
        "name" : "added",
        "fieldName" : "added",
        "expression" : null
      }, {
        "type" : "longSum",
        "name" : "count",
        "fieldName" : "count",
        "expression" : null
      }, {
        "type" : "doubleSum",
        "name" : "deleted",
        "fieldName" : "deleted",
        "expression" : null
      }, {
        "type" : "doubleSum",
        "name" : "delta",
        "fieldName" : "delta",
        "expression" : null
      } ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : {
          "type" : "period",
          "period" : "P2D",
          "timeZone" : "UTC",
          "origin" : null
        },
        "queryGranularity" : "SECOND",
        "rollup" : true,
        "intervals" : [ "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z" ]
      }
    },
    "ioConfig" : {
      "type" : "index",
      "firehose" : {
        "type" : "ingestSegment",
        "dataSource" : "wikipedia_index_test",
        "interval" : "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z",
        "filter" : null,
        "dimensions" : [ "robot", "continent", "country", "city", "newPage", "unpatrolled", "namespace", "anonymous", "language", "page", "region", "user" ],
        "metrics" : [ "deleted", "added", "count", "delta" ]
      },
      "appendToExisting" : false
    },
    "tuningConfig" : {
      "type" : "index",
      "targetPartitionSize" : 5000000,
      "maxRowsInMemory" : 75000,
      "maxTotalRows" : 20000000,
      "numShards" : null,
      "indexSpec" : {
        "bitmap" : {
          "type" : "concise"
        },
        "dimensionCompression" : "lz4",
        "metricCompression" : "lz4",
        "longEncoding" : "longs"
      },
      "maxPendingPersists" : 0,
      "buildV9Directly" : true,
      "forceExtendableShardSpecs" : false,
      "forceGuaranteedRollup" : false,
      "reportParseExceptions" : false,
      "publishTimeout" : 0
    }
  },
  "context" : null,
  "groupId" : "compaction_wikipedia_index_test_2017-10-20T01:33:54.420Z",
  "dataSource" : "wikipedia_index_test"
}

This change is Reviewable

@jihoonson
Copy link
Contributor Author

I'll add a doc soon.

@jihoonson
Copy link
Contributor Author

Added doc.

@@ -79,7 +79,7 @@ A sample ingest firehose spec is shown below -
|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over.|yes|
|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|filter| See [Filters](../querying/filters.html)|yes|
|filter| See [Filters](../querying/filters.html)|no|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

@@ -104,7 +104,7 @@ Tasks can have different default priorities depening on their types. Here are a
|---------|----------------|
|Realtime index task|75|
|Batch index task|50|
|Merge/Append task|25|
|Merge/Append/Compation task|25|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compaction (spelling)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
```

### Compaction Task

Compaction tasks merge all segments of the given interval. The syntax is:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should include a segmentGranularity too. Unless your idea is that the interval specified should just be one segment's worth of interval, in which case, that should be said in the docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, all the segments of the interval are always merged into a single segment. I added the below statement.

This compaction task merges all segments of the interval 2017-01-01/2018-01-01 into a single segment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding two more sentences:

To merge each day's worth of data into a separate segment, you can submit multiple "compact" tasks, one for each day. They will run in parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

For example, its `firehose` is always the [ingestSegmentSpec](./firehose.html) and `dimensionsSpec` and `metricsSpec`
always include all dimensions and metrics of the input segments.

Note that all input segments should have the same `queryGranularity` and `rollup`. See [Segment Metadata Queries](../querying/segmentmetadataquery.html#analysistypes) for more details.
Copy link
Contributor

@gianm gianm Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if they don't have consistent queryGranularity and rollup? (Docs should say and it should hopefully be reasonable, since this situation may happen in real life.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It thrown an exception before, but now, it automatically checks and sets rollup if it is set for all input segments.


This compaction task merges _all segments_ of the interval `2017-01-01/2018-01-01`.

A compaction task internally generates an indexTask spec for performing compaction work with some fixed parameters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably more clear:

generates an "index" task spec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// conver to combining aggregators
final AggregatorFactory[] combiningAggregators;
if (mergedAggregators == null) {
combiningAggregators = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need to thrown an exception here. I think if we actually go through with a dataSchema with null aggregators, it will just drop all the metrics while compacting, which seems like a bad idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I added a check.

);
final Map<DataSegment, File> segmentFileMap = pair.lhs;
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = pair.rhs;
final List<String> dimensions = IngestSegmentFirehoseFactory.getUniqueDimensions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary. IngestSegmentFirehoseFactory will include all dimensions if the dimensions parameter passed to the constructor is null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompactionTask finds the unique set of dimensions to generate the DimensionsSpec. I added this line again to make sure the dimensions in the generated DimensionsSpec is used in IngestSegmentFirehose.

timelineSegments,
new NoopInputRowParser(null)
);
final List<String> metrics = IngestSegmentFirehoseFactory.getUniqueMetrics(timelineSegments);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary either -- same reason. IngestSegmentFirehoseFactory should include all metrics if null is passed in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here. CompactionTask finds the unique set of aggregators. I added this line again to make sure the aggregators in the generated DataSchema is used in IngestSegmentFirehose.

}

// find granularity spec
final GranularitySpec granularitySpec = new UniformGranularitySpec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using ArbitraryGranularitySpec would be simpler. It's designed to index specific intervals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.


return new DataSchema(
dataSource,
ImmutableMap.of("type", "noop"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will not work right with regard to numeric dimensions.

This data schema will essentially tell the index task to use metrics from combiningAggregators (which is good, assuming it's computed properly) and to auto-detect dimensions. But dimension auto-detection basically just treats everything that is not an input to an aggregator as a string. It won't retain the types they had in the original segment if it was a long or float dimension for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the good point. I thought about this and the only possible solution looks allowing user-defined dimensionsSpec in the compactionTask spec until we store data types of dimensions in somewhere. Does it make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to accept an optional dimensionsSpec.

Copy link
Contributor

@gianm gianm Oct 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do better than that, by examining what types the existing dimension columns are. storageAdapter.getColumnCapabilities(column).getType() is the way to do that.

@jihoonson
Copy link
Contributor Author

@gianm thank you for the quick review!

}
```

### Compaction Task

Compaction tasks merge all segments of the given interval. The syntax is:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding two more sentences:

To merge each day's worth of data into a separate segment, you can submit multiple "compact" tasks, one for each day. They will run in parallel.


return new DataSchema(
dataSource,
dimensionsSpec == null ? ImmutableMap.of("type", "noop")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more type-safe to do something like jsonMapper.convertValue(parser, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT), on a parser you create using a normal constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Changed.

@JsonProperty("resource") final TaskResource taskResource,
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") final Interval interval,
@JsonProperty("dimensionsSpec") final DimensionsSpec dimensionsSpec,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than asking the user to include this, it could be determined by looking at the dimensions and column types from each segment's StorageAdapter. The methods are getAvailableDimensions and getColumnCapabilities.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I changed.

{
final BiMap<String, Integer> uniqueMetrics = HashBiMap.create();

// Here, we try to retain the order of metrics as they were specified since the order of metrics may be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike dimensions, order of metrics doesn't matter for performance. Dimension order matters because it affects sorting of the rows and can be used to improve locality (rows are sorted by time first, but rows within the same time bucket are sorted by dimensions, in order). But metric order doesn't affect sorting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Metrics currently don't affect to performance. However, we're going to handle dimensions and metrics in the same way. So, I guess this will be needed in the future. Do you think it's better to add later?

BTW, I updated the comments here.

always include all dimensions and metrics of the input segments.

Note that the output segment is rolled up only when `rollup` is set for all input segments.
See [Segment Metadata Queries](../querying/segmentmetadataquery.html#analysistypes) for more details about `rollup`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a reference to http://druid.io/docs/latest/design/index.html#roll-up here, and add a comment to the link to the SegmentMetadataQuery docs about how that query can be used to determine whether a segment was created with rollup or not?

The "design" link has a more substantial explanation of what rollup is, and the extra comment re: SegmentMetadataQuery would make it more clear what it's used for


// Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be
// optimized for performance.
// Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would also have the effect giving recent segments precedence in terms of what type each dimension has (like if an older segment stored a dimension as String but newer ones store it as Long). Can you mention the ordering and type precedence in the docs somewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question that leads to, would it be useful to allow users some control over ordering and types?

I'm thinking that maybe that overcomplicates the Compaction Task which is meant to be simple to express, and users can manually write a full batch task if they have column ordering/type requirements for the final compacted segments, but would like to see what your thoughts are on that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would also have the effect giving recent segments precedence in terms of what type each dimension has

Good point. Added doc.

One question that leads to, would it be useful to allow users some control over ordering and types?

I think it's a good idea. Users can write a full batch spec for full control, but someone might want to have little more control with compaction task because it's simple. I added this feature and a unit test for it.

@jon-wei
Copy link
Contributor

jon-wei commented Oct 28, 2017

@jihoonson Had a few comments related to adding things to docs, rest LGTM. Can you also add a test that includes compacting segments with different dimension orders/types?

@jihoonson
Copy link
Contributor Author

@jon-wei thank you for the review. I changed CompactionTaskTest to test different dimension orders and types.

@jihoonson
Copy link
Contributor Author

jihoonson commented Oct 31, 2017

I added segments as a new parameter to CompactionTask. This is not documented because it's intended to be used by only coordinators.

@jihoonson
Copy link
Contributor Author

@jon-wei @gianm do you have more comments?

@gianm
Copy link
Contributor

gianm commented Nov 4, 2017

The latest changes look good to me.

@gianm gianm merged commit 5f3c863 into apache:master Nov 4, 2017
@jihoonson
Copy link
Contributor Author

@gianm thank you!

@jon-wei jon-wei added this to the 0.12.0 milestone Jan 5, 2018
@Gauravshah
Copy link

@jihoonson since removing of dimensions is supported, any reason we didn't include the metricSpec ? It is interesting to be able able to go to a different granularity after making compaction. for ex from minute to Hour after a some duration has passed by

@jihoonson
Copy link
Contributor Author

@Gauravshah, yes we can also add metricSpec if needed. Are you interested in making a PR for it?

@Gauravshah
Copy link

@jihoonson sure, I can try taking a stab at it. I do not know druid internals much though. Will start working on it after 4 weeks.

@jihoonson
Copy link
Contributor Author

@Gauravshah sounds great. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants