
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning">
</div>



# User-Defined Functions

Databricks recommends using native functions whenever possible. While UDFs are a great way to extend the functionality of Spark SQL, their use requires transferring data between Python and Spark, which in turn requires serialization. This drastically slows down queries.

But sometimes UDFs are necessary. They can be an especially powerful tool for ML or NLP use cases, which may not have a native Spark equivalent.

Run the next cell to set up the lesson.

## REQUIRED - SELECT CLASSIC COMPUTE

Before executing cells in this notebook, please select your classic compute cluster in the lab. Be aware that **Serverless** is enabled by default. If you use Serverless, errors will be returned when setting compute runtime properties.

Follow these steps to select the classic compute cluster:

1. Navigate to the top-right of this notebook and click the drop-down menu to select your cluster. By default, the notebook will use **Serverless**.

1. If your cluster is available, select it and continue to the next cell. If the cluster is not shown:

  - In the drop-down, select **More**.

  - In the **Attach to an existing compute resource** pop-up, select the first drop-down. You will see a unique cluster name in that drop-down. Please select that cluster.

**NOTE:** If your cluster has terminated, you might need to restart it in order to select it. To do this:

1. Right-click on **Compute** in the left navigation pane and select *Open in new tab*.

1. Find the triangle icon to the right of your compute cluster name and click it.

1. Wait a few minutes for the cluster to start.

1. Once the cluster is running, complete the steps above to select your cluster.

## A. Classroom Setup

Run the following cell to configure your working environment for this course. It will also set your default catalog to your unique **labuser** catalog, and the default schema to **default**. All tables will be read from and written to this location.
<br></br>

**NOTE:** The `DA` object is only used in Databricks Academy courses and is not available outside of these courses. It will dynamically reference the information needed to run the course.

In [0]:
%run ./Includes/Classroom-Setup-5

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


Set the spark.sql.autoBroadcastJoinThreshold and spark.databricks.adaptive.autoBroadcastJoinThreshold configs to the defaults.


0,1
Course Catalog:,
Your Schema:,


Let's Check the Current Catalog and Schema

In [0]:
%sql
SELECT current_catalog(), current_schema()

current_catalog(),current_schema()
labuser10305556_1747289640,default


## B. Disable Caching

Run the following cell to set a Spark configuration variable that disables disk caching.

Turning disk caching off prevents Databricks from storing cloud storage files after the first query. This makes the effect of the optimizations more apparent by ensuring that files are always pulled from cloud storage for each query.

