Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stringLast and stringFirst aggregators extension #5789

Merged
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
a0049fd
Add lastString and firstString aggregators extension
May 20, 2018
09338b9
Remove duplicated class
May 20, 2018
3658b0c
Move first-last-string doc page to extensions-contrib
May 20, 2018
5975952
Fix ObjectStrategy compare method
May 28, 2018
adc773b
Fix doc bad aggregatos type name
May 29, 2018
c2f3672
Create FoldingAggregatorFactory classes to fix SegmentMetadataQuery
May 31, 2018
1c3bd6a
Add getMaxStringBytes() method to support JSON serialization
Jun 1, 2018
c75c88f
Fix null pointer exception at segment creation phase when the string …
Jun 6, 2018
d671922
Control the valueSelector object class on BufferAggregators
Jun 6, 2018
6945bf9
Perform all improvements
Jun 9, 2018
7dd12e0
Add java doc on SerializablePairLongStringSerde
Jun 9, 2018
217627f
Refactor ObjectStraty compare method
Jun 9, 2018
9b68a60
Remove unused ;
Jun 9, 2018
e552f04
Add aggregateCombiner unit tests. Rename BufferAggregators unit tests
Jun 9, 2018
48325f3
Remove unused imports
Jun 9, 2018
657e8a3
Add license header
Jun 9, 2018
e8a2ded
Add class name to java doc class serde
Jun 9, 2018
4d24159
Throw exception if value is unsupported class type
Jun 9, 2018
7eeef86
Merge branch 'master' into feature-first-last-string-aggregators
andresgomezfrr Jun 9, 2018
bbf84b8
Move first-last-string extension into druid core
Jun 10, 2018
b09217c
Update druid core docs
Jun 10, 2018
834662d
Fix null pointer exception when pair->string is null
Jun 10, 2018
482e8bf
Add null control unit tests
Jun 10, 2018
046c5a9
Remove unused imports
Jun 10, 2018
9829263
Add first/last string folding aggregator on AggregatorsModule to supp…
Jun 10, 2018
023fc88
Change SerializablePairLongString to extend SerializablePair
Jun 11, 2018
e6dee78
Change vars from public to private
Jun 11, 2018
1fb6789
Convert vars to primitive type
Jun 11, 2018
42b6ca3
Clarify compare comment
Jun 11, 2018
a9f2d61
Change IllegalStateException to ISE
Jun 11, 2018
5a1643f
Remove TODO comments
Jun 11, 2018
a9a1069
Control possible null pointer exception
Jun 11, 2018
1e53094
Add @Nullable annotation
Jul 27, 2018
129d594
Remove empty line
Jul 27, 2018
a5dea44
Remove unused parameter type
Jul 27, 2018
e7992fa
Improve AggregatorCombiner javadocs
Jul 27, 2018
aa4ff30
Add filterNullValues option at StringLast and StringFirst aggregators
Jul 27, 2018
1d11d46
Add filterNullValues option at agg documentation
Jul 27, 2018
33c944f
Fix checkstyle
Jul 27, 2018
7ffebe9
Merge branch 'master' of github.com:druid-io/druid into feature-first…
Jul 27, 2018
acfbed8
Update header license
Jul 27, 2018
ff56f3e
Fix StringFirstAggregatorFactory.VALUE_COMPARATOR
Jul 30, 2018
7ff6fc3
Fix StringFirstAggregatorCombiner
Jul 30, 2018
b074c5f
Fix if condition at StringFirstAggregateCombiner
Jul 31, 2018
21dfdc1
Remove filterNullValues from string first/last aggregators
Jul 31, 2018
19b55c3
Add isReset flag in FirstAggregatorCombiner
Jul 31, 2018
f674212
Change Arrays.asList to Collections.singletonList
Jul 31, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 31 additions & 1 deletion docs/content/querying/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Computes and stores the sum of values as 32-bit floating point value. Similar to

### First / Last aggregator

First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries.
(Double/Float/Long) First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries.

Note that queries with first/last aggregators on a segment created with rollup enabled will return the rolled up value, and not the last value within the raw ingested data.

