Skip to content

Commit

Permalink
Remove "granularity" from IngestSegmentFirehose. (apache#4110)
Browse files Browse the repository at this point in the history
It wasn't doing anything useful (the sequences were being concatted, and
cursor.getTime() wasn't being called) and it defaulted to Granularities.NONE.
Changing it to Granularities.ALL gave me a 700x+ performance boost on a
small dataset I was reindexing (2m27s to 365ms). Most of that was from avoiding
making a lot of unnecessary column selectors.
  • Loading branch information
gianm committed May 16, 2017
1 parent d7fa069 commit 02e4220
Show file tree
Hide file tree
Showing 11 changed files with 10 additions and 50 deletions.
1 change: 0 additions & 1 deletion docs/content/ingestion/update-existing-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ Here is what goes inside `ingestionSpec`:
|dataSource|String|Druid dataSource name from which you are loading the data.|yes|
|intervals|List|A list of strings representing ISO-8601 Intervals.|yes|
|segments|List|List of segments from which to read data from, by default it is obtained automatically. You can obtain list of segments to put here by making a POST query to coordinator at url /druid/coordinator/v1/metadata/datasources/segments?full with list of intervals specified in the request paylod e.g. ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]. You may want to provide this list manually in order to ensure that segments read are exactly same as they were at the time of task submission, task would fail if the list provided by the user does not match with state of database when the task actually runs.|no|
|granularity|String|Defines the granularity of the query while loading data. Default value is "none". See [Granularities](../querying/granularities.html).|no|
|filter|JSON|See [Filters](../querying/filters.html)|no|
|dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no|
|metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.druid.common.utils.JodaUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.filter.DimFilter;
import io.druid.timeline.DataSegment;
Expand All @@ -38,7 +37,6 @@ public class DatasourceIngestionSpec
private final List<Interval> intervals;
private final List<DataSegment> segments;
private final DimFilter filter;
private final Granularity granularity;
private final List<String> dimensions;
private final List<String> metrics;
private final boolean ignoreWhenNoSegments;
Expand All @@ -50,7 +48,6 @@ public DatasourceIngestionSpec(
@JsonProperty("intervals") List<Interval> intervals,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("filter") DimFilter filter,
@JsonProperty("granularity") Granularity granularity,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics,
@JsonProperty("ignoreWhenNoSegments") boolean ignoreWhenNoSegments
Expand All @@ -77,8 +74,6 @@ public DatasourceIngestionSpec(
this.segments = segments;

this.filter = filter;
this.granularity = granularity == null ? Granularities.NONE : granularity;

this.dimensions = dimensions;
this.metrics = metrics;

Expand Down Expand Up @@ -109,12 +104,6 @@ public DimFilter getFilter()
return filter;
}

@JsonProperty
public Granularity getGranularity()
{
return granularity;
}

@JsonProperty
public List<String> getDimensions()
{
Expand All @@ -141,7 +130,6 @@ public DatasourceIngestionSpec withDimensions(List<String> dimensions)
intervals,
segments,
filter,
granularity,
dimensions,
metrics,
ignoreWhenNoSegments
Expand All @@ -156,7 +144,6 @@ public DatasourceIngestionSpec withMetrics(List<String> metrics)
intervals,
segments,
filter,
granularity,
dimensions,
metrics,
ignoreWhenNoSegments
Expand All @@ -171,7 +158,6 @@ public DatasourceIngestionSpec withQueryGranularity(Granularity granularity)
intervals,
segments,
filter,
granularity,
dimensions,
metrics,
ignoreWhenNoSegments
Expand All @@ -186,7 +172,6 @@ public DatasourceIngestionSpec withIgnoreWhenNoSegments(boolean ignoreWhenNoSegm
intervals,
segments,
filter,
granularity,
dimensions,
metrics,
ignoreWhenNoSegments
Expand Down Expand Up @@ -220,9 +205,6 @@ public boolean equals(Object o)
if (filter != null ? !filter.equals(that.filter) : that.filter != null) {
return false;
}
if (!granularity.equals(that.granularity)) {
return false;
}
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
return false;
}
Expand All @@ -237,7 +219,6 @@ public int hashCode()
result = 31 * result + intervals.hashCode();
result = 31 * result + (segments != null ? segments.hashCode() : 0);
result = 31 * result + (filter != null ? filter.hashCode() : 0);
result = 31 * result + granularity.hashCode();
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (metrics != null ? metrics.hashCode() : 0);
result = 31 * result + (ignoreWhenNoSegments ? 1 : 0);
Expand All @@ -252,7 +233,6 @@ public String toString()
", intervals=" + intervals +
", segments=" + segments +
", filter=" + filter +
", granularity=" + granularity +
", dimensions=" + dimensions +
", metrics=" + metrics +
", ignoreWhenNoSegments=" + ignoreWhenNoSegments +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.common.io.Files;

import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow;
Expand Down Expand Up @@ -111,8 +110,7 @@ public WindowedStorageAdapter apply(WindowedDataSegment segment)
adapters,
spec.getDimensions(),
spec.getMetrics(),
spec.getFilter(),
spec.getGranularity()
spec.getFilter()
);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,7 @@ private void testIngestion(
ImmutableList.of(new WindowedStorageAdapter(adapter, windowedDataSegment.getInterval())),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
null,
Granularities.NONE
null
);

List<InputRow> rows = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePat
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, null, false),
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false),
null
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
Expand All @@ -119,7 +119,6 @@ public void testupdateSegmentListIfDatasourcePathSpecWithMatchingUserSegments()
null,
null,
null,
null,
false
),
null
Expand Down Expand Up @@ -148,7 +147,6 @@ public void testupdateSegmentListThrowsExceptionWithUserSegmentsMismatch() throw
null,
null,
null,
null,
false
),
null
Expand All @@ -174,7 +172,6 @@ public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePat
null,
null,
null,
null,
false
),
null
Expand Down Expand Up @@ -206,7 +203,6 @@ public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithMultiplePathSpec(
null,
null,
null,
null,
false
),
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.TestHelper;
import io.druid.timeline.DataSegment;
Expand All @@ -49,7 +48,6 @@ public void testSingleIntervalSerde() throws Exception
null,
null,
new SelectorDimFilter("dim", "value", null),
Granularities.DAY,
Lists.newArrayList("d1", "d2"),
Lists.newArrayList("m1", "m2", "m3"),
false
Expand Down Expand Up @@ -86,7 +84,6 @@ public void testMultiIntervalSerde() throws Exception
null,
null,
null,
null,
false
);

