Skip to content

User Defined Functions crash Spark Dataframes created directly, but not for ones made from Pandas on Spark. #55882

@IMarvinTPA

Description

@IMarvinTPA

I am attempting to create user defined functions for spark dataframes, but the engine crashes when using them on Windows.
But the creation source of the dataframe determines if it crashes or not.

#%%
import pyspark.pandas as ppd
import pyspark.sql as ss
import pyspark.sql.functions as psFunc
import pyspark.sql.types as sst

mySpark = ss.SparkSession.builder.appName('test') \
            .config("spark.sql.catalogImplementation", "hive") \
            .config("spark.sql.legacy.createHiveTableByDefault", "false")\
            .config("spark.sql.repl.eagerEval.enabled", "true") \
            .config("spark.sql.sources.default", "parquet") \
            .config("spark.sql.execution.arrow.pyspark.enabled", "true") \  #Crashes without this too.
            .config("spark.sql.ansi.enabled", "false") \
            .config("spark.python.worker.faulthandler.enabled", "true") \
            .config("spark.sql.execution.pyspark.udf.faulthandler.enabled", "true") \
            .enableHiveSupport().getOrCreate()

            # .config("spark.executor.memory", "6g") \
            # .config("spark.executor.memoryOverhead", "1g") \
            # .config("spark.delta.catalog.update.enabled", "false") \


DECIMAL_STR_LIST = ["1",
                    "2.2",
                    "3.3",
                    "44.4",
                    "555.55",
                    "654321.12",
                    "7.75",
                    "-800000.8",
                    "None",
                    "1000"]

def noChange(val: str) -> str:
    return str(val)

# %%
#Test using Spark Dataframe created from Pandas on Spark dataframe.  This does NOT crash.
cols = {"aDecimal82": DECIMAL_STR_LIST}
df2 = ppd.DataFrame(cols)
sparkDf = df2.to_spark()

ncUdfFunc = psFunc.udf(noChange, sst.StringType())

newCol = ncUdfFunc(psFunc.col("aDecimal82")).alias("aDecimal82")

newDf = sparkDf.select([newCol])

print(newDf)

# %%
#Test using Spark Dataframe created directly.  This DOES crash.
cols = {"aDecimal82": DECIMAL_STR_LIST}
cols2 = list(map(list, zip(*cols)))
sparkDf = mySpark.createDataFrame(cols2, ["aDecimal82"])

ncUdfFunc = psFunc.udf(noChange, sst.StringType())

newCol = ncUdfFunc(psFunc.col("aDecimal82")).alias("aDecimal82")

newDf = sparkDf.select([newCol])

print(newDf)

Python Version: 3.12.10
Pandas Version: 2.2.3
Spark Version: 4.2.0-preview5
numpy Version: 2.1.3
pyArrow Version: 19.0.1

The final part of the error message:
Py4JJavaError: An error occurred while calling o207.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 36) (machine_name executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
Lots of intermediate errors:
Caused by: java.io.IOException: An established connection was aborted by the software in your host machine
at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:54)
at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:76)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:53)
at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532)
at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:975)
at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:879)
at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
at java.base/java.io.DataInputStream.readInt(DataInputStream.java:393)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1053)
... 45 more

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions