In [1]:
# Load Newsgroups dataset from Databricks
news20 = spark.read.parquet("dbfs:/databricks-datasets/news20.binary/data-001/training")
# Cache data in memory
news20.cache()

In [2]:
# Show a few elements
news20.show()

In [3]:
# Counts number of elements in each topic
news20.groupBy('topic').count().show()

In [4]:
# Count number of elements with positive and negative labels
news20.groupBy('label').count().show()

In [5]:
# For this demo let us discard spam messages (using '< 0.5', for good practice in dealing with double values)
news20 = news20.where(news20['label'] < 0.5)
news20.cache()

In [6]:
# Add a new column 'text_length' that contains the length of data in column 'text'
from pyspark.sql.functions import length
news20_with_lengths = news20.withColumn('text_length', length(news20['text']))
news20_with_lengths.show()

In [7]:
# Find average length of text in each topic
news20_with_lengths.groupBy('topic').avg('text_length').orderBy('avg(text_length)').show(truncate = False)

In [8]:
# Create an RDD of texts
topic_name = 'misc.forsale'
# topic_name = 'rec.sport.baseball'
raw_texts = news20.filter(news20['topic'] == topic_name).select('text').rdd.map(lambda x: x[0])

# Separate out the body of the email (remove mail headers)
texts = raw_texts.map(lambda x: " ".join(x.split("  ")[1:]))
texts.take(1)

In [9]:
# Split into words (flatMap)
words = texts.flatMap(lambda text: text.split())
print(words.take(7))

# Make words lowercase
words = words.map(lambda word: word.lower())
print(words.take(7))

# Filter out words that only have letters
words = words.filter(lambda word: word.isalpha())
print(words.take(7))

In [10]:
# Load stop words from a URL
import urllib2
stopwords_data = urllib2.urlopen('https://raw.githubusercontent.com/stanfordnlp/CoreNLP/master/data/edu/stanford/nlp/patterns/surface/stopwords.txt')
stopwords = [line[:-1].lower() for line in stopwords_data]
stopwords[40:50]

# Filter out those words:
words = words.filter(lambda p: p not in stopwords)
print(words.take(7))

In [11]:
# Create pairs as (word, 1)
words_with_one = words.map(lambda word: (word, 1))
print(words_with_one.take(7))

In [12]:
from operator import add
print(topic_name)
words_with_one.reduceByKey(add).takeOrdered(20, key=lambda x: -x[1])

In [13]:
print(topic_name)
words_with_one.reduceByKey(add).takeOrdered(20, key=lambda x: -x[1])