Skip to content

Commit

Permalink
Fix I3316: javalib FileChannel append behavior now matches a JVM (sca…
Browse files Browse the repository at this point in the history
  • Loading branch information
LeeTibbert committed Jul 3, 2023
1 parent d51279a commit 014f0ad
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 62 deletions.
31 changes: 15 additions & 16 deletions javalib/src/main/scala/java/nio/channels/FileChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,21 @@ object FileChannel {
): FileChannel = {
import StandardOpenOption._

if (options.contains(APPEND) && options.contains(TRUNCATE_EXISTING)) {
throw new IllegalArgumentException(
"APPEND + TRUNCATE_EXISTING not allowed"
)
}
val appending = options.contains(APPEND)
val writing = options.contains(WRITE) || appending

if (appending) {
if (options.contains(TRUNCATE_EXISTING)) {
throw new IllegalArgumentException(
"APPEND + TRUNCATE_EXISTING not allowed"
)
}

if (options.contains(APPEND) && options.contains(READ)) {
throw new IllegalArgumentException("READ + APPEND not allowed")
if (options.contains(READ)) {
throw new IllegalArgumentException("READ + APPEND not allowed")
}
}

val writing = options.contains(WRITE) || options.contains(APPEND)

val mode = new StringBuilder("r")
if (writing) mode.append("w")

Expand All @@ -127,21 +130,17 @@ object FileChannel {
val raf = tryRandomAccessFile(path.toString, mode.toString)

try {
if (writing && options.contains(TRUNCATE_EXISTING)) {
if (writing && options.contains(TRUNCATE_EXISTING))
raf.setLength(0L)
}

if (writing && options.contains(APPEND)) {
raf.seek(raf.length())
}

new FileChannelImpl(
raf.getFD(),
Some(path.toFile()),
deleteFileOnClose =
options.contains(StandardOpenOption.DELETE_ON_CLOSE),
openForReading = true,
openForWriting = writing
openForWriting = writing,
openForAppending = appending
)
} catch {
case e: Throwable =>
Expand Down
179 changes: 134 additions & 45 deletions javalib/src/main/scala/java/nio/channels/FileChannelImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,40 @@ private[java] final class FileChannelImpl(
file: Option[File],
deleteFileOnClose: Boolean,
openForReading: Boolean,
openForWriting: Boolean
openForWriting: Boolean,
openForAppending: Boolean = false
) extends FileChannel {
/* Note:
* Channels are described in the Java documentation as thread-safe.
* This implementation is, most patently _not_ thread-safe.
* Use with only one thread accessing the channel, even for READS.
*/

if (openForAppending)
seekEOF() // so a position() before first APPEND write() matches JVM.

private def ensureOpen(): Unit =
if (!isOpen()) throw new ClosedChannelException()

private def seekEOF(): Unit = {
if (isWindows) {
SetFilePointerEx(
fd.handle,
distanceToMove = 0,
newFilePointer = null,
moveMethod = FILE_END
)
} else {
val pos = unistd.lseek(fd.fd, 0, stdio.SEEK_END);
if (pos < 0)
throwPosixException("lseek")
}
}

private def throwPosixException(functionName: String): Unit = {
if (!isWindows) {
val errnoString = fromCString(string.strerror(errno))
throw new IOException(s"${functionName} failed: ${errnoString}")
throw new IOException("${functionName} failed: ${errnoString}")
}
}

Expand Down Expand Up @@ -176,15 +204,28 @@ private[java] final class FileChannelImpl(
MappedByteBufferImpl(mode, position, size.toInt, fd)
}

override def position(offset: Long): FileChannel = {
// change position, even in APPEND mode. Use _carefully_.
private def compelPosition(offset: Long): FileChannel = {
if (isWindows)
FileApi.SetFilePointerEx(
fd.handle,
offset,
null,
FILE_BEGIN
)
else unistd.lseek(fd.fd, offset.toSize, stdio.SEEK_SET)
else {
val pos = unistd.lseek(fd.fd, offset.toSize, stdio.SEEK_SET)
if (pos < 0)
throwPosixException("lseek")
}

this
}

override def position(offset: Long): FileChannel = {
if (!openForAppending)
compelPosition(offset)

this
}

Expand All @@ -199,7 +240,10 @@ private[java] final class FileChannelImpl(
)
!filePointer
} else {
unistd.lseek(fd.fd, 0, stdio.SEEK_CUR).toLong
val pos = unistd.lseek(fd.fd, 0, stdio.SEEK_CUR).toLong
if (pos < 0)
throwPosixException("lseek")
pos
}

override def read(
Expand Down Expand Up @@ -434,51 +478,20 @@ private[java] final class FileChannelImpl(

}

// Always check, to avoid moon-shot corner cases.
if (currentPosition > newSize)
position(newSize)
compelPosition(newSize)

this
}

override def write(
buffers: Array[ByteBuffer],
offset: Int,
length: Int
): Long = {
ensureOpen()
var i = 0
while (i < length) {
write(buffers(offset + i))
i += 1
}
i
}

override def write(buffer: ByteBuffer, pos: Long): Int = {
ensureOpen()
position(pos)
val srcPos: Int = buffer.position()
val srcLim: Int = buffer.limit()
val lim = math.abs(srcLim - srcPos)
val bytes = if (buffer.hasArray()) {
buffer.array()
} else {
val bytes = new Array[Byte](lim)
buffer.get(bytes)
bytes
}
write(bytes, 0, lim)
buffer.position(srcPos + lim)
lim
}

override def write(src: ByteBuffer): Int =
write(src, position())

private def ensureOpen(): Unit =
if (!isOpen()) throw new ClosedChannelException()

/* 2023-07-02 NOTE: This method is BROKEN! It should be returning
* an Int number of bytes written. It detects errors but not
* partial writes. Bad dog!
*
* Fix 'writeByteBuffer()' after this methods gets fixed.
* The former should return the actual number of bytes written
* on partial writes.
*/
private[java] def write(
buffer: Array[Byte],
offset: Int,
Expand Down Expand Up @@ -515,6 +528,82 @@ private[java] final class FileChannelImpl(
}
}

private def writeByteBuffer(src: ByteBuffer): Int = {
val srcPos = src.position()
val srcLim = src.limit()
val nBytes = srcLim - srcPos // number of bytes in range.

val (arr, offset) = if (src.hasArray()) {
(src.array(), srcPos)
} else {
val ba = new Array[Byte](nBytes)
src.get(ba, srcPos, nBytes)
(ba, 0)
}

write(arr, offset, nBytes)

src.position(srcPos + nBytes)

/* 2023-07-02 NOTE: This return is BROKEN! It does not handle
* partial OS writes. Fix after/when the 'write(arr, offset, nBytes)'
* method gets fixed to return a value.
*/
nBytes // BUGGY
}

/* 2023-07-02 NOTE: This method is BROKEN! It should be returning
* an Long number of bytes written. Instead it is wrongly returning
* 'i' the number of buffers written. At least here the return type is
* correct.
*/

override def write(
buffers: Array[ByteBuffer],
offset: Int,
length: Int
): Long = {
// write(ByteBuffer) will call ensureOpen(), saveCPU cycles by no call here
var i = 0
while (i < length) {
write(buffers(offset + i))
i += 1
}
i
}

/* Write to absolute position, do not change current position.
*
* Understanding "does not change current position" when the channel
* has been opened requires some mind_bending/understanding.
*
* "Current position" when file has been opened for APPEND is
* a logical place, End of File (EOF), not an absolute number.
* When APPEND mode changes the position it reports as "current" to the
* new EOF rather than stashed position, according to JVM is is not
* really changing the "current position".
*/
override def write(src: ByteBuffer, pos: Long): Int = {
ensureOpen()
val stashPosition = position()
compelPosition(pos)

val nBytesWritten = writeByteBuffer(src)

if (!openForAppending)
compelPosition(stashPosition)
else
seekEOF()

nBytesWritten
}

// Write relative to current position (SEEK_CUR) or, for APPEND, SEEK_END.
override def write(src: ByteBuffer): Int = {
ensureOpen()
writeByteBuffer(src)
}

/* The Scala Native implementation of FileInputStream#available delegates
* to this method. This method now implements "available()" as described in
* the Java description of FileInputStream#available. So the delegator
Expand Down

0 comments on commit 014f0ad

Please sign in to comment.