Skip to content

Commit

Permalink
Merge pull request #1681 from gianm/ingest-segment-overlapping
Browse files Browse the repository at this point in the history
Fix overlapping segments in IngestSegmentFirehose, DatasourceInputFormat
  • Loading branch information
himanshug committed Aug 28, 2015
2 parents 1b0b1d5 + 414a6fb commit ceaa49e
Show file tree
Hide file tree
Showing 19 changed files with 1,038 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.WindowedDataSegment;
import io.druid.indexer.path.UsedSegmentLister;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -116,25 +122,40 @@ public static HadoopIngestionSpec updateSegmentListIfDatasourcePathSpecIsUsed(

Map<String, Object> pathSpec = spec.getIOConfig().getPathSpec();
Map<String, Object> datasourcePathSpec = null;
if(pathSpec.get(type).equals(dataSource)) {
if (pathSpec.get(type).equals(dataSource)) {
datasourcePathSpec = pathSpec;
} else if(pathSpec.get(type).equals(multi)) {
} else if (pathSpec.get(type).equals(multi)) {
List<Map<String, Object>> childPathSpecs = (List<Map<String, Object>>) pathSpec.get(children);
for(Map<String, Object> childPathSpec : childPathSpecs) {
for (Map<String, Object> childPathSpec : childPathSpecs) {
if (childPathSpec.get(type).equals(dataSource)) {
datasourcePathSpec = childPathSpec;
break;
}
}
}

if (datasourcePathSpec != null) {
Map<String, Object> ingestionSpecMap = (Map<String, Object>) datasourcePathSpec.get(ingestionSpec);
DatasourceIngestionSpec ingestionSpecObj = jsonMapper.convertValue(ingestionSpecMap, DatasourceIngestionSpec.class);
DatasourceIngestionSpec ingestionSpecObj = jsonMapper.convertValue(
ingestionSpecMap,
DatasourceIngestionSpec.class
);
List<DataSegment> segmentsList = segmentLister.getUsedSegmentsForInterval(
ingestionSpecObj.getDataSource(),
ingestionSpecObj.getInterval()
);
datasourcePathSpec.put(segments, segmentsList);
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
for (DataSegment segment : segmentsList) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = timeline.lookup(ingestionSpecObj.getInterval());
final List<WindowedDataSegment> windowedSegments = Lists.newArrayList();
for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
windowedSegments.add(new WindowedDataSegment(chunk.getObject(), holder.getInterval()));
}
}
datasourcePathSpec.put(segments, windowedSegments);
}

return spec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.druid.indexer.hadoop;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
Expand Down Expand Up @@ -56,9 +55,9 @@ public List<InputSplit> getSplits(JobContext context) throws IOException, Interr
Configuration conf = context.getConfiguration();

String segmentsStr = Preconditions.checkNotNull(conf.get(CONF_INPUT_SEGMENTS), "No segments found to read");
List<DataSegment> segments = HadoopDruidIndexerConfig.jsonMapper.readValue(
List<WindowedDataSegment> segments = HadoopDruidIndexerConfig.jsonMapper.readValue(
segmentsStr,
new TypeReference<List<DataSegment>>()
new TypeReference<List<WindowedDataSegment>>()
{
}
);
Expand All @@ -75,31 +74,31 @@ public List<InputSplit> getSplits(JobContext context) throws IOException, Interr
//are combined appropriately
Collections.sort(
segments,
new Comparator<DataSegment>()
new Comparator<WindowedDataSegment>()
{
@Override
public int compare(DataSegment s1, DataSegment s2)
public int compare(WindowedDataSegment s1, WindowedDataSegment s2)
{
return Long.compare(s1.getSize(), s2.getSize());
return Long.compare(s1.getSegment().getSize(), s2.getSegment().getSize());
}
}
);
}

List<InputSplit> splits = Lists.newArrayList();

List<DataSegment> list = new ArrayList<>();
List<WindowedDataSegment> list = new ArrayList<>();
long size = 0;

for (DataSegment segment : segments) {
if (size + segment.getSize() > maxSize && size > 0) {
for (WindowedDataSegment segment : segments) {
if (size + segment.getSegment().getSize() > maxSize && size > 0) {
splits.add(new DatasourceInputSplit(list));
list = Lists.newArrayList();
size = 0;
}

list.add(segment);
size += segment.getSize();
size += segment.getSegment().getSize();
}

if (list.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,28 @@

package io.druid.indexer.hadoop;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;

import javax.validation.constraints.NotNull;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;

public class DatasourceInputSplit extends InputSplit implements Writable
{
private List<DataSegment> segments = null;
private List<WindowedDataSegment> segments = null;

//required for deserialization
public DatasourceInputSplit()
{
}

public DatasourceInputSplit(@NotNull List<DataSegment> segments)
public DatasourceInputSplit(@NotNull List<WindowedDataSegment> segments)
{
Preconditions.checkArgument(segments != null && segments.size() > 0, "no segments");
this.segments = segments;
Expand All @@ -52,8 +50,8 @@ public DatasourceInputSplit(@NotNull List<DataSegment> segments)
public long getLength() throws IOException, InterruptedException
{
long size = 0;
for (DataSegment segment : segments) {
size += segment.getSize();
for (WindowedDataSegment segment : segments) {
size += segment.getSegment().getSize();
}
return size;
}
Expand All @@ -64,7 +62,7 @@ public String[] getLocations() throws IOException, InterruptedException
return new String[]{};
}

public List<DataSegment> getSegments()
public List<WindowedDataSegment> getSegments()
{
return segments;
}
Expand All @@ -80,7 +78,7 @@ public void readFields(DataInput in) throws IOException
{
segments = HadoopDruidIndexerConfig.jsonMapper.readValue(
in.readUTF(),
new TypeReference<List<DataSegment>>()
new TypeReference<List<WindowedDataSegment>>()
{
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@
import io.druid.segment.IndexIO;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.timeline.DataSegment;
import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -70,18 +69,18 @@ public void initialize(InputSplit split, final TaskAttemptContext context) throw
{
spec = readAndVerifyDatasourceIngestionSpec(context.getConfiguration(), HadoopDruidIndexerConfig.jsonMapper);

List<DataSegment> segments = ((DatasourceInputSplit) split).getSegments();
List<WindowedDataSegment> segments = ((DatasourceInputSplit) split).getSegments();

List<StorageAdapter> adapters = Lists.transform(
List<WindowedStorageAdapter> adapters = Lists.transform(
segments,
new Function<DataSegment, StorageAdapter>()
new Function<WindowedDataSegment, WindowedStorageAdapter>()
{
@Override
public StorageAdapter apply(DataSegment segment)
public WindowedStorageAdapter apply(WindowedDataSegment segment)
{
try {
logger.info("Getting storage path for segment [%s]", segment.getIdentifier());
Path path = new Path(JobHelper.getURIFromSegment(segment));
logger.info("Getting storage path for segment [%s]", segment.getSegment().getIdentifier());
Path path = new Path(JobHelper.getURIFromSegment(segment.getSegment()));

logger.info("Fetch segment files from [%s]", path);

Expand All @@ -96,7 +95,10 @@ public StorageAdapter apply(DataSegment segment)
indexes.add(index);
numRows += index.getNumRows();

return new QueryableIndexStorageAdapter(index);
return new WindowedStorageAdapter(
new QueryableIndexStorageAdapter(index),
segment.getInterval()
);
}
catch (IOException ex) {
throw Throwables.propagate(ex);
Expand All @@ -110,7 +112,6 @@ public StorageAdapter apply(DataSegment segment)
spec.getDimensions(),
spec.getMetrics(),
spec.getFilter(),
spec.getInterval(),
spec.getGranularity()
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexer.hadoop;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;

import java.util.Objects;

public class WindowedDataSegment
{
private final DataSegment segment;
private final Interval interval;

@JsonCreator
public WindowedDataSegment(
@JsonProperty("segment") final DataSegment segment,
@JsonProperty("interval") final Interval interval
)
{
this.segment = segment;
this.interval = interval;
}

@JsonProperty
public DataSegment getSegment()
{
return segment;
}

@JsonProperty
public Interval getInterval()
{
return interval;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WindowedDataSegment that = (WindowedDataSegment) o;
return Objects.equals(segment, that.segment) &&
Objects.equals(interval, that.interval);
}

@Override
public int hashCode()
{
return Objects.hash(segment, interval);
}

@Override
public String toString()
{
return "WindowedDataSegment{" +
"segment=" + segment +
", interval=" + interval +
'}';
}

public static WindowedDataSegment of(final DataSegment segment)
{
return new WindowedDataSegment(segment, segment.getInterval());
}
}
Loading

0 comments on commit ceaa49e

Please sign in to comment.