Skip to content
Permalink
Browse files
create Heap/LoserTree (#26)
* add InputsSorting interface and implementation
  • Loading branch information
corgiboygsj committed Apr 7, 2021
1 parent fb4c829 commit acd3434d7b0e6078e8fd09c54448a6578992f1eb
Show file tree
Hide file tree
Showing 9 changed files with 569 additions and 1 deletion.
@@ -0,0 +1,49 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.sort.sorting;

import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;

import com.baidu.hugegraph.util.E;

public abstract class AbstractInputsSorting<T> implements InputsSorting<T> {

protected final Iterator<T>[] sources;
protected final Comparator<? super T> comparator;

@SuppressWarnings("unchecked")
public AbstractInputsSorting(Collection<? extends Iterator<T>> sources,
Comparator<? super T> comparator) {
E.checkNotEmpty(sources, "sources");

this.sources = sources.toArray(new Iterator[0]);
this.comparator = comparator;
}

protected final int compare(T t1, T t2) {
@SuppressWarnings("unchecked")
int result = this.comparator != null ?
this.comparator.compare(t1, t2) :
((Comparable<? super T>) t1).compareTo(t2);
return result;
}
}
@@ -0,0 +1,132 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.sort.sorting;

import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NoSuchElementException;

public final class HeapInputsSorting<T> extends AbstractInputsSorting<T> {

private final Object[] data;
private int size;

public HeapInputsSorting(Collection<? extends Iterator<T>> sources) {
this(sources, null);
}

public HeapInputsSorting(Collection<? extends Iterator<T>> sources,
Comparator<? super T> comparator) {
super(sources, comparator);

this.size = sources.size();
this.data = new Object[this.size];

// Init Heap
this.constructHeap();
}

@Override
public boolean hasNext() {
return !this.isEmpty();
}

@Override
public T next() {
if (this.isEmpty()) {
throw new NoSuchElementException();
}

@SuppressWarnings("unchecked")
T top = (T) this.data[0];
Iterator<T> topSource = this.sources[0];
if (topSource.hasNext()) {
// Current element was removed, fill next element.
this.data[0] = topSource.next();
} else {
this.size--;
// Move the last input to the top when the top input is empty.
if (this.size > 0) {
this.sources[0] = this.sources[this.size];
this.data[0] = this.data[this.size];
}
}

this.adjustHeap(0);

return top;
}

private void constructHeap() {
// Init data array. Skip empty iterator.
for (int i = 0, len = this.sources.length - 1; i <= len; ) {
if (!this.sources[i].hasNext()) {
System.arraycopy(this.sources, i + 1,
this.sources, i, len - i);
this.size--;
len--;
continue;
}
this.data[i] = this.sources[i].next();
i++;
}

// Build Heap
for (int index = (this.size >> 1) - 1; index >= 0; index--) {
this.adjustHeap(index);
}
}

@SuppressWarnings("unchecked")
private void adjustHeap(int parent) {
int child;
while ((child = (parent << 1) + 1) < this.size) {
// Compare left and right child if right child exist.
if (child < this.size - 1 &&
this.compare((T) this.data[child],
(T) this.data[child + 1]) > 0) {
child++;
}
if (this.compare((T) this.data[parent], (T) this.data[child]) > 0) {
this.swap(parent, child);
parent = child;
} else {
break;
}
}
}

private void swap(int i, int j) {
// Swap data
Object dataTmp = this.data[i];
this.data[i] = this.data[j];
this.data[j] = dataTmp;

// Swap sources
Iterator<T> sourceTmp = this.sources[i];
this.sources[i] = this.sources[j];
this.sources[j] = sourceTmp;
}

private boolean isEmpty() {
return this.size <= 0;
}
}
@@ -0,0 +1,6 @@
package com.baidu.hugegraph.computer.core.sort.sorting;

import java.util.Iterator;

public interface InputsSorting<T> extends Iterator<T> {
}
@@ -0,0 +1,145 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.sort.sorting;

import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NoSuchElementException;

public final class LoserTreeInputsSorting<T> extends AbstractInputsSorting<T> {

private static final Object INFINITY_LEAF = new Object();

private final Object[] leaves;
private final int size;
private final int[] tree;

public LoserTreeInputsSorting(Collection<? extends Iterator<T>> sources) {
this(sources, null);
}

public LoserTreeInputsSorting(Collection<? extends Iterator<T>> sources,
Comparator<? super T> comparator) {
super(sources, comparator);

this.size = sources.size();
this.leaves = new Object[this.size];
this.tree = new int[this.size];

this.constructLoserTree();
}

@Override
public boolean hasNext() {
return !this.isEmpty();
}

@Override
public T next() {
if (this.isEmpty()) {
throw new NoSuchElementException();
}

int winnerIndex = this.tree[0];

@SuppressWarnings("unchecked")
T winner = (T) this.leaves[winnerIndex];

this.fill(winnerIndex);
this.adjust(winnerIndex);

return winner;
}

private boolean isEmpty() {
return this.leaves[this.tree[0]] == INFINITY_LEAF;
}

private void constructLoserTree() {
// Init leaves
for (int i = 0; i < this.sources.length; i++) {
if (this.sources[i].hasNext()) {
this.leaves[i] = this.sources[i].next();
} else {
this.leaves[i] = INFINITY_LEAF;
}
}

// Init tree to winner
int winner = 0;
for (int i = 0; i < this.size; i++) {
if (this.beat(i, winner)) {
winner = i;
}
}
Arrays.fill(this.tree, winner);

// Init loser tree
for (int i = this.size - 1; i >= 0; i--) {
this.adjust(i);
}
}

private void fill(int index) {
Iterator<T> source = this.sources[index];
if (source.hasNext()) {
this.leaves[index] = source.next();
} else {
this.leaves[index] = INFINITY_LEAF;
}
}

private void adjust(int index) {
int t = (this.size + index) >> 1;
while (t > 0) {
// Save loser
if (this.beat(this.tree[t], index)) {
int temp = this.tree[t];
this.tree[t] = index;
index = temp;
}
t = t >> 1;
}
this.tree[0] = index;
}

/**
* Judge whether index1 can beat index2
* if order by asc. smaller one is winner. bigger one is loser.
*/
private boolean beat(int index1, int index2) {
@SuppressWarnings("unchecked")
T t1 = (T) this.leaves[index1];
@SuppressWarnings("unchecked")
T t2 = (T) this.leaves[index2];

if (t1 == INFINITY_LEAF) {
return false;
}
if (t2 == INFINITY_LEAF) {
return true;
}

int result = this.compare(t1, t2);
return result <= 0;
}
}
@@ -33,6 +33,7 @@
import com.baidu.hugegraph.computer.core.graph.GraphTestSuite;
import com.baidu.hugegraph.computer.core.input.InputTestSuite;
import com.baidu.hugegraph.computer.core.io.IOTestSuite;
import com.baidu.hugegraph.computer.core.sort.SortTestSuite;
import com.baidu.hugegraph.computer.core.worker.WorkerTestSuite;
import com.baidu.hugegraph.config.OptionSpace;
import com.baidu.hugegraph.util.Log;
@@ -48,7 +49,8 @@
IOTestSuite.class,
BspTestSuite.class,
InputTestSuite.class,
WorkerTestSuite.class
WorkerTestSuite.class,
SortTestSuite.class
})
public class UnitTestSuite {

@@ -0,0 +1,32 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.sort;

import org.junit.runner.RunWith;
import org.junit.runners.Suite;

import com.baidu.hugegraph.computer.core.sort.sorting.InputsSortingTest;

@RunWith(Suite.class)
@Suite.SuiteClasses({
InputsSortingTest.class
})
public class SortTestSuite {
}

0 comments on commit acd3434

Please sign in to comment.