Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.4+ #218

Draft
wants to merge 40 commits into
base: main
Choose a base branch
from
Draft

3.4+ #218

wants to merge 40 commits into from

Conversation

Jolanrensen
Copy link
Collaborator

@Jolanrensen Jolanrensen commented Mar 17, 2024

Fixes #195, which is a fun read if you're interested in the process :)

This is a work-in-progress overhaul of the core parts of the library to support Spark 3.4+.

Why

  • Too much has changed in Spark 3.4+ due to Spark decoupling their encoding/decoding system with Spark Connect in mind.
  • Our previous method was hacky and made us publish exact versions of Spark to maintain bytecode-level compatibility.
  • We too would like Spark Connect support in the future :)
  • We need to keep supporting newer Spark versions

What has changed

  • Removed the :core module entirely. No more spark-package injected code that can break at the bytecode level.
  • Instead, we just have a :scala-helpers module which doesn't even depend on Spark atm. We just need the VarargUnwrapper class.
  • Rewrote Encoding from the ground up in pure Kotlin this time. We use the power of Kotlin reflection. I took inspiration of JavaTypeInference and ScalaReflection, which, since 3.4, now build an AgnosticEncoder as a sort-of intermediate step in building an Encoder for the data. This non-implementation-specific encoder can be turned into an actual encoder by passing it to ExpressionEncoder() or into something entirely different (which is what makes Spark Connect possible).
  • Our KotlinTypeInference.encoderFor implementation is a mix of the Java and Scala types, supporting both Scala/Java lists, primitives, scala Tuples, and most importantly Kotlin data classes.
  • One downside of having to create an AgnosticEncoder is that we are limited to the AgnosticEncoders offered to us by Spark. We cannot write our own (de)serializers anymore if we want to support Spark Connect. So, in order to support data classes, we need to hijack ProductEncoder.
  • Deserializing data classes using ProductEncoder works fine, but for serializing we hit a snag. In Scala, case classes have a function with the same name as each property. This assumption is used under the hood, so we need to make sure those functions exist in our data classes.
    Plus, later I found this function to do an actual instance check to see if the value is a scala.Product... It's compiler plugin time!
  • I created a Kotlin compiler plugin which, when applied to your project, can convert:
@Sparkify
data class User(
    val name: String,
    @ColumnName("test") val age: Int,
)

to

@Sparkify
data class User(
    @get:JvmName("name") val name: String,
    @get:JvmName("test") @ColumnName("test") val age: Int,
): scala.Product, Serializable {
  override fun canEqual(that: Any?): Boolean = that is User
  override fun productArity(): Int = 2
  override fun productElement(n: Int): Any? =
    if (n == 0) this.name
    else if (n == 1) this.age
    else throw IndexOutOfBoundsException()
}

satisfying both needs from Spark. One downside of this approach is that now you need to annotate each data class you want to encode with @Sparkify (else the column names will be getName and getAge). And you cannot annotate external data classes like Pair :/ So I recommend working with tuples from now on (or make your own @Sparkify Pair).

  • The compiler plugin (:compiler-plugin) is going to be applicable to your Gradle project by the gradle plugin (:gradle-plugin) with id("org.jetbrains.kotlinx.spark.api") version X or in maven with the <compilerPlugins> tag (probably).
  • The :kotlin-spark-api and :examples modules also depend on these two plugins for their tests. This is done with a gradle trick that updates bootstrap jars and adds them to the classpath/repository.
  • Updated to Kotlin 2.0 Beta 5. You should still be able to use 1.9.23 with the compiler plugin, since it just uses IR. It does not require K2.
  • For Kotlin 2.0, just make sure you set freeCompilerArgs.add("-Xlambdas=class") since Spark cannot serialize lamdas otherwise. If you use the gradle plugin, this is done for you.

TODO

  • Provide warnings for non-Sparkified classes, especially for Pair/Triple
  • Java bean as fallback encoder
  • Jupyter support
  • Finalize Jupyter support
  • UDTs for non-generic Kotlin types like Instant, LocalDateTime etc.
  • Spark Connect
  • Docs
  • Fix RddTest "Work with any number"
  • Remove streaming in favor of structured streaming, update examples

@Jolanrensen Jolanrensen added the enhancement New feature or request label Mar 17, 2024
@Jolanrensen Jolanrensen changed the base branch from release to main March 24, 2024 16:10
a) it can build gradle-plugin and compiler-plugin with bootstrap jars without mavenLocal.
b) bootstrap jars are updated before the actual build
@Jolanrensen
Copy link
Collaborator Author

added encoding for KotlinX: DatePeriod, DateTimePeriod, Instant, LocalDateTime, and LocalDate, kotlin.time.Duration is sadly not working as it's a value class. (I think that's the reason)

…sion for jupyter. Disabled html renderes in favor of just outputting them as text. Notebooks can render them however they like. RDDs are converted to ds before rendering
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Spark 3.4+ / -Connect support
1 participant