<a href="https://colab.research.google.com/github/deepavasanthkumar/spark_tips/blob/main/Python_user_defined_table_function_udtf.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=5bcd4e51013412005a64be535080c705c874c908b60791b9b8f58abcfbe5b00e
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [16]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
  .master("local")
  .appName("PySpark User Defined Table Functions")
  .getOrCreate())

**Python user-defined table function (UDTF)**

 a new type of user-defined function. Unlike scalar functions that return a single result value from each call, each UDTF is invoked in the FROM clause of a query and returns an entire table as output. Each UDTF call can accept zero or more arguments. These arguments can either be scalar expressions or table arguments that represent entire input tables.

In [17]:
from pyspark.sql.functions import udtf

@udtf(returnType="word: string")
class WordSplitter:
    def eval(self, text: str):
        for word in text.split(" "):
            yield (word.strip(),)

# Register the UDTF for use in Spark SQL.
spark.udtf.register("split_words", WordSplitter)

# Example: Using the UDTF in SQL.
spark.sql("SELECT * FROM split_words('hello world')").show()

+-----+
| word|
+-----+
|hello|
|world|
+-----+



**Using the UDTF with a lateral join in SQL**

The lateral join allows us to reference the columns and aliases
in the previous FROM clause items as inputs to the UDTF.

In [18]:


spark.sql(
    "SELECT * FROM VALUES ('Hello World'), ('Apache Spark') t(text), "
    "LATERAL split_words(text)"
).show()

+------------+------+
|        text|  word|
+------------+------+
| Hello World| Hello|
| Hello World| World|
|Apache Spark|Apache|
|Apache Spark| Spark|
+------------+------+



**TABLE input argument**



In [19]:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()

+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+



In [30]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data1 = [("James","","Smith","36636","M", 1000, "Sales", 2020),
    ("Michael","Rose","","40288","M", 2000, "Operations",2020),
    ("Robert","","Williams","42114","M", 3000, "Sales",2020),
    ("Maria","Anne","Jones","39192","F", 4000, "Operations",2020),
  ("Ria","Anne","Jones","60000","F", 7000, "Operations",2020)

  ]

schema1 = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True),
    StructField("annualsalary", IntegerType(), True),
    StructField("work", StringType(), True),
    StructField("year", IntegerType(), True),

  ])

df1 = spark.createDataFrame(data=data1,schema=schema1)
df1.show(truncate=False)
df1.createOrReplaceTempView("sample")


@udtf(returnType="id: string, firstname: string, lastname: string")
class SalaryFilterUDTF:
    def eval(self, row: Row):
        if int(row["annualsalary"]) > 3000:
            yield row["id"], row["firstname"],row["lastname"]

spark.udtf.register("salary_udtf", SalaryFilterUDTF)

spark.sql("SELECT * FROM salary_udtf(TABLE(SELECT * FROM sample))").show()

+---------+----------+--------+-----+------+------------+----------+----+
|firstname|middlename|lastname|id   |gender|annualsalary|work      |year|
+---------+----------+--------+-----+------+------------+----------+----+
|James    |          |Smith   |36636|M     |1000        |Sales     |2020|
|Michael  |Rose      |        |40288|M     |2000        |Operations|2020|
|Robert   |          |Williams|42114|M     |3000        |Sales     |2020|
|Maria    |Anne      |Jones   |39192|F     |4000        |Operations|2020|
|Ria      |Anne      |Jones   |60000|F     |7000        |Operations|2020|
+---------+----------+--------+-----+------+------------+----------+----+

+-----+---------+--------+
|   id|firstname|lastname|
+-----+---------+--------+
|39192|    Maria|   Jones|
|60000|      Ria|   Jones|
+-----+---------+--------+



In [None]:
spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()