-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add stringLast and stringFirst aggregators extension #5789
Merged
jihoonson
merged 47 commits into
apache:master
from
wizzie-io:feature-first-last-string-aggregators
Aug 1, 2018
Merged
Changes from all commits
Commits
Show all changes
47 commits
Select commit
Hold shift + click to select a range
a0049fd
Add lastString and firstString aggregators extension
09338b9
Remove duplicated class
3658b0c
Move first-last-string doc page to extensions-contrib
5975952
Fix ObjectStrategy compare method
adc773b
Fix doc bad aggregatos type name
c2f3672
Create FoldingAggregatorFactory classes to fix SegmentMetadataQuery
1c3bd6a
Add getMaxStringBytes() method to support JSON serialization
c75c88f
Fix null pointer exception at segment creation phase when the string …
d671922
Control the valueSelector object class on BufferAggregators
6945bf9
Perform all improvements
7dd12e0
Add java doc on SerializablePairLongStringSerde
217627f
Refactor ObjectStraty compare method
9b68a60
Remove unused ;
e552f04
Add aggregateCombiner unit tests. Rename BufferAggregators unit tests
48325f3
Remove unused imports
657e8a3
Add license header
e8a2ded
Add class name to java doc class serde
4d24159
Throw exception if value is unsupported class type
7eeef86
Merge branch 'master' into feature-first-last-string-aggregators
andresgomezfrr bbf84b8
Move first-last-string extension into druid core
b09217c
Update druid core docs
834662d
Fix null pointer exception when pair->string is null
482e8bf
Add null control unit tests
046c5a9
Remove unused imports
9829263
Add first/last string folding aggregator on AggregatorsModule to supp…
023fc88
Change SerializablePairLongString to extend SerializablePair
e6dee78
Change vars from public to private
1fb6789
Convert vars to primitive type
42b6ca3
Clarify compare comment
a9f2d61
Change IllegalStateException to ISE
5a1643f
Remove TODO comments
a9a1069
Control possible null pointer exception
1e53094
Add @Nullable annotation
129d594
Remove empty line
a5dea44
Remove unused parameter type
e7992fa
Improve AggregatorCombiner javadocs
aa4ff30
Add filterNullValues option at StringLast and StringFirst aggregators
1d11d46
Add filterNullValues option at agg documentation
33c944f
Fix checkstyle
7ffebe9
Merge branch 'master' of github.com:druid-io/druid into feature-first…
acfbed8
Update header license
ff56f3e
Fix StringFirstAggregatorFactory.VALUE_COMPARATOR
7ff6fc3
Fix StringFirstAggregatorCombiner
b074c5f
Fix if condition at StringFirstAggregateCombiner
21dfdc1
Remove filterNullValues from string first/last aggregators
19b55c3
Add isReset flag in FirstAggregatorCombiner
f674212
Change Arrays.asList to Collections.singletonList
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
35 changes: 35 additions & 0 deletions
35
processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation; | ||
|
||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import io.druid.collections.SerializablePair; | ||
|
||
public class SerializablePairLongString extends SerializablePair<Long, String> | ||
{ | ||
@JsonCreator | ||
public SerializablePairLongString(@JsonProperty("lhs") Long lhs, @JsonProperty("rhs") String rhs) | ||
{ | ||
super(lhs, rhs); | ||
} | ||
} | ||
|
||
|
146 changes: 146 additions & 0 deletions
146
processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation; | ||
|
||
import io.druid.data.input.InputRow; | ||
import io.druid.java.util.common.StringUtils; | ||
import io.druid.query.aggregation.first.StringFirstAggregatorFactory; | ||
import io.druid.segment.GenericColumnSerializer; | ||
import io.druid.segment.column.ColumnBuilder; | ||
import io.druid.segment.data.GenericIndexed; | ||
import io.druid.segment.data.ObjectStrategy; | ||
import io.druid.segment.serde.ComplexColumnPartSupplier; | ||
import io.druid.segment.serde.ComplexMetricExtractor; | ||
import io.druid.segment.serde.ComplexMetricSerde; | ||
import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; | ||
import io.druid.segment.writeout.SegmentWriteOutMedium; | ||
|
||
import javax.annotation.Nullable; | ||
import java.nio.ByteBuffer; | ||
|
||
/** | ||
* The SerializablePairLongStringSerde serializes a Long-String pair (SerializablePairLongString). | ||
* The serialization structure is: Long:Integer:String | ||
* <p> | ||
* The class is used on first/last String aggregators to store the time and the first/last string. | ||
* Long:Integer:String -> Timestamp:StringSize:StringData | ||
*/ | ||
public class SerializablePairLongStringSerde extends ComplexMetricSerde | ||
{ | ||
|
||
private static final String TYPE_NAME = "serializablePairLongString"; | ||
|
||
@Override | ||
public String getTypeName() | ||
{ | ||
return TYPE_NAME; | ||
} | ||
|
||
@Override | ||
public ComplexMetricExtractor getExtractor() | ||
{ | ||
return new ComplexMetricExtractor() | ||
{ | ||
@Override | ||
public Class<SerializablePairLongString> extractedClass() | ||
{ | ||
return SerializablePairLongString.class; | ||
} | ||
|
||
@Override | ||
public Object extractValue(InputRow inputRow, String metricName) | ||
{ | ||
return inputRow.getRaw(metricName); | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder) | ||
{ | ||
final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper()); | ||
columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column)); | ||
} | ||
|
||
@Override | ||
public ObjectStrategy getObjectStrategy() | ||
{ | ||
return new ObjectStrategy<SerializablePairLongString>() | ||
{ | ||
@Override | ||
public int compare(@Nullable SerializablePairLongString o1, @Nullable SerializablePairLongString o2) | ||
{ | ||
return StringFirstAggregatorFactory.VALUE_COMPARATOR.compare(o1, o2); | ||
} | ||
|
||
@Override | ||
public Class<? extends SerializablePairLongString> getClazz() | ||
{ | ||
return SerializablePairLongString.class; | ||
} | ||
|
||
@Override | ||
public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int numBytes) | ||
{ | ||
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); | ||
|
||
long lhs = readOnlyBuffer.getLong(); | ||
int stringSize = readOnlyBuffer.getInt(); | ||
|
||
String lastString = null; | ||
if (stringSize > 0) { | ||
byte[] stringBytes = new byte[stringSize]; | ||
readOnlyBuffer.get(stringBytes, 0, stringSize); | ||
lastString = StringUtils.fromUtf8(stringBytes); | ||
} | ||
|
||
return new SerializablePairLongString(lhs, lastString); | ||
} | ||
|
||
@Override | ||
public byte[] toBytes(SerializablePairLongString val) | ||
{ | ||
String rhsString = val.rhs; | ||
ByteBuffer bbuf; | ||
|
||
if (rhsString != null) { | ||
byte[] rhsBytes = StringUtils.toUtf8(rhsString); | ||
bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + rhsBytes.length); | ||
bbuf.putLong(val.lhs); | ||
bbuf.putInt(Long.BYTES, rhsBytes.length); | ||
bbuf.position(Long.BYTES + Integer.BYTES); | ||
bbuf.put(rhsBytes); | ||
} else { | ||
bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES); | ||
bbuf.putLong(val.lhs); | ||
bbuf.putInt(Long.BYTES, 0); | ||
} | ||
|
||
return bbuf.array(); | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) | ||
{ | ||
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
60 changes: 60 additions & 0 deletions
60
processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.first; | ||
|
||
import io.druid.query.aggregation.ObjectAggregateCombiner; | ||
import io.druid.segment.ColumnValueSelector; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
public class StringFirstAggregateCombiner extends ObjectAggregateCombiner<String> | ||
{ | ||
private String firstString; | ||
private boolean isReset = false; | ||
|
||
@Override | ||
public void reset(ColumnValueSelector selector) | ||
{ | ||
firstString = (String) selector.getObject(); | ||
isReset = true; | ||
} | ||
|
||
@Override | ||
public void fold(ColumnValueSelector selector) | ||
{ | ||
if (!isReset) { | ||
firstString = (String) selector.getObject(); | ||
isReset = true; | ||
} | ||
} | ||
|
||
@Nullable | ||
@Override | ||
public String getObject() | ||
{ | ||
return firstString; | ||
} | ||
|
||
@Override | ||
public Class<String> classOfObject() | ||
{ | ||
return String.class; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check #5789 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add some unit tests:
https://github.com/wizzie-io/druid/blob/9829263e7344b551904ec9eba5aab732d121e311/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java#L151
https://github.com/wizzie-io/druid/blob/9829263e7344b551904ec9eba5aab732d121e311/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java#L151