Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9517][SQL] BytesToBytesMap should encode data the same way as UnsafeExternalSorter #7845

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 31 additions & 27 deletions core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.unsafe.map;

import java.io.IOException;
import java.lang.Override;
import java.lang.UnsupportedOperationException;
import java.util.Iterator;
Expand Down Expand Up @@ -212,7 +211,7 @@ public BytesToBytesMap(
*/
public int numElements() { return numElements; }

private static final class BytesToBytesMapIterator implements Iterator<Location> {
public static final class BytesToBytesMapIterator implements Iterator<Location> {

private final int numRecords;
private final Iterator<MemoryBlock> dataPagesIterator;
Expand All @@ -222,7 +221,8 @@ private static final class BytesToBytesMapIterator implements Iterator<Location>
private Object pageBaseObject;
private long offsetInPage;

BytesToBytesMapIterator(int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc) {
private BytesToBytesMapIterator(
int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc) {
this.numRecords = numRecords;
this.dataPagesIterator = dataPagesIterator;
this.loc = loc;
Expand All @@ -244,13 +244,13 @@ public boolean hasNext() {

@Override
public Location next() {
int keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, offsetInPage);
if (keyLength == END_OF_PAGE_MARKER) {
int totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage);
if (totalLength == END_OF_PAGE_MARKER) {
advanceToNextPage();
keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, offsetInPage);
totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage);
}
loc.with(pageBaseObject, offsetInPage);
offsetInPage += 8 + 8 + keyLength + loc.getValueLength();
offsetInPage += 8 + totalLength;
currentRecordNumber++;
return loc;
}
Expand All @@ -269,7 +269,7 @@ public void remove() {
* If any other lookups or operations are performed on this map while iterating over it, including
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public Iterator<Location> iterator() {
public BytesToBytesMapIterator iterator() {
return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc);
}

Expand Down Expand Up @@ -352,15 +352,18 @@ private void updateAddressesAndSizes(long fullKeyAddress) {
taskMemoryManager.getOffsetInPage(fullKeyAddress));
}

private void updateAddressesAndSizes(Object page, long keyOffsetInPage) {
long position = keyOffsetInPage;
keyLength = (int) PlatformDependent.UNSAFE.getLong(page, position);
position += 8; // word used to store the key size
keyMemoryLocation.setObjAndOffset(page, position);
position += keyLength;
valueLength = (int) PlatformDependent.UNSAFE.getLong(page, position);
position += 8; // word used to store the key size
valueMemoryLocation.setObjAndOffset(page, position);
private void updateAddressesAndSizes(final Object page, final long keyOffsetInPage) {
long position = keyOffsetInPage;
final int totalLength = PlatformDependent.UNSAFE.getInt(page, position);
position += 4;
keyLength = PlatformDependent.UNSAFE.getInt(page, position);
position += 4;
valueLength = totalLength - keyLength;

keyMemoryLocation.setObjAndOffset(page, position);

position += keyLength;
valueMemoryLocation.setObjAndOffset(page, position);
}

Location with(int pos, int keyHashcode, boolean isDefined) {
Expand Down Expand Up @@ -478,7 +481,7 @@ public boolean putNewKey(
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
// (8 byte key length) (key) (8 byte value length) (value)
final long requiredSize = 8 + keyLengthBytes + 8 + valueLengthBytes;
final long requiredSize = 8 + keyLengthBytes + valueLengthBytes;

// --- Figure out where to insert the new record ---------------------------------------------

Expand Down Expand Up @@ -508,7 +511,7 @@ public boolean putNewKey(
// There wasn't enough space in the current page, so write an end-of-page marker:
final Object pageBaseObject = currentDataPage.getBaseObject();
final long lengthOffsetInPage = currentDataPage.getBaseOffset() + pageCursor;
PlatformDependent.UNSAFE.putLong(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER);
PlatformDependent.UNSAFE.putInt(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER);
}
final long memoryGranted = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryGranted != pageSizeBytes) {
Expand All @@ -535,37 +538,38 @@ public boolean putNewKey(
long insertCursor = dataPageInsertOffset;

// Compute all of our offsets up-front:
final long keySizeOffsetInPage = insertCursor;
insertCursor += 8; // word used to store the key size
final long totalLengthOffset = insertCursor;
insertCursor += 4;
final long keyLengthOffset = insertCursor;
insertCursor += 4;
final long keyDataOffsetInPage = insertCursor;
insertCursor += keyLengthBytes;
final long valueSizeOffsetInPage = insertCursor;
insertCursor += 8; // word used to store the value size
final long valueDataOffsetInPage = insertCursor;
insertCursor += valueLengthBytes; // word used to store the value size

PlatformDependent.UNSAFE.putInt(dataPageBaseObject, totalLengthOffset,
keyLengthBytes + valueLengthBytes);
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, keyLengthOffset, keyLengthBytes);
// Copy the key
PlatformDependent.UNSAFE.putLong(dataPageBaseObject, keySizeOffsetInPage, keyLengthBytes);
PlatformDependent.copyMemory(
keyBaseObject, keyBaseOffset, dataPageBaseObject, keyDataOffsetInPage, keyLengthBytes);
// Copy the value
PlatformDependent.UNSAFE.putLong(dataPageBaseObject, valueSizeOffsetInPage, valueLengthBytes);
PlatformDependent.copyMemory(valueBaseObject, valueBaseOffset, dataPageBaseObject,
valueDataOffsetInPage, valueLengthBytes);

// --- Update bookeeping data structures -----------------------------------------------------

if (useOverflowPage) {
// Store the end-of-page marker at the end of the data page
PlatformDependent.UNSAFE.putLong(dataPageBaseObject, insertCursor, END_OF_PAGE_MARKER);
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, insertCursor, END_OF_PAGE_MARKER);
} else {
pageCursor += requiredSize;
}

numElements++;
bitset.set(pos);
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
dataPage, keySizeOffsetInPage);
dataPage, totalLengthOffset);
longArray.set(pos * 2, storedKeyAddress);
longArray.set(pos * 2 + 1, keyHashcode);
updateAddressesAndSizes(storedKeyAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,21 @@ public void insertRecord(
sorter.insertRecord(recordAddress, prefix);
}

/**
* Write a record to the sorter. The record is broken down into two different parts, and
*
*/
public void insertRecord(
Object recordBaseObject1,
long recordBaseOffset1,
int lengthInBytes1,
Object recordBaseObject2,
long recordBaseOffset2,
int lengthInBytes2,
long prefix) throws IOException {

}

public UnsafeSorterIterator getSortedIterator() throws IOException {
final UnsafeSorterIterator inMemoryIterator = sorter.getSortedIterator();
int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,17 +243,17 @@ public void iteratorTest() throws Exception {
@Test
public void iteratingOverDataPagesWithWastedSpace() throws Exception {
final int NUM_ENTRIES = 1000 * 1000;
final int KEY_LENGTH = 16;
final int KEY_LENGTH = 24;
final int VALUE_LENGTH = 40;
final BytesToBytesMap map = new BytesToBytesMap(
taskMemoryManager, shuffleMemoryManager, NUM_ENTRIES, PAGE_SIZE_BYTES);
// Each record will take 8 + 8 + 16 + 40 = 72 bytes of space in the data page. Our 64-megabyte
// Each record will take 8 + 24 + 40 = 72 bytes of space in the data page. Our 64-megabyte
// pages won't be evenly-divisible by records of this size, which will cause us to waste some
// space at the end of the page. This is necessary in order for us to take the end-of-record
// handling branch in iterator().
try {
for (int i = 0; i < NUM_ENTRIES; i++) {
final long[] key = new long[] { i, i }; // 2 * 8 = 16 bytes
final long[] key = new long[] { i, i, i }; // 3 * 8 = 24 bytes
final long[] value = new long[] { i, i, i, i, i }; // 5 * 8 = 40 bytes
final BytesToBytesMap.Location loc = map.lookup(
key,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution;

import java.io.IOException;

import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.unsafe.KVIterator;

public abstract class UnsafeKeyValueSorter {

public abstract void insert(UnsafeRow key, UnsafeRow value);

public abstract KVIterator<UnsafeRow, UnsafeRow> sort() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.spark.sql.execution;

import java.io.IOException;
import java.util.Iterator;

import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
Expand All @@ -28,6 +25,7 @@
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryLocation;
Expand Down Expand Up @@ -156,54 +154,55 @@ public UnsafeRow getAggregationBuffer(InternalRow groupingKey) {
return currentAggregationBuffer;
}

/**
* Mutable pair object returned by {@link UnsafeFixedWidthAggregationMap#iterator()}.
*/
public static class MapEntry {
private MapEntry() { };
public final UnsafeRow key = new UnsafeRow();
public final UnsafeRow value = new UnsafeRow();
}

/**
* Returns an iterator over the keys and values in this map.
*
* For efficiency, each call returns the same object.
*/
public Iterator<MapEntry> iterator() {
return new Iterator<MapEntry>() {
public KVIterator<UnsafeRow, UnsafeRow> iterator() {
return new KVIterator<UnsafeRow, UnsafeRow>() {

private final BytesToBytesMap.BytesToBytesMapIterator mapLocationIterator = map.iterator();
private final UnsafeRow key = new UnsafeRow();
private final UnsafeRow value = new UnsafeRow();

private final MapEntry entry = new MapEntry();
private final Iterator<BytesToBytesMap.Location> mapLocationIterator = map.iterator();
@Override
public boolean next() {
if (mapLocationIterator.hasNext()) {
final BytesToBytesMap.Location loc = mapLocationIterator.next();
final MemoryLocation keyAddress = loc.getKeyAddress();
final MemoryLocation valueAddress = loc.getValueAddress();
key.pointTo(
keyAddress.getBaseObject(),
keyAddress.getBaseOffset(),
groupingKeySchema.length(),
loc.getKeyLength()
);
value.pointTo(
valueAddress.getBaseObject(),
valueAddress.getBaseOffset(),
aggregationBufferSchema.length(),
loc.getValueLength()
);
return true;
} else {
return false;
}
}

@Override
public boolean hasNext() {
return mapLocationIterator.hasNext();
public UnsafeRow getKey() {
return key;
}

@Override
public MapEntry next() {
final BytesToBytesMap.Location loc = mapLocationIterator.next();
final MemoryLocation keyAddress = loc.getKeyAddress();
final MemoryLocation valueAddress = loc.getValueAddress();
entry.key.pointTo(
keyAddress.getBaseObject(),
keyAddress.getBaseOffset(),
groupingKeySchema.length(),
loc.getKeyLength()
);
entry.value.pointTo(
valueAddress.getBaseObject(),
valueAddress.getBaseOffset(),
aggregationBufferSchema.length(),
loc.getValueLength()
);
return entry;
public UnsafeRow getValue() {
return value;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
public void close() {
// Do nothing.
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,21 +287,26 @@ case class GeneratedAggregate(
new Iterator[InternalRow] {
private[this] val mapIterator = aggregationMap.iterator()
private[this] val resultProjection = resultProjectionBuilder()
private[this] var _hasNext = mapIterator.next()

def hasNext: Boolean = mapIterator.hasNext
def hasNext: Boolean = _hasNext

def next(): InternalRow = {
val entry = mapIterator.next()
val result = resultProjection(joinedRow(entry.key, entry.value))
if (hasNext) {
result
if (_hasNext) {
val result = resultProjection(joinedRow(mapIterator.getKey, mapIterator.getValue))
_hasNext = mapIterator.next()
if (_hasNext) {
result
} else {
// This is the last element in the iterator, so let's free the buffer. Before we do,
// though, we need to make a defensive copy of the result so that we don't return an
// object that might contain dangling pointers to the freed memory
val resultCopy = result.copy()
aggregationMap.free()
resultCopy
}
} else {
// This is the last element in the iterator, so let's free the buffer. Before we do,
// though, we need to make a defensive copy of the result so that we don't return an
// object that might contain dangling pointers to the freed memory
val resultCopy = result.copy()
aggregationMap.free()
resultCopy
throw new java.util.NoSuchElementException
}
}
}
Expand Down
Loading