# Using user defined functions in Spark

- You've seen some of the power behind Spark's built-in string functions when it comes to manipulating DataFrames. However, once you reach a certain point, it becomes difficult to process the data in a without creating a rat's nest of function calls. Here's one place where you can use User Defined Functions to manipulate our DataFrames.

- For this exercise, we'll use our `voter_df` DataFrame, but you're going to replace the `first_name` column with the first and middle names.

- The `pyspark.sql.functions` library is available under the alias `F`. The classes from `pyspark.sql.types` are already imported.

## Instructions

- Edit the `getFirstAndMiddle()` function to return a space separated string of names, except the last entry in the names list.
- Define the function as a user-defined function. It should return a string type.
- Create a new column on `voter_df` called `first_and_middle_name` using your UDF.
- Show the Data Frame.

In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [10]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

# Load the CSV file
voter_df = spark.read.format('csv').options(Header=True).load('file:///home/talentum/test-jupyter/P2/M1/SM16/Dataset/DallasCouncilVoters.csv')
#voter_df.show()
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

def getFirstAndMiddle(names):
    # Assuming the first name is always at index 0
    first_name = names[0]
    # If there are more than 2 names, concatenate the second name as middle name
    if len(names) > 2:
        middle_name = ' '.join(names[1:-1])
        return f"{first_name} {middle_name}"
    else:
        return first_name

# Register the UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(F.col('splits')))

# Show the DataFrame
voter_df.show(truncate=False)


+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember|       Scott Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember| Rickey D. Callahan|
|01/11/2017|Councilmember|  Jennifer S. Gates|
|04/25/2018|C

In [20]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

# Load the CSV file
voter_df = spark.read.format('csv').options(header=True).load('file:///home/talentum/test-jupyter/P2/M1/SM16/Dataset/DallasCouncilVoters.csv')

voter_df = voter_df.filter(voter_df['VOTER_NAME'].isNotNull())

# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size(voter_df.splits) - 1))

# Define a UDF to concatenate first and middle names
def getFirstAndMiddle(names):
    if names is None:
        return None
    # Assuming the first name is always at index 0
    first_name = names[0]
    # If there are more than 1 names, concatenate all names except the first and last to form the middle name(s)
    if len(names) > 2:
        middle_name = ' '.join(names[1:-1])
        return f"{first_name} {middle_name}"
    else:
        return first_name
    


# Register the UDF with correct return type
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(F.col('splits')))

# Show the DataFrame
voter_df.show(truncate=False)

+----------+-------------+-------------------+-----------------------+----------+---------+---------------------+
|DATE      |TITLE        |VOTER_NAME         |splits                 |first_name|last_name|first_and_middle_name|
+----------+-------------+-------------------+-----------------------+----------+---------+---------------------+
|02/08/2017|Councilmember|Jennifer S. Gates  |[Jennifer, S., Gates]  |Jennifer  |Gates    |Jennifer S.          |
|02/08/2017|Councilmember|Philip T. Kingston |[Philip, T., Kingston] |Philip    |Kingston |Philip T.            |
|02/08/2017|Mayor        |Michael S. Rawlings|[Michael, S., Rawlings]|Michael   |Rawlings |Michael S.           |
|02/08/2017|Councilmember|Adam Medrano       |[Adam, Medrano]        |Adam      |Medrano  |Adam                 |
|02/08/2017|Councilmember|Casey Thomas       |[Casey, Thomas]        |Casey     |Thomas   |Casey                |
|02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, Arnold]|Carolyn   |Arnold 

In [19]:
def join(lst):
    return ' '.join(lst[:-1])
join(['string','sagar', 'Z'])

'string sagar'

In [26]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

# Load the CSV file
voter_df = spark.read.format('csv').options(header=True).load('file:///home/talentum/test-jupyter/P2/M1/SM16/Dataset/DallasCouncilVoters.csv')

# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size(voter_df.splits) - 1))