Expand Down Expand Up @@ -178,6 +178,36 @@ Note that queries with first/last aggregators on a segment created with rollup e
}
```

#### `stringFirst` aggregator

`stringFirst` computes the metric value with the minimum timestamp or `null` if no row exist

```json
{
"type" : "stringFirst",
"name" : <output_name>,
"fieldName" : <metric_name>,
"maxStringBytes" : <integer> # (optional, defaults to 1024),
"filterNullValues" : <boolean> # (optional, defaults to false)
}
```



#### `stringLast` aggregator

`stringLast` computes the metric value with the maximum timestamp or `null` if no row exist

```json
{
"type" : "stringLast",
"name" : <output_name>,
"fieldName" : <metric_name>,
"maxStringBytes" : <integer> # (optional, defaults to 1024),
"filterNullValues" : <boolean> # (optional, defaults to false)
}
```

### JavaScript aggregator

Computes an arbitrary JavaScript function over a set of columns (both metrics and dimensions are allowed). Your
Expand Down
20 changes: 18 additions & 2 deletions processing/src/main/java/io/druid/jackson/AggregatorsModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,22 @@
import io.druid.query.aggregation.LongMinAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.SerializablePairLongStringSerde;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
import io.druid.query.aggregation.first.StringFirstFoldingAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.aggregation.hyperloglog.PreComputedHyperUniquesSerde;
import io.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import io.druid.query.aggregation.last.FloatLastAggregatorFactory;
import io.druid.query.aggregation.last.LongLastAggregatorFactory;
import io.druid.query.aggregation.last.StringLastAggregatorFactory;
import io.druid.query.aggregation.last.StringLastFoldingAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.DoubleGreatestPostAggregator;
Expand All @@ -74,7 +79,14 @@ public AggregatorsModule()
}

if (ComplexMetrics.getSerdeForType("preComputedHyperUnique") == null) {
ComplexMetrics.registerSerde("preComputedHyperUnique", new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault()));
ComplexMetrics.registerSerde(
"preComputedHyperUnique",
new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault())
);
}

if (ComplexMetrics.getSerdeForType("serializablePairLongString") == null) {
ComplexMetrics.registerSerde("serializablePairLongString", new SerializablePairLongStringSerde());
}

setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
Expand All @@ -101,9 +113,13 @@ public AggregatorsModule()
@JsonSubTypes.Type(name = "longFirst", value = LongFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleFirst", value = DoubleFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "floatFirst", value = FloatFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "stringFirst", value = StringFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "stringFirstFold", value = StringFirstFoldingAggregatorFactory.class),
@JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class)
@JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "stringLast", value = StringLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "stringLastFold", value = StringLastFoldingAggregatorFactory.class)
})
public interface AggregatorFactoryMixin
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ public class AggregatorUtil
public static final byte ARRAY_OF_DOUBLES_SKETCH_T_TEST_CACHE_TYPE_ID = 0x29;
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_STRING_CACHE_TYPE_ID = 0x2A;

// StringFirst, StringLast aggregator
public static final byte STRING_FIRST_CACHE_TYPE_ID = 0x2B;
public static final byte STRING_LAST_CACHE_TYPE_ID = 0x2C;

/**
* returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.collections.SerializablePair;

public class SerializablePairLongString extends SerializablePair<Long, String>
{
@JsonCreator
public SerializablePairLongString(@JsonProperty("lhs") Long lhs, @JsonProperty("rhs") String rhs)
{
super(lhs, rhs);
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation;

import io.druid.data.input.InputRow;
import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
import io.druid.segment.GenericColumnSerializer;
import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.ObjectStrategy;
import io.druid.segment.serde.ComplexColumnPartSupplier;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import io.druid.segment.writeout.SegmentWriteOutMedium;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;

/**
* The SerializablePairLongStringSerde serializes a Long-String pair (SerializablePairLongString).
* The serialization structure is: Long:Integer:String
* <p>
* The class is used on first/last String aggregators to store the time and the first/last string.
* Long:Integer:String -> Timestamp:StringSize:StringData
*/
public class SerializablePairLongStringSerde extends ComplexMetricSerde
{

private static final String TYPE_NAME = "serializablePairLongString";

@Override
public String getTypeName()
{
return TYPE_NAME;
}

@Override
public ComplexMetricExtractor getExtractor()
{
return new ComplexMetricExtractor()
{
@Override
public Class<SerializablePairLongString> extractedClass()
{
return SerializablePairLongString.class;
}

@Override
public Object extractValue(InputRow inputRow, String metricName)
{
return inputRow.getRaw(metricName);
}
};
}

@Override
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder)
{
final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper());
columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column));
}

@Override
public ObjectStrategy getObjectStrategy()
{
return new ObjectStrategy<SerializablePairLongString>()
{
@Override
public int compare(@Nullable SerializablePairLongString o1, @Nullable SerializablePairLongString o2)
{
return StringFirstAggregatorFactory.VALUE_COMPARATOR.compare(o1, o2);
}

@Override
public Class<? extends SerializablePairLongString> getClazz()
{
return SerializablePairLongString.class;
}

@Override
public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();

long lhs = readOnlyBuffer.getLong();
int stringSize = readOnlyBuffer.getInt();

String lastString = null;
if (stringSize > 0) {
byte[] stringBytes = new byte[stringSize];
readOnlyBuffer.get(stringBytes, 0, stringSize);
lastString = StringUtils.fromUtf8(stringBytes);
}

return new SerializablePairLongString(lhs, lastString);
}

@Override
public byte[] toBytes(SerializablePairLongString val)
{
String rhsString = val.rhs;
ByteBuffer bbuf;

if (rhsString != null) {
byte[] rhsBytes = StringUtils.toUtf8(rhsString);
bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + rhsBytes.length);
bbuf.putLong(val.lhs);
bbuf.putInt(Long.BYTES, rhsBytes.length);
bbuf.position(Long.BYTES + Integer.BYTES);
bbuf.put(rhsBytes);
} else {
bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
bbuf.putLong(val.lhs);
bbuf.putInt(Long.BYTES, 0);
}

return bbuf.array();
}
};
}

@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import io.druid.java.util.common.StringUtils;
import io.druid.collections.SerializablePair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.UOE;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.first;

import io.druid.query.aggregation.ObjectAggregateCombiner;
import io.druid.segment.ColumnValueSelector;

import javax.annotation.Nullable;

public class StringFirstAggregateCombiner extends ObjectAggregateCombiner<String>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check #5789 (comment).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
private String firstString;

@Override
public void reset(ColumnValueSelector selector)
{
firstString = (String) selector.getObject();
}

@Override
public void fold(ColumnValueSelector selector)
{
if (firstString == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks that this is to check reset() is called or not. But, firstValue can be null even when reset() is called because selector.getObject() can return null. I think we need a flag isReset to check this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah! It is true, good point!

firstString = (String) selector.getObject();
}
}

@Nullable
@Override
public String getObject()
{
return firstString;
}

@Override
public Class<String> classOfObject()
{
return String.class;
}
}