In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *

import os
os.environ['PYSPARK_PYTHON'] = "/usr/local/bin/python3.6"

In [2]:
import sys
import getpass
username = getpass.getuser()
appname = username + '_app'
spark = SparkSession.builder.appName(appname).config("spark.yarn.queue", "default").enableHiveSupport().getOrCreate()

In [26]:
import urllib.request
urllib.request.urlretrieve(f"http://www.gutenberg.org/files/100/100-0.txt", "shakespeare.txt")

('shakespeare.txt', <http.client.HTTPMessage at 0x7f39a90e7320>)

In [5]:
! hadoop fs -put shakespeare.txt

In [6]:
textDf = spark.read.text("/user/zaur/shakespeare.txt")

In [7]:
textDf.printSchema()

root
 |-- value: string (nullable = true)



In [9]:
# Convert the data into lower case
from pyspark.sql import functions as F
textLowerDf = textDf.select(F.lower(F.col("value")).alias("words_lower"))

In [10]:
textLowerDf.show()
textDf.show()

+--------------------+
|         words_lower|
+--------------------+
|project gutenberg...|
|                    |
|this ebook is for...|
|most other parts ...|
|whatsoever.  you ...|
|of the project gu...|
|www.gutenberg.org...|
|have to check the...|
|         this ebook.|
|                    |
|                    |
|title: the comple...|
|                    |
|author: william s...|
|                    |
|release date: jan...|
|last updated: aug...|
|                    |
|   language: english|
|                    |
+--------------------+
only showing top 20 rows

+--------------------+
|               value|
+--------------------+
|Project Gutenberg...|
|                    |
|This eBook is for...|
|most other parts ...|
|whatsoever.  You ...|
|of the Project Gu...|
|www.gutenberg.org...|
|have to check the...|
|         this ebook.|
|                    |
|                    |
|Title: The Comple...|
|                    |
|Author: William S...|
|                    |
|Release

In [11]:
textLowerDf.printSchema()

root
 |-- words_lower: string (nullable = true)



In [12]:
#split the words of a sentence
textSplitDf = textLowerDf.select(F.split(F.col("words_lower"), " ").alias("words_split"))

In [13]:
textSplitDf.printSchema()

root
 |-- words_split: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [14]:
textSplitDf.printSchema()

root
 |-- words_split: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [15]:
textSplitDf.show()

+--------------------+
|         words_split|
+--------------------+
|[project, gutenbe...|
|                  []|
|[this, ebook, is,...|
|[most, other, par...|
|[whatsoever., , y...|
|[of, the, project...|
|[www.gutenberg.or...|
|[have, to, check,...|
|      [this, ebook.]|
|                  []|
|                  []|
|[title:, the, com...|
|                  []|
|[author:, william...|
|                  []|
|[release, date:, ...|
|[last, updated:, ...|
|                  []|
|[language:, english]|
|                  []|
+--------------------+
only showing top 20 rows



In [16]:
textExplodedDf = textSplitDf.select(F.explode(F.col("words_split")).alias("word"))

In [18]:
textExplodedDf.show()

+------------+
|        word|
+------------+
|     project|
| gutenberg’s|
|         the|
|    complete|
|       works|
|          of|
|     william|
|shakespeare,|
|          by|
|     william|
| shakespeare|
|            |
|        this|
|       ebook|
|          is|
|         for|
|         the|
|         use|
|          of|
|      anyone|
+------------+
only showing top 20 rows



In [19]:
textExplodedDf = textExplodedDf.where(F.ltrim(F.col("word")) != "")

In [20]:
textExplodedDf.show()

+------------+
|        word|
+------------+
|     project|
| gutenberg’s|
|         the|
|    complete|
|       works|
|          of|
|     william|
|shakespeare,|
|          by|
|     william|
| shakespeare|
|        this|
|       ebook|
|          is|
|         for|
|         the|
|         use|
|          of|
|      anyone|
|    anywhere|
+------------+
only showing top 20 rows



In [21]:
textExplodedDf = textExplodedDf.select(F.regexp_extract(F.col("word"), "[a-z]+", 0).alias("word"))

In [22]:
textExplodedDf.show()

+-----------+
|       word|
+-----------+
|    project|
|  gutenberg|
|        the|
|   complete|
|      works|
|         of|
|    william|
|shakespeare|
|         by|
|    william|
|shakespeare|
|       this|
|      ebook|
|         is|
|        for|
|        the|
|        use|
|         of|
|     anyone|
|   anywhere|
+-----------+
only showing top 20 rows



In [23]:
textWordCounts = textExplodedDf.groupBy("word").count().orderBy(F.col("count").desc())

In [24]:
textWordCounts.show()

+----+-----+
|word|count|
+----+-----+
| the|30207|
| and|28402|
|   i|23870|
|  to|21274|
|  of|18833|
|   a|16266|
| you|14676|
|  my|13180|
|  in|12347|
|that|12225|
|  is| 9912|
| not| 9078|
|with| 8537|
|  me| 8285|
| for| 8283|
|  it| 8234|
| his| 7583|
|  be| 7407|
|  he| 7280|
|this| 7184|
+----+-----+
only showing top 20 rows



In [33]:
import urllib.request
urllib.request.urlretrieve(f"https://raw.githubusercontent.com/farooq-teqniqly/blog-pyspark-for-noobs-part-2/master/stopwords.txt", "stopwords.txt")

('stopwords.txt', <http.client.HTTPMessage at 0x7f39a92cd9b0>)

In [36]:
!hadoop fs -rmr stopwords.txt
!hadoop fs -put stopwords.txt

rmr: DEPRECATED: Please use '-rm -r' instead.
Deleted stopwords.txt


In [37]:
stopWordsDf = (spark.read.text("/user/zaur/stopwords.txt")
                .select(F.lower(F.col("value")).alias("words_lower"))
                .select(F.rtrim(F.col("words_lower")).alias("word")))

In [38]:
textWordCounts = textWordCounts.join(stopWordsDf, "word",  "leftanti")

In [40]:
textWordCounts.where(F.col("word") == "able").count()

0

In [34]:
!cat stopwords.txt

a
a's
able
about
above
according
accordingly
across
actually
after
afterwards
again
against
ain't
all
allow
allows
almost
alone
along
already
also
although
always
am
among
amongst
an
and
another
any
anybody
anyhow
anyone
anything
anyway
anyways
anywhere
apart
appear
appreciate
appropriate
are
aren't
around
as
aside
ask
asking
associated
at
available
away
awfully
b
be
became
because
become
becomes
becoming
been
before
beforehand
behind
being
believe
below
beside
besides
best
better
between
beyond
both
brief
but
by
c
c'mon
c's
came
can
can't
cannot
cant
cause
causes
certain
certainly
changes
clearly
co
com
come
comes
concerning
consequently
consider
considering
contain
containing
contains
corresponding
could
couldn't
course
currently
d
definitely
described
despite
did
didn't
different
do
does
doesn't
doing
don't
done
down
downwards
during
e
each
edu
eg
eight
either
else
elsewhere
enough
entirely
especially
et
etc
even
ever
every
everybody
everyone
everything
everywhere
ex
exactly
example