In [23]:
# Check env vars
!env | grep -e "SPARK" -e "PYTHON"

PYSPARK_DRIVER_PYTHON=/Users/c11309a/.local/share/rtx/installs/python/3.10/bin/python
PYSPARK_PYTHON=/Users/c11309a/.local/share/rtx/installs/python/3.10/bin/python
PYTHONPATH=/Users/c11309a/Tools/spark-3.3.4-bin-hadoop3/python:/Users/c11309a/Tools/spark-3.3.4-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip:/Users/c11309a/Tools/spark-3.3.4-bin-hadoop3/python/lib/pyspark.zip:/Users/c11309a/Tools/spark-3.3.4-bin-hadoop3/python:/Users/c11309a/Tools/spark-3.3.4-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip:/Users/c11309a/Tools/spark-3.3.4-bin-hadoop3/python/lib/*.zip:
SPARK_HOME=/Users/c11309a/Tools/spark-3.3.4-bin-hadoop3
PYTHONUNBUFFERED=1
PYTHONIOENCODING=utf-8
PYDEVD_IPYTHON_COMPATIBLE_DEBUGGING=1
SPARK_AUTH_SOCKET_TIMEOUT=15
SPARK_BUFFER_SIZE=65536


# Basic UDF

In [24]:
# Create a spark session
from pyspark.sql import SparkSession

spark = (
            SparkSession.builder.appName("learn_dataframes")
                .master("local[4]")
                .getOrCreate()
        )

sc = spark.sparkContext

spark

In [25]:
# Create a udf
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

def get_first_letter(word):
    return word[0]

get_first_letter_udf = udf(get_first_letter, StringType())

In [26]:
# Create a dataframe
from pyspark.sql.functions import col

df = spark.createDataFrame(
    [
        ("cat",),
        ("elephant",),
        ("rat",),
        ("rat",),
        ("cat",),
    ],
    ["word"]
)

df.show()

[Stage 4:>                                                          (0 + 1) / 1]

+--------+
|    word|
+--------+
|     cat|
|elephant|
|     rat|
|     rat|
|     cat|
+--------+



                                                                                

In [27]:
# Use the udf to add a column to the dataframe
df.withColumn("first_letter", get_first_letter_udf(col("word"))).show()

+--------+------------+
|    word|first_letter|
+--------+------------+
|     cat|           c|
|elephant|           e|
|     rat|           r|
|     rat|           r|
|     cat|           c|
+--------+------------+



# Retrieve multiple fields from UDF

In [28]:
# Create a dataframe with a list full_addresses that include street, city, state, zip in a single column
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df = spark.createDataFrame(
    [
        ("123 Main St, Buffalo NY 14201",),
        ("456 Pine St, Bellingham WA 98226",),
        ("789 Maple St, Sacramento CA 94203",),
    ],
    ["full_address"]
)

df.show(truncate=False)

+---------------------------------+
|full_address                     |
+---------------------------------+
|123 Main St, Buffalo NY 14201    |
|456 Pine St, Bellingham WA 98226 |
|789 Maple St, Sacramento CA 94203|
+---------------------------------+



In [29]:
# Create a udf to parse the full_address column into a struct with street, city, state, zip
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

def parse_address(full_address):
    street, city_state = full_address.split(", ")
    city, state, zip = city_state.split(" ")
    return (street, city, state, zip)

parse_address_udf = udf(parse_address, StructType([
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip", StringType(), True),
]))

# Use the udf to create a new dataframe with the parsed address in four separate columns
df = (
    df.withColumn("address", parse_address_udf(col("full_address"))).select(
        col("address.street"),
        col("address.city"),
        col("address.state"),
        col("address.zip"),
    )
)

df.show(truncate=False)

+------------+----------+-----+-----+
|street      |city      |state|zip  |
+------------+----------+-----+-----+
|123 Main St |Buffalo   |NY   |14201|
|456 Pine St |Bellingham|WA   |98226|
|789 Maple St|Sacramento|CA   |94203|
+------------+----------+-----+-----+



# Parse FWF file using a dynamic spec

In [30]:
# Read yaml file width fixed width field spec
import yaml

with open("data/fwf_fields.yaml", "r") as f:
    fwf_field_spec = yaml.load(f, yaml.SafeLoader)['fields']
    
fwf_field_spec

[{'name': 'id', 'length': 5, 'type': 'integer'},
 {'name': 'name', 'length': 10, 'type': 'string'},
 {'name': 'age', 'length': 3, 'type': 'integer'}]

In [31]:
# Build dynamic schema with metadata for length from spec
from typing import List
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DataType

# Get the data type from string value
def get_type(type: str) -> DataType:
    if type == "string":
        return StringType()
    elif type == "integer":
        return IntegerType()
    else:
        return StringType()

# Build the schema, adding metadata for length
schema = StructType()
for f in fwf_field_spec:
    t = get_type(f['type'])
    schema.add(StructField(f["name"], t, True, {"length": f["length"]}))
    
print(schema)
print(schema[0].metadata)

StructType([StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('age', IntegerType(), True)])
{'length': 5}


In [32]:
# Create function factory to parse a line of text based on given schema
from typing import List, Tuple

def parse_line_factory(schema: StructType):
    
    def parse_line(line: str) -> List[str]:
        
        start = 0
        values = []
        for f in schema.fields:
            
            # get the string value
            length = f.metadata['length']
            strVal = line[start:start+length].strip()
            start += length
            
            # convert to correct type and add to list
            type = f.dataType
            if type == IntegerType():
                val = int(strVal)
            else:
                val = strVal
            values.append(val)
            
        return values
    
    return parse_line

# Test parse_line function
parse_line_factory(schema)("1    John Doe  025")

[1, 'John Doe', 25]

In [33]:
# Create a udf to parse a line of text based on given schema
from pyspark.sql.functions import udf

parse_line_udf = udf(parse_line_factory(schema), schema)

In [34]:
# Create a dataframe from the fixed width file data/people.txt
df = spark.read.text("data/people.txt")
df.show(truncate=False)

+------------------+
|value             |
+------------------+
|1    John Doe  025|
|2    Jane Doe  030|
|3    Jim Smith 035|
|4    Ann Brown 040|
|5    Tom Davis 045|
+------------------+



In [35]:
# Use the udf to add complex column to the dataframe with parsed values
from pyspark.sql.functions import col

df = df.withColumn("parsed", parse_line_udf(col("value")))
df.show(truncate=False)

+------------------+------------------+
|value             |parsed            |
+------------------+------------------+
|1    John Doe  025|{1, John Doe, 25} |
|2    Jane Doe  030|{2, Jane Doe, 30} |
|3    Jim Smith 035|{3, Jim Smith, 35}|
|4    Ann Brown 040|{4, Ann Brown, 40}|
|5    Tom Davis 045|{5, Tom Davis, 45}|
+------------------+------------------+



In [36]:
# Now create a df with just the parsed values
df = df.select(col("parsed.*"))
df.show(truncate=False)

+---+---------+---+
|id |name     |age|
+---+---------+---+
|1  |John Doe |25 |
|2  |Jane Doe |30 |
|3  |Jim Smith|35 |
|4  |Ann Brown|40 |
|5  |Tom Davis|45 |
+---+---------+---+



In [37]:
# Now do all of that in 1 shot
(
    spark.read.text("data/people.txt")
        .withColumn("parsed", parse_line_udf(col("value")))
        .select(col("parsed.*"))
        .show(truncate=False)
)

+---+---------+---+
|id |name     |age|
+---+---------+---+
|1  |John Doe |25 |
|2  |Jane Doe |30 |
|3  |Jim Smith|35 |
|4  |Ann Brown|40 |
|5  |Tom Davis|45 |
+---+---------+---+

