In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

# Load the data
voter_df = spark.read.format('csv').options(header=True).load('file:///home/talentum/test-jupyter/P3/M4/sm16/Dataset/DallasCouncilVoters.csv.gz')

# Filter out rows with null VOTER_NAME
voter_df = voter_df.filter(voter_df.VOTER_NAME.isNotNull())

# Split the VOTER_NAME into separate parts
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Define the UDF schema
name_schema = StructType([
    StructField("first_name", StringType(), True),
    StructField("second_name", StringType(), True),
    StructField("last_name", StringType(), True)
])

# Define the UDF
def fullSeperateName(splits):
    return (splits[0], '' if splits[1] == splits[-1] else splits[1], splits[-1])

udfFullSeperateName = F.udf(fullSeperateName, name_schema)

# Create new columns using the UDF
voter_df = voter_df.withColumn('name_struct', udfFullSeperateName(voter_df.splits))
voter_df = voter_df.select(
    '*',
    F.col('name_struct.first_name').alias('First_name'),
    F.col('name_struct.second_name').alias('Second_name'),
    F.col('name_struct.last_name').alias('Last_name')
).drop('name_struct','splits', 'VOTER_NAME')

# Show the DataFrame
voter_df.show()


+----------+-------------+----------+-----------+---------+
|      DATE|        TITLE|First_name|Second_name|Last_name|
+----------+-------------+----------+-----------+---------+
|02/08/2017|Councilmember|  Jennifer|         S.|    Gates|
|02/08/2017|Councilmember|    Philip|         T.| Kingston|
|02/08/2017|        Mayor|   Michael|         S.| Rawlings|
|02/08/2017|Councilmember|      Adam|           |  Medrano|
|02/08/2017|Councilmember|     Casey|           |   Thomas|
|02/08/2017|Councilmember|   Carolyn|       King|   Arnold|
|02/08/2017|Councilmember|     Scott|           |   Griggs|
|02/08/2017|Councilmember|        B.|       Adam|  McGough|
|02/08/2017|Councilmember|       Lee|           | Kleinman|
|02/08/2017|Councilmember|     Sandy|           |  Greyson|
|02/08/2017|Councilmember|  Jennifer|         S.|    Gates|
|02/08/2017|Councilmember|    Philip|         T.| Kingston|
|02/08/2017|        Mayor|   Michael|         S.| Rawlings|
|02/08/2017|Councilmember|      Adam|   