Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8160][SQL]Support using external sorting to run aggregate #7423

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions;

import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.memory.MemoryLocation;
import org.apache.spark.unsafe.memory.TaskMemoryManager;

/**
* memory location of a <k, v> pair
*/
public class UnsafeRowLocation {

private final TaskMemoryManager memoryManager;

/**
* An index into the hash map's Long array
*/
private int pos;
/**
* True if this location points to a position where a key is defined, false otherwise
*/
private boolean isDefined;
/**
* The hashcode of the most recent key passed to caching this hashcode here allows us to
* avoid re-hashing the key when storing a value for that key.
*/
private int keyHashcode;

private final MemoryLocation keyMemoryLocation = new MemoryLocation();
private final MemoryLocation valueMemoryLocation = new MemoryLocation();
private int keyLength;
private int valueLength;

public UnsafeRowLocation(TaskMemoryManager memoryManager) {
this.memoryManager = memoryManager;
}

public UnsafeRowLocation with(long fullKeyAddress) {
this.isDefined = true;
updateAddressesAndSizes(fullKeyAddress);
return this;
}

public UnsafeRowLocation with(Object page, long offsetInPage) {
this.isDefined = true;
updateAddressesAndSizes(page, offsetInPage);
return this;
}

public UnsafeRowLocation with(int keyLength, byte[] keyArray, int valueLength,
byte[] valueArray) {
this.isDefined = true;
this.keyLength = keyLength;
keyMemoryLocation.setObjAndOffset(keyArray, PlatformDependent.BYTE_ARRAY_OFFSET);
this.valueLength = valueLength;
valueMemoryLocation.setObjAndOffset(valueArray, PlatformDependent.BYTE_ARRAY_OFFSET);
return this;
}

public UnsafeRowLocation with(int pos, int keyHashcode, boolean isDefined, long fullKeyAddress) {
this.pos = pos;
this.isDefined = isDefined;
this.keyHashcode = keyHashcode;
if (isDefined) {
updateAddressesAndSizes(fullKeyAddress);
}
return this;
}

public void updateAddressesAndSizes(long fullKeyAddress) {
updateAddressesAndSizes(
memoryManager.getPage(fullKeyAddress),
memoryManager.getOffsetInPage(fullKeyAddress));
}

private void updateAddressesAndSizes(Object page, long offsetInPage) {
long position = offsetInPage;
keyLength = (int) PlatformDependent.UNSAFE.getLong(page, position);
position += 8; // word used to store the key size
keyMemoryLocation.setObjAndOffset(page, position);
position += keyLength;
valueLength = (int) PlatformDependent.UNSAFE.getLong(page, position);
position += 8; // word used to store the key size
valueMemoryLocation.setObjAndOffset(page, position);
}

/**
* Returns true if the key is defined at this position, and false otherwise.
*/
public boolean isDefined() {
return isDefined;
}

/**
* Set whether the key is defined.
*/
public void setDefined(boolean isDefined) {
this.isDefined = isDefined;
}

/**
* Returns the hashcode of the key.
*/
public long getKeyHashcode() {
return this.keyHashcode;
}

/**
* Set the hashcode of the key.
*/
public void setKeyHashcode(int keyHashcode) {
this.keyHashcode = keyHashcode;
}

/**
* Set an index into the hash map's Long array.
*/
public void setPos(int pos) {
this.pos = pos;
}

/**
* Returns the index into the hash map's Long array.
*/
public int getPos() {
return this.pos;
}

/**
* Returns the address of the key defined at this position.
* This points to the first byte of the key data.
* Unspecified behavior if the key is not defined.
* For efficiency reasons, calls to this method always returns the same MemoryLocation object.
*/
public MemoryLocation getKeyAddress() {
assert (isDefined);
return keyMemoryLocation;
}

/**
* Returns the base object of the key.
*/
public Object getKeyBaseObject() {
assert (isDefined);
return keyMemoryLocation.getBaseObject();
}

/**
* Returns the base offset of the key.
*/
public long getKeyBaseOffset() {
assert (isDefined);
return keyMemoryLocation.getBaseOffset();
}

/**
* Returns the length of the key defined at this position.
* Unspecified behavior if the key is not defined.
*/
public int getKeyLength() {
assert (isDefined);
return keyLength;
}

/**
* Returns the address of the value defined at this position.
* This points to the first byte of the value data.
* Unspecified behavior if the key is not defined.
* For efficiency reasons, calls to this method always returns the same MemoryLocation object.
*/
public MemoryLocation getValueAddress() {
assert (isDefined);
return valueMemoryLocation;
}

/**
* Returns the base object of the value.
*/
public Object getValueBaseObject() {
assert (isDefined);
return valueMemoryLocation.getBaseObject();
}

/**
* Return the base offset of the value.
*/
public long getValueBaseOffset() {
assert (isDefined);
return valueMemoryLocation.getBaseOffset();
}

/**
* Returns the length of the value defined at this position.
* Unspecified behavior if the key is not defined.
*/
public int getValueLength() {
assert (isDefined);
return valueLength;
}
}
Loading