Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-1296] [runtime] Add better paged disk I/O readers / writers
- Loading branch information
1 parent
7610588
commit 996d404
Showing
18 changed files
with
1,872 additions
and
258 deletions.
There are no files selected for viewing
148 changes: 148 additions & 0 deletions
148
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Original file line | Diff line number | Diff line change |
---|---|---|---|
@@ -0,0 +1,148 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one | |||
* or more contributor license agreements. See the NOTICE file | |||
* distributed with this work for additional information | |||
* regarding copyright ownership. The ASF licenses this file | |||
* to you under the Apache License, Version 2.0 (the | |||
* "License"); you may not use this file except in compliance | |||
* with the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
|
|||
package org.apache.flink.runtime.io.disk; | |||
|
|||
import static com.google.common.base.Preconditions.checkNotNull; | |||
import static com.google.common.base.Preconditions.checkArgument; | |||
|
|||
import java.io.EOFException; | |||
import java.io.IOException; | |||
import java.util.List; | |||
|
|||
import org.apache.flink.core.memory.MemorySegment; | |||
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; | |||
import org.apache.flink.runtime.memorymanager.AbstractPagedInputView; | |||
import org.apache.flink.runtime.memorymanager.MemoryManager; | |||
import org.apache.flink.runtime.util.MathUtils; | |||
|
|||
/** | |||
* A {@link org.apache.flink.core.memory.DataInputView} that is backed by a {@link BlockChannelReader}, | |||
* making it effectively a data input stream. The view reads it data in blocks from the underlying channel. | |||
* The view can read data that has been written by a {@link FileChannelOutputView}, or that was written in blocks | |||
* in another fashion. | |||
*/ | |||
public class FileChannelInputView extends AbstractPagedInputView { | |||
|
|||
private final BlockChannelReader reader; | |||
|
|||
private final MemoryManager memManager; | |||
|
|||
private final List<MemorySegment> memory; | |||
|
|||
private final int sizeOfLastBlock; | |||
|
|||
private int numRequestsRemaining; | |||
|
|||
private int numBlocksRemaining; | |||
|
|||
// -------------------------------------------------------------------------------------------- | |||
|
|||
public FileChannelInputView(BlockChannelReader reader, MemoryManager memManager, List<MemorySegment> memory, int sizeOfLastBlock) throws IOException { | |||
super(0); | |||
|
|||
checkNotNull(reader); | |||
checkNotNull(memManager); | |||
checkNotNull(memory); | |||
checkArgument(!reader.isClosed()); | |||
checkArgument(memory.size() > 0); | |||
|
|||
this.reader = reader; | |||
this.memManager = memManager; | |||
this.memory = memory; | |||
this.sizeOfLastBlock = sizeOfLastBlock; | |||
|
|||
try { | |||
final long channelLength = reader.getSize(); | |||
final int segmentSize = memManager.getPageSize(); | |||
|
|||
this.numBlocksRemaining = MathUtils.checkedDownCast(channelLength / segmentSize); | |||
if (channelLength % segmentSize != 0) { | |||
this.numBlocksRemaining++; | |||
} | |||
|
|||
this.numRequestsRemaining = numBlocksRemaining; | |||
|
|||
for (int i = 0; i < memory.size(); i++) { | |||
sendReadRequest(memory.get(i)); | |||
} | |||
|
|||
advance(); | |||
} | |||
catch (IOException e) { | |||
memManager.release(memory); | |||
throw e; | |||
} | |||
} | |||
|
|||
public void close() throws IOException { | |||
close(false); | |||
} | |||
|
|||
public void closeAndDelete() throws IOException { | |||
close(true); | |||
} | |||
|
|||
private void close(boolean deleteFile) throws IOException { | |||
try { | |||
clear(); | |||
if (deleteFile) { | |||
reader.closeAndDelete(); | |||
} else { | |||
reader.close(); | |||
} | |||
} finally { | |||
synchronized (memory) { | |||
memManager.release(memory); | |||
memory.clear(); | |||
} | |||
} | |||
} | |||
|
|||
@Override | |||
protected MemorySegment nextSegment(MemorySegment current) throws IOException { | |||
// check for end-of-stream | |||
if (numBlocksRemaining <= 0) { | |||
reader.close(); | |||
throw new EOFException(); | |||
} | |||
|
|||
// send a request first. if we have only a single segment, this same segment will be the one obtained in the next lines | |||
if (current != null) { | |||
sendReadRequest(current); | |||
} | |||
|
|||
// get the next segment | |||
numBlocksRemaining--; | |||
return reader.getNextReturnedSegment(); | |||
} | |||
|
|||
@Override | |||
protected int getLimitForSegment(MemorySegment segment) { | |||
return numBlocksRemaining > 0 ? segment.size() : sizeOfLastBlock; | |||
} | |||
|
|||
private void sendReadRequest(MemorySegment seg) throws IOException { | |||
if (numRequestsRemaining > 0) { | |||
reader.readBlock(seg); | |||
numRequestsRemaining--; | |||
} else { | |||
memManager.release(seg); | |||
} | |||
} | |||
} |
144 changes: 144 additions & 0 deletions
144
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Original file line | Diff line number | Diff line change |
---|---|---|---|
@@ -0,0 +1,144 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one | |||
* or more contributor license agreements. See the NOTICE file | |||
* distributed with this work for additional information | |||
* regarding copyright ownership. The ASF licenses this file | |||
* to you under the Apache License, Version 2.0 (the | |||
* "License"); you may not use this file except in compliance | |||
* with the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
|
|||
package org.apache.flink.runtime.io.disk; | |||
|
|||
import static com.google.common.base.Preconditions.checkArgument; | |||
import static com.google.common.base.Preconditions.checkNotNull; | |||
|
|||
import java.io.IOException; | |||
import java.util.List; | |||
|
|||
import org.apache.flink.core.memory.MemorySegment; | |||
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; | |||
import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView; | |||
import org.apache.flink.runtime.memorymanager.MemoryManager; | |||
|
|||
/** | |||
* A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a {@link BlockChannelWriter}, making it effectively a data output | |||
* stream. The view writes it data in blocks to the underlying channel. | |||
*/ | |||
public class FileChannelOutputView extends AbstractPagedOutputView { | |||
|
|||
private final BlockChannelWriter writer; // the writer to the channel | |||
|
|||
private final MemoryManager memManager; | |||
|
|||
private final List<MemorySegment> memory; | |||
|
|||
private int numBlocksWritten; | |||
|
|||
private int bytesInLatestSegment; | |||
|
|||
// -------------------------------------------------------------------------------------------- | |||
|
|||
public FileChannelOutputView(BlockChannelWriter writer, MemoryManager memManager, List<MemorySegment> memory, int segmentSize) throws IOException { | |||
super(segmentSize, 0); | |||
|
|||
checkNotNull(writer); | |||
checkNotNull(memManager); | |||
checkNotNull(memory); | |||
checkArgument(!writer.isClosed()); | |||
|
|||
this.writer = writer; | |||
this.memManager = memManager; | |||
this.memory = memory; | |||
|
|||
|
|||
for (MemorySegment next : memory) { | |||
writer.getReturnQueue().add(next); | |||
} | |||
|
|||
// move to the first page | |||
advance(); | |||
} | |||
|
|||
// -------------------------------------------------------------------------------------------- | |||
|
|||
/** | |||
* Closes this output, writing pending data and releasing the memory. | |||
* | |||
* @throws IOException Thrown, if the pending data could not be written. | |||
*/ | |||
public void close() throws IOException { | |||
close(false); | |||
} | |||
|
|||
/** | |||
* Closes this output, writing pending data and releasing the memory. | |||
* | |||
* @throws IOException Thrown, if the pending data could not be written. | |||
*/ | |||
public void closeAndDelete() throws IOException { | |||
close(true); | |||
} | |||
|
|||
private void close(boolean delete) throws IOException { | |||
try { | |||
// send off set last segment, if we have not been closed before | |||
MemorySegment current = getCurrentSegment(); | |||
if (current != null) { | |||
writeSegment(current, getCurrentPositionInSegment()); | |||
} | |||
|
|||
clear(); | |||
if (delete) { | |||
writer.closeAndDelete(); | |||
} else { | |||
writer.close(); | |||
} | |||
} | |||
finally { | |||
memManager.release(memory); | |||
} | |||
} | |||
|
|||
// -------------------------------------------------------------------------------------------- | |||
|
|||
/** | |||
* Gets the number of blocks written by this output view. | |||
* | |||
* @return The number of blocks written by this output view. | |||
*/ | |||
public int getBlockCount() { | |||
return numBlocksWritten; | |||
} | |||
|
|||
/** | |||
* Gets the number of bytes written in the latest memory segment. | |||
* | |||
* @return The number of bytes written in the latest memory segment. | |||
*/ | |||
public int getBytesInLatestSegment() { | |||
return bytesInLatestSegment; | |||
} | |||
|
|||
@Override | |||
protected MemorySegment nextSegment(MemorySegment current, int posInSegment) throws IOException { | |||
if (current != null) { | |||
writeSegment(current, posInSegment); | |||
} | |||
return writer.getNextReturnedSegment(); | |||
} | |||
|
|||
private void writeSegment(MemorySegment segment, int writePosition) throws IOException { | |||
writer.writeBlock(segment); | |||
numBlocksWritten++; | |||
bytesInLatestSegment = writePosition; | |||
} | |||
} |
Oops, something went wrong.