For more information, see [Optimize performance with caching on Databricks](https://docs.databricks.com/en/optimizations/disk-cache.html#optimize-performance-with-caching-on-databricks).

**NOTE:** This will not work in Serverless. Please use classic compute to turn off caching. If you're using Serverless, an error will be returned.

In [0]:
spark.conf.set('spark.databricks.io.cache.enabled', False)


## C. Generate Data

Let's generate the data we will use in this demo. For this, we'll synthesize telemetry data representing temperature readings. This time, however, we're only going to generate 60 readings and create a table named **device_data**.


In [0]:
from pyspark.sql.functions import *

## Drop the table if it exists
spark.sql('DROP TABLE IF EXISTS device_data')


## Create the table
spark.sql('DROP TABLE IF EXISTS device_data')

df = (spark
      .range(0, 60, 1, 1)
      .select(
          'id',
          (col('id') % 1000).alias('device_id'),
          (rand() * 100).alias('temperature_F')
      )
      .write
      .saveAsTable('device_data')
)

## Display the table
display(spark.sql('SELECT * FROM device_data'))

id,device_id,temperature_F
0,0,81.76517340123696
1,1,59.30974226989637
2,2,14.394093014542664
3,3,12.256443341818713
4,4,44.850999810820646
5,5,15.252715264985817
6,6,38.82216054596802
7,7,63.77493447871602
8,8,15.204999365208316
9,9,98.60251468306058


## D. Python UDF
Create and use a Python UDF in two ways.
- Computationally Expensive Python UDF
- Parallelization a Python UDF through Repartitioning

### D1. Computationally Expensive Python UDF

For the sake of experimentation, let's implement a function that converts Fahrenheit to Celsius. Notice that we're inserting a one-second sleep to simulate a computationally expensive operation within our UDF. Let's try it.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

## Create the Python UDF
@udf("double")
def F_to_Celsius(f):
    # Let's pretend some fancy math takes one second per row
    time.sleep(1)
    return (f - 32) * (5/9)

spark.sql('DROP TABLE IF EXISTS celsius')

## Prep the data
celsius_df = (spark
              .table('device_data')
              .withColumn("celsius", F_to_Celsius(col('temperature_F')))
            )

## Create the table
(celsius_df
 .write
 .mode('overwrite')
 .saveAsTable('celsius')
)

Run the code to view how many partitions were used for the query. Notice that only **1 partition** was used because the UDF does not utilize the parallel processing capabilities of Spark, which slows down the query.


In [0]:
print(f'Total number of cores across all executors in the cluster: {spark.sparkContext.defaultParallelism}')
print(f'The number of partitions in the underlying RDD of a dataframe: {celsius_df.rdd.getNumPartitions()}')

Total number of cores across all executors in the cluster: 4
The number of partitions in the underlying RDD of a dataframe: 1


Explain the Spark execution plan. Notice that the **BatchEvalPython** stage indicates that a Python UDF is being used.


In [0]:
celsius_df.explain()

== Physical Plan ==
*(2) Project [id#6894L, device_id#6895L, temperature_F#6896, pythonUDF0#8735 AS celsius#6901]
+- BatchEvalPython [F_to_Celsius(temperature_F#6896)#6900], [pythonUDF0#8735]
   +- *(1) ColumnarToRow
      +- PhotonResultStage
         +- PhotonScan parquet labuser10305556_1747289640.default.device_data[id#6894L,device_id#6895L,temperature_F#6896] DataFilters: [], DictionaryFilters: [], Format: parquet, Location: PreparedDeltaFileIndex(1 paths)[s3://unity-catalogs-us-west-2/metastore/4018693-root/89cf4a04-20a..., OptionalDataFilters: [], PartitionFilters: [], ReadSchema: struct<id:bigint,device_id:bigint,temperature_F:double>, RequiredDataFilters: []


== Photon Explanation ==
Photon does not fully support the query because:
		Unsupported node: BatchEvalPython [F_to_Celsius(temperature_F#6896)#6900], [pythonUDF0#8735].

Reference node:
	BatchEvalPython [F_to_Celsius(temperature_F#6896)#6900], [pythonUDF0#8735]



#### Summary
That took approximately one minute, which is kind of surprising since we have about 60 seconds worth of computation, spread across multiple cores. Shouldn't it take significantly less time? 

The answer to this question is yes, it should take less time. The problem here is that Spark doesn't know that the computations are expensive, so it hasn't divided the work up into tasks that can be done in parallel. We can see that by watching the one task chug away as the cell is running, and by visiting the Spark UI.

### D2. Parallelization a Python UDF through Repartitioning

Repartitioning is the answer in this case. *We* know that this computation is expensive and should span across all 4 cores, so we can explicitly repartition the DataFrame:


In [0]:
# Repartition across the number of cores in your cluster
num_cores = 4

@udf("double")
def F_to_Celsius(f):
    # Let's pretend some fancy math take one second per row
    time.sleep(1)
    return (f - 32) * (5/9)

spark.sql('DROP TABLE IF EXISTS celsius')

celsius_df_cores = (spark.table('device_data')
                    .repartition(num_cores) # <-- HERE
                    .withColumn("celsius", F_to_Celsius(col('temperature_F')))
             )

(celsius_df_cores
 .write
 .mode('overwrite')
 .saveAsTable('celsius')
)

Run the code to view how many partitions are being used for the query. Notice that 4 partitions (tasks) are being used to execute the code in parallel.


In [0]:
print(f'Total number of cores across all executors in the cluster: {spark.sparkContext.defaultParallelism}')
print(f'The number of partitions in the underlying RDD of a dataframe: {celsius_df_cores.rdd.getNumPartitions()}')

Total number of cores across all executors in the cluster: 4
The number of partitions in the underlying RDD of a dataframe: 4


#### Summary
Repartition command general recommend best practice that your UDF runs in parallel distributed manner.

##E. SQL UDFs

The ability to create user-defined functions in Python and Scala is convenient since it allows you to extend functionality in the language of your choice. As far as optimization is concerned, however, it's important to know that SQL is generally the best choice, for a couple of reasons:
- SQL UDFs require less data serialization
- Catalyst optimizer can operate within SQL UDFs

Let's see this in action now by comparing the performance of a SQL UDF to its Python counterpart.

First let's redefine the Python UDF from before, this time without the delay, so we can compare raw performance.

Now let's perform the equivalent operation through a SQL UDF.

In [0]:
%sql

-- Create the same function
DROP FUNCTION IF EXISTS farh_to_cels;

CREATE FUNCTION farh_to_cels (farh DOUBLE)
  RETURNS DOUBLE RETURN ((farh - 32) * 5/9);


-- Use the function to create the table
DROP TABLE IF EXISTS celsius_sql;

CREATE OR REPLACE TABLE celsius_sql AS
SELECT farh_to_cels(temperature_F) as Farh_to_cels_convert 
FROM device_data;


-- View the data
SELECT * 
FROM celsius_sql;

Farh_to_cels_convert
27.647318556242755
15.172079038831315
-9.781059436365188
-10.968642587878492
7.139444339344802
-9.30404707500788
3.790089192204454
17.65274137706446
-9.3305559082176
37.00139704614477


Explain the query plan with the SQL UDF. Notice that the SQL UDF is fully supported by Photon is more performant.

In [0]:
%sql
EXPLAIN 
SELECT farh_to_cels(temperature_F) as Farh_to_cels_convert 
FROM device_data

plan
"== Physical Plan == *(1) ColumnarToRow +- PhotonResultStage  +- PhotonProject [(((temperature_F#10348 - 32.0) * 5.0) / 9.0) AS Farh_to_cels_convert#10327]  +- PhotonScan parquet labuser10305556_1747289640.default.device_data[temperature_F#10348] DataFilters: [], DictionaryFilters: [], Format: parquet, Location: PreparedDeltaFileIndex(1 paths)[s3://unity-catalogs-us-west-2/metastore/4018693-root/89cf4a04-20a..., OptionalDataFilters: [], PartitionFilters: [], ReadSchema: struct, RequiredDataFilters: [] == Photon Explanation == The query is fully supported by Photon."


## Summary

Actual times depend on a number of factors, however, on average, the SQL UDF will perform better than its Python equivalent — often, significantly better. The reason for this is that SQL UDFs use Spark's built-in APIs and functions, rather than relying on external dependencies or Python UDFs.

If you are using a UDF in your Spark job, refactoring your code to use native Spark APIs or functions whenever possible will lead to the best performance and efficiency gains.

If you must use UDFs due to strong dependencies on external libraries, you should parallelize your code and repartition your DataFrame to match the number of CPU cores in your cluster to achieve the best level of parallelization.

When using Python UDFs, consider using **Apache Arrow**-optimized Python UDFs instead, as they improve the efficiency of data exchange between the Spark runtime and the UDF process. [Learn more about Arrow-optimized Python UDFs](https://www.databricks.com/blog/arrow-optimized-python-udfs-apache-sparktm-35).


&copy; 2025 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the 
<a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/><a href="https://databricks.com/privacy-policy">Privacy Policy</a> | 
<a href="https://databricks.com/terms-of-use">Terms of Use</a> | 
<a href="https://help.databricks.com/">Support</a>