Skip to content

Commit

Permalink
[FLINK-1320] [core] Add an off-heap variant of the managed memory
Browse files Browse the repository at this point in the history
This closes #1093
  • Loading branch information
StephanEwen committed Sep 8, 2015
1 parent 1800434 commit 655a891
Show file tree
Hide file tree
Showing 195 changed files with 14,373 additions and 3,243 deletions.
Expand Up @@ -125,9 +125,14 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction";

/**
* The key for the config parameter defining whether the memory manager allocates memory lazy.
* The fraction of off-heap memory relative to the heap size.
*/
public static final String TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY = "taskmanager.memory.lazyalloc";
public static final String TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY = "taskmanager.memory.off-heap-ratio";

/**
* The config parameter defining the memory allocation method (JVM heap or off-heap).
*/
public static final String TASK_MANAGER_MEMORY_OFF_HEAP_KEY = "taskmanager.memory.off-heap";

/**
* The config parameter defining the number of buffers used in the network stack. This defines the
Expand Down Expand Up @@ -542,6 +547,11 @@ public final class ConfigConstants {
*/
public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f;

/**
* The default ratio of heap to off-heap memory, when the TaskManager is started with off-heap memory.
*/
public static final float DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO = 3.0f;

/**
* Default number of buffers used in the network stack.
*/
Expand Down
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.memory;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

/**
* This class represents a piece of heap memory managed by Flink.
* The segment is backed by a byte array and features random put and get methods for the basic types,
* as well as compare and swap methods.
* <p>
* This class specialized byte access and byte copy calls for heap memory, while reusing the
* multi-byte type accesses and cross-segment operations from the MemorySegment.
* <p>
* Note that memory segments should usually not be allocated manually, but rather through the
* {@link MemorySegmentFactory}.
*/
public final class HeapMemorySegment extends MemorySegment {

/** An extra reference to the heap memory, so we can let byte array checks fail
* by the built-in checks automatically without extra checks */
private byte[] memory;

/**
* Creates a new memory segment that represents the data in the given byte array.
* The owner of this memory segment is null.
*
* @param memory The byte array that holds the data.
*/
HeapMemorySegment(byte[] memory) {
this(memory, null);
}

/**
* Creates a new memory segment that represents the data in the given byte array.
* The memory segment references the given owner.
*
* @param memory The byte array that holds the data.
* @param owner The owner referenced by the memory segment.
*/
HeapMemorySegment(byte[] memory, Object owner) {
super(Objects.requireNonNull(memory), owner);
this.memory = memory;
}

// -------------------------------------------------------------------------
// MemorySegment operations
// -------------------------------------------------------------------------

@Override
public void free() {
super.free();
this.memory = null;
}

@Override
public ByteBuffer wrap(int offset, int length) {
try {
return ByteBuffer.wrap(this.memory, offset, length);
}
catch (NullPointerException e) {
throw new IllegalStateException("segment has been freed");
}
}

/**
* Gets the byte array that backs this memory segment.
*
* @return The byte array that backs this memory segment, or null, if the segment has been freed.
*/
public byte[] getArray() {
return this.heapMemory;
}

// ------------------------------------------------------------------------
// Random Access get() and put() methods
// ------------------------------------------------------------------------

@Override
public final byte get(int index) {
return this.memory[index];
}

@Override
public final void put(int index, byte b) {
this.memory[index] = b;
}

@Override
public final void get(int index, byte[] dst) {
get(index, dst, 0, dst.length);
}

@Override
public final void put(int index, byte[] src) {
put(index, src, 0, src.length);
}

@Override
public final void get(int index, byte[] dst, int offset, int length) {
// system arraycopy does the boundary checks anyways, no need to check extra
System.arraycopy(this.memory, index, dst, offset, length);
}

@Override
public final void put(int index, byte[] src, int offset, int length) {
// system arraycopy does the boundary checks anyways, no need to check extra
System.arraycopy(src, offset, this.memory, index, length);
}

@Override
public final boolean getBoolean(int index) {
return this.memory[index] != 0;
}

@Override
public final void putBoolean(int index, boolean value) {
this.memory[index] = (byte) (value ? 1 : 0);
}

// -------------------------------------------------------------------------
// Bulk Read and Write Methods
// -------------------------------------------------------------------------

@Override
public final void get(DataOutput out, int offset, int length) throws IOException {
out.write(this.memory, offset, length);
}

@Override
public final void put(DataInput in, int offset, int length) throws IOException {
in.readFully(this.memory, offset, length);
}

@Override
public final void get(int offset, ByteBuffer target, int numBytes) {
// ByteBuffer performs the boundary checks
target.put(this.memory, offset, numBytes);
}

@Override
public final void put(int offset, ByteBuffer source, int numBytes) {
// ByteBuffer performs the boundary checks
source.get(this.memory, offset, numBytes);
}

// -------------------------------------------------------------------------
// Factoring
// -------------------------------------------------------------------------

/**
* A memory segment factory that produces heap memory segments. Note that this factory does not
* support to allocate off-heap memory.
*/
public static final class HeapMemorySegmentFactory implements MemorySegmentFactory.Factory {

@Override
public HeapMemorySegment wrap(byte[] memory) {
return new HeapMemorySegment(memory);
}

@Override
public HeapMemorySegment allocateUnpooledSegment(int size, Object owner) {
return new HeapMemorySegment(new byte[size], owner);
}

@Override
public HeapMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
return new HeapMemorySegment(memory, owner);
}

@Override
public HeapMemorySegment wrapPooledOffHeapMemory(ByteBuffer memory, Object owner) {
throw new UnsupportedOperationException(
"The MemorySegment factory was not initialized for off-heap memory.");
}

/** prevent external instantiation */
HeapMemorySegmentFactory() {}
};

public static final HeapMemorySegmentFactory FACTORY = new HeapMemorySegmentFactory();
}

0 comments on commit 655a891

Please sign in to comment.