* Spark provides robust set of pre-defined functions as part of `pyspark.sql.functions`.
* However, they might not fulfill all our requirements.
* At times, we might have to develop customer UDFs for these scenarios.
    * No function available for our requirement while applying row level transformation.
    * Also, we might have to use multiple functions due to which readability of the code is compromised.

Here are the steps we need to follow to develop and use Spark User Definded Functions.
  * Develop required logic using Python as programming language.
  * Register the function using `spark.udf.register`. Also assign it ti variable.
  * Variable can be used as part of Data Frame APIs such as `select`,`filter` etc.
  * When we register, we register with a name. That name can be used as part of `selectExpr` or as part of Spark SQL queries using `spark.sql`

In [0]:
help(spark.udf.register)

Help on method register in module pyspark.sql.udf:

register(name, f, returnType=None) method of pyspark.sql.udf.UDFRegistration instance
    Register a Python function (including lambda function) or a user-defined function
    as a SQL function.
    
    .. versionadded:: 1.3.1
    
    Parameters
    ----------
    name : str,
        name of the user-defined function in SQL statements.
    f : function, :meth:`pyspark.sql.functions.udf` or :meth:`pyspark.sql.functions.pandas_udf`
        a Python function, or a user-defined function. The user-defined function can
        be either row-at-a-time or vectorized. See :meth:`pyspark.sql.functions.udf` and
        :meth:`pyspark.sql.functions.pandas_udf`.
    returnType : :class:`pyspark.sql.types.DataType` or str, optional
        the return type of the registered user-defined function. The value can
        be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
        `returnType` can be optionally speci

##### Spark UDFs as part of Spark Data Frames

In [0]:
orders = spark.read.parquet('/user/root/retail_db_parquet').drop('year','month','day_of_month')

In [0]:
orders.show()

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|    orderstatus|
+--------+----------+-----------------+---------------+
|   15793|2013-11-03|             6471|       COMPLETE|
|   15794|2013-11-03|             5323|     PROCESSING|
|   15795|2013-11-03|            10096|         CLOSED|
|   15796|2013-11-03|            11665|       COMPLETE|
|   15797|2013-11-03|             6249|PENDING_PAYMENT|
|   15798|2013-11-03|            10736|       COMPLETE|
|   15799|2013-11-03|             5475|       COMPLETE|
|   15800|2013-11-03|             7417|     PROCESSING|
|   15801|2013-11-03|             4021|       COMPLETE|
|   15802|2013-11-03|             2284|         CLOSED|
|   15803|2013-11-03|             1015|     PROCESSING|
|   15804|2013-11-03|              208|       COMPLETE|
|   15805|2013-11-03|             3708|        PENDING|
|   15806|2013-11-03|            10239|PENDING_PAYMENT|
|   15807|2013-11-03|             4709|PENDING_P

In [0]:
orders.dtypes

Out[11]: [('order_id', 'int'),
 ('order_date', 'date'),
 ('order_customer_id', 'bigint'),
 ('orderstatus', 'string')]

In [0]:
orders = orders.withColumn('order_date',orders.order_date.cast('string'))

In [0]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_customer_id: long (nullable = true)
 |-- orderstatus: string (nullable = true)



In [0]:
dc = spark.udf.register('date_convert',lambda d: int(d[:10].replace('-','')))

In [0]:
dc

Out[15]: <function __main__.<lambda>(d)>

In [0]:
orders.select(dc('order_date').alias('order_date')).show()

+----------+
|order_date|
+----------+
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
+----------+
only showing top 20 rows



In [0]:
orders.filter(dc('order_date')== 20140101).show()

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|    orderstatus|
+--------+----------+-----------------+---------------+
|   25876|2014-01-01|             3414|PENDING_PAYMENT|
|   25877|2014-01-01|             5549|PENDING_PAYMENT|
|   25878|2014-01-01|             9084|        PENDING|
|   25879|2014-01-01|             5118|        PENDING|
|   25880|2014-01-01|            10146|       CANCELED|
|   25881|2014-01-01|             3205|PENDING_PAYMENT|
|   25882|2014-01-01|             4598|       COMPLETE|
|   25883|2014-01-01|            11764|        PENDING|
|   25884|2014-01-01|             7904|PENDING_PAYMENT|
|   25885|2014-01-01|             7253|        PENDING|
|   25886|2014-01-01|             8195|     PROCESSING|
|   25887|2014-01-01|            10062|        PENDING|
|   25888|2014-01-01|             6735|       COMPLETE|
|   25889|2014-01-01|            10045|       COMPLETE|
|   25890|2014-01-01|             2581|        P

