Skip to content

Commit

Permalink
[SPARK-23762][SQL] UTF8StringBuffer uses MemoryBlock
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR tries to use `MemoryBlock` in `UTF8StringBuffer`. In general, there are two advantages to use `MemoryBlock`.

1. Has clean API calls rather than using a Java array or `PlatformMemory`
2. Improve runtime performance of memory access instead of using `Object`.

## How was this patch tested?

Added `UTF8StringBufferSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20871 from kiszk/SPARK-23762.
  • Loading branch information
kiszk authored and cloud-fan committed Apr 12, 2018
1 parent 6a2289e commit 0b19122
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 21 deletions.
Expand Up @@ -19,6 +19,8 @@

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;

/**
Expand All @@ -29,50 +31,41 @@ public class UTF8StringBuilder {

private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;

private byte[] buffer;
private int cursor = Platform.BYTE_ARRAY_OFFSET;
private ByteArrayMemoryBlock buffer;
private int length = 0;

public UTF8StringBuilder() {
// Since initial buffer size is 16 in `StringBuilder`, we set the same size here
this.buffer = new byte[16];
this.buffer = new ByteArrayMemoryBlock(16);
}

// Grows the buffer by at least `neededSize`
private void grow(int neededSize) {
if (neededSize > ARRAY_MAX - totalSize()) {
if (neededSize > ARRAY_MAX - length) {
throw new UnsupportedOperationException(
"Cannot grow internal buffer by size " + neededSize + " because the size after growing " +
"exceeds size limitation " + ARRAY_MAX);
}
final int length = totalSize() + neededSize;
if (buffer.length < length) {
int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
final byte[] tmp = new byte[newLength];
Platform.copyMemory(
buffer,
Platform.BYTE_ARRAY_OFFSET,
tmp,
Platform.BYTE_ARRAY_OFFSET,
totalSize());
final int requestedSize = length + neededSize;
if (buffer.size() < requestedSize) {
int newLength = requestedSize < ARRAY_MAX / 2 ? requestedSize * 2 : ARRAY_MAX;
final ByteArrayMemoryBlock tmp = new ByteArrayMemoryBlock(newLength);
MemoryBlock.copyMemory(buffer, tmp, length);
buffer = tmp;
}
}

private int totalSize() {
return cursor - Platform.BYTE_ARRAY_OFFSET;
}

public void append(UTF8String value) {
grow(value.numBytes());
value.writeToMemory(buffer, cursor);
cursor += value.numBytes();
value.writeToMemory(buffer.getByteArray(), length + Platform.BYTE_ARRAY_OFFSET);
length += value.numBytes();
}

public void append(String value) {
append(UTF8String.fromString(value));
}

public UTF8String build() {
return UTF8String.fromBytes(buffer, 0, totalSize());
return UTF8String.fromBytes(buffer.getByteArray(), 0, length);
}
}
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions.codegen

import org.apache.spark.SparkFunSuite
import org.apache.spark.unsafe.types.UTF8String

class UTF8StringBuilderSuite extends SparkFunSuite {

test("basic test") {
val sb = new UTF8StringBuilder()
assert(sb.build() === UTF8String.EMPTY_UTF8)

sb.append("")
assert(sb.build() === UTF8String.EMPTY_UTF8)

sb.append("abcd")
assert(sb.build() === UTF8String.fromString("abcd"))

sb.append(UTF8String.fromString("1234"))
assert(sb.build() === UTF8String.fromString("abcd1234"))

// expect to grow an internal buffer
sb.append(UTF8String.fromString("efgijk567890"))
assert(sb.build() === UTF8String.fromString("abcd1234efgijk567890"))
}
}

0 comments on commit 0b19122

Please sign in to comment.