-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
expression aggregator #11104
expression aggregator #11104
Changes from 14 commits
151824b
7007ac4
157ddae
8167add
fe0c46f
c354453
10ef3b9
aa838bb
7188222
aa2f60d
d9bde55
0afee3f
794b5c9
55c8d1e
47bd743
362e5ec
183eab8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,15 +23,308 @@ | |
import org.apache.druid.common.config.NullHandling; | ||
import org.apache.druid.common.guava.GuavaUtils; | ||
import org.apache.druid.java.util.common.IAE; | ||
import org.apache.druid.java.util.common.ISE; | ||
import org.apache.druid.java.util.common.StringUtils; | ||
import org.apache.druid.java.util.common.UOE; | ||
|
||
import javax.annotation.Nullable; | ||
import java.nio.ByteBuffer; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
|
||
/** | ||
* Generic result holder for evaluated {@link Expr} containing the value and {@link ExprType} of the value to allow | ||
*/ | ||
public abstract class ExprEval<T> | ||
{ | ||
/** | ||
* Deserialize an expression stored in a bytebuffer, e.g. for an agg. | ||
* | ||
* This should be refactored to be consolidated with some of the standard type handling of aggregators probably | ||
*/ | ||
public static ExprEval deserialize(ByteBuffer buffer, int position) | ||
{ | ||
ExprType type = ExprType.fromByte(buffer.get(position)); | ||
int offset = position + 1; | ||
switch (type) { | ||
case LONG: | ||
if (buffer.get(offset++) == NullHandling.IS_NOT_NULL_BYTE) { | ||
return of(buffer.getLong(offset)); | ||
} | ||
return ofLong(null); | ||
case DOUBLE: | ||
if (buffer.get(offset++) == NullHandling.IS_NOT_NULL_BYTE) { | ||
return of(buffer.getDouble(offset)); | ||
} | ||
return ofDouble(null); | ||
case STRING: | ||
final int length = buffer.getInt(offset); | ||
if (length < 0) { | ||
return of(null); | ||
} | ||
final byte[] stringBytes = new byte[length]; | ||
final int oldPosition = buffer.position(); | ||
buffer.position(offset + Integer.BYTES); | ||
buffer.get(stringBytes, 0, length); | ||
buffer.position(oldPosition); | ||
return of(StringUtils.fromUtf8(stringBytes)); | ||
case LONG_ARRAY: | ||
final int longArrayLength = buffer.getInt(offset); | ||
offset += Integer.BYTES; | ||
if (longArrayLength < 0) { | ||
return ofLongArray(null); | ||
} | ||
final Long[] longs = new Long[longArrayLength]; | ||
for (int i = 0; i < longArrayLength; i++) { | ||
final byte isNull = buffer.get(offset); | ||
offset += Byte.BYTES; | ||
if (isNull == NullHandling.IS_NOT_NULL_BYTE) { | ||
longs[i] = buffer.getLong(offset); | ||
offset += Long.BYTES; | ||
} else { | ||
longs[i] = null; | ||
} | ||
} | ||
return ofLongArray(longs); | ||
case DOUBLE_ARRAY: | ||
final int doubleArrayLength = buffer.getInt(offset); | ||
offset += Integer.BYTES; | ||
if (doubleArrayLength < 0) { | ||
return ofDoubleArray(null); | ||
} | ||
final Double[] doubles = new Double[doubleArrayLength]; | ||
for (int i = 0; i < doubleArrayLength; i++) { | ||
final byte isNull = buffer.get(offset); | ||
offset += Byte.BYTES; | ||
if (isNull == NullHandling.IS_NOT_NULL_BYTE) { | ||
doubles[i] = buffer.getDouble(offset); | ||
offset += Double.BYTES; | ||
} else { | ||
doubles[i] = null; | ||
} | ||
} | ||
return ofDoubleArray(doubles); | ||
case STRING_ARRAY: | ||
final int stringArrayLength = buffer.getInt(offset); | ||
offset += Integer.BYTES; | ||
if (stringArrayLength < 0) { | ||
return ofStringArray(null); | ||
} | ||
final String[] stringArray = new String[stringArrayLength]; | ||
for (int i = 0; i < stringArrayLength; i++) { | ||
final int stringElementLength = buffer.getInt(offset); | ||
offset += Integer.BYTES; | ||
if (stringElementLength < 0) { | ||
stringArray[i] = null; | ||
} else { | ||
final byte[] stringElementBytes = new byte[stringElementLength]; | ||
final int oldPosition2 = buffer.position(); | ||
buffer.position(offset); | ||
buffer.get(stringElementBytes, 0, stringElementLength); | ||
buffer.position(oldPosition2); | ||
stringArray[i] = StringUtils.fromUtf8(stringElementBytes); | ||
offset += stringElementLength; | ||
} | ||
} | ||
return ofStringArray(stringArray); | ||
|
||
default: | ||
throw new UOE("how can this be?"); | ||
} | ||
} | ||
|
||
/** | ||
* Write an expression result to a bytebuffer | ||
* | ||
* This should be refactored to be consolidated with some of the standard type handling of aggregators probably | ||
*/ | ||
public static void serialize(ByteBuffer buffer, int position, ExprEval<?> eval, int maxSizeBytes) | ||
{ | ||
int offset = position; | ||
buffer.put(offset++, eval.type().getId()); | ||
switch (eval.type()) { | ||
case LONG: | ||
if (eval.isNumericNull()) { | ||
buffer.put(offset, NullHandling.IS_NULL_BYTE); | ||
} else { | ||
buffer.put(offset++, NullHandling.IS_NOT_NULL_BYTE); | ||
buffer.putLong(offset, eval.asLong()); | ||
} | ||
break; | ||
case DOUBLE: | ||
if (eval.isNumericNull()) { | ||
buffer.put(offset, NullHandling.IS_NULL_BYTE); | ||
} else { | ||
buffer.put(offset++, NullHandling.IS_NOT_NULL_BYTE); | ||
buffer.putDouble(offset, eval.asDouble()); | ||
} | ||
break; | ||
case STRING: | ||
final byte[] stringBytes = StringUtils.toUtf8Nullable(eval.asString()); | ||
if (stringBytes != null) { | ||
checkMaxBytes(eval.type(), 1 + Integer.BYTES + stringBytes.length, maxSizeBytes); | ||
buffer.putInt(offset, stringBytes.length); | ||
offset += Integer.BYTES; | ||
for (byte stringByte : stringBytes) { | ||
buffer.put(offset++, stringByte); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, why not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oops, I meant to switch to doing what you suggest here.. originally I was doing all cases manually but then started to switch some over to the bulk methods but I guess didn't finish all the way |
||
} | ||
} else { | ||
checkMaxBytes(eval.type(), 1 + Integer.BYTES, maxSizeBytes); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I missed that the extra byte is for the expr type. It would be better if it is more obvious. |
||
buffer.putInt(offset, -1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest adding a static variable for |
||
} | ||
break; | ||
case LONG_ARRAY: | ||
Long[] longs = eval.asLongArray(); | ||
|
||
if (longs == null) { | ||
checkMaxBytes(eval.type(), 1 + Integer.BYTES, maxSizeBytes); | ||
buffer.putInt(offset, -1); | ||
} else { | ||
final int sizeBytes = 1 + Integer.BYTES + (Long.BYTES * longs.length); | ||
checkMaxBytes(eval.type(), sizeBytes, maxSizeBytes); | ||
buffer.putInt(offset, longs.length); | ||
offset += Integer.BYTES; | ||
for (Long aLong : longs) { | ||
if (aLong != null) { | ||
buffer.put(offset, NullHandling.IS_NOT_NULL_BYTE); | ||
offset++; | ||
buffer.putLong(offset, aLong); | ||
offset += Long.BYTES; | ||
} else { | ||
buffer.put(offset++, NullHandling.IS_NULL_BYTE); | ||
} | ||
} | ||
} | ||
break; | ||
case DOUBLE_ARRAY: | ||
Double[] doubles = eval.asDoubleArray(); | ||
if (doubles == null) { | ||
checkMaxBytes(eval.type(), 1 + Integer.BYTES, maxSizeBytes); | ||
buffer.putInt(offset, -1); | ||
} else { | ||
final int sizeBytes = 1 + Integer.BYTES + (Double.BYTES * doubles.length); | ||
checkMaxBytes(eval.type(), sizeBytes, maxSizeBytes); | ||
buffer.putInt(offset, doubles.length); | ||
offset += Integer.BYTES; | ||
|
||
for (Double aDouble : doubles) { | ||
if (aDouble != null) { | ||
buffer.put(offset, NullHandling.IS_NOT_NULL_BYTE); | ||
offset++; | ||
buffer.putDouble(offset, aDouble); | ||
offset += Long.BYTES; | ||
} else { | ||
buffer.put(offset++, NullHandling.IS_NULL_BYTE); | ||
} | ||
} | ||
} | ||
break; | ||
case STRING_ARRAY: | ||
String[] strings = eval.asStringArray(); | ||
if (strings == null) { | ||
checkMaxBytes(eval.type(), 1 + Integer.BYTES, maxSizeBytes); | ||
buffer.putInt(offset, -1); | ||
} else { | ||
buffer.putInt(offset, strings.length); | ||
offset += Integer.BYTES; | ||
int sizeBytes = 1 + Integer.BYTES; | ||
for (String string : strings) { | ||
if (string == null) { | ||
sizeBytes += Integer.BYTES; | ||
checkMaxBytes(eval.type(), sizeBytes, maxSizeBytes); | ||
buffer.putInt(offset, -1); | ||
offset += Integer.BYTES; | ||
} else { | ||
final byte[] stringElementBytes = StringUtils.toUtf8(string); | ||
sizeBytes += Integer.BYTES + stringElementBytes.length; | ||
checkMaxBytes(eval.type(), sizeBytes, maxSizeBytes); | ||
buffer.putInt(offset, stringElementBytes.length); | ||
offset += Integer.BYTES; | ||
final int oldPosition = buffer.position(); | ||
buffer.position(offset); | ||
buffer.put(stringElementBytes, 0, stringElementBytes.length); | ||
buffer.position(oldPosition); | ||
offset += stringElementBytes.length; | ||
} | ||
} | ||
} | ||
break; | ||
default: | ||
throw new UOE("how can this be?"); | ||
} | ||
} | ||
|
||
private static void checkMaxBytes(ExprType type, int sizeBytes, int maxSizeBytes) | ||
{ | ||
if (sizeBytes > maxSizeBytes) { | ||
throw new ISE("Unable to serialize [%s], size [%s] is larger than max [%s]", type, sizeBytes, maxSizeBytes); | ||
} | ||
} | ||
|
||
/** | ||
* Selectors are not consistent in treatment of null, [], and [null], so coerce [] to [null] | ||
*/ | ||
public static Object coerceListToArray(@Nullable List<?> val) | ||
{ | ||
if (val != null && val.size() > 0) { | ||
Class coercedType = null; | ||
|
||
for (Object elem : val) { | ||
if (elem != null) { | ||
coercedType = convertType(coercedType, elem.getClass()); | ||
} | ||
} | ||
|
||
if (coercedType == Long.class || coercedType == Integer.class) { | ||
return val.stream().map(x -> x != null ? ((Number) x).longValue() : null).toArray(Long[]::new); | ||
} | ||
if (coercedType == Float.class || coercedType == Double.class) { | ||
return val.stream().map(x -> x != null ? ((Number) x).doubleValue() : null).toArray(Double[]::new); | ||
} | ||
// default to string | ||
return val.stream().map(x -> x != null ? x.toString() : null).toArray(String[]::new); | ||
} | ||
return new String[]{null}; | ||
} | ||
|
||
private static Class convertType(@Nullable Class existing, Class next) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please add some javadoc? |
||
{ | ||
if (Number.class.isAssignableFrom(next) || next == String.class) { | ||
if (existing == null) { | ||
return next; | ||
} | ||
// string wins everything | ||
if (existing == String.class) { | ||
return existing; | ||
} | ||
if (next == String.class) { | ||
return next; | ||
} | ||
// all numbers win over Integer | ||
if (existing == Integer.class) { | ||
return next; | ||
} | ||
if (existing == Float.class) { | ||
// doubles win over floats | ||
if (next == Double.class) { | ||
return next; | ||
} | ||
return existing; | ||
} | ||
if (existing == Long.class) { | ||
if (next == Integer.class) { | ||
// long beats int | ||
return existing; | ||
} | ||
// double and float win over longs | ||
return next; | ||
} | ||
// otherwise double | ||
return Double.class; | ||
} | ||
throw new UOE("Invalid array expression type: %s", next); | ||
} | ||
|
||
public static ExprEval ofLong(@Nullable Number longValue) | ||
{ | ||
return new LongExprEval(longValue); | ||
|
@@ -118,6 +411,10 @@ public static ExprEval bestEffortOf(@Nullable Object val) | |
return new StringArrayExprEval((String[]) val); | ||
} | ||
|
||
if (val instanceof List) { | ||
return bestEffortOf(coerceListToArray((List<?>) val)); | ||
} | ||
|
||
return new StringExprEval(val == null ? null : String.valueOf(val)); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems reasonable to me to ignore
maxSizeBytes
for primitive types, but please document it at least in the javadoc.