In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 14.2 kB/88.                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 48.9 kB/88.0% [2 InRelease gpgv 1,575 B] [Waiting for headers] [1 InRelease 51.8 kB/88.7 k                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:7 http://archive.ubun

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Tokens").getOrCreate()

In [3]:
# Import Tokenizer Library
from pyspark.ml.feature import Tokenizer

In [4]:
# Create sample DataFrame
dataframe = spark.createDataFrame([
                                   (0, "BCS Link has not been working today"),
                                   (1, "What am I doing wrong"),
                                   (2, "Please Update!!")
],["id", "sentence"])
dataframe.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|BCS Link has not ...|
|  1|What am I doing w...|
|  2|     Please Update!!|
+---+--------------------+



In [5]:
# Tokenize sentences with Spark
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
tokenizer

Tokenizer_d1235fc7a068

In [6]:
# Transform and show DataFrame
tokenized_df = tokenizer.transform(dataframe)
tokenized_df.show(truncate=False)

+---+-----------------------------------+-------------------------------------------+
|id |sentence                           |words                                      |
+---+-----------------------------------+-------------------------------------------+
|0  |BCS Link has not been working today|[bcs, link, has, not, been, working, today]|
|1  |What am I doing wrong              |[what, am, i, doing, wrong]                |
|2  |Please Update!!                    |[please, update!!]                         |
+---+-----------------------------------+-------------------------------------------+



In [7]:
# we will turn this into a 'user-defined function' (UDF) below 
# function returns the length of a list
def word_list_length(word_list):
  return len(word_list)

In [8]:
# import the udf function, the col function (to select a column to be passed into the udf), and the type IntegerType that will be used in our udf to define the datatype
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [9]:
# Turn our function into a UDF
count_tokens = udf(word_list_length, IntegerType())

In [10]:
# Re-create our Tokenizer
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

# Transform DataFrame
tokenized_df = tokenizer.transform(dataframe)

# Select the needed columns and don't truncate results
tokenized_df.withColumn("tokens", count_tokens(col("words"))).show(truncate=False)

+---+-----------------------------------+-------------------------------------------+------+
|id |sentence                           |words                                      |tokens|
+---+-----------------------------------+-------------------------------------------+------+
|0  |BCS Link has not been working today|[bcs, link, has, not, been, working, today]|7     |
|1  |What am I doing wrong              |[what, am, i, doing, wrong]                |5     |
|2  |Please Update!!                    |[please, update!!]                         |2     |
+---+-----------------------------------+-------------------------------------------+------+



In [11]:
# Import Stop Words library
from pyspark.ml.feature import StopWordsRemover

In [12]:
# Run the remover
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

In [13]:
# Transform and show data
remover.transform(tokenized_df).show(truncate=False)

+---+-----------------------------------+-------------------------------------------+---------------------------+
|id |sentence                           |words                                      |filtered                   |
+---+-----------------------------------+-------------------------------------------+---------------------------+
|0  |BCS Link has not been working today|[bcs, link, has, not, been, working, today]|[bcs, link, working, today]|
|1  |What am I doing wrong              |[what, am, i, doing, wrong]                |[wrong]                    |
|2  |Please Update!!                    |[please, update!!]                         |[please, update!!]         |
+---+-----------------------------------+-------------------------------------------+---------------------------+

