Skip to content

Commit

Permalink
Merge pull request #2765 from navis/invalid-encode-nullstring
Browse files Browse the repository at this point in the history
Null string is encoded as "null" in incremental index
  • Loading branch information
gianm committed Apr 1, 2016
2 parents abc53f5 + f0e55f5 commit 23d66e5
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
Expand All @@ -38,7 +39,6 @@
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.NewSpatialDimensionSchema;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
Expand Down Expand Up @@ -101,7 +101,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
@Override
public String apply(final Object o)
{
return String.valueOf(o);
return o == null ? null : String.valueOf(o);
}
};

Expand All @@ -114,8 +114,9 @@ public Long apply(final Object o)
return null;
}
if (o instanceof String) {
String s = (String) o;
try {
return Long.valueOf((String) o);
return s.isEmpty() ? null : Long.valueOf(s);
}
catch (NumberFormatException nfe) {
throw new ParseException(nfe, "Unable to parse value[%s] as long in column: ", o);
Expand All @@ -137,8 +138,9 @@ public Float apply(final Object o)
return null;
}
if (o instanceof String) {
String s = (String) o;
try {
return Float.valueOf((String) o);
return s.isEmpty() ? null : Float.valueOf(s);
}
catch (NumberFormatException nfe) {
throw new ParseException(nfe, "Unable to parse value[%s] as float in column: ", o);
Expand Down Expand Up @@ -355,6 +357,9 @@ public int lookupId(String name)
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final List<DimDim> dimValues;

// looks need a configuration
private final Ordering<Comparable> ordering = Ordering.natural().nullsFirst();

private final AtomicInteger numEntries = new AtomicInteger();

// This is modified on add() in a critical section.
Expand Down Expand Up @@ -714,7 +719,7 @@ private int[] getDimVals(final DimDim dimLookup, final List<Comparable> dimValue
}

Comparable[] dimArray = dimValues.toArray(new Comparable[dimValues.size()]);
Arrays.sort(dimArray);
Arrays.sort(dimArray, ordering);

final int[] retVal = new int[dimArray.length];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@
import com.metamx.common.ISE;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.FloatDimensionSchema;
import io.druid.data.input.impl.LongDimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.CloserRule;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.CloserRule;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -66,6 +72,25 @@ public IncrementalIndexTest(IndexCreator IndexCreator)
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
DimensionsSpec dimensions = new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("string"),
new FloatDimensionSchema("float"),
new LongDimensionSchema("long")
), null, null
);
AggregatorFactory[] metrics = {
new FilteredAggregatorFactory(
new CountAggregatorFactory("cnt"),
new SelectorDimFilter("billy", "A")
)
};
final IncrementalIndexSchema schema = new IncrementalIndexSchema(
0,
QueryGranularity.MINUTE,
dimensions,
metrics
);
return Arrays.asList(
new Object[][]{
{
Expand All @@ -74,20 +99,9 @@ public static Collection<?> constructorFeeder() throws IOException
@Override
public IncrementalIndex createIndex()
{
return new OnheapIncrementalIndex(
0,
QueryGranularity.MINUTE,
new AggregatorFactory[]{
new FilteredAggregatorFactory(
new CountAggregatorFactory("cnt"),
new SelectorDimFilter("billy", "A")
)
},
1000
);
return new OnheapIncrementalIndex(schema, true, 1000);
}
}

},
{
new IndexCreator()
Expand All @@ -96,16 +110,7 @@ public IncrementalIndex createIndex()
public IncrementalIndex createIndex()
{
return new OffheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{
new FilteredAggregatorFactory(
new CountAggregatorFactory("cnt"),
new SelectorDimFilter("billy", "A")
)
},
1000000,
new StupidPool<ByteBuffer>(
schema, true, true, true, 1000000, new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
Expand Down Expand Up @@ -144,7 +149,7 @@ public void testDuplicateDimensions() throws IndexSizeExceededException
}

@Test(expected = ISE.class)
public void testDuplicateDimensionsFirstOccurance() throws IndexSizeExceededException
public void testDuplicateDimensionsFirstOccurrence() throws IndexSizeExceededException
{
IncrementalIndex index = closer.closeLater(indexCreator.createIndex());
index.add(
Expand Down Expand Up @@ -182,4 +187,26 @@ public void controlTest() throws IndexSizeExceededException
)
);
}

@Test
public void testNullDimensionTransform() throws IndexSizeExceededException
{
IncrementalIndex<?> index = closer.closeLater(indexCreator.createIndex());
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),
Lists.newArrayList("string", "float", "long"),
ImmutableMap.<String, Object>of(
"string", Arrays.asList("A", null, ""),
"float", Arrays.asList(Float.MAX_VALUE, null, ""),
"long", Arrays.asList(Long.MIN_VALUE, null, ""))
)
);

Row row = index.iterator().next();

Assert.assertArrayEquals(new String[]{"", "", "A"}, (Object[]) row.getRaw("string"));
Assert.assertArrayEquals(new Float[]{null, null, Float.MAX_VALUE}, (Object[]) row.getRaw("float"));
Assert.assertArrayEquals(new Long[]{null, null, Long.MIN_VALUE}, (Object[]) row.getRaw("long"));
}
}

0 comments on commit 23d66e5

Please sign in to comment.