From b13849082155590a1c5fe424f06d3da0a085c881 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 5 Mar 2020 16:35:37 +0300 Subject: [PATCH 1/2] Introduce interoperability with java.io --- core/commonMain/src/kotlinx/io/Input.kt | 7 +- .../src/kotlinx/io/InputOperations.kt | 1 - .../io/{Inputs.kt => Inputs.common.kt} | 0 .../src/kotlinx/io/LimitingInputTest.kt | 1 - core/commonTest/src/kotlinx/io/TestUtils.kt | 6 +- core/jvmMain/src/kotlinx/io/Inputs.kt | 16 ++ core/jvmMain/src/kotlinx/io/Outputs.kt | 16 ++ .../jvmMain/src/kotlinx/io/internal/Atomic.kt | 16 -- .../src/kotlinx/io/internal/JavaInterop.kt | 92 +++++++++ .../src/kotlinx/io/pool/DefaultPool.kt | 4 +- .../{InputOutputTest.kt => CustomPoolTest.kt} | 4 +- .../jvmTest/src/kotlinx/io/InputStreamTest.kt | 191 ++++++++++++++++++ .../src/kotlinx/io/OutputStreamTest.kt | 111 ++++++++++ 13 files changed, 436 insertions(+), 29 deletions(-) rename core/commonMain/src/kotlinx/io/{Inputs.kt => Inputs.common.kt} (100%) create mode 100644 core/jvmMain/src/kotlinx/io/Inputs.kt create mode 100644 core/jvmMain/src/kotlinx/io/Outputs.kt delete mode 100644 core/jvmMain/src/kotlinx/io/internal/Atomic.kt create mode 100644 core/jvmMain/src/kotlinx/io/internal/JavaInterop.kt rename core/jvmTest/src/kotlinx/io/{InputOutputTest.kt => CustomPoolTest.kt} (97%) create mode 100644 core/jvmTest/src/kotlinx/io/InputStreamTest.kt create mode 100644 core/jvmTest/src/kotlinx/io/OutputStreamTest.kt diff --git a/core/commonMain/src/kotlinx/io/Input.kt b/core/commonMain/src/kotlinx/io/Input.kt index cf93ebcf3..8efb5d45d 100644 --- a/core/commonMain/src/kotlinx/io/Input.kt +++ b/core/commonMain/src/kotlinx/io/Input.kt @@ -146,13 +146,12 @@ public abstract class Input : Closeable { } /** - * Reads the available content in current [Input] to the [destination] buffer. + * Reads the available content in the current [Input] to the [destination] buffer. * - * If no bytes are available in the input, [fill] method will be called directly on - * the [destination] buffer without an extra copy. + * If no bytes are available, [fill] method will be called directly on the [destination] buffer without an extra copy. * Otherwise, available bytes are copied to the destination. * - * @return number of bytes written in the [destination]. + * @return number of bytes written to the [destination]. */ public fun readAvailableTo( destination: Buffer, diff --git a/core/commonMain/src/kotlinx/io/InputOperations.kt b/core/commonMain/src/kotlinx/io/InputOperations.kt index ded301b9a..a5395e2ca 100644 --- a/core/commonMain/src/kotlinx/io/InputOperations.kt +++ b/core/commonMain/src/kotlinx/io/InputOperations.kt @@ -147,7 +147,6 @@ public fun Input.discardExact(count: Int): Int { return count } - /** * Reads a [Byte] from this Input. * diff --git a/core/commonMain/src/kotlinx/io/Inputs.kt b/core/commonMain/src/kotlinx/io/Inputs.common.kt similarity index 100% rename from core/commonMain/src/kotlinx/io/Inputs.kt rename to core/commonMain/src/kotlinx/io/Inputs.common.kt diff --git a/core/commonTest/src/kotlinx/io/LimitingInputTest.kt b/core/commonTest/src/kotlinx/io/LimitingInputTest.kt index 79ad0b90b..af9357599 100644 --- a/core/commonTest/src/kotlinx/io/LimitingInputTest.kt +++ b/core/commonTest/src/kotlinx/io/LimitingInputTest.kt @@ -54,5 +54,4 @@ class LimitingInputTest { assertTrue(closed) } - private fun StringInput(str: String) = ByteArrayInput(str.encodeToByteArray()) } diff --git a/core/commonTest/src/kotlinx/io/TestUtils.kt b/core/commonTest/src/kotlinx/io/TestUtils.kt index 036382944..1b1467c33 100644 --- a/core/commonTest/src/kotlinx/io/TestUtils.kt +++ b/core/commonTest/src/kotlinx/io/TestUtils.kt @@ -3,7 +3,7 @@ package kotlinx.io import kotlin.test.* fun assertArrayEquals(expected: ByteArray, actual: ByteArray) { - assertEquals(expected.size, actual.size) + assertEquals(expected.size, actual.size, "Expected array lengths to be equal") assertEquals(expected.toHexString(), actual.toHexString()) } @@ -17,4 +17,6 @@ internal fun Bytes.useInput(block: Input.() -> Unit) { } finally { close() } -} \ No newline at end of file +} + +public fun StringInput(string: String) = ByteArrayInput(string.encodeToByteArray()) diff --git a/core/jvmMain/src/kotlinx/io/Inputs.kt b/core/jvmMain/src/kotlinx/io/Inputs.kt new file mode 100644 index 000000000..d1b236492 --- /dev/null +++ b/core/jvmMain/src/kotlinx/io/Inputs.kt @@ -0,0 +1,16 @@ +package kotlinx.io + +import kotlinx.io.internal.* +import java.io.* + +/** + * Returns an [InputStream] that uses the current [Input] as an underlying source of data. + * Closing the resulting [InputStream] will close the input. + */ +public fun Input.asInputStream(): InputStream = InputStreamFromInput(this) + +/** + * Returns an [Input] that uses the current [InputStream] as an underlying source of data. + * Closing the resulting [Input] will close the input stream. + */ +public fun InputStream.asInput(): Input = InputFromInputStream(this) diff --git a/core/jvmMain/src/kotlinx/io/Outputs.kt b/core/jvmMain/src/kotlinx/io/Outputs.kt new file mode 100644 index 000000000..1370b3924 --- /dev/null +++ b/core/jvmMain/src/kotlinx/io/Outputs.kt @@ -0,0 +1,16 @@ +package kotlinx.io + +import kotlinx.io.internal.* +import java.io.* + +/** + * Returns an [OutputStream] that uses the current [Output] as the destination. + * Closing the resulting [OutputStream] will close the input. + */ +public fun Output.asOutputStream(): OutputStream = OutputStreamFromOutput(this) + +/** + * Returns an [Output] that uses the current [OutputStream] as the destination. + * Closing the resulting [Output] will close the input stream. + */ +public fun OutputStream.asOutput(): Output = OutputFromOutputStream(this) diff --git a/core/jvmMain/src/kotlinx/io/internal/Atomic.kt b/core/jvmMain/src/kotlinx/io/internal/Atomic.kt deleted file mode 100644 index 87fe32483..000000000 --- a/core/jvmMain/src/kotlinx/io/internal/Atomic.kt +++ /dev/null @@ -1,16 +0,0 @@ -package kotlinx.io.internal - -import java.util.concurrent.atomic.* -import kotlin.reflect.* - -internal inline fun longUpdater(p: KProperty1): AtomicLongFieldUpdater { - return AtomicLongFieldUpdater.newUpdater(Owner::class.java, p.name) -} - -internal fun getIOIntProperty(name: String, default: Int): Int { - return try { - System.getProperty("kotlinx.io.$name") - } catch (e: SecurityException) { - null - }?.toIntOrNull() ?: default -} diff --git a/core/jvmMain/src/kotlinx/io/internal/JavaInterop.kt b/core/jvmMain/src/kotlinx/io/internal/JavaInterop.kt new file mode 100644 index 000000000..d6a5e1a0b --- /dev/null +++ b/core/jvmMain/src/kotlinx/io/internal/JavaInterop.kt @@ -0,0 +1,92 @@ +package kotlinx.io.internal + +import kotlinx.io.* +import kotlinx.io.buffer.* +import java.io.* + +internal class InputStreamFromInput(private val input: Input) : InputStream() { + override fun read(): Int { + if (input.exhausted()) { + return -1 + } + return input.readByte().toInt() and 0xFF + } + + override fun read(b: ByteArray): Int { + if (b.isEmpty()) return 0 + val result = input.readAvailableTo(bufferOf(b)) + if (result == 0) return -1 + return result + } + + override fun read(b: ByteArray, off: Int, len: Int): Int { + if (len == 0) return 0 + val result = input.readAvailableTo(bufferOf(b), off, off + len) + if (result == 0) return -1 + return result + } + + override fun close() { + input.close() + } +} + +internal class InputFromInputStream(private val inputStream: InputStream) : Input() { + override fun closeSource() { + inputStream.close() + } + + override fun fill(buffer: Buffer, startIndex: Int, endIndex: Int): Int { + // Zero-copy attempt + if (buffer.buffer.hasArray()) { + return inputStream.read(buffer.buffer.array(), startIndex, endIndex - startIndex) + .coerceAtLeast(0) // -1 when IS is closed + } + + for (i in startIndex until endIndex) { + val byte = inputStream.read() + if (byte == -1) return (i - startIndex) + buffer[i] = byte.toByte() + } + return endIndex - startIndex + } +} + +internal class OutputStreamFromOutput(private val output: Output) : OutputStream() { + override fun write(b: Int) { + output.writeByte(b.toByte()) + } + + override fun write(b: ByteArray) { + output.writeBuffer(bufferOf(b)) + } + + override fun write(b: ByteArray, off: Int, len: Int) { + output.writeBuffer(bufferOf(b), off, off + len) + } + + override fun flush() { + output.flush() + } + + override fun close() { + output.close() + } +} + +internal class OutputFromOutputStream(private val outputStream: OutputStream) : Output() { + + override fun closeSource() { + outputStream.close() + } + + override fun flush(source: Buffer, startIndex: Int, endIndex: Int) { + if (source.buffer.hasArray()) { + return outputStream.write(source.buffer.array(), startIndex, endIndex - startIndex) + } + + for (i in startIndex until endIndex) { + outputStream.write(source[i].toInt()) + } + } +} diff --git a/core/jvmMain/src/kotlinx/io/pool/DefaultPool.kt b/core/jvmMain/src/kotlinx/io/pool/DefaultPool.kt index 093fecaab..9eec60668 100755 --- a/core/jvmMain/src/kotlinx/io/pool/DefaultPool.kt +++ b/core/jvmMain/src/kotlinx/io/pool/DefaultPool.kt @@ -1,6 +1,5 @@ package kotlinx.io.pool -import kotlinx.io.internal.* import java.util.concurrent.atomic.* private const val MULTIPLIER = 4 @@ -92,7 +91,6 @@ actual abstract class DefaultPool actual constructor(actual final overr } companion object { - // todo: replace with atomicfu, remove companion object - private val Top = longUpdater(DefaultPool<*>::top) + private val Top = AtomicLongFieldUpdater.newUpdater(DefaultPool::class.java, DefaultPool<*>::top.name) } } diff --git a/core/jvmTest/src/kotlinx/io/InputOutputTest.kt b/core/jvmTest/src/kotlinx/io/CustomPoolTest.kt similarity index 97% rename from core/jvmTest/src/kotlinx/io/InputOutputTest.kt rename to core/jvmTest/src/kotlinx/io/CustomPoolTest.kt index 2237a54bd..64476fab0 100644 --- a/core/jvmTest/src/kotlinx/io/InputOutputTest.kt +++ b/core/jvmTest/src/kotlinx/io/CustomPoolTest.kt @@ -4,7 +4,7 @@ package kotlinx.io import kotlinx.io.buffer.* import kotlin.test.* -class InputOutputTestJvm { +class CustomPoolTest { @Test fun testCustomPools() { @@ -38,4 +38,4 @@ class InputOutputTestJvm { input.readAvailableTo(output) output.flush() } -} \ No newline at end of file +} diff --git a/core/jvmTest/src/kotlinx/io/InputStreamTest.kt b/core/jvmTest/src/kotlinx/io/InputStreamTest.kt new file mode 100644 index 000000000..44c40441a --- /dev/null +++ b/core/jvmTest/src/kotlinx/io/InputStreamTest.kt @@ -0,0 +1,191 @@ +package kotlinx.io + +import kotlinx.io.buffer.* +import kotlinx.io.text.* +import org.junit.Test +import java.io.* +import kotlin.io.DEFAULT_BUFFER_SIZE +import kotlin.random.* +import kotlin.test.* + +class InputStreamTest { + // I as IS + @Test + fun testInputAsInputStream() { + val input = StringInput("1\n2") + val lines = input.asInputStream().bufferedReader().readLines() + assertEquals(listOf("1", "2"), lines) + } + + @Test + fun testInputAsInputStreamBufferBoundary() { + val baseline = "1".repeat(DEFAULT_BUFFER_SIZE + 1) + "\n" + "2".repeat(DEFAULT_BUFFER_SIZE + 1) + val input = StringInput(baseline) + val lines = input.asInputStream().bufferedReader().readLines() + assertEquals(baseline.split("\n"), lines) + } + + @Test + fun testInputAsInputStreamNegativeValues() { + val baseline = byteArrayOf(1, -1, -127, 0, -125, 127, -42, -1, 3) + val input = ByteArrayInput(baseline) + val stream = input.asInputStream() + val result = ByteArray(baseline.size) + stream.read(result) + assertEquals(-1, stream.read()) + assertArrayEquals(baseline, result) + } + + @Test + fun testEmptyInputAsInputStream() { + val input = ByteArrayInput(byteArrayOf()).asInputStream() + assertEquals(-1, input.read()) + } + + @Test + fun testInputAsInputStreamClose() { + var closed = false + val input = object : Input() { + override fun fill(buffer: Buffer, startIndex: Int, endIndex: Int): Int { + return 0 + } + + override fun closeSource() { + closed = true + } + } + val inputStream = input.asInputStream() + assertEquals(-1, inputStream.read()) + assertFalse(closed) + inputStream.close() + assertTrue(closed) + } + + @Test + fun testInputAsInputStreamRangeRead() { + val inputStream = StringInput("abcdefgh").asInputStream() + val bis = ByteArrayInputStream("abcdefgh".toByteArray()) + val array = ByteArray(100) + val bisArray = ByteArray(100) + run { + assertEquals(bis.read(bisArray, 0, 0), inputStream.read(array, 0, 0)) + assertArrayEquals(bisArray, array) + } + + run { + assertEquals(bis.read(bisArray, 0, 1), inputStream.read(array, 0, 1)) + assertArrayEquals(bisArray, array) + } + + run { + assertEquals(bis.read(bisArray, 1, 2), inputStream.read(array, 1, 2)) + assertArrayEquals(bisArray, array) + } + + run { + assertEquals(bis.read(bisArray, 5, 90), inputStream.read(array, 5, 90)) + assertArrayEquals(bisArray, array) + } + } + + // IS as I + + @Test + fun testInputStreamAsInput() { + val inputStream = ByteArrayInputStream("1\n2".toByteArray()) + val lines = inputStream.asInput().readUtf8Lines() + assertEquals(listOf("1", "2"), lines) + } + + @Test + fun testInputStreamAsInputBufferBoundary() { + val baseline = "1".repeat(DEFAULT_BUFFER_SIZE + 1) + "\n" + "2".repeat(DEFAULT_BUFFER_SIZE + 1) + val input = ByteArrayInputStream(baseline.toByteArray()) + val lines = input.asInput().readUtf8Lines() + assertEquals(baseline.split("\n"), lines) + } + + @Test + fun testInputStreamAsInputNegativeValues() { + val baseline = byteArrayOf(1, -1, -127, 0, -125, 127, -42, -1, 3) + val inputStream = ByteArrayInputStream(baseline) + val input = inputStream.asInput() + val result = input.readByteArray() + assertTrue(input.exhausted()) + assertArrayEquals(baseline, result) + } + + @Test + fun testEmptyInputStreamAsInput() { + val input = ByteArrayInputStream(byteArrayOf()).asInput() + assertTrue(input.exhausted()) + } + + @Test + fun testInputStreamAsInputClose() { + var closed = false + val inputStream = object : InputStream() { + override fun read(): Int { + return -1 + } + + override fun close() { + closed = true + } + } + val input = inputStream.asInput() + assertTrue(input.exhausted()) + assertFalse(closed) + input.close() + assertTrue(closed) + } + + private val bytesSize = 1024 * 1024 + + @Test + fun testInputAsInputStreamRandomDataTest() { + val content = Random.nextBytes(bytesSize) + val inputStream = ByteArrayInput(content).asInputStream() + val result = ByteArrayOutputStream() + loop@ while (true) { + when (Random.nextBoolean()) { + true -> { + val b = inputStream.read() + if (b == -1) break@loop + result.write(b) + } + false -> { + val array = ByteArray(Random.nextInt(1, DEFAULT_BUFFER_SIZE * 2)) + val offset = Random.nextInt(array.size) + val length = Random.nextInt(array.size - offset + 1) + val read = inputStream.read(array, offset, length) + if (read == -1) break@loop + result.write(array, offset, read) + } + } + } + assertArrayEquals(content, result.toByteArray()) + } + + @Test + fun testInputStreamAsInputRandomDataTest() { + val content = Random.nextBytes(bytesSize) + val input = ByteArrayInputStream(content).asInput() + val result = ByteArrayOutputStream() + while (!input.exhausted()) { + when (Random.nextBoolean()) { + true -> { + result.write(input.readByte().toInt()) + } + false -> { + val array = ByteArray(Random.nextInt(1, DEFAULT_BUFFER_SIZE * 2)) + val offset = Random.nextInt(array.size) + val length = Random.nextInt(array.size - offset + 1) + val read = input.readAvailableTo(bufferOf(array), offset, offset + length) + result.write(array, offset, read) + } + } + } + assertArrayEquals(content, result.toByteArray()) + } +} diff --git a/core/jvmTest/src/kotlinx/io/OutputStreamTest.kt b/core/jvmTest/src/kotlinx/io/OutputStreamTest.kt new file mode 100644 index 000000000..e1754750f --- /dev/null +++ b/core/jvmTest/src/kotlinx/io/OutputStreamTest.kt @@ -0,0 +1,111 @@ +package kotlinx.io + +import kotlinx.io.buffer.* +import kotlinx.io.text.* +import org.junit.Test +import java.io.* +import kotlin.io.DEFAULT_BUFFER_SIZE +import kotlin.test.* + +class OutputStreamTest { + + @Test + fun testOutputAsOutputStream() { + val output = ByteArrayOutput() + val writer = output.asOutputStream().bufferedWriter() + writer.write("a\n") + writer.write("b") + writer.close() + assertEquals("a\nb", output.toByteArray().decodeToString()) + } + + @Test + fun testOutputAsOutputStreamBufferBoundary() { + val baseline = "1".repeat(DEFAULT_BUFFER_SIZE + 1) + "\n" + "2".repeat(DEFAULT_BUFFER_SIZE + 1) + val output = ByteArrayOutput() + val writer = output.asOutputStream().bufferedWriter() + writer.write(baseline) + writer.close() + assertEquals(baseline, output.toByteArray().decodeToString()) + } + + @Test + fun testOutputAsOutputStreamNegativeValues() { + val baseline = byteArrayOf(1, -1, -127, 0, -125, 127, -42, -1, 3) + val output = ByteArrayOutput() + val stream = output.asOutputStream() + stream.write(baseline) + stream.close() + assertArrayEquals(baseline, output.toByteArray()) + } + + @Test + fun testOutputAsOutputStreamClose() { + var closed = false + val output = object : Output() { + override fun flush(source: Buffer, startIndex: Int, endIndex: Int) { + + } + + override fun closeSource() { + closed = true + } + } + val outputStream = output.asOutputStream() + outputStream.write(1) + assertFalse(closed) + outputStream.close() + assertTrue(closed) + } + + // IS as I + + @Test + fun testOutputStreamAsOutput() { + val outputStream = ByteArrayOutputStream() + val output = outputStream.asOutput() + output.writeUtf8String("1\n2") + output.close() + assertEquals("1\n2", outputStream.toByteArray().decodeToString()) + } + + @Test + fun testOutputStreamAsOutputBufferBoundary() { + val baseline = "1".repeat(DEFAULT_BUFFER_SIZE + 1) + "\n" + "2".repeat(DEFAULT_BUFFER_SIZE + 1) + val outputStream = ByteArrayOutputStream() + val output = outputStream.asOutput() + output.writeUtf8String(baseline) + output.close() + assertEquals(baseline, outputStream.toByteArray().decodeToString()) + } + + @Test + fun testOutputStreamAsOutputNegativeValues() { + val baseline = byteArrayOf(1, -1, -127, 0, -125, 127, -42, -1, 3) + val outputStream = ByteArrayOutputStream() + val output = outputStream.asOutput() + output.writeByteArray(baseline) + output.close() + assertArrayEquals(baseline, outputStream.toByteArray()) + } + + + @Test + fun testOutputStreamAsOutputClose() { + var closed = false + val outputStream = object : OutputStream() { + override fun write(b: Int) { + + } + + override fun close() { + closed = true + } + } + val output = outputStream.asOutput() + output.writeByte(1) + assertFalse(closed) + output.close() + assertTrue(closed) + } +} From 137c8c01d7ce4536597f2e95b2828496d4c3205b Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 6 Mar 2020 12:19:20 +0300 Subject: [PATCH 2/2] ~ --- TODO.md | 1 + core/jvmMain/src/kotlinx/io/internal/JavaInterop.kt | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/TODO.md b/TODO.md index a9d6c716d..fc07803f1 100644 --- a/TODO.md +++ b/TODO.md @@ -8,6 +8,7 @@ - Implementation - Index preconditions - Prototype `PipedOutput` + - Benchmark overhead `java.io` integration - Test - Verify pool has no leaks - Documentation diff --git a/core/jvmMain/src/kotlinx/io/internal/JavaInterop.kt b/core/jvmMain/src/kotlinx/io/internal/JavaInterop.kt index d6a5e1a0b..b5ad2e773 100644 --- a/core/jvmMain/src/kotlinx/io/internal/JavaInterop.kt +++ b/core/jvmMain/src/kotlinx/io/internal/JavaInterop.kt @@ -39,8 +39,8 @@ internal class InputFromInputStream(private val inputStream: InputStream) : Inpu override fun fill(buffer: Buffer, startIndex: Int, endIndex: Int): Int { // Zero-copy attempt if (buffer.buffer.hasArray()) { - return inputStream.read(buffer.buffer.array(), startIndex, endIndex - startIndex) - .coerceAtLeast(0) // -1 when IS is closed + val result = inputStream.read(buffer.buffer.array(), startIndex, endIndex - startIndex) + return result.coerceAtLeast(0) // -1 when IS is closed } for (i in startIndex until endIndex) {