In [0]:
orders.\
  groupBy(dc('order_date').alias('order_date')).\
  count().\
  withColumnRenamed('count','order_count').\
  show()

+----------+-----------+
|order_date|order_count|
+----------+-----------+
|  20140413|        117|
|  20130919|        206|
|  20140303|        266|
|  20140410|        252|
|  20140512|        246|
|  20140711|        138|
|  20140530|        102|
|  20140202|        192|
|  20140310|        235|
|  20130809|        125|
|  20130817|        253|
|  20131015|        174|
|  20140114|        209|
|  20140505|        171|
|  20140709|        150|
|  20131029|        128|
|  20140130|        254|
|  20130824|        265|
|  20140610|        137|
|  20130913|        103|
+----------+-----------+
only showing top 20 rows



##### Spark UDFs as part of Spark SQL

In [0]:
dc = spark.udf.register('date_convert', lambda d: int(d[:10].replace('-','')))

In [0]:
orders.selectExpr('date_convert(order_date) as order_date').show()

+----------+
|order_date|
+----------+
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
|  20131103|
+----------+
only showing top 20 rows



In [0]:
orders.createOrReplaceTempView('orders')

In [0]:
spark.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |   orders|       true|
+--------+---------+-----------+



In [0]:
spark.sql('''
    select o.* ,date_convert(order_date) as order_date_as_int
    from orders o
      ''').show()

+--------+----------+-----------------+---------------+-----------------+
|order_id|order_date|order_customer_id|    orderstatus|order_date_as_int|
+--------+----------+-----------------+---------------+-----------------+
|   15793|2013-11-03|             6471|       COMPLETE|         20131103|
|   15794|2013-11-03|             5323|     PROCESSING|         20131103|
|   15795|2013-11-03|            10096|         CLOSED|         20131103|
|   15796|2013-11-03|            11665|       COMPLETE|         20131103|
|   15797|2013-11-03|             6249|PENDING_PAYMENT|         20131103|
|   15798|2013-11-03|            10736|       COMPLETE|         20131103|
|   15799|2013-11-03|             5475|       COMPLETE|         20131103|
|   15800|2013-11-03|             7417|     PROCESSING|         20131103|
|   15801|2013-11-03|             4021|       COMPLETE|         20131103|
|   15802|2013-11-03|             2284|         CLOSED|         20131103|
|   15803|2013-11-03|             1015

In [0]:
spark.sql('''
    select o.* ,date_convert(order_date) as order_date_as_int
    from orders o
    where date_convert(order_date) = 20140101
      ''').show()

+--------+----------+-----------------+---------------+-----------------+
|order_id|order_date|order_customer_id|    orderstatus|order_date_as_int|
+--------+----------+-----------------+---------------+-----------------+
|   25876|2014-01-01|             3414|PENDING_PAYMENT|         20140101|
|   25877|2014-01-01|             5549|PENDING_PAYMENT|         20140101|
|   25878|2014-01-01|             9084|        PENDING|         20140101|
|   25879|2014-01-01|             5118|        PENDING|         20140101|
|   25880|2014-01-01|            10146|       CANCELED|         20140101|
|   25881|2014-01-01|             3205|PENDING_PAYMENT|         20140101|
|   25882|2014-01-01|             4598|       COMPLETE|         20140101|
|   25883|2014-01-01|            11764|        PENDING|         20140101|
|   25884|2014-01-01|             7904|PENDING_PAYMENT|         20140101|
|   25885|2014-01-01|             7253|        PENDING|         20140101|
|   25886|2014-01-01|             8195

In [0]:
spark.sql('''
    select date_convert(order_date) as order_date,count(*) as order_count
    from orders o
    group by 1
      ''').show()

+----------+-----------+
|order_date|order_count|
+----------+-----------+
|  20140413|        117|
|  20130919|        206|
|  20140303|        266|
|  20140410|        252|
|  20140512|        246|
|  20140711|        138|
|  20140530|        102|
|  20140202|        192|
|  20140310|        235|
|  20130809|        125|
|  20130817|        253|
|  20131015|        174|
|  20140114|        209|
|  20140505|        171|
|  20140709|        150|
|  20131029|        128|
|  20140130|        254|
|  20130824|        265|
|  20140610|        137|
|  20130913|        103|
+----------+-----------+
only showing top 20 rows



#####  Cleansing Data using Spark UDFs

In [0]:
import pandas as pd

