### What are UDFs?

- UDFs or User Defined Functions are a way to extend the functionality of Spark SQL by allowing users to define their own functions that can be used in SQL expressions.

- UDFs are defined using programming languages like Python, Scala, or Java, and can be registered with Spark to make them available for use in SQL expressions. Once registered, UDFs can be used in the same way as built-in functions to transform or manipulate the data in a Spark DataFrame.

### Why are UDFs used?

- UDFs allow users to write custom code to perform specific data processing tasks, which can be more efficient and flexible than using the built-in functions provided by Spark SQL.

- UDFs allow users to express logic in familiar languages, reducing the human cost associated with refactoring code.For ad hoc queries, manual data cleansing, exploratory data analysis, and most operations on small or medium-sized datasets, latency overhead costs associated with UDFs are unlikely to outweigh costs associated with refactoring code.

### How to create UDFs in Pyspark

### Create a Python Function

The first step in creating a UDF is creating a Python function. The below snippet is a function to derive air time of flight in hours from an existing column

In [0]:
def calculate_duration_hrs(air_time):
    return round(air_time / 60,2)

In [0]:
%run /Users/aparna.menon@diggibyte.com/UDF_run

### Converting Python function to PySpark UDF

##### Using select()

In [0]:
air_time_udf = udf(calculate_duration_hrs, FloatType())

In [0]:

df = flights.select('*',air_time_udf(flights.air_time).alias("air_time_hrs"))
df.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|air_time_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|         2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|         6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|        1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|        1.38|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS

##### Using withColumn()

In [0]:
def long_flight(distance):
    return 1 if distance > 1000 else 0

In [0]:
udf_long_flight = udf(long_flight, IntegerType())

In [0]:
df1 = flights.withColumn("long_flight", udf_long_flight("distance"))
df1.show(2)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+-----------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|long_flight|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+-----------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|          0|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|          1|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+-----------+
only showing top 2 rows



##### Registering predefined function

To register a custom function as a UDF, you need to use the udf function provided by PySpark. This function takes your custom function as an argument, and returns a new function that can be registered with Spark using the **spark.udf.register** method.

>Syntax
````python
def func_name(param1,param2,..):
  #function logic

  #register the udf  
spark.udf.register("func_name", func_name)

  # create a temporary view for the flights DataFrame
df.createOrReplaceTempView("table_name")

  # use the UDF in a Spark SQL expression to concatenate origin and dest columns
concat_flights = spark.sql("SELECT column1,column2,func_name(param1, ) as alias_name FROM table_name")

````

In [0]:
def concat_origin_dest(origin, dest):
    return f"{origin}-{dest}"

In [0]:
# register the UDF with Spark
spark.udf.register("concat_origin_dest", concat_origin_dest)

Out[11]: <function __main__.concat_origin_dest(origin, dest)>

In [0]:
# create a temporary view for the flights DataFrame
flights.createOrReplaceTempView("flights")

# use the UDF in a Spark SQL expression to concatenate origin and dest columns
concat_flights = spark.sql("SELECT flight,distance,air_time,concat_origin_dest(origin, dest) as origin_dest FROM flights")

# display the result
concat_flights.show()

+------+--------+--------+-----------+
|flight|distance|air_time|origin_dest|
+------+--------+--------+-----------+
|  1780|     954|     132|    SEA-LAX|
|   851|    2677|     360|    SEA-HNL|
|   755|     679|     111|    SEA-SFO|
|   344|     569|      83|    PDX-SJC|
|   522|     937|     127|    SEA-BUR|
|    48|     991|     121|    PDX-DEN|
|  1520|     543|      90|    PDX-OAK|
|   755|     679|      98|    SEA-SFO|
|   490|    1050|     135|    SEA-SAN|
|    26|    1721|     198|    SEA-ORD|
|   448|     954|     130|    SEA-LAX|
|   656|    1107|     154|    SEA-PHX|
|   608|     867|     127|    SEA-LAS|
|   121|    1448|     183|    SEA-ANC|
|   306|     679|     129|    SEA-SFO|
|  1458|     550|      90|    PDX-SFO|
|   368|     605|      76|    SEA-SMF|
|   827|    1733|     216|    SEA-MDW|
|    24|    2496|     290|    SEA-BOS|
|  3488|     817|     111|    PDX-BUR|
+------+--------+--------+-----------+
only showing top 20 rows



### Using @udf Annotations