Expand Down Expand Up @@ -133,7 +130,6 @@ public void testMultiIntervalSerde() throws Exception
)
),
new SelectorDimFilter("dim", "value", null),
Granularities.DAY,
Lists.newArrayList("d1", "d2"),
Lists.newArrayList("m1", "m2", "m3"),
true
Expand All @@ -156,7 +152,7 @@ public void testOldJsonDeserialization() throws Exception
DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class);

Assert.assertEquals(
new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, null, false),
new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, false),
actual
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public void testSanity() throws Exception
null,
null,
null,
null,
segment.getDimensions(),
segment.getMetrics(),
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public DatasourcePathSpecTest()
null,
null,
null,
null,
false
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.task.NoopTask;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.filter.DimFilter;
import io.druid.segment.IndexIO;
Expand Down Expand Up @@ -274,12 +273,9 @@ public WindowedStorageAdapter apply(final PartitionChunk<DataSegment> input)
)
);

return new IngestSegmentFirehose(adapters, dims, metricsList, dimFilter, Granularities.NONE);
return new IngestSegmentFirehose(adapters, dims, metricsList, dimFilter);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
catch (SegmentLoadingException e) {
catch (IOException | SegmentLoadingException e) {
throw Throwables.propagate(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.Yielder;
Expand Down Expand Up @@ -59,8 +59,7 @@ public IngestSegmentFirehose(
final List<WindowedStorageAdapter> adapters,
final List<String> dims,
final List<String> metrics,
final DimFilter dimFilter,
final Granularity granularity
final DimFilter dimFilter
)
{
Sequence<InputRow> rows = Sequences.concat(
Expand All @@ -77,7 +76,7 @@ public Sequence<InputRow> apply(WindowedStorageAdapter adapter)
Filters.toFilter(dimFilter),
adapter.getInterval(),
VirtualColumns.EMPTY,
granularity,
Granularities.ALL,
false
), new Function<Cursor, Sequence<InputRow>>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public void testSanity() throws Exception
ImmutableList.of(wsa, wsa),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
null,
Granularities.NONE
null
);

int count = 0;
Expand Down

0 comments on commit 02e4220

Please sign in to comment.