In [0]:
courses = {'course_id': ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10'],
           'course_name': ['Mastering SQL', 'Streaming Pipelines - Python', 'Head First Python',
                           'Designing Data-Intensive Applications', 'Distributed Systems', 'Database Internals',
                           'Art of Immutable Architecture', 'Graph Databases', 'Building MicroServices',
                           'Kubernetes Patterns'],
           'course_author': ['Mike Jack', 'Bob Davis', 'Elvis Presley', 'Martin Kleppmann', 'Sukumar Ghosh',
                             'Alex Petrov',
                             'Michael L. Perry', 'Ian Robinson', 'Sam Newman', 'Rolan Hub'],
           'course_status': ['   published   ', '   inactive   ', '\\N', 'published  ', '\\N', '   inactive',
                             'published   ', '\\N', '  inactive ', 'published   '],
           'course_published_dt': ['2020-07-08', '2020-03-10', '\\N', '2021-02-27', '\\N', '2021-05-14',
                                   '2021-04-18', '\\N',
                                   '2020-12-15', '2021-07-11']}

courses_df = spark.createDataFrame(pd.DataFrame(courses))

In [0]:
users = {'user_id': ['1001', '1002', '1003', '1004', '1005', '1006'],
         'user_name': ['BenJohnson   ', '  Halley Battles ', '  Laura Anderson  ', '  Rolanda Garza ',
                       'Angela Fox  ', 'Kerl Goldinger '],
         'user_email': ['benjohn@gmail.com', '\\N', '\\N', 'garza.roland@gmail.com', 'nshaiary@aol.com',
                        'k.gold@live.com1'],
         'user_gender': ['Male', 'Male', 'Female', 'Male', 'Female', 'Male']}

users_df = spark.createDataFrame(pd.DataFrame(users))

In [0]:
from pyspark.sql import Row

In [0]:
course_enrolments = {'course_id': ['3', '5', '8', '5', '6', '8', '7', '3'],
                     'user_id': ['1001', '1001', '1003', '1003', '1005', '1006', '1001', '1001'],
                     'enrollment_id': ['9010', '9020', '9030', '9040', '9050', '9060', '9070', '9080'],
                     'grade': ['A', '\\N', 'A', '\\N', 'B', 'C', '\\N', 'A'],
                     'department': ['AI  ', 'ML', '  CS', '  DS', '  AI', 'ML', '  CS', 'DS  ']}

course_enrolments_df = spark.createDataFrame(pd.DataFrame(course_enrolments))

In [0]:
def data_cleanse(c):
    return c.strip() if c.strip()!='\\N' else None

In [0]:
data_cleanse = spark.udf.register('data_cleanse',data_cleanse)

In [0]:
courses_df.show()

+---------+--------------------+----------------+---------------+-------------------+
|course_id|         course_name|   course_author|  course_status|course_published_dt|
+---------+--------------------+----------------+---------------+-------------------+
|        1|       Mastering SQL|       Mike Jack|   published   |         2020-07-08|
|        2|Streaming Pipelin...|       Bob Davis|    inactive   |         2020-03-10|
|        3|   Head First Python|   Elvis Presley|             \N|                 \N|
|        4|Designing Data-In...|Martin Kleppmann|    published  |         2021-02-27|
|        5| Distributed Systems|   Sukumar Ghosh|             \N|                 \N|
|        6|  Database Internals|     Alex Petrov|       inactive|         2021-05-14|
|        7|Art of Immutable ...|Michael L. Perry|   published   |         2021-04-18|
|        8|     Graph Databases|    Ian Robinson|             \N|                 \N|
|        9|Building MicroSer...|      Sam Newman|     

In [0]:
from pyspark.sql.functions import col

In [0]:
courses_df.select(
      data_cleanse(col('course_id')).alias('course_id'),
      data_cleanse(col('course_status')).alias('course_status'),
).show()

+---------+-------------+
|course_id|course_status|
+---------+-------------+
|        1|    published|
|        2|     inactive|
|        3|         null|
|        4|    published|
|        5|         null|
|        6|     inactive|
|        7|    published|
|        8|         null|
|        9|     inactive|
|       10|    published|
+---------+-------------+



In [0]:
courses_df.createOrReplaceTempView('courses')

In [0]:
spark.sql('''
  select data_cleanse(course_id) as course_id,
  data_cleanse(course_status) as course_status
  from courses
  ''').show()

+---------+-------------+
|course_id|course_status|
+---------+-------------+
|        1|    published|
|        2|     inactive|
|        3|         null|
|        4|    published|
|        5|         null|
|        6|     inactive|
|        7|    published|
|        8|         null|
|        9|     inactive|
|       10|    published|
+---------+-------------+