UDF can be created in one step without the explicitly assigning it using udf function using the @udf annotation.
> Syntax <br>
```python
@udf(returnType=returnType())
def function_Name(): 
  #function logic
  return return_value
```

In [0]:
flights.groupBy("carrier").count().show()

+-------+-----+
|carrier|count|
+-------+-----+
|     UA| 1051|
|     AA|  482|
|     B6|  214|
|     DL| 1082|
|     OO| 1186|
|     F9|  181|
|     US|  367|
|     HA|   73|
|     AS| 3784|
|     VX|  186|
|     WN| 1394|
+-------+-----+



In [0]:
carrier_dict = {
  'ZW': 'Air Wisconsin',
  'AS': 'Alaska Airlines',
  'G4': 'Allegiant Air LLC',
  'AA': 'American Airlines',
  'C5': 'Champlain Air',
  'CP': 'Compass Airlines',
  'DL': 'Delta Air Lines, Inc.',
  'EM': 'Empire Airline',
  '9E': 'Endeavor Air',
  'MQ': 'Envoy Air',
  'EV': 'ExpressJet Airlines',
  'F9': 'Frontier Airlines, Inc.',
  'G7': 'GoJet Airlines',
  'HA': 'Hawaiian Airlines Inc.',
  'QX': 'Horizon Air',
  'B6': 'Jetblue Airways Corporation',
  'OH': 'Jetstream Intl',
  'YV': 'Mesa Airlines, Inc.',
  'KS': 'Penair',
  'PT': 'Piedmont Airlines',
  'YX': 'Republic Airlines',
  'OO': 'Skywest Airlines',
  'WN': 'Southwest Airlines',
  'NK': 'Spirit Airlines, Inc.',
  'AX': 'Trans State',
  'UA': 'United Airlines, Inc.'
}


In [0]:
@udf(returnType=StringType())
def convert_abbreviation(value):
    if value in carrier_dict:
        return carrier_dict[value]
    else:
        return value

In [0]:
df2 = flights.withColumn("Carrier_Name", convert_abbreviation(flights["carrier"]))
df2.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+--------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|        Carrier_Name|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+--------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|                  VX|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|     Alaska Airlines|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|                  VX|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|  Southwest Airlines|
|2014|    3| 

##Evaluation order and null checking

Spark SQL (including SQL and the DataFrame and Dataset API) does not guarantee the order of evaluation of subexpressions. In particular, the inputs of an operator or function are not necessarily evaluated left-to-right or in any other fixed order. 

If a UDF relies on short-circuiting semantics in SQL for null checking, there’s no guarantee that the null check will happen before invoking the UDF.

````python
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
````
This WHERE clause does not guarantee the strlen UDF to be invoked after filtering out nulls.

To perform proper null checking, we recommend that you do either of the following:

- Make the UDF itself null-aware and do null checking inside the UDF itself

- Use IF or CASE WHEN expressions to do the null check and invoke the UDF in a conditional branch

````python
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok
````

## Performance Concerns with UDF

UDFs can have performance concerns when working with large datasets in distributed systems like PySpark. Here are some of the performance concerns with UDFs:

- **Serialization and Deserialization**: Since UDFs are Python functions, they need to be serialized and deserialized to be sent to the worker nodes for execution. This process can be slow, especially for large UDFs or when working with a lot of data.

- **Garbage Collection**: UDFs create a lot of garbage objects, which can cause performance issues due to increased memory usage and garbage collection overhead.

- **Network Overhead**: UDFs need to send data over the network to worker nodes for processing. This can cause performance issues if the network is slow or if there is a lot of data to be processed.

- **Single-Threaded Execution**: By default, UDFs execute in a single thread on each worker node, which can cause performance issues when processing large datasets.

To optimize performance with UDFs, you can take the following steps:

- Use built-in PySpark functions whenever possible, as they are optimized for distributed processing.

- Use the PySpark DataFrame API instead of RDDs, as it provides optimizations for performance.

- Avoid using UDFs for complex processing and instead use PySpark's higher-level APIs, such as Window, groupBy, join, etc.

- Minimize the amount of data passed to and from UDFs, as serialization and deserialization can be slow and memory-intensive.

- Consider using Pandas UDFs (also known as Vectorized UDFs), which can provide performance benefits for certain types of operations. However, they have their own limitations, such as being memory-intensive and not suitable for all types of operations.

####References
- https://docs.databricks.com/udf/python.html
- https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/#pyspark-udf