# Using user defined functions in Spark

- You've seen some of the power behind Spark's built-in string functions when it comes to manipulating DataFrames. However, once you reach a certain point, it becomes difficult to process the data in a without creating a rat's nest of function calls. Here's one place where you can use User Defined Functions to manipulate our DataFrames.

- For this exercise, we'll use our `voter_df` DataFrame, but you're going to replace the `first_name` column with the first and middle names.

- The `pyspark.sql.functions` library is available under the alias `F`. The classes from `pyspark.sql.types` are already imported.

## Instructions

- Edit the `getFirstAndMiddle()` function to return a space separated string of names, except the last entry in the names list.
- Define the function as a user-defined function. It should return a string type.
- Create a new column on `voter_df` called `first_and_middle_name` using your UDF.
- Show the Data Frame.

In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [44]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

# Load the CSV file
voter_df = spark.read.format('csv').options(Header=True).load('file:///home/talentum/test-jupyter/P3/M2/SM3/Dataset/DallasCouncilVoters.csv.gz')

voter_df = voter_df.filter(voter_df.VOTER_NAME.isNotNull())

# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

def getFirstAndMiddle(names):
  # Return a space separated string of names
    return ' '.join(names[:-1])


# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))

# Show the DataFrame
voter_df.show()


+----------+-------------+-------------------+--------------------+----------+---------+---------------------+
|      DATE|        TITLE|         VOTER_NAME|              splits|first_name|last_name|first_and_middle_name|
+----------+-------------+-------------------+--------------------+----------+---------+---------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|          Jennifer S.|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip| Kingston|            Philip T.|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael| Rawlings|           Michael S.|
|02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|                 Adam|
|02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|     Casey|   Thomas|                Casey|
|02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, A...|   Carolyn|   Arnold|         Carolyn King|
|

In [58]:
voter_df = spark.read.format('csv').options(Header=True).load('file:///home/talentum/test-jupyter/P3/M2/SM3/Dataset/DallasCouncilVoters.csv.gz')

voter_df.filter(voter_df.VOTER_NAME.isNull()).count()

# voter_df = voter_df.filter(voter_df.VOTER_NAME.isNotNull())


503

In [59]:
voter_df.filter(voter_df.VOTER_NAME.isNotNull()).count()

44122

In [None]:
voter_df.filter(voter_df.VOTER_NAME.isNull(

In [11]:
voter_df.printSchema()

root
 |-- DATE: string (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- VOTER_NAME: string (nullable = true)
 |-- splits: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- first_and_middle_name: string (nullable = true)



In [15]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.column import Column

voter_df = spark.read.format('csv').options(Header=True).load('file:///home/talentum/test-jupyter/P3/M2/SM3/Dataset/DallasCouncilVoters.csv.gz')

voter_df = voter_df.filter(voter_df.VOTER_NAME.isNotNull())

voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

def fullSeperateName(splits):
    return [splits.getItem(0),
           F.when(voter_df.splits.getItem(1) == splits.getItem(F.size('splits') - 1), '').otherwise(splits.getItem(1)),
            splits.getItem(F.size(splits) - 1)
           ]

#     # Middle name
#     voter_df = voter_df.withColumn('Second_name', F.when(voter_df.splits.getItem(1) == voter_df.splits.getItem(F.size('splits') - 1), '').otherwise(voter_df.splits.getItem(1)))
    
#     # Get the last entry of the splits list and create a column called last_name
#     voter_df = voter_df.withColumn('Last_name', voter_df.splits.getItem(F.size('splits') - 1))

In [16]:
udfFullSeperateName = F.udf(fullSeperateName, ArrayType(Column))

# Create a new column using your UDF
voter_df = voter_df.withColumn({'first_name', udfFullSeperateName(voter_df.splits)[0],
                               'second_name', udfFullSeperateName(voter_df.splits)[1],
                               'last_name', udfFullSeperateName(voter_df.splits)[2]})

# Show the DataFrame
voter_df.show()


AssertionError: elementType <class 'pyspark.sql.column.Column'> should be an instance of <class 'pyspark.sql.types.DataType'>

In [9]:
type(udfFullSeperateName(voter_df.splits))

pyspark.sql.column.Column

In [5]:
voter_df.show()


+----------+-------------+-------------------+--------------------+
|      DATE|        TITLE|         VOTER_NAME|              splits|
+----------+-------------+-------------------+--------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|
|02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|
|02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|
|02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, A...|
|02/08/2017|Councilmember|       Scott Griggs|     [Scott, Griggs]|
|02/08/2017|Councilmember|   B. Adam  McGough| [B., Adam, McGough]|
|02/08/2017|Councilmember|       Lee Kleinman|     [Lee, Kleinman]|
|02/08/2017|Councilmember|      Sandy Greyson|    [Sandy, Greyson]|
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|
|02/08/2017|Councilmember| Philip T. Kingston|[P

In [12]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

# Load the data
voter_df = spark.read.format('csv').options(header=True).load('file:///home/talentum/test-jupyter/P3/M2/SM3/Dataset/DallasCouncilVoters.csv.gz')

# Filter out rows with null VOTER_NAME
voter_df = voter_df.filter(voter_df.VOTER_NAME.isNotNull())

# Split the VOTER_NAME into separate parts
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Define the UDF schema
name_schema = StructType([
    StructField("first_name", StringType(), True),
    StructField("second_name", StringType(), True),
    StructField("last_name", StringType(), True)
])

# Define the UDF
def fullSeperateName(splits):
    return (splits[0], '' if splits[1] == splits[-1] else splits[1], splits[-1])

udfFullSeperateName = F.udf(fullSeperateName, name_schema)

# Create new columns using the UDF
voter_df = voter_df.withColumn('name_struct', udfFullSeperateName(voter_df.splits))
voter_df = voter_df.select(
    '*',
    F.col('name_struct.first_name').alias('first_name'),
    F.col('name_struct.second_name').alias('second_name'),
    F.col('name_struct.last_name').alias('last_name')
).drop('name_struct')

# Show the DataFrame
voter_df.show()


+----------+-------------+-------------------+--------------------+----------+-----------+---------+
|      DATE|        TITLE|         VOTER_NAME|              splits|first_name|second_name|last_name|
+----------+-------------+-------------------+--------------------+----------+-----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|         S.|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip|         T.| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael|         S.| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|      Adam|           |  Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|     Casey|           |   Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, A...|   Carolyn|       King|   Arnold|
|02/08/2017|Councilmember|       Scott Griggs|     [Scott, Griggs]|     Scott|           | 