In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.3.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu focal InRelease
Get:4 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Get:5 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Get:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease [18.1 kB]
Get:7 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Hit:8 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:9 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Get:10 http://archive.ubuntu.com/ubuntu focal-updates/main 

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameFunctions").getOrCreate()

In [3]:
# import Tokenizer 
from pyspark.ml.feature import Tokenizer

In [13]:
# create sample DataFrame
dataframe = spark.createDataFrame([
    (0, "Spark is great"),
    (1, "Learning Spark is fun"),
    (2, "Spark is better")
], ["id", "sentence"])
dataframe.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|      Spark is great|
|  1|Learning Spark is...|
|  2|     Spark is better|
+---+--------------------+



In [14]:
# Tokenize sentences
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
tokenizer

Tokenizer_2760f563ec06

In [15]:
# Transform and show df
tokenized_df = tokenizer.transform(dataframe)
tokenized_df.show(truncate=False)

+---+---------------------+--------------------------+
|id |sentence             |words                     |
+---+---------------------+--------------------------+
|0  |Spark is great       |[spark, is, great]        |
|1  |Learning Spark is fun|[learning, spark, is, fun]|
|2  |Spark is better      |[spark, is, better]       |
+---+---------------------+--------------------------+



User-defined functions (UDFs) are functions created by the user to add custom output columns.



In [16]:
# Create a function to return the length of a list
def word_list_length(word_list):
    return len(word_list)

In [17]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [18]:
# Create a user defined function
count_tokens = udf(word_list_length, IntegerType())

In [19]:
# create Tokenizer 
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

# Transform df
tokenized_df = tokenizer.transform(dataframe)

#select the needed columns and donet truncate results
tokenized_df.withColumn("tokens", count_tokens(col("words"))).show(truncate=False)

+---+---------------------+--------------------------+------+
|id |sentence             |words                     |tokens|
+---+---------------------+--------------------------+------+
|0  |Spark is great       |[spark, is, great]        |3     |
|1  |Learning Spark is fun|[learning, spark, is, fun]|4     |
|2  |Spark is better      |[spark, is, better]       |3     |
+---+---------------------+--------------------------+------+

