From 7eac71f00c71bbb2c3bb9f16296181853399bc62 Mon Sep 17 00:00:00 2001 From: Nikita Klimenko Date: Fri, 5 Dec 2025 22:38:49 +0200 Subject: [PATCH 1/2] Convert StructVector into ColumnGroup instead of DataColumn> --- .../kotlinx/dataframe/io/arrowReadingImpl.kt | 18 +++++++++++++----- .../kotlinx/dataframe/io/ArrowKtTest.kt | 14 +++++--------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index 5ba09a7598..d982e6256f 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -19,6 +19,7 @@ import org.apache.arrow.vector.DateMilliVector import org.apache.arrow.vector.Decimal256Vector import org.apache.arrow.vector.DecimalVector import org.apache.arrow.vector.DurationVector +import org.apache.arrow.vector.FieldVector import org.apache.arrow.vector.Float4Vector import org.apache.arrow.vector.Float8Vector import org.apache.arrow.vector.IntVector @@ -293,10 +294,16 @@ private fun List.withTypeNullable( return this to nothingType(nullable) } -private fun readField(root: VectorSchemaRoot, field: Field, nullability: NullabilityOptions): AnyBaseCol { +private fun readField(vector: FieldVector, field: Field, nullability: NullabilityOptions): AnyBaseCol { try { - val range = 0 until root.rowCount - val (list, type) = when (val vector = root.getVector(field)) { + val range = 0 until vector.valueCount + if (vector is StructVector) { + val columns = field.children.map { childField -> + readField(vector.getChild(childField.name), childField, nullability) + } + return DataColumn.createColumnGroup(field.name, columns.toDataFrame()) + } + val (list, type) = when (vector) { is VarCharVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) is LargeVarCharVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) @@ -357,8 +364,6 @@ private fun readField(root: VectorSchemaRoot, field: Field, nullability: Nullabi is TimeStampSecVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) - is StructVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) - is NullVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) else -> { @@ -371,6 +376,9 @@ private fun readField(root: VectorSchemaRoot, field: Field, nullability: Nullabi } } +private fun readField(root: VectorSchemaRoot, field: Field, nullability: NullabilityOptions): AnyBaseCol = + readField(root.getVector(field), field, nullability) + /** * Read [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format) data from existing [channel] */ diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt index 679aabae49..42ba2733e2 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt @@ -25,7 +25,6 @@ import org.apache.arrow.vector.types.pojo.Field import org.apache.arrow.vector.types.pojo.FieldType import org.apache.arrow.vector.types.pojo.Schema import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel -import org.apache.arrow.vector.util.Text import org.duckdb.DuckDBConnection import org.duckdb.DuckDBResultSet import org.jetbrains.kotlinx.dataframe.AnyFrame @@ -39,7 +38,6 @@ import org.jetbrains.kotlinx.dataframe.api.dataFrameOf import org.jetbrains.kotlinx.dataframe.api.map import org.jetbrains.kotlinx.dataframe.api.pathOf import org.jetbrains.kotlinx.dataframe.api.remove -import org.jetbrains.kotlinx.dataframe.api.toColumn import org.jetbrains.kotlinx.dataframe.exceptions.TypeConverterNotFoundException import org.junit.Assert import org.junit.Test @@ -68,13 +66,11 @@ internal class ArrowKtTest { val df = DataFrame.readArrowFeather(feather) val a by columnOf("one") val b by columnOf(2.0) - val c by listOf( - mapOf( - "c1" to Text("inner"), - "c2" to 4.0, - "c3" to 50.0, - ) as Map, - ).toColumn() + val c by columnOf( + "c1" to columnOf("inner"), + "c2" to columnOf(4.0), + "c3" to columnOf(50.0), + ) val d by columnOf("four") val expected = dataFrameOf(a, b, c, d) df shouldBe expected From d9f9569ad03c5b8735fb38c417e97b8d26d7cf99 Mon Sep 17 00:00:00 2001 From: Nikita Klimenko Date: Fri, 5 Dec 2025 22:48:30 +0200 Subject: [PATCH 2/2] Support writing ColumnGroup into Arrow as StructVector --- .../kotlinx/dataframe/io/ArrowWriterImpl.kt | 25 +++++ .../dataframe/io/arrowTypesMatching.kt | 10 ++ .../kotlinx/dataframe/io/ArrowKtTest.kt | 86 ++++++++++++++++++ .../src/test/resources/books.parquet | Bin 0 -> 5746 bytes 4 files changed, 121 insertions(+) create mode 100644 dataframe-arrow/src/test/resources/books.parquet diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowWriterImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowWriterImpl.kt index 194e5dec3f..64d978c81b 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowWriterImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowWriterImpl.kt @@ -26,6 +26,7 @@ import org.apache.arrow.vector.TinyIntVector import org.apache.arrow.vector.VarCharVector import org.apache.arrow.vector.VariableWidthVector import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.complex.StructVector import org.apache.arrow.vector.types.DateUnit import org.apache.arrow.vector.types.FloatingPointPrecision import org.apache.arrow.vector.types.pojo.ArrowType @@ -49,8 +50,10 @@ import org.jetbrains.kotlinx.dataframe.api.convertToShort import org.jetbrains.kotlinx.dataframe.api.convertToString import org.jetbrains.kotlinx.dataframe.api.forEachIndexed import org.jetbrains.kotlinx.dataframe.api.map +import org.jetbrains.kotlinx.dataframe.columns.ColumnGroup import org.jetbrains.kotlinx.dataframe.exceptions.CellConversionException import org.jetbrains.kotlinx.dataframe.exceptions.TypeConverterNotFoundException +import org.jetbrains.kotlinx.dataframe.indices import org.jetbrains.kotlinx.dataframe.name import org.jetbrains.kotlinx.dataframe.values import kotlin.reflect.full.isSubtypeOf @@ -72,7 +75,15 @@ internal class ArrowWriterImpl( private fun allocateVector(vector: FieldVector, size: Int, totalBytes: Long? = null) { when (vector) { is FixedWidthVector -> vector.allocateNew(size) + is VariableWidthVector -> totalBytes?.let { vector.allocateNew(it, size) } ?: vector.allocateNew(size) + + is StructVector -> { + vector.childrenFromFields.forEach { child -> + allocateVector(child, size) + } + } + else -> throw IllegalArgumentException("Can not allocate ${vector.javaClass.canonicalName}") } } @@ -138,6 +149,8 @@ internal class ArrowWriterImpl( is ArrowType.Time -> column.convertToLocalTime() + is ArrowType.Struct -> column + else -> throw NotImplementedError( "Saving ${targetFieldType.javaClass.canonicalName} is currently not implemented", @@ -277,6 +290,18 @@ internal class ArrowWriterImpl( } ?: vector.setNull(i) } + is StructVector -> { + require(column is ColumnGroup<*>) { + "StructVector expects ColumnGroup, but got ${column::class.simpleName}" + } + + column.columns().forEach { childColumn -> + infillVector(vector.getChild(childColumn.name()), childColumn) + } + + column.indices.forEach { i -> vector.setIndexDefined(i) } + } + else -> { // TODO implement other vector types from [readField] (VarBinaryVector, UIntVector, DurationVector, StructVector) and may be others (ListVector, FixedSizeListVector etc) throw NotImplementedError("Saving to ${vector.javaClass.canonicalName} is currently not implemented") diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowTypesMatching.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowTypesMatching.kt index 1e337d8d2a..2d3fa13010 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowTypesMatching.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowTypesMatching.kt @@ -11,6 +11,7 @@ import org.apache.arrow.vector.types.pojo.Field import org.apache.arrow.vector.types.pojo.FieldType import org.apache.arrow.vector.types.pojo.Schema import org.jetbrains.kotlinx.dataframe.AnyCol +import org.jetbrains.kotlinx.dataframe.columns.ColumnGroup import org.jetbrains.kotlinx.dataframe.typeClass import kotlin.reflect.full.isSubtypeOf import kotlin.reflect.typeOf @@ -27,6 +28,15 @@ public fun AnyCol.toArrowField(mismatchSubscriber: (ConvertingMismatch) -> Unit val columnType = column.type() val nullable = columnType.isMarkedNullable return when { + column is ColumnGroup<*> -> { + val childFields = column.columns().map { it.toArrowField(mismatchSubscriber) } + Field( + column.name(), + FieldType(nullable, ArrowType.Struct(), null), + childFields, + ) + } + columnType.isSubtypeOf(typeOf()) -> Field( column.name(), diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt index 42ba2733e2..9efcc77673 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt @@ -38,6 +38,7 @@ import org.jetbrains.kotlinx.dataframe.api.dataFrameOf import org.jetbrains.kotlinx.dataframe.api.map import org.jetbrains.kotlinx.dataframe.api.pathOf import org.jetbrains.kotlinx.dataframe.api.remove +import org.jetbrains.kotlinx.dataframe.columns.ColumnGroup import org.jetbrains.kotlinx.dataframe.exceptions.TypeConverterNotFoundException import org.junit.Assert import org.junit.Test @@ -724,4 +725,89 @@ internal class ArrowKtTest { dataFrame.rowsCount() shouldBe 900 } + + @Test + fun testColumnGroupRoundtrip() { + val original = dataFrameOf( + "outer" to columnOf("x", "y", "z"), + "inner" to columnOf( + "nested1" to columnOf("a", "b", "c"), + "nested2" to columnOf(1, 2, 3), + ), + ) + + val featherBytes = original.saveArrowFeatherToByteArray() + val fromFeather = DataFrame.readArrowFeather(featherBytes) + fromFeather shouldBe original + + val ipcBytes = original.saveArrowIPCToByteArray() + val fromIpc = DataFrame.readArrowIPC(ipcBytes) + fromIpc shouldBe original + } + + @Test + fun testNestedColumnGroupRoundtrip() { + val deeplyNested by columnOf( + "level2" to columnOf( + "level3" to columnOf(1, 2, 3), + ), + ) + val original = dataFrameOf(deeplyNested) + + val bytes = original.saveArrowFeatherToByteArray() + val restored = DataFrame.readArrowFeather(bytes) + + restored shouldBe original + } + + @Test + fun testColumnGroupWithNulls() { + val group by columnOf( + "a" to columnOf("x", null, "z"), + "b" to columnOf(1, 2, null), + ) + val original = dataFrameOf(group) + + val bytes = original.saveArrowFeatherToByteArray() + val restored = DataFrame.readArrowFeather(bytes) + + restored shouldBe original + } + + @Test + fun testReadParquetWithNestedStruct() { + val resourceUrl = testResource("books.parquet") + val resourcePath = resourceUrl.toURI().toPath() + + val df = DataFrame.readParquet(resourcePath) + + df.columnNames() shouldBe listOf("id", "title", "author", "genre", "publisher") + + val authorGroup = df["author"] as ColumnGroup<*> + authorGroup.columnNames() shouldBe listOf("id", "firstName", "lastName") + + df["id"].type() shouldBe typeOf() + df["title"].type() shouldBe typeOf() + df["genre"].type() shouldBe typeOf() + df["publisher"].type() shouldBe typeOf() + authorGroup["id"].type() shouldBe typeOf() + authorGroup["firstName"].type() shouldBe typeOf() + authorGroup["lastName"].type() shouldBe typeOf() + } + + @Test + fun testParquetNestedStructRoundtrip() { + val resourceUrl = testResource("books.parquet") + val resourcePath = resourceUrl.toURI().toPath() + + val original = DataFrame.readParquet(resourcePath) + + val featherBytes = original.saveArrowFeatherToByteArray() + val fromFeather = DataFrame.readArrowFeather(featherBytes) + fromFeather shouldBe original + + val ipcBytes = original.saveArrowIPCToByteArray() + val fromIpc = DataFrame.readArrowIPC(ipcBytes) + fromIpc shouldBe original + } } diff --git a/dataframe-arrow/src/test/resources/books.parquet b/dataframe-arrow/src/test/resources/books.parquet new file mode 100644 index 0000000000000000000000000000000000000000..0fea3a95fdbb3e8c5512b486c8f3ec567d5566c4 GIT binary patch literal 5746 zcmb7ITWlQF8QvsL;)IY(!)$iJC4(Wx6vv4ZngDf`czua4yNT;{ZctR6ojKk;^6VV< z%#6))kpcord1)o8s*TzQ(yE9)prYmhp*~bJRZ-qbQE4k4P^+dY4@i|NDj|CLzB9Yt z4Gs@g=F85^Ip@FM#f|D-CqUnQ7gK$oQlRr8}yHSCf%@ zR;XBoRobFl6G7mHI(BEJfx--gLZG#OcLb&9^s+RJfz#oN6w5eciO#`z6}`e>`bwhR zGs0I9B`O4pJj+YUsF*LxOVWc^LU&9wBC?x=V(uiIH7blJBFT3XMufk5S*r*qMOB1! zTuF`U#qM}0>kYpvt8qcXO7l7hB6@39z=Jce25bc1 z%o-JX(wcdmF$l6cG0@p8hHy}e>#Q(=5-r9h5z^M>K+2_*1)a{lV|pcIyh5KOv>q-? zkrIL7%20?LK#gXXbo>qSlM< zq^`>qVf+Di1O^T0glXaFifdgqg``-PKADZAv5N$mR{_Jwoee}NR~-a@#dK5X(~2Vq zx^O|l3Bqeo?y-cnY9&lr2fmU9!8T%I6tQ#o_pP@sHdxZS(vr4m$8mStnCO|jG*JA@ z@a#L2yHB6r^Idh@yN$iM7w$Z7u6_TdS6-QZk5vujs#zDmdEw&k)^*fH)L ze;)v-&@q5@i)k_ioCR3tC_ZcE*MKhrtW)a&aw-5^{}MoYvjFQIX)sb}0P;8uuj8Mzq>?jDmBxQTajNfgO|l;#>DGo<>X_n`L^lGsTgrQUgf!08_1ra~SBQshCOUf!TqnCvz z7r70F^=he{u$XnuoP!K;@yJY4Qy$~J?A1gaQG}A*u{$N^Ee2Mt zQ+LluxnOU{7Z#+6A0vrL(U1}Ci16~OMp{LR3SBwiTM$<0`!>X8k~kG`EHUU8WQ0b8 zvSO(6tX>K9O0=kWY}QC$FDuV-XoxuG=2C)cF0DyUpnb$VHK%=KNFpC&c8>(HM8kvj zv=&NPl^PTgVA^tnH_&=5fjJGwk!VvDtF>qZ=HXwOtX-8B}IX)L(vV=*A)Hv+`n$GS*Gn7;V^@Mh$w|fiW4QmtY^sPKef4)~AW|l#ARn!(sUaCUcB_iYuh^- z*~T62^LI?HyY@`x>Hhv#l5@9xIGlO-J1?H`PCs$vhd=-7rTf@i;IE52c>U)2?>%)> zbMR&JFL_XS1+xN%~;bNXhL|})kS>Y{Wo=YkLDli%}N0w0m{V@rR zc*2)3ABGl;CUjI;dW%@-!=Pmu@Zp?{wftjMI z5u%9^JSKug*}&X@%78Go`fwy1MFBF($IK713_TS$(p5I~(}>^@X2B3t(qUq!F?*Co znik5_EjFBNCUp`?4zg!-1A`O3I%pII?-^yP2<^6!ZM9)0RCUOy0-3*zoL(aYznJDb zv>}^5477005|UO%)BztGQJ|OIIT=X|%J}Sny;YpVR+tk~qrkc+Ic75qX&Ct+0%paX(p-`1nc1^^o@%j5)qEDb48wb%ZACbIVww*Qcr>_9ac%@y!UD zK`4`YN|*YM2a$xgZMm3GMpjTwF!yS6_m7omBMto<#$^s)>x7=RELW(Tb2|Epd+-S@06hoRw9=?gb8I;@lz@Yu+VYb z1_9N?_gySZOL4l3uB2Rciz{Mx4r=1Vz(xl{v4EDixWZ3_c09CmcE8P8_#ibrsspFo zZ(R7-Z@W9H!>7jFyGrwaJ(rHTZ@v7~OZTteb^hYtp8m-PtdW`guJ!p`7iUHM*s*YZ z9`S2m?-{xA#GTJMH~ffm+DBUyIh1z39>GAC&uzfC*ztC2yGiDAo14Q&$NJ`GInL+0 zTUEScb^G6$0Ut z&3zy58HP%X*<2Unw}V_3F_Ot%8iSaB7;y$##_6EA>N!ggk#w!Q@1wmW*b4MqNBDP= zM!03NZ<6#g1?PB+w6(i+ye0VlrtJHh`mPN2z1L5>ujWUtl`}UmEdn=_{qvY}d-h7f z8SW(9jvq_>>dx%doqZRM_5I=iiC>>rxliTY4HzLKGT95`(BQ(TGoQ)+=UCS7>&6cX zU49_9dr24!{qdk!HhM6^_fTEre!u?l-ng+OdmrpI(!=Q6JJ1^<8}Q7$!qbA1fgklg z`0(Sc1;tIxL-@GqeR5!R$!1-96_W&^y+|c0A zk->v*e