# Define a UDF to concatenate first and middle names
def getFirstAndMiddle(names):
    if names is None:
        return None
    # Assuming the first name is always at index 0
    first_name = names[0]
    # If there are more than 1 names, concatenate all names except the first and last to form the middle name(s)
    if len(names) > 2:
        middle_name = ' '.join(names[1:-1])
        return f"{first_name} {middle_name}"
    else:
        return first_name
    


# Register the UDF with correct return type
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(F.col('splits')))

# Show the DataFrame
voter_df.show(truncate=False)

+----------+-------------+-------------------+-----------------------+----------+---------+---------------------+
|DATE      |TITLE        |VOTER_NAME         |splits                 |first_name|last_name|first_and_middle_name|
+----------+-------------+-------------------+-----------------------+----------+---------+---------------------+
|02/08/2017|Councilmember|Jennifer S. Gates  |[Jennifer, S., Gates]  |Jennifer  |Gates    |Jennifer S.          |
|02/08/2017|Councilmember|Philip T. Kingston |[Philip, T., Kingston] |Philip    |Kingston |Philip T.            |
|02/08/2017|Mayor        |Michael S. Rawlings|[Michael, S., Rawlings]|Michael   |Rawlings |Michael S.           |
|02/08/2017|Councilmember|Adam Medrano       |[Adam, Medrano]        |Adam      |Medrano  |Adam                 |
|02/08/2017|Councilmember|Casey Thomas       |[Casey, Thomas]        |Casey     |Thomas   |Casey                |
|02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, Arnold]|Carolyn   |Arnold 

In [27]:
voter_df.filter(voter_df['VOTER_NAME'].isNotNull()).count()

44122

In [31]:
voter_df.filter(voter_df['VOTER_NAME'].isNull()).count()

503

In [30]:
voter_df = spark.read.format('csv').options(header=True).load('file:///home/talentum/test-jupyter/P2/M1/SM16/Dataset/DallasCouncilVoters.csv')
voter_df.count()

44625

In [32]:
(503/44625)*100

1.127170868347339

In [37]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Define UDFs to extract first name, second name, and last name
def getFirstName(names):
    if names is None:
        return None
    return names[0]

def getSecondName(names):
    if names is None:
        return None
    if len(names) > 2:
        return names[1]
    else:
        return None

def getLastName(names):
    if names is None:
        return None
    return names[-1]

# Register the UDFs with correct return type
udfFirstName = F.udf(getFirstName, StringType())
udfSecondName = F.udf(getSecondName, StringType())
udfLastName = F.udf(getLastName, StringType())

# Create new columns using the UDFs
voter_df = voter_df.withColumn('first_name', udfFirstName(F.col('splits')))
voter_df = voter_df.withColumn('second_name', udfSecondName(F.col('splits')))
voter_df = voter_df.withColumn('last_name', udfLastName(F.col('splits')))

voter_df = voter_df.drop('DATE', 'TITLE', 'VOTER_NAME')

voter_df.show(truncate=False)

+-----------------------+----------+-----------+---------+
|splits                 |first_name|second_name|last_name|
+-----------------------+----------+-----------+---------+
|[Jennifer, S., Gates]  |Jennifer  |S.         |Gates    |
|[Philip, T., Kingston] |Philip    |T.         |Kingston |
|[Michael, S., Rawlings]|Michael   |S.         |Rawlings |
|[Adam, Medrano]        |Adam      |null       |Medrano  |
|[Casey, Thomas]        |Casey     |null       |Thomas   |
|[Carolyn, King, Arnold]|Carolyn   |King       |Arnold   |
|[Scott, Griggs]        |Scott     |null       |Griggs   |
|[B., Adam, McGough]    |B.        |Adam       |McGough  |
|[Lee, Kleinman]        |Lee       |null       |Kleinman |
|[Sandy, Greyson]       |Sandy     |null       |Greyson  |
|[Jennifer, S., Gates]  |Jennifer  |S.         |Gates    |
|[Philip, T., Kingston] |Philip    |T.         |Kingston |
|[Michael, S., Rawlings]|Michael   |S.         |Rawlings |
|[Adam, Medrano]        |Adam      |null       |Medrano 