In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:5 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:14 http://security.ubuntu.com/ubuntu bionic-security/universe amd64 Packages

In [2]:
 # Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StopWords").getOrCreate()

In [4]:
# create DF
sentenceData = spark.createDataFrame([
                                      (0, ['Big', 'data', 'is', 'super', 'powerful']),
                                      (1, ['This', 'is', 'going', 'to', 'be', 'epic'])
], ['id', 'raw'])

sentenceData.show(truncate= False)

+---+--------------------------------+
|id |raw                             |
+---+--------------------------------+
|0  |[Big, data, is, super, powerful]|
|1  |[This, is, going, to, be, epic] |
+---+--------------------------------+



In [5]:
# remove stop words
# Import stop words library
from pyspark.ml.feature import StopWordsRemover

In [6]:
# Run the Remover
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")

In [7]:
# transform and show new DF
remover.transform(sentenceData).show(truncate= False)

+---+--------------------------------+----------------------------+
|id |raw                             |filtered                    |
+---+--------------------------------+----------------------------+
|0  |[Big, data, is, super, powerful]|[Big, data, super, powerful]|
|1  |[This, is, going, to, be, epic] |[going, epic]               |
+---+--------------------------------+----------------------------+



In [9]:
# create DF
df = spark.createDataFrame([
                                   (0, 'Spark is great.'),
                                   (1, 'We are learning Spark.'),
                                   (2, 'Spark is better than Hadoop no doibt')
], ['id', 'sentence'])
df.show(truncate=False)

+---+------------------------------------+
|id |sentence                            |
+---+------------------------------------+
|0  |Spark is great.                     |
|1  |We are learning Spark.              |
|2  |Spark is better than Hadoop no doibt|
+---+------------------------------------+



In [16]:
# Next, we'll import the udf function, the col function to select a column to be passed into a function, 
# and the type IntegerType that will be used in our udf to define the data type of the output
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

# import tokenizer library for NLP
from pyspark.ml.feature import Tokenizer


In [17]:
# make user define function (UDF) that takes list of words in and return list length
# Create a function to return the length of a list
def word_list_length(word_list):
    return len(word_list)

In [18]:
# Create a user defined function
count_tokens = udf(word_list_length, IntegerType())

In [19]:
# Tokenize sentences
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
# transform DF
tokenized_df = tokenizer.transform(df)
# select needed columns and don't truncate results
tokenized_df.withColumn('tokens', count_tokens(col('words'))).show(truncate= False)

+---+------------------------------------+--------------------------------------------+------+
|id |sentence                            |words                                       |tokens|
+---+------------------------------------+--------------------------------------------+------+
|0  |Spark is great.                     |[spark, is, great.]                         |3     |
|1  |We are learning Spark.              |[we, are, learning, spark.]                 |4     |
|2  |Spark is better than Hadoop no doibt|[spark, is, better, than, hadoop, no, doibt]|7     |
+---+------------------------------------+--------------------------------------------+------+



In [23]:
# Run the Remover
remover2 = StopWordsRemover(inputCol="words", outputCol="filtered")

In [28]:
# transform and show new DF
remover2.transform(tokenized_df).show(truncate= False)

+---+------------------------------------+--------------------------------------------+------------------------------+
|id |sentence                            |words                                       |filtered                      |
+---+------------------------------------+--------------------------------------------+------------------------------+
|0  |Spark is great.                     |[spark, is, great.]                         |[spark, great.]               |
|1  |We are learning Spark.              |[we, are, learning, spark.]                 |[learning, spark.]            |
|2  |Spark is better than Hadoop no doibt|[spark, is, better, than, hadoop, no, doibt]|[spark, better, hadoop, doibt]|
+---+------------------------------------+--------------------------------------------+------------------------------+



toeknize and remove stop words all in one step below

In [29]:
# create DF
df2 = spark.createDataFrame([
                                   (0, 'Spark is great.'),
                                   (1, 'We are learning Spark.'),
                                   (2, 'Spark is better than Hadoop no doibt')
], ['id', 'sentence'])
df2.show(truncate=False)

+---+------------------------------------+
|id |sentence                            |
+---+------------------------------------+
|0  |Spark is great.                     |
|1  |We are learning Spark.              |
|2  |Spark is better than Hadoop no doibt|
+---+------------------------------------+



In [31]:
# Tokenize sentences
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
# transform DF
tokenized_df2 = tokenizer.transform(df2)
# select needed columns and don't truncate results
tokenized_df2.withColumn('tokens', count_tokens(col('words'))).show(truncate= False)
remover2.transform(tokenized_df).show(truncate= False)

+---+------------------------------------+--------------------------------------------+------+
|id |sentence                            |words                                       |tokens|
+---+------------------------------------+--------------------------------------------+------+
|0  |Spark is great.                     |[spark, is, great.]                         |3     |
|1  |We are learning Spark.              |[we, are, learning, spark.]                 |4     |
|2  |Spark is better than Hadoop no doibt|[spark, is, better, than, hadoop, no, doibt]|7     |
+---+------------------------------------+--------------------------------------------+------+

+---+------------------------------------+--------------------------------------------+------------------------------+
|id |sentence                            |words                                       |filtered                      |
+---+------------------------------------+--------------------------------------------+---------

In [38]:
# Tokenize sentences
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
# transform DF
tokenized_df2 = tokenizer.transform(df2)

# Run the Remover
remover3 = StopWordsRemover(inputCol="words", outputCol="filtered")

# select needed columns and don't truncate results
tokenized_df2.withColumn('tokens', count_tokens(col('words'))).show(truncate= False)

remover3.transform(tokenized_df2).show(truncate= False)


+---+------------------------------------+--------------------------------------------+------+
|id |sentence                            |words                                       |tokens|
+---+------------------------------------+--------------------------------------------+------+
|0  |Spark is great.                     |[spark, is, great.]                         |3     |
|1  |We are learning Spark.              |[we, are, learning, spark.]                 |4     |
|2  |Spark is better than Hadoop no doibt|[spark, is, better, than, hadoop, no, doibt]|7     |
+---+------------------------------------+--------------------------------------------+------+

+---+------------------------------------+--------------------------------------------+------------------------------+
|id |sentence                            |words                                       |filtered                      |
+---+------------------------------------+--------------------------------------------+---------