Skip to content
Permalink
Browse files
Fix EntriesUtil#entryFromInput and some improve (#58)
  • Loading branch information
corgiboygsj committed Jun 4, 2021
1 parent 8c2dbec commit f524ec2d777f9eddb1ee890b4bfba7e2a67109a1
Showing 20 changed files with 303 additions and 193 deletions.
@@ -41,9 +41,10 @@ public interface Sorter {
* If some key exists several time, combine the values.
* @param input The input buffer.
* @param flusher The flusher for the same key.
* @param withSubKv True if need sort subKv.
*/
void sortBuffer(RandomAccessInput input, InnerSortFlusher flusher)
throws Exception;
void sortBuffer(RandomAccessInput input, InnerSortFlusher flusher,
boolean withSubKv) throws Exception;

/**
* Merge the buffers by increasing order of key.
@@ -66,7 +67,7 @@ void sortBuffer(RandomAccessInput input, InnerSortFlusher flusher)
* @param inputBuffers The input buffer list.
* @param flusher The flusher for the same key.
* @param output Sort result output location.
* @param withSubKv Buffer format 2 is true, Buffer format 1 is false.
* @param withSubKv True if need sort subKv.
*/
void mergeBuffers(List<RandomAccessInput> inputBuffers,
OuterSortFlusher flusher, String output,
@@ -99,7 +100,7 @@ void mergeBuffers(List<RandomAccessInput> inputBuffers,
* @param inputs The input file list.
* @param flusher The flusher for the same key.
* @param outputs Sort result output locations.
* @param withSubKv Buffer format 2 is true, Buffer format 1 is false.
* @param withSubKv True if need sort subKv.
*/
void mergeInputs(List<String> inputs, OuterSortFlusher flusher,
List<String> outputs, boolean withSubKv) throws Exception;
@@ -42,9 +42,9 @@
import com.baidu.hugegraph.computer.core.store.hgkvfile.file.reader.HgkvDirReaderImpl;
import com.baidu.hugegraph.computer.core.store.hgkvfile.file.reader.HgkvDirReader;
import com.baidu.hugegraph.computer.core.store.hgkvfile.file.reader.HgkvDir4SubKvReaderImpl;
import com.baidu.hugegraph.computer.core.store.hgkvfile.buffer.EntriesInput;
import com.baidu.hugegraph.computer.core.store.hgkvfile.buffer.KvEntriesInput;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry;
import com.baidu.hugegraph.computer.core.store.hgkvfile.buffer.EntriesSubKvInput;
import com.baidu.hugegraph.computer.core.store.hgkvfile.buffer.KvEntriesWithFirstSubKvInput;
import com.baidu.hugegraph.computer.core.store.hgkvfile.buffer.EntryIterator;
import com.baidu.hugegraph.computer.core.store.hgkvfile.file.select.DisperseEvenlySelector;
import com.baidu.hugegraph.computer.core.store.hgkvfile.file.select.InputFilesSelector;
@@ -59,9 +59,9 @@ public SorterImpl(Config config) {
}

@Override
public void sortBuffer(RandomAccessInput input, InnerSortFlusher flusher)
throws Exception {
try (EntryIterator entries = new EntriesInput(input)) {
public void sortBuffer(RandomAccessInput input, InnerSortFlusher flusher,
boolean withSubKv) throws Exception {
try (EntryIterator entries = new KvEntriesInput(input, withSubKv)) {
InputSorter sorter = new JavaInputSorter();
flusher.flush(sorter.sort(entries));
}
@@ -75,11 +75,11 @@ public void mergeBuffers(List<RandomAccessInput> inputs,
try {
if (withSubKv) {
entries = inputs.stream()
.map(EntriesSubKvInput::new)
.map(KvEntriesWithFirstSubKvInput::new)
.collect(Collectors.toList());
} else {
entries = inputs.stream()
.map(EntriesInput::new)
.map(KvEntriesInput::new)
.collect(Collectors.toList());
}
} finally {
@@ -102,7 +102,8 @@ public void flush(EntryIterator entries, HgkvDirBuilder writer)
// Write kvEntry to file.
RandomAccessInput input = EntriesUtil.inputFromOutput(
this.output);
writer.write(EntriesUtil.entryFromInput(input, true, true));
writer.write(EntriesUtil.kvEntryFromInput(input, true,
true));
this.output.seek(0);

if (current == null) {
@@ -26,24 +26,24 @@
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntriesUtil;

public class EntriesInput implements EntryIterator {
public class KvEntriesInput implements EntryIterator {

private final RandomAccessInput input;
private final boolean useInlinePointer;
private final boolean withSubKv;
private final RandomAccessInput userAccessInput;

public EntriesInput(RandomAccessInput input, boolean useInlinePointer) {
public KvEntriesInput(RandomAccessInput input, boolean withSubKv) {
this.input = input;
this.useInlinePointer = useInlinePointer;
this.withSubKv = withSubKv;
try {
this.userAccessInput = this.input.duplicate();
} catch (IOException e) {
throw new ComputerException(e.getMessage(), e);
}
}

public EntriesInput(RandomAccessInput input) {
this(input, true);
public KvEntriesInput(RandomAccessInput input) {
this(input, false);
}

@Override
@@ -57,8 +57,8 @@ public boolean hasNext() {

@Override
public KvEntry next() {
return EntriesUtil.entryFromInput(this.input, this.userAccessInput,
this.useInlinePointer, false);
return EntriesUtil.kvEntryFromInput(this.input, this.userAccessInput,
true, this.withSubKv);
}

@Override
@@ -25,21 +25,16 @@
import com.baidu.hugegraph.iterator.CIter;
import com.baidu.hugegraph.iterator.MapperIterator;

public class EntriesSubKvInput implements EntryIterator {
public class KvEntriesWithFirstSubKvInput implements EntryIterator {

private final CIter<KvEntry> entries;

public EntriesSubKvInput(RandomAccessInput input,
boolean useInlinePointer) {
public KvEntriesWithFirstSubKvInput(RandomAccessInput input) {
this.entries = new MapperIterator<>(
new EntriesInput(input, useInlinePointer),
new KvEntriesInput(input),
EntriesUtil::kvEntryWithFirstSubKv);
}

public EntriesSubKvInput(RandomAccessInput input) {
this(input, true);
}

@Override
public boolean hasNext() {
return this.entries.hasNext();
@@ -0,0 +1,74 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.store.hgkvfile.buffer;

import java.io.IOException;
import java.util.NoSuchElementException;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.io.UnsafeBytesInput;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntriesUtil;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry;

public class SubKvEntriesInput implements EntryIterator {

private final RandomAccessInput input;
private final RandomAccessInput useAccessInput;
private long size;
private final boolean useInlinePointer;

public SubKvEntriesInput(KvEntry kvEntry, boolean useInlinePointer) {
try {
this.input = new UnsafeBytesInput(kvEntry.value().bytes());
this.useAccessInput = this.input.duplicate();
this.size = this.input.readInt();
this.useInlinePointer = useInlinePointer;
} catch (IOException e) {
throw new ComputerException(e.getMessage(), e);
}
}

public SubKvEntriesInput(KvEntry kvEntry) {
this(kvEntry, true);
}

@Override
public boolean hasNext() {
return this.size > 0;
}

@Override
public KvEntry next() {
if (!this.hasNext()) {
throw new NoSuchElementException();
}

this.size--;
return EntriesUtil.subKvEntryFromInput(this.input, this.useAccessInput,
this.useInlinePointer);
}

@Override
public void close() throws Exception {
this.input.close();
this.useAccessInput.close();
}
}
@@ -20,86 +20,19 @@
package com.baidu.hugegraph.computer.core.store.hgkvfile.entry;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.io.UnsafeBytesInput;
import com.baidu.hugegraph.computer.core.io.UnsafeBytesOutput;
import com.baidu.hugegraph.computer.core.store.hgkvfile.buffer.EntriesInput;
import com.baidu.hugegraph.computer.core.store.hgkvfile.buffer.EntryIterator;
import com.baidu.hugegraph.computer.core.store.hgkvfile.buffer.SubKvEntriesInput;

public final class EntriesUtil {

private static final int DEFAULT_CAPACITY = 100000;

public static List<KvEntry> readInput(RandomAccessInput input)
throws IOException {
List<KvEntry> pointers = new ArrayList<>(DEFAULT_CAPACITY);
try (EntriesInput entriesInput = new EntriesInput(input)) {
while (entriesInput.hasNext()) {
pointers.add(entriesInput.next());
}
}

return pointers;
}

public static EntryIterator subKvIterFromEntry(KvEntry entry) {
return new SubKvIterator(entry);
}

private static class SubKvIterator implements EntryIterator {

private final RandomAccessInput input;
private final RandomAccessInput useAccessInput;
private long size;
private final boolean useInlinePointer;

public SubKvIterator(KvEntry kvEntry, boolean useInlinePointer) {
try {
this.input = new UnsafeBytesInput(kvEntry.value().bytes());
this.useAccessInput = this.input.duplicate();
this.size = this.input.readInt();
this.useInlinePointer = useInlinePointer;
} catch (IOException e) {
throw new ComputerException(e.getMessage(), e);
}
}

public SubKvIterator(KvEntry kvEntry) {
this(kvEntry, true);
}

@Override
public boolean hasNext() {
return this.size > 0;
}

@Override
public KvEntry next() {
if (!this.hasNext()) {
throw new NoSuchElementException();
}

this.size--;
return EntriesUtil.entryFromInput(this.input, this.useAccessInput,
this.useInlinePointer, true);
}

@Override
public void close() throws Exception {
this.input.close();
this.useAccessInput.close();
}
}

public static KvEntry entryFromInput(RandomAccessInput input,
RandomAccessInput userAccessInput,
boolean useInlinePointer,
boolean valueWithSubKv) {
public static KvEntry kvEntryFromInput(RandomAccessInput input,
RandomAccessInput userAccessInput,
boolean useInlinePointer,
boolean valueWithSubKv) {
try {
if (useInlinePointer) {
return inlinePointerKvEntry(input, valueWithSubKv);
@@ -112,10 +45,10 @@ public static KvEntry entryFromInput(RandomAccessInput input,
}
}

public static KvEntry entryFromInput(RandomAccessInput input,
boolean useInlinePointer,
boolean valueWithSubKv) {
return entryFromInput(input, input, useInlinePointer, valueWithSubKv);
public static KvEntry kvEntryFromInput(RandomAccessInput input,
boolean useInlinePointer,
boolean valueWithSubKv) {
return kvEntryFromInput(input, input, useInlinePointer, valueWithSubKv);
}

private static KvEntry cachedPointerKvEntry(
@@ -176,13 +109,46 @@ private static KvEntry inlinePointerKvEntry(RandomAccessInput input,
return new DefaultKvEntry(key, value, numSubEntries);
}

public static KvEntry subKvEntryFromInput(RandomAccessInput input,
RandomAccessInput userAccessInput,
boolean useInlinePointer) {
try {
Pointer key, value;
if (useInlinePointer) {
byte[] keyBytes = input.readBytes(input.readInt());
key = new InlinePointer(keyBytes);

byte[] valueBytes = input.readBytes(input.readInt());
value = new InlinePointer(valueBytes);
} else {
int keyLength = input.readInt();
key = new CachedPointer(userAccessInput, input.position(),
keyLength);
input.skip(keyLength);

int valueLength = input.readInt();
value = new CachedPointer(userAccessInput, input.position(),
valueLength);
input.skip(valueLength);
}
return new DefaultKvEntry(key, value);
} catch (IOException e) {
throw new ComputerException(e.getMessage(), e);
}
}

public static KvEntry subKvEntryFromInput(RandomAccessInput input,
boolean useInlinePointer) {
return subKvEntryFromInput(input, input, useInlinePointer);
}

public static KvEntryWithFirstSubKv kvEntryWithFirstSubKv(KvEntry entry) {
try {
RandomAccessInput input = new UnsafeBytesInput(
entry.value().bytes());
// Skip sub-entry size
input.skip(Integer.BYTES);
KvEntry firstSubKv = EntriesUtil.entryFromInput(input, true, true);
KvEntry firstSubKv = EntriesUtil.subKvEntryFromInput(input, true);

return new KvEntryWithFirstSubKv(entry.key(), entry.value(),
firstSubKv);
@@ -194,4 +160,8 @@ public static KvEntryWithFirstSubKv kvEntryWithFirstSubKv(KvEntry entry) {
public static UnsafeBytesInput inputFromOutput(UnsafeBytesOutput output) {
return new UnsafeBytesInput(output.buffer(), output.position());
}

public static EntryIterator subKvIterFromEntry(KvEntry entry) {
return new SubKvEntriesInput(entry);
}
}

0 comments on commit f524ec2

Please sign in to comment.