<a href="https://colab.research.google.com/github/Ravikumarsethu/data-engineer-handbook/blob/main/UserdefinedUDF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType, FloatType

# Initialize Spark
spark = SparkSession.builder.appName("Bird Trends").getOrCreate()

# Sample data
data = [
    ("Greenfinch (Chloris chloris)", "Farmland birds", "(1970-2014)", -1.13),
    ("Siskin (Carduelis spinus)", "Woodland birds", "(1995-2014)", 2.26),
    ("European shag (Phalacrocorax artistotelis)", "Seabirds", "(1986-2014)", -2.31),
    ("Mute Swan (Cygnus olor)", "Water and wetland birds", "(1975-2014)", 1.65),
    ("Collared Dove (Streptopelia decaocto)", "Other", "(1970-2014)", 5.2),
]

columns = ["Species", "Category", "Period", "Annual percentage change"]
df = spark.createDataFrame(data, columns)

# Define functions
def get_english_name(species):
    return species.split('(')[0].strip() if '(' in species else species.strip()

def get_start_year(period):
    return int(period.strip("()").split('-')[0])

def get_trend(percentage):
    if percentage < -3.00:
        return 'strong decline'
    elif -3.00 <= percentage <= -0.50:
        return 'weak decline'
    elif -0.50 < percentage < 0.50:
        return 'no change'
    elif 0.50 <= percentage <= 3.00:
        return 'weak increase'
    else:
        return 'strong increase'

# Register UDFs
spark.udf.register("get_english_name", get_english_name, StringType())
spark.udf.register("get_start_year", get_start_year, IntegerType())
spark.udf.register("get_trend", get_trend, StringType())

# Apply UDFs
df_transformed = df.withColumn("English_Name", udf(get_english_name, StringType())(df["Species"])) \
                   .withColumn("Start_Year", udf(get_start_year, IntegerType())(df["Period"])) \
                   .withColumn("Trend", udf(get_trend, StringType())(df["Annual percentage change"]))

df_transformed.show(truncate=False)


+------------------------------------------+-----------------------+-----------+------------------------+-------------+----------+---------------+
|Species                                   |Category               |Period     |Annual percentage change|English_Name |Start_Year|Trend          |
+------------------------------------------+-----------------------+-----------+------------------------+-------------+----------+---------------+
|Greenfinch (Chloris chloris)              |Farmland birds         |(1970-2014)|-1.13                   |Greenfinch   |1970      |weak decline   |
|Siskin (Carduelis spinus)                 |Woodland birds         |(1995-2014)|2.26                    |Siskin       |1995      |weak increase  |
|European shag (Phalacrocorax artistotelis)|Seabirds               |(1986-2014)|-2.31                   |European shag|1986      |weak decline   |
|Mute Swan (Cygnus olor)                   |Water and wetland birds|(1975-2014)|1.65                    |Mute Swan    