-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add stringLast and stringFirst aggregators extension #5789
Changes from 9 commits
a0049fd
09338b9
3658b0c
5975952
adc773b
c2f3672
1c3bd6a
c75c88f
d671922
6945bf9
7dd12e0
217627f
9b68a60
e552f04
48325f3
657e8a3
e8a2ded
4d24159
7eeef86
bbf84b8
b09217c
834662d
482e8bf
046c5a9
9829263
023fc88
e6dee78
1fb6789
42b6ca3
a9f2d61
5a1643f
a9a1069
1e53094
129d594
a5dea44
e7992fa
aa4ff30
1d11d46
33c944f
7ffebe9
acfbed8
ff56f3e
7ff6fc3
b074c5f
21dfdc1
19b55c3
f674212
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
--- | ||
layout: doc_page | ||
--- | ||
|
||
# First/Last String Module | ||
|
||
To use these aggregators, make sure you [include](../../operations/including-extensions.html) the extension in your config file: | ||
|
||
``` | ||
druid.extensions.loadList=["druid-first-last-string"] | ||
``` | ||
|
||
## First String aggregator | ||
|
||
`stringFirst` computes the metric value with the minimum timestamp or `null` if no row exist | ||
|
||
```json | ||
{ | ||
"type" : "stringFirst", | ||
"name" : <output_name>, | ||
"fieldName" : <metric_name>, | ||
"maxStringBytes" : <integer> | ||
} | ||
``` | ||
|
||
## Last String aggregator | ||
|
||
```json | ||
{ | ||
"type" : "stringLast", | ||
"name" : <output_name>, | ||
"fieldName" : <metric_name>, | ||
"maxStringBytes" : <integer> | ||
} | ||
``` | ||
|
||
`stringLast` computes the metric value with the maximum timestamp or `null` if no row exist | ||
|
||
|
||
|
||
Note: The default value of `maxStringBytes` is 1024. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
~ Druid - a distributed column store. | ||
~ Copyright 2012 - 2015 Metamarkets Group Inc. | ||
~ | ||
~ Licensed under the Apache License, Version 2.0 (the "License"); | ||
~ you may not use this file except in compliance with the License. | ||
~ You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software | ||
~ distributed under the License is distributed on an "AS IS" BASIS, | ||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>io.druid.extensions.contrib</groupId> | ||
<artifactId>druid-first-last-string</artifactId> | ||
<name>druid-first-last-string</name> | ||
<description>druid-first-last-string</description> | ||
|
||
<parent> | ||
<groupId>io.druid</groupId> | ||
<artifactId>druid</artifactId> | ||
<version>0.13.0-SNAPSHOT</version> | ||
<relativePath>../../pom.xml</relativePath> | ||
</parent> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>io.druid</groupId> | ||
<artifactId>druid-processing</artifactId> | ||
<version>${project.parent.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.druid</groupId> | ||
<artifactId>druid-sql</artifactId> | ||
<version>${project.parent.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<!-- Tests --> | ||
<dependency> | ||
<groupId>io.druid</groupId> | ||
<artifactId>druid-processing</artifactId> | ||
<version>${project.parent.version}</version> | ||
<scope>test</scope> | ||
<type>test-jar</type> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.druid</groupId> | ||
<artifactId>druid-sql</artifactId> | ||
<version>${project.parent.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.druid</groupId> | ||
<artifactId>druid-server</artifactId> | ||
<version>${project.parent.version}</version> | ||
<scope>test</scope> | ||
<type>test-jar</type> | ||
</dependency> | ||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.easymock</groupId> | ||
<artifactId>easymock</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
</project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation; | ||
|
||
import com.fasterxml.jackson.databind.Module; | ||
import com.fasterxml.jackson.databind.module.SimpleModule; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.inject.Binder; | ||
import io.druid.initialization.DruidModule; | ||
import io.druid.query.aggregation.first.StringFirstAggregatorFactory; | ||
import io.druid.query.aggregation.first.StringFirstFoldingAggregatorFactory; | ||
import io.druid.query.aggregation.last.StringLastAggregatorFactory; | ||
import io.druid.query.aggregation.last.StringLastFoldingAggregatorFactory; | ||
import io.druid.segment.serde.ComplexMetrics; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
*/ | ||
public class FirstLastStringDruidModule implements DruidModule | ||
{ | ||
|
||
@Override | ||
public List<? extends Module> getJacksonModules() | ||
{ | ||
return ImmutableList.of( | ||
new SimpleModule("FirstLastStringModule").registerSubtypes( | ||
StringLastAggregatorFactory.class, | ||
StringLastFoldingAggregatorFactory.class, | ||
StringFirstAggregatorFactory.class, | ||
StringFirstFoldingAggregatorFactory.class | ||
) | ||
); | ||
} | ||
|
||
@Override | ||
public void configure(Binder binder) | ||
{ | ||
if (ComplexMetrics.getSerdeForType("serializablePairLongString") == null) { | ||
ComplexMetrics.registerSerde("serializablePairLongString", new SerializablePairSerde()); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation; | ||
|
||
import io.druid.collections.SerializablePair; | ||
import io.druid.data.input.InputRow; | ||
import io.druid.segment.GenericColumnSerializer; | ||
import io.druid.segment.column.ColumnBuilder; | ||
import io.druid.segment.data.GenericIndexed; | ||
import io.druid.segment.data.ObjectStrategy; | ||
import io.druid.segment.serde.ComplexColumnPartSupplier; | ||
import io.druid.segment.serde.ComplexMetricExtractor; | ||
import io.druid.segment.serde.ComplexMetricSerde; | ||
import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; | ||
import io.druid.segment.writeout.SegmentWriteOutMedium; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.nio.charset.StandardCharsets; | ||
|
||
public class SerializablePairSerde extends ComplexMetricSerde | ||
{ | ||
|
||
public SerializablePairSerde() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary class constructor. |
||
{ | ||
|
||
} | ||
|
||
@Override | ||
public String getTypeName() | ||
{ | ||
return "serializablePairLongString"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we define this string as a static variable in somewhere and use it? |
||
} | ||
|
||
@Override | ||
public ComplexMetricExtractor getExtractor() | ||
{ | ||
return new ComplexMetricExtractor() | ||
{ | ||
@Override | ||
public Class<SerializablePair> extractedClass() | ||
{ | ||
return SerializablePair.class; | ||
} | ||
|
||
@Override | ||
public Object extractValue(InputRow inputRow, String metricName) | ||
{ | ||
return inputRow.getRaw(metricName); | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder) | ||
{ | ||
final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper()); | ||
columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column)); | ||
} | ||
|
||
@Override | ||
public ObjectStrategy getObjectStrategy() | ||
{ | ||
return new ObjectStrategy<SerializablePair>() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please specify type parameters. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will create a new class |
||
{ | ||
@Override | ||
public int compare(SerializablePair o1, SerializablePair o2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please specify type parameters. |
||
{ | ||
Integer comparation = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be primitive. |
||
|
||
if ((Long) o1.lhs > (Long) o2.lhs) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't have to cast if type parameters are set. |
||
comparation = 1; | ||
} else if ((Long) o1.lhs < (Long) o2.lhs) { | ||
comparation = -1; | ||
} | ||
|
||
if (comparation == 0) { | ||
if (o1.rhs != null && o2.rhs != null) { | ||
if (o1.rhs.equals(o2.rhs)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you elaborate more on this? I don't understand the below logic. Should it be if (o1.rhs != null && o2.rhs != null) {
if (o1.rhs.equals(o2.rhs)) {
comparation = 0;
} else {
comparation = -1;
}
} |
||
comparation = 0; | ||
} else { | ||
comparation = -1; | ||
} | ||
} else if (o1.rhs != null) { | ||
comparation = 1; | ||
} else if (o2.rhs != null) { | ||
comparation = -1; | ||
} else { | ||
comparation = 0; | ||
} | ||
} | ||
|
||
return comparation; | ||
} | ||
|
||
@Override | ||
public Class<? extends SerializablePair> getClazz() | ||
{ | ||
return SerializablePair.class; | ||
} | ||
|
||
@Override | ||
public SerializablePair<Long, String> fromByteBuffer(ByteBuffer buffer, int numBytes) | ||
{ | ||
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); | ||
|
||
Long lhs = readOnlyBuffer.getLong(); | ||
Integer stringSize = readOnlyBuffer.getInt(); | ||
|
||
String lastString = null; | ||
if (stringSize > 0) { | ||
byte[] stringBytes = new byte[stringSize]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use |
||
readOnlyBuffer.get(stringBytes, 0, stringSize); | ||
lastString = new String(stringBytes, StandardCharsets.UTF_8); | ||
} | ||
|
||
return new SerializablePair<>(lhs, lastString); | ||
} | ||
|
||
@Override | ||
public byte[] toBytes(SerializablePair val) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please specify type parameters. |
||
{ | ||
String rhsString = (String) val.rhs; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't have to cast if type parameters are set. |
||
ByteBuffer bbuf; | ||
|
||
|
||
if (rhsString != null) { | ||
byte[] rhsBytes = rhsString.getBytes(StandardCharsets.UTF_8); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use |
||
bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + rhsBytes.length); | ||
bbuf.putLong((Long) val.lhs); | ||
bbuf.putInt(Long.BYTES, rhsBytes.length); | ||
bbuf.position(Long.BYTES + Integer.BYTES); | ||
bbuf.put(rhsBytes); | ||
} else { | ||
bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES); | ||
bbuf.putLong((Long) val.lhs); | ||
bbuf.putInt(Long.BYTES, 0); | ||
} | ||
|
||
return bbuf.array(); | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) | ||
{ | ||
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you please add some java doc for this class? It should contain the value types to be serde and where this class is used.