diff --git a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 0239c2e2..8be2385a 100644 --- a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -180,6 +180,22 @@ inline fun KeyValueGroupedDataset.reduc reduceGroups(ReduceFunction(func)) .map { t -> t._1 to t._2 } +@JvmName("takeKeysTuple2") +inline fun Dataset>.takeKeys(): Dataset = map { it._1() } + +inline fun Dataset>.takeKeys(): Dataset = map { it.first } + +@JvmName("takeKeysArity2") +inline fun Dataset>.takeKeys(): Dataset = map { it._1 } + +@JvmName("takeValuesTuple2") +inline fun Dataset>.takeValues(): Dataset = map { it._2() } + +inline fun Dataset>.takeValues(): Dataset = map { it.second } + +@JvmName("takeValuesArity2") +inline fun Dataset>.takeValues(): Dataset = map { it._2 } + inline fun KeyValueGroupedDataset.flatMapGroups( noinline func: (key: K, values: Iterator) -> Iterator ): Dataset = flatMapGroups( @@ -238,6 +254,8 @@ inline fun Dataset<*>.to(): Dataset = `as`(encoder()) inline fun Dataset.forEach(noinline func: (T) -> Unit) = foreach(ForeachFunction(func)) +inline fun Dataset.forEachPartition(noinline func: (Iterator) -> Unit) = foreachPartition(ForeachPartitionFunction(func)) + /** * It's hard to call `Dataset.debugCodegen` from kotlin, so here is utility for that */ @@ -718,6 +736,16 @@ inline fun col(column: KProperty1): TypedColumn Dataset.invoke(column: KProperty1): TypedColumn = col(column) +/** + * Allows to sort data class dataset on one or more of the properties of the data class. + * ```kotlin + * val sorted: Dataset = unsorted.sort(YourClass::a) + * val sorted2: Dataset = unsorted.sort(YourClass::a, YourClass::b) + * ``` + */ +fun Dataset.sort(col: KProperty1, vararg cols: KProperty1): Dataset = + sort(col.name, *cols.map { it.name }.toTypedArray()) + /** * Alternative to [Dataset.show] which returns source dataset. * Useful for debug purposes when you need to view content of a dataset as an intermediate operation diff --git a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index 11d7d5cc..51be5aec 100644 --- a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -437,11 +437,43 @@ class ApiTest : ShouldSpec({ b.count() shouldBe 1 } + should("Allow simple forEachPartition in datasets") { + val dataset = dsOf( + SomeClass(intArrayOf(1, 2, 3), 1), + SomeClass(intArrayOf(4, 3, 2), 1), + ) + dataset.forEachPartition { + it.forEach { + it.b shouldBe 1 + } + } + } + should("Have easier access to keys and values for key/value datasets") { + val dataset: Dataset = dsOf( + SomeClass(intArrayOf(1, 2, 3), 1), + SomeClass(intArrayOf(4, 3, 2), 1), + ) + .groupByKey { it.b } + .reduceGroups(func = { a, b -> SomeClass(a.a + b.a, a.b) }) + .takeValues() + + dataset.count() shouldBe 1 + } + should("Be able to sort datasets with property reference") { + val dataset: Dataset = dsOf( + SomeClass(intArrayOf(1, 2, 3), 2), + SomeClass(intArrayOf(4, 3, 2), 1), + ) + dataset.sort(SomeClass::b) + dataset.takeAsList(1).first().b shouldBe 2 + + dataset.sort(SomeClass::a, SomeClass::b) + dataset.takeAsList(1).first().b shouldBe 2 + } } } }) - data class DataClassWithTuple(val tuple: T) diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 8b5e5142..a103bb43 100644 --- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -176,6 +176,23 @@ inline fun KeyValueGroupedDataset.reduc reduceGroups(ReduceFunction(func)) .map { t -> t._1 to t._2 } +@JvmName("takeKeysTuple2") +inline fun Dataset>.takeKeys(): Dataset = map { it._1() } + +inline fun Dataset>.takeKeys(): Dataset = map { it.first } + +@JvmName("takeKeysArity2") +inline fun Dataset>.takeKeys(): Dataset = map { it._1 } + +@JvmName("takeValuesTuple2") +inline fun Dataset>.takeValues(): Dataset = map { it._2() } + +inline fun Dataset>.takeValues(): Dataset = map { it.second } + +@JvmName("takeValuesArity2") +inline fun Dataset>.takeValues(): Dataset = map { it._2 } + + inline fun KeyValueGroupedDataset.flatMapGroups( noinline func: (key: K, values: Iterator) -> Iterator ): Dataset = flatMapGroups( @@ -234,6 +251,8 @@ inline fun Dataset<*>.to(): Dataset = `as`(encoder()) inline fun Dataset.forEach(noinline func: (T) -> Unit) = foreach(ForeachFunction(func)) +inline fun Dataset.forEachPartition(noinline func: (Iterator) -> Unit) = foreachPartition(ForeachPartitionFunction(func)) + /** * It's hard to call `Dataset.debugCodegen` from kotlin, so here is utility for that */ @@ -714,6 +733,16 @@ inline fun col(column: KProperty1): TypedColumn Dataset.invoke(column: KProperty1): TypedColumn = col(column) +/** + * Allows to sort data class dataset on one or more of the properties of the data class. + * ```kotlin + * val sorted: Dataset = unsorted.sort(YourClass::a) + * val sorted2: Dataset = unsorted.sort(YourClass::a, YourClass::b) + * ``` + */ +fun Dataset.sort(col: KProperty1, vararg cols: KProperty1): Dataset = + sort(col.name, *cols.map { it.name }.toTypedArray()) + /** * Alternative to [Dataset.show] which returns source dataset. * Useful for debug purposes when you need to view content of a dataset as an intermediate operation diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index 9e887236..823f968f 100644 --- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -477,6 +477,39 @@ class ApiTest : ShouldSpec({ ) dataset.show() } + should("Allow simple forEachPartition in datasets") { + val dataset = dsOf( + SomeClass(intArrayOf(1, 2, 3), 1), + SomeClass(intArrayOf(4, 3, 2), 1), + ) + dataset.forEachPartition { + it.forEach { + it.b shouldBe 1 + } + } + } + should("Have easier access to keys and values for key/value datasets") { + val dataset: Dataset = dsOf( + SomeClass(intArrayOf(1, 2, 3), 1), + SomeClass(intArrayOf(4, 3, 2), 1), + ) + .groupByKey { it.b } + .reduceGroups(func = { a, b -> SomeClass(a.a + b.a, a.b) }) + .takeValues() + + dataset.count() shouldBe 1 + } + should("Be able to sort datasets with property reference") { + val dataset: Dataset = dsOf( + SomeClass(intArrayOf(1, 2, 3), 2), + SomeClass(intArrayOf(4, 3, 2), 1), + ) + dataset.sort(SomeClass::b) + dataset.takeAsList(1).first().b shouldBe 2 + + dataset.sort(SomeClass::a, SomeClass::b) + dataset.takeAsList(1).first().b shouldBe 2 + } } } })