Skip to content

Dynamically invoking withSpark in dockerized Ktor web app throws UnsupportedFileSystemException #219

Open
@tyknkd

Description

@tyknkd

When withSpark is invoked dynamically in a dockerized Ktor web app, an UnsupportedFileSystemException is thrown.

Expected behavior: No exception is thrown.

A GitHub repo is here.

Broadcast.kt (from kotlin-spark-api example)

import org.jetbrains.kotlinx.spark.api.map
import org.jetbrains.kotlinx.spark.api.withSpark
import java.io.Serializable

object Broadcast {
    data class SomeClass(val a: IntArray, val b: Int) : Serializable

    fun broadcast(): MutableList<Int> {
        lateinit var result: MutableList<Int>
        withSpark(master = "local") {
            val broadcastVariable = spark.broadcast(SomeClass(a = intArrayOf(5, 6), b = 3))
            result = listOf(1, 2, 3, 4, 5)
                .toDS()
                .map {
                    val receivedBroadcast = broadcastVariable.value
                    it + receivedBroadcast.a.first()
                }
                .collectAsList()
            println(result)
        }
        return result
    }
}

Routing.kt

fun Application.configureRouting() {
    routing {
        get("/") {
            val list = Broadcast.broadcast()
            call.respondText(list.toString())
        }
    }
}

Dockerfile

# syntax=docker/dockerfile:1
FROM eclipse-temurin:11-jre-jammy AS jre-jammy-spark
RUN curl https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3-scala2.13.tgz -o spark.tgz && \
    tar -xf spark.tgz && \
    mv spark-3.3.2-bin-hadoop3-scala2.13 /opt/spark && \
    rm spark.tgz
ENV SPARK_HOME="/opt/spark"
ENV PATH="${PATH}:/opt/spark/bin:/opt/spark/sbin"

FROM gradle:8.4-jdk11 AS gradle-build
COPY --chown=gradle:gradle . /home/gradle/src
WORKDIR /home/gradle/src
RUN gradle buildFatJar --no-daemon

