Skip to content

Commit

Permalink
Add support for reading parquet file thanks to arrow-dataset #576
Browse files Browse the repository at this point in the history
  • Loading branch information
fb64 committed Jan 31, 2024
1 parent 5f5b5d3 commit 0dd7498
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 7 deletions.
1 change: 1 addition & 0 deletions dataframe-arrow/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation(libs.arrow.vector)
implementation(libs.arrow.format)
implementation(libs.arrow.memory)
implementation(libs.arrow.dataset)
implementation(libs.commonsCompress)
implementation(libs.kotlin.reflect)
implementation(libs.kotlin.datetimeJvm)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jetbrains.kotlinx.dataframe.io

import org.apache.arrow.dataset.file.FileFormat
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.ipc.ArrowReader
import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
Expand Down Expand Up @@ -186,3 +187,11 @@ public fun DataFrame.Companion.readArrow(
public fun ArrowReader.toDataFrame(
nullability: NullabilityOptions = NullabilityOptions.Infer
): AnyFrame = DataFrame.Companion.readArrowImpl(this, nullability)

/**
* Read [Parquet](https://parquet.apache.org/) data from existing [url] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html)
*/
public fun DataFrame.Companion.readParquet(
url: URL,
nullability: NullabilityOptions = NullabilityOptions.Infer
): AnyFrame = readArrowDataset(url.toString(), FileFormat.PARQUET, nullability)
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package org.jetbrains.kotlinx.dataframe.io

import org.apache.arrow.dataset.file.FileFormat
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
import org.apache.arrow.dataset.jni.DirectReservationListener
import org.apache.arrow.dataset.jni.NativeMemoryPool
import org.apache.arrow.dataset.scanner.ScanOptions
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.BigIntVector
import org.apache.arrow.vector.BitVector
Expand Down Expand Up @@ -296,7 +301,7 @@ internal fun DataFrame.Companion.readArrowImpl(
add(df)
}
}
is ArrowStreamReader -> {
else -> {
val root = reader.vectorSchemaRoot
val schema = root.schema
while (reader.loadNextBatch()) {
Expand All @@ -309,3 +314,27 @@ internal fun DataFrame.Companion.readArrowImpl(
return flattened.concatKeepingSchema()
}
}

internal fun DataFrame.Companion.readArrowDataset(
fileUri: String,
fileFormat: FileFormat,
nullability: NullabilityOptions = NullabilityOptions.Infer,
): AnyFrame {
val scanOptions = ScanOptions(32768)
RootAllocator().use { allocator ->
FileSystemDatasetFactory(
allocator,
NativeMemoryPool.createListenable(DirectReservationListener.instance()),
fileFormat,
fileUri
).use { datasetFactory ->
datasetFactory.finish().use { dataset ->
dataset.newScan(scanOptions).use { scanner ->
scanner.scanBatches().use { reader ->
return readArrow(reader, nullability)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -583,4 +583,17 @@ internal class ArrowKtTest {
val arrowStreamReader = ArrowStreamReader(ipcInputStream, RootAllocator())
arrowStreamReader.toDataFrame() shouldBe expected
}

@Test
fun testReadParquet(){
val path = testResource("test.arrow.parquet").path
val dataFrame = DataFrame.readParquet(URL("file:$path"))
dataFrame.rowsCount() shouldBe 300
assertEstimations(
exampleFrame = dataFrame,
expectedNullable = false,
hasNulls = false,
fromParquet = true
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ import kotlin.reflect.typeOf
* Assert that we have got the same data that was originally saved on example creation.
* Example generation project is currently located at https://github.com/Kopilov/arrow_example
*/
internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) {
internal fun assertEstimations(
exampleFrame: AnyFrame,
expectedNullable: Boolean,
hasNulls: Boolean,
fromParquet: Boolean = false
) {
/**
* In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number
*/
Expand Down Expand Up @@ -129,10 +134,19 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean
assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30))
}

val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC))
if (fromParquet){
//parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDate?>
datetimeCol.type() shouldBe typeOf<LocalDate>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30))
}
}else {
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC))
}
}

val timeSecCol = exampleFrame["time32_seconds"] as DataColumn<LocalTime?>
Expand Down
Binary file not shown.
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ sl4j = "2.0.7"
junit = "4.13.2"
kotestAsserions = "4.6.3"
jsoup = "1.14.3"
arrow = "11.0.0"
arrow = "14.0.2"
docProcessor = "0.2.3"
simpleGit = "2.0.3"

Expand Down Expand Up @@ -65,6 +65,7 @@ jsoup = { module = "org.jsoup:jsoup", version.ref = "jsoup" }
arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref = "arrow" }
arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" }
arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" }
arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" }

kotlinpoet = { group = "com.squareup", name = "kotlinpoet", version.ref = "kotlinpoet" }
swagger = { group = "io.swagger.parser.v3", name = "swagger-parser", version.ref = "openapi" }
Expand Down

0 comments on commit 0dd7498

Please sign in to comment.