Skip to content

Commit

Permalink
This fixes arbitrary gran spec breaking
Browse files Browse the repository at this point in the history
  • Loading branch information
fjy committed Mar 17, 2015
1 parent 36b4c6a commit bfe10bd
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 7 deletions.
17 changes: 17 additions & 0 deletions docs/content/Ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,29 @@ This is a special variation of the JSON ParseSpec that lower cases all the colum

## GranularitySpec

The default granularity spec is `uniform`.

### Uniform Granularity Spec

This spec is used to generated segments with uniform intervals.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | string | The type of granularity spec. | no (default == 'uniform') |
| segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |

### Arbitrary Granularity Spec

This spec is used to generate segments with arbitrary intervals (it tries to create evenly sized segments). This spec is not supported for real-time processing.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | string | The type of granularity spec. | no (default == 'uniform') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |

# IO Config

Real-time Ingestion: See [Real-time ingestion](Realtime-ingestion.html).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOExce

final Path betaInput = new Path(getInputPath());
final FileSystem fs = betaInput.getFileSystem(job.getConfiguration());
final Granularity segmentGranularity = ((UniformGranularitySpec) config.getGranularitySpec()).getSegmentGranularity();
final Granularity segmentGranularity = config.getGranularitySpec().getSegmentGranularity();

Map<DateTime, Long> inputModifiedTimes = new TreeMap<DateTime, Long>(
Comparators.inverse(Comparators.<Comparable>comparable())
Map<DateTime, Long> inputModifiedTimes = new TreeMap<>(
Comparators.inverse(Comparators.comparable())
);

for (FileStatus status : FSSpideringIterator.spiderIterable(fs, betaInput)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -205,9 +206,13 @@ private SortedSet<Interval> getDataIntervals() throws IOException
try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
Interval interval = granularitySpec.getSegmentGranularity()
.bucket(new DateTime(inputRow.getTimestampFromEpoch()));
retVal.add(interval);
DateTime dt = new DateTime(inputRow.getTimestampFromEpoch());
Optional<Interval> interval = granularitySpec.bucketInterval(dt);
if (interval.isPresent()) {
retVal.add(interval.get());
} else {
throw new ISE("Unable to to find a matching interval for [%s]", dt);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
Expand Down Expand Up @@ -113,6 +114,73 @@ public void testDeterminePartitions() throws Exception
new DefaultObjectMapper()
);

final List<DataSegment> segments = runTask(indexTask);

Assert.assertEquals(2, segments.size());
}

@Test
public void testWithArbitraryGranularity() throws Exception
{
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();

File tmpFile = File.createTempFile("druid", "index", tmpDir);
tmpFile.deleteOnExit();

PrintWriter writer = new PrintWriter(tmpFile);
writer.println("2014-01-01T00:00:10Z,a,1");
writer.println("2014-01-01T01:00:20Z,b,1");
writer.println("2014-01-01T02:00:30Z,c,1");
writer.close();

IndexTask indexTask = new IndexTask(
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto"
),
new DimensionsSpec(
Arrays.asList("ts"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new ArbitraryGranularitySpec(
QueryGranularity.MINUTE,
Arrays.asList(new Interval("2014/2015"))
)
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
tmpDir,
"druid*",
null
)
),
null
),
new DefaultObjectMapper()
);

List<DataSegment> segments = runTask(indexTask);

Assert.assertEquals(1, segments.size());
}

private final List<DataSegment> runTask(final IndexTask indexTask) throws Exception
{
final List<DataSegment> segments = Lists.newArrayList();

indexTask.run(
Expand Down Expand Up @@ -156,6 +224,6 @@ public DataSegment push(File file, DataSegment segment) throws IOException
)
);

Assert.assertEquals(2, segments.size());
return segments;
}
}

0 comments on commit bfe10bd

Please sign in to comment.