Skip to content
Permalink
Browse files
Add store/sort module (#37)
* add hgkvFile/hgkvDir module
* add EntriesUtil.java
* add SorterTest.java
* move file verification to HgkvDir
* rebuild HgkvFile readFooter and Fix HgkvDir#open
* add sort module
* implement InnerSortFlusher and OuterSortFlusher
* add KvEntry#numSubEntries
  • Loading branch information
corgiboygsj committed May 31, 2021
1 parent bfb8826 commit df5c2bb588549aa153be83157414d896f30638cd
Showing 88 changed files with 6,296 additions and 185 deletions.
@@ -43,6 +43,10 @@ public static int hashBytes(byte[] bytes, int offset, int length) {
return hash;
}

public static int compare(byte[] bytes1, byte[] bytes2) {
return compare(bytes1, bytes1.length, bytes2, bytes2.length);
}

public static int compare(byte[] bytes1, int length1,
byte[] bytes2, int length2) {
return compare(bytes1, 0, length1, bytes2, 0, length2);
@@ -147,6 +147,14 @@ public static synchronized ComputerOptions instance() {
false
);

public static final ConfigOption<Integer> INPUT_MAX_EDGES_IN_ONE_VERTEX =
new ConfigOption<>(
"input.max_edges_in_one_vertex",
"The number of edges of a vertex in kvEntry.",
positiveInt(),
500
);

public static final ConfigOption<Integer> VERTEX_AVERAGE_DEGREE =
new ConfigOption<>(
"computer.vertex_average_degree",
@@ -548,4 +556,37 @@ public static synchronized ComputerOptions instance() {
positiveInt(),
90
);

public static final ConfigOption<Long> HGKV_MAX_FILE_SIZE =
new ConfigOption<>(
"hgkv.max_file_size",
"The max number of bytes in each hgkv-file.",
positiveInt(),
Bytes.GB * 4
);

public static final ConfigOption<Long> HGKV_DATABLOCK_SIZE =
new ConfigOption<>(
"hgkv.max_data_block_size",
"The max byte size of hgkv-file data block.",
positiveInt(),
Bytes.KB * 64
);

public static final ConfigOption<Integer> HGKV_MERGE_FILES_NUM =
new ConfigOption<>(
"hgkv.max_merge_files",
"The max number of files to merge at one time.",
positiveInt(),
200
);

public static final ConfigOption<String> HGKV_TEMP_DIR =
new ConfigOption<>(
"hgkv.temp_file_dir",
"This folder is used to store temporary files, temporary " +
"files will be generated during the file merging process.",
disallowEmpty(),
"/tmp/hgkv"
);
}
@@ -26,6 +26,7 @@

import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.util.BytesUtil;
import com.baidu.hugegraph.exception.NotSupportException;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.util.E;

@@ -77,6 +78,9 @@ public void readFully(byte[] b, int off, int len) throws IOException {

@Override
public void seek(long position) throws IOException {
if (position == this.position()) {
return;
}
long bufferStart = this.fileOffset - this.limit();
if (position >= bufferStart && position < this.fileOffset) {
super.seek(position - bufferStart);
@@ -168,17 +172,45 @@ public BufferedFileInput duplicate() throws IOException {
@Override
public int compare(long offset, long length, RandomAccessInput other,
long otherOffset, long otherLength) throws IOException {
long position = this.position();
this.seek(offset);
byte[] bytes1 = this.readBytes((int) length);
this.seek(position);

long otherPosition = other.position();
other.seek(otherOffset);
byte[] bytes2 = other.readBytes((int) otherLength);
other.seek(otherPosition);
assert other != null;
if (other.getClass() != BufferedFileInput.class) {
throw new NotSupportException("BufferedFileInput must be compare " +
"with BufferedFileInput");
}

return BytesUtil.compare(bytes1, 0, (int) length,
bytes2, 0, (int) otherLength);
BufferedFileInput otherInput = (BufferedFileInput) other;
if ((offset + length) <= this.limit()) {
if ((otherOffset + otherLength) <= otherInput.limit()) {
return BytesUtil.compare(this.buffer(), (int) offset,
(int) length, otherInput.buffer(),
(int) otherOffset, (int) otherLength);
} else {
long otherOldPosition = other.position();
other.seek(otherOffset);
byte[] bytes = other.readBytes((int) otherLength);
other.seek(otherOldPosition);
return BytesUtil.compare(this.buffer(), (int) offset,
(int) length, bytes, 0,
bytes.length);
}
} else {
long oldPosition = this.position();
this.seek(offset);
byte[] bytes1 = this.readBytes((int) length);
this.seek(oldPosition);

if ((otherOffset + otherLength) <= otherInput.limit()) {
return BytesUtil.compare(bytes1, 0, bytes1.length,
otherInput.buffer(), (int) otherOffset,
(int) otherLength);
} else {
long otherOldPosition = other.position();
other.seek(otherOffset);
byte[] bytes2 = other.readBytes((int) otherLength);
other.seek(otherOldPosition);
return BytesUtil.compare(bytes1, 0, bytes1.length,
bytes2, 0, bytes2.length);
}
}
}
}
@@ -28,7 +28,6 @@
import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.util.CoderUtil;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.util.E;

import sun.misc.Unsafe;
@@ -213,7 +212,7 @@ public long skip(long bytesToSkip) throws IOException {
public void write(RandomAccessInput input, long offset, long length)
throws IOException {
if (UnsafeBytesInput.class == input.getClass()) {
byte[] buffer = Whitebox.getInternalState(input, "buffer");
byte[] buffer = ((UnsafeBytesInput) input).buffer();
this.write(buffer, (int) offset, (int) length);
} else {
input.seek(offset);
@@ -0,0 +1,112 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.sort;

import java.io.IOException;
import java.util.List;

import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.sort.flusher.InnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.OuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry;

public interface Sorter {

/**
* Sort the buffer by increasing order of key. Every key exists only once
* in output buffer.
* The input buffer format:
* | key1 length | key1 | value1 length | value1 |
* | key2 length | key2 | value2 length | value2 |
* | key1 length | key1 | value3 length | value3 |
* and so on.
* If some key exists several time, combine the values.
* @param input The input buffer.
* @param flusher The flusher for the same key.
*/
void sortBuffer(RandomAccessInput input, InnerSortFlusher flusher)
throws Exception;

/**
* Merge the buffers by increasing order of key.
* The input buffers in list are in increasing order of the key.
* There are two formats for the input buffer:
* 1.
* | key1 length | key1 | value1 length | value1 |
* | key2 length | key2 | value2 length | value2 |
* and so on.
* Keys are in increasing order in each buffer.
* 2.
* | key1 length | key1 | value1 length | sub-entry count |
* | sub-key1 length | sub-key1 | sub-value1 length | sub-value1 |
* | sub-key2 length | sub-key2 | sub-value2 length | sub-value2 |
* and so on.
* Keys are in increasing order in each buffer.
* Sub-keys are in increasing order in a key value pair.
*
* The results of multiple buffer sorting are outputted to @param output
* @param inputBuffers The input buffer list.
* @param flusher The flusher for the same key.
* @param output Sort result output location.
* @param withSubKv Buffer format 2 is true, Buffer format 1 is false.
*/
void mergeBuffers(List<RandomAccessInput> inputBuffers,
OuterSortFlusher flusher, String output,
boolean withSubKv) throws Exception;

/**
* Merge the n inputs into m outputs.
* 'n' is size of inputs, 'm' is size of outputs.
* The input files in list are in increasing order of the key.
* There are two formats for the input buffer:
* 1.
* | key1 length | key1 | value1 length | value1 |
* | key2 length | key2 | value2 length | value2 |
* and so on.
* Keys are in increasing order in each buffer.
* 2.
* | key1 length | key1 | value1 length | sub-entry count |
* | sub-key1 length | sub-key1 | sub-value1 length | sub-value1 |
* | sub-key2 length | sub-key2 | sub-value2 length | sub-value2 |
* and so on.
* Sub-keys are in increasing order in a key value pair.
*
* The format of outputs is same as inputs.
* For example number of the inputs is 100, and number of the outputs is
* 10, this method merge 100 inputs into 10 outputs.
* The outputs need to be as evenly distributed as possible. It might
* need to sort the inputs by desc order. Then select the inputs one by
* one assign to the output with least inputs. It makes the difference
* between the outputs below the least inputs.
* @param inputs The input file list.
* @param flusher The flusher for the same key.
* @param outputs Sort result output locations.
* @param withSubKv Buffer format 2 is true, Buffer format 1 is false.
*/
void mergeInputs(List<String> inputs, OuterSortFlusher flusher,
List<String> outputs, boolean withSubKv) throws Exception;

/**
* Get the iterator of <key, value> pair by increasing order of key.
*/
PeekableIterator<KvEntry> iterator(List<String> inputs, boolean withSubKv)
throws IOException;
}

0 comments on commit df5c2bb

Please sign in to comment.