Skip to content

Commit

Permalink
[SPARK-23103][CORE] Ensure correct sort order for negative values in …
Browse files Browse the repository at this point in the history
…LevelDB.

The code was sorting "0" as "less than" negative values, which is a little
wrong. Fix is simple, most of the changes are the added test and related
cleanup.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #20284 from vanzin/SPARK-23103.

(cherry picked from commit aa3a127)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
  • Loading branch information
Marcelo Vanzin authored and squito committed Jan 19, 2018
1 parent 4b79514 commit d0cb198
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ byte[] toKey(Object value, byte prefix) {
byte[] key = new byte[bytes * 2 + 2];
long longValue = ((Number) value).longValue();
key[0] = prefix;
key[1] = longValue > 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
key[1] = longValue >= 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;

for (int i = 0; i < key.length - 2; i++) {
int masked = (int) ((longValue >>> (4 * i)) & 0xF);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ default BaseComparator reverse() {
private static final BaseComparator NATURAL_ORDER = (t1, t2) -> t1.key.compareTo(t2.key);
private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> t1.id.compareTo(t2.id);
private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> t1.name.compareTo(t2.name);
private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> t1.num - t2.num;
private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> {
return Integer.valueOf(t1.num).compareTo(t2.num);
};
private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> t1.child.compareTo(t2.child);

/**
Expand Down Expand Up @@ -112,7 +114,8 @@ public void setup() throws Exception {
t.key = "key" + i;
t.id = "id" + i;
t.name = "name" + RND.nextInt(MAX_ENTRIES);
t.num = RND.nextInt(MAX_ENTRIES);
// Force one item to have an integer value of zero to test the fix for SPARK-23103.
t.num = (i != 0) ? (int) RND.nextLong() : 0;
t.child = "child" + (i % MIN_ENTRIES);
allEntries.add(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.commons.io.FileUtils;
import org.iq80.leveldb.DBIterator;
Expand Down Expand Up @@ -74,11 +76,7 @@ public void testReopenAndVersionCheckDb() throws Exception {

@Test
public void testObjectWriteReadDelete() throws Exception {
CustomType1 t = new CustomType1();
t.key = "key";
t.id = "id";
t.name = "name";
t.child = "child";
CustomType1 t = createCustomType1(1);

try {
db.read(CustomType1.class, t.key);
Expand Down Expand Up @@ -106,17 +104,9 @@ public void testObjectWriteReadDelete() throws Exception {

@Test
public void testMultipleObjectWriteReadDelete() throws Exception {
CustomType1 t1 = new CustomType1();
t1.key = "key1";
t1.id = "id";
t1.name = "name1";
t1.child = "child1";

CustomType1 t2 = new CustomType1();
t2.key = "key2";
t2.id = "id";
t2.name = "name2";
t2.child = "child2";
CustomType1 t1 = createCustomType1(1);
CustomType1 t2 = createCustomType1(2);
t2.id = t1.id;

db.write(t1);
db.write(t2);
Expand All @@ -142,11 +132,7 @@ public void testMultipleObjectWriteReadDelete() throws Exception {

@Test
public void testMultipleTypesWriteReadDelete() throws Exception {
CustomType1 t1 = new CustomType1();
t1.key = "1";
t1.id = "id";
t1.name = "name1";
t1.child = "child1";
CustomType1 t1 = createCustomType1(1);

IntKeyType t2 = new IntKeyType();
t2.key = 2;
Expand Down Expand Up @@ -188,10 +174,7 @@ public void testMultipleTypesWriteReadDelete() throws Exception {
public void testMetadata() throws Exception {
assertNull(db.getMetadata(CustomType1.class));

CustomType1 t = new CustomType1();
t.id = "id";
t.name = "name";
t.child = "child";
CustomType1 t = createCustomType1(1);

db.setMetadata(t);
assertEquals(t, db.getMetadata(CustomType1.class));
Expand All @@ -202,11 +185,7 @@ public void testMetadata() throws Exception {

@Test
public void testUpdate() throws Exception {
CustomType1 t = new CustomType1();
t.key = "key";
t.id = "id";
t.name = "name";
t.child = "child";
CustomType1 t = createCustomType1(1);

db.write(t);

Expand All @@ -222,13 +201,7 @@ public void testUpdate() throws Exception {
@Test
public void testSkip() throws Exception {
for (int i = 0; i < 10; i++) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
t.id = "id" + i;
t.name = "name" + i;
t.child = "child" + i;

db.write(t);
db.write(createCustomType1(i));
}

KVStoreIterator<CustomType1> it = db.view(CustomType1.class).closeableIterator();
Expand All @@ -240,6 +213,36 @@ public void testSkip() throws Exception {
assertFalse(it.hasNext());
}

@Test
public void testNegativeIndexValues() throws Exception {
List<Integer> expected = Arrays.asList(-100, -50, 0, 50, 100);

expected.stream().forEach(i -> {
try {
db.write(createCustomType1(i));
} catch (Exception e) {
throw new RuntimeException(e);
}
});

List<Integer> results = StreamSupport
.stream(db.view(CustomType1.class).index("int").spliterator(), false)
.map(e -> e.num)
.collect(Collectors.toList());

assertEquals(expected, results);
}

private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
t.id = "id" + i;
t.name = "name" + i;
t.num = i;
t.child = "child" + i;
return t;
}

private int countKeys(Class<?> type) throws Exception {
byte[] prefix = db.getTypeInfo(type).keyPrefix();
int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,15 +894,19 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val dropped = stages.drop(1).head

// Cache some quantiles by calling AppStatusStore.taskSummary(). For quantiles to be
// calculcated, we need at least one finished task.
// calculated, we need at least one finished task. The code in AppStatusStore uses
// `executorRunTime` to detect valid tasks, so that metric needs to be updated in the
// task end event.
time += 1
val task = createTasks(1, Array("1")).head
listener.onTaskStart(SparkListenerTaskStart(dropped.stageId, dropped.attemptId, task))

time += 1
task.markFinished(TaskState.FINISHED, time)
val metrics = TaskMetrics.empty
metrics.setExecutorRunTime(42L)
listener.onTaskEnd(SparkListenerTaskEnd(dropped.stageId, dropped.attemptId,
"taskType", Success, task, null))
"taskType", Success, task, metrics))

new AppStatusStore(store)
.taskSummary(dropped.stageId, dropped.attemptId, Array(0.25d, 0.50d, 0.75d))
Expand Down

0 comments on commit d0cb198

Please sign in to comment.