FROM jre-jammy-spark AS app
RUN mkdir -p /app
COPY --from=gradle-build /home/gradle/src/build/libs/*-all.jar /app/app.jar
ENTRYPOINT ["java","-jar","/app/app.jar"]

compose.yaml

services:
  app:
    build: .
    ports:
      - 8888:8888

In a shell, run:

$ docker compose up

Then, open http://localhost:8888 in a browser.

An org.apache.hadoop.fs.UnsupportedFileSystemException will be thrown:

app-1  | 2024-04-09 10:26:26.484 [main] INFO  ktor.application - Autoreload is disabled because the development mode is off.
app-1  | 2024-04-09 10:26:26.720 [main] INFO  ktor.application - Application started in 0.261 seconds.
app-1  | 2024-04-09 10:26:26.816 [DefaultDispatcher-worker-1] INFO  ktor.application - Responding at http://0.0.0.0:8888
app-1  | WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
app-1  | Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
app-1  | 2024-04-09 10:27:07.885 [eventLoopGroupProxy-4-1] WARN  o.a.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
app-1  | WARNING: An illegal reflective access operation has occurred
app-1  | WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/app/app.jar) to constructor java.nio.DirectByteBuffer(long,int)
app-1  | WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
app-1  | WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
app-1  | WARNING: All illegal access operations will be denied in a future release
app-1  | 2024-04-09 10:27:10.066 [eventLoopGroupProxy-4-1] WARN  o.a.spark.sql.internal.SharedState - URL.setURLStreamHandlerFactory failed to set FsUrlStreamHandlerFactory
app-1  | 2024-04-09 10:27:10.071 [eventLoopGroupProxy-4-1] WARN  o.a.spark.sql.internal.SharedState - Cannot qualify the warehouse path, leaving it unqualified.
app-1  | org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
app-1  | 	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3585)
app-1  | 	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3608)
app-1  | 	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
app-1  | 	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3712)
app-1  | 	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3663)
app-1  | 	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
app-1  | 	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
app-1  | 	at org.apache.spark.sql.internal.SharedState$.qualifyWarehousePath(SharedState.scala:282)
app-1  | 	at org.apache.spark.sql.internal.SharedState.liftedTree1$1(SharedState.scala:80)
app-1  | 	at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:79)
app-1  | 	at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:143)
app-1  | 	at scala.Option.getOrElse(Option.scala:201)
app-1  | 	at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:143)
app-1  | 	at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:142)
app-1  | 	at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)
app-1  | 	at scala.Option.getOrElse(Option.scala:201)
app-1  | 	at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)
app-1  | 	at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)
app-1  | 	at org.apache.spark.sql.SparkSession.$anonfun$new$3(SparkSession.scala:117)
app-1  | 	at scala.Option.map(Option.scala:242)
app-1  | 	at org.apache.spark.sql.SparkSession.$anonfun$new$1(SparkSession.scala:117)
app-1  | 	at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:230)
app-1  | 	at org.apache.spark.sql.catalyst.SerializerBuildHelper$.nullOnOverflow(SerializerBuildHelper.scala:29)
app-1  | 	at org.apache.spark.sql.catalyst.SerializerBuildHelper$.createSerializerForJavaBigDecimal(SerializerBuildHelper.scala:158)
app-1  | 	at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:549)
app-1  | 	at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
app-1  | 	at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
app-1  | 	at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
app-1  | 	at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
app-1  | 	at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:448)
app-1  | 	at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerForType$1(ScalaReflection.scala:437)
app-1  | 	at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
app-1  | 	at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
app-1  | 	at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
app-1  | 	at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
app-1  | 	at org.apache.spark.sql.catalyst.ScalaReflection$.serializerForType(ScalaReflection.scala:429)
app-1  | 	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:55)
app-1  | 	at org.apache.spark.sql.Encoders$.DECIMAL(Encoders.scala:100)
app-1  | 	at org.apache.spark.sql.Encoders.DECIMAL(Encoders.scala)
app-1  | 	at org.jetbrains.kotlinx.spark.api.EncodingKt.<clinit>(Encoding.kt:87)
app-1  | 	at com.example.plugins.Broadcast.broadcast(Broadcast.kt:62)
app-1  | 	at com.example.plugins.RoutingKt$configureRouting$1$1.invokeSuspend(Routing.kt:10)
app-1  | 	at com.example.plugins.RoutingKt$configureRouting$1$1.invoke(Routing.kt)
app-1  | 	at com.example.plugins.RoutingKt$configureRouting$1$1.invoke(Routing.kt)
app-1  | 	at io.ktor.server.routing.Route$buildPipeline$1$1.invokeSuspend(Route.kt:116)
app-1  | 	at io.ktor.server.routing.Route$buildPipeline$1$1.invoke(Route.kt)
app-1  | 	at io.ktor.server.routing.Route$buildPipeline$1$1.invoke(Route.kt)
app-1  | 	at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
app-1  | 	at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
app-1  | 	at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
app-1  | 	at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
app-1  | 	at io.ktor.server.routing.Routing$executeResult$$inlined$execute$1.invokeSuspend(Pipeline.kt:478)
app-1  | 	at io.ktor.server.routing.Routing$executeResult$$inlined$execute$1.invoke(Pipeline.kt)
app-1  | 	at io.ktor.server.routing.Routing$executeResult$$inlined$execute$1.invoke(Pipeline.kt)
app-1  | 	at io.ktor.util.debug.ContextUtilsKt.initContextInDebugMode(ContextUtils.kt:17)
app-1  | 	at io.ktor.server.routing.Routing.executeResult(Routing.kt:190)
app-1  | 	at io.ktor.server.routing.Routing.interceptor(Routing.kt:64)
app-1  | 	at io.ktor.server.routing.Routing$Plugin$install$1.invokeSuspend(Routing.kt:140)
app-1  | 	at io.ktor.server.routing.Routing$Plugin$install$1.invoke(Routing.kt)
app-1  | 	at io.ktor.server.routing.Routing$Plugin$install$1.invoke(Routing.kt)
app-1  | 	at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
app-1  | 	at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
app-1  | 	at io.ktor.server.engine.BaseApplicationEngineKt$installDefaultTransformationChecker$1.invokeSuspend(BaseApplicationEngine.kt:124)
app-1  | 	at io.ktor.server.engine.BaseApplicationEngineKt$installDefaultTransformationChecker$1.invoke(BaseApplicationEngine.kt)
app-1  | 	at io.ktor.server.engine.BaseApplicationEngineKt$installDefaultTransformationChecker$1.invoke(BaseApplicationEngine.kt)
app-1  | 	at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
app-1  | 	at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
app-1  | 	at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
app-1  | 	at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
app-1  | 	at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1$invokeSuspend$$inlined$execute$1.invokeSuspend(Pipeline.kt:478)
app-1  | 	at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
app-1  | 	at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
app-1  | 	at io.ktor.util.debug.ContextUtilsKt.initContextInDebugMode(ContextUtils.kt:17)
app-1  | 	at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1.invokeSuspend(DefaultEnginePipeline.kt:123)
app-1  | 	at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1.invoke(DefaultEnginePipeline.kt)
app-1  | 	at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1.invoke(DefaultEnginePipeline.kt)
app-1  | 	at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
app-1  | 	at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
app-1  | 	at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
app-1  | 	at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
app-1  | 	at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1$invokeSuspend$$inlined$execute$1.invokeSuspend(Pipeline.kt:478)
app-1  | 	at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
app-1  | 	at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
app-1  | 	at io.ktor.util.debug.ContextUtilsKt.initContextInDebugMode(ContextUtils.kt:17)
app-1  | 	at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1.invokeSuspend(NettyApplicationCallHandler.kt:140)
app-1  | 	at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1.invoke(NettyApplicationCallHandler.kt)
app-1  | 	at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1.invoke(NettyApplicationCallHandler.kt)
app-1  | 	at kotlinx.coroutines.intrinsics.UndispatchedKt.startCoroutineUndispatched(Undispatched.kt:44)
app-1  | 	at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:112)
app-1  | 	at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:126)
app-1  | 	at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:56)
app-1  | 	at kotlinx.coroutines.BuildersKt.launch(Unknown Source)
app-1  | 	at io.ktor.server.netty.NettyApplicationCallHandler.handleRequest(NettyApplicationCallHandler.kt:41)
app-1  | 	at io.ktor.server.netty.NettyApplicationCallHandler.channelRead(NettyApplicationCallHandler.kt:33)
app-1  | 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
app-1  | 	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
app-1  | 	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:425)
app-1  | 	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
app-1  | 	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
app-1  | 	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
app-1  | 	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
app-1  | 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
app-1  | 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
app-1  | 	at io.ktor.server.netty.EventLoopGroupProxy$Companion.create$lambda$1$lambda$0(NettyApplicationEngine.kt:296)
app-1  | 	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
app-1  | 	at java.base/java.lang.Thread.run(Unknown Source)
app-1  | [6, 7, 8, 9, 10]

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions