In [98]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName('UDFs_TRANFORM()').getOrCreate()


In [2]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]
df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



### Convertcase function for converting to udf

In [19]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

#### converting convertcase function to UDF

In [40]:
convertUDF = udf(lambda z: convertCase(z), StringType())

In [41]:
converted_df = df.select("Seqno","Name",convertUDF("Name").alias("Converted Name"))
converted_df.show()

+-----+------------+--------------+
|Seqno|        Name|Converted Name|
+-----+------------+--------------+
|    1|  john jones|   John Jones |
|    2|tracey smith| Tracey Smith |
|    3| amy sanders|  Amy Sanders |
+-----+------------+--------------+



### Uppercase function for converting to udf

In [25]:
def upperCase(str):
    return str.upper()

#### converting uppercase function to udf

In [26]:
upperCaseUDF = udf(lambda z:upperCase(z),StringType())    

In [29]:
converted_df.withColumn("Cureated Name", upperCaseUDF("Name")).show(truncate=False)

+-----+------------+--------------+-------------+
|Seqno|Name        |Converted Name|Cureated Name|
+-----+------------+--------------+-------------+
|1    |john jones  |John Jones    |JOHN JONES   |
|2    |tracey smith|Tracey Smith  |TRACEY SMITH |
|3    |amy sanders |Amy Sanders   |AMY SANDERS  |
+-----+------------+--------------+-------------+



### Using UDF on SQL
#### spark.udf.register( name_to_be_given_to_udf , function_name , return_type_from_function )

In [36]:
spark.udf.register("convertUDF", convertCase, StringType())

<function __main__.convertCase(str)>

### Creating a Temporary View of the df

In [43]:
df.createOrReplaceTempView("NAME_TABLE")

In [47]:
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE").show(truncate=False)
     
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE and where Name is not null and convertUDF(Name) like '%John%'").show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



### null check

In [57]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ('4',None)]

df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
|4    |NULL        |
+-----+------------+



In [62]:
spark.udf.register("nullsafeUDF", lambda str: convertCase(str) if not str is None else "NULL IS HERE" , StringType())

<function __main__.<lambda>(str)>

In [63]:
spark.sql("select nullsafeUDF(Name) from NAME_TABLE2").show(truncate=False)

spark.sql("select Seqno, nullsafeUDF(Name) as Name from NAME_TABLE2 " + " where Name is not null and nullsafeUDF(Name) like '%John%'").show(truncate=False) 

+-----------------+
|nullsafeUDF(Name)|
+-----------------+
|John Jones       |
|Tracey Smith     |
|Amy Sanders      |
|NULL IS HERE     |
+-----------------+

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



### transform() - without registering udfs, directly function is applied in transform()

In [74]:
simpleData = (("Java",4000,5), 
    ("Python", 4600,10),  
    ("Scala", 4100,15),   
    ("Scala", 4500,15),   
    ("PHP", 3000,20),  
  )
columns= ["CourseName", "fee", "discount"]

df3 = spark.createDataFrame(data = simpleData, schema = columns)
df3.printSchema()
df3.show(truncate=False)

root
 |-- CourseName: string (nullable = true)
 |-- fee: long (nullable = true)
 |-- discount: long (nullable = true)

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4100|15      |
|Scala     |4500|15      |
|PHP       |3000|20      |
+----------+----+--------+



In [79]:
from pyspark.sql.functions import upper

def to_upper_str_columns(df):
    return df.withColumn("CourseName_uppercase",upper(df.CourseName))

def reduce_price(df,reduceBy):
    return df.withColumn("new_fee",df.fee - reduceBy)

def apply_discount(df):
    return df.withColumn("discounted_fee", df.new_fee - (df.new_fee * df.discount) / 100)

In [80]:
df4 = df3.transform(to_upper_str_columns).transform(reduce_price,1000).transform(apply_discount)
df4.show(truncate=False)

+----------+----+--------+--------------------+-------+--------------+
|CourseName|fee |discount|CourseName_uppercase|new_fee|discounted_fee|
+----------+----+--------+--------------------+-------+--------------+
|Java      |4000|5       |JAVA                |3000   |2850.0        |
|Python    |4600|10      |PYTHON              |3600   |3240.0        |
|Scala     |4100|15      |SCALA               |3100   |2635.0        |
|Scala     |4500|15      |SCALA               |3500   |2975.0        |
|PHP       |3000|20      |PHP                 |2000   |1600.0        |
+----------+----+--------+--------------------+-------+--------------+



In [81]:
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"]),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"]),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"])
]
df = spark.createDataFrame(data=data,schema=["Name","Languages1","Languages2"])
df.printSchema()
df.show()

root
 |-- Name: string (nullable = true)
 |-- Languages1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Languages2: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+------------------+---------------+
|            Name|        Languages1|     Languages2|
+----------------+------------------+---------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|
+----------------+------------------+---------------+



### tranform() - another type of transform, functions are directly written in transform( columnn_name , function )

In [94]:
from pyspark.sql.functions import upper
from pyspark.sql.functions import transform

new_df = df.select(transform("Languages1", lambda x: upper(x)).alias("languages1"))
new_df.show()

+------------------+
|        languages1|
+------------------+
|[JAVA, SCALA, C++]|
|[SPARK, JAVA, C++]|
|      [CSHARP, VB]|
+------------------+



In [97]:
transformed_df = df.withColumn("languages", transform("Languages1", lambda x: upper(x)))
transformed_df.show()

+----------------+------------------+---------------+------------------+
|            Name|        Languages1|     Languages2|         languages|
+----------------+------------------+---------------+------------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|[JAVA, SCALA, C++]|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|[SPARK, JAVA, C++]|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|      [CSHARP, VB]|
+----------------+------------------+---------------+------------------+

