# Lab - Introduction to Spark

The purpose of this lab is to get some experience with Spark. 


In [1]:
import re
import numpy as np
import pyspark

In [2]:
# pyspark works best with java8 
# set JAVA_HOME enviroment variable to java8 path 
%env JAVA_HOME = /usr/lib/jvm/java-8-openjdk-amd64

env: JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64


In [3]:
sc = pyspark.SparkContext()

## Completing a Spark Program

We are going to use a subset of the 20 Newsgroups dataset http://qwone.com/~jason/20Newsgroups/

This dataset is from back in the day when there were electronic "bulletin board" discussions on different topics on Usenet.

The data file we will use is `20-news-same-line-small.txt`

Load the data into an RDD and take a peek at it.

In [4]:
corpus = sc.textFile('data/20-news-same-line-small.txt')

In [6]:
corpus.take(3)

['<doc id="20_newsgroups/comp.graphics/37261" url="" title="20_newsgroups/comp.graphics/37261"> From: lipman@oasys.dt.navy.mil (Robert Lipman) Subject: CALL FOR PRESENTATIONS: Navy SciViz/VR Seminar Date: 19 Mar 93 20:10:23 GMT Distribution: usa Lines: 65   CALL FOR PRESENTATIONS  NAVY SCIENTIFIC VISUALIZATION AND VIRTUAL REALITY SEMINAR  Tuesday, June 22, 1993  Carderock Division, Naval Surface Warfare Center formerly the David Taylor Research Center) Bethesda, Maryland  SPONSOR: NESS (Navy Engineering Software System) is sponsoring a  one-day Navy Scientific Visualization and Virtual Reality Seminar.   The purpose of the seminar is to present and exchange information for Navy-related scientific visualization and virtual reality programs,  research, developments, and applications.  PRESENTATIONS: Presentations are solicited on all aspects of  Navy-related scientific visualization and virtual reality.  All  current work, works-in-progress, and proposed work by Navy  organizations will 

The documents in this text file are represented in `XML` format. 

Each document is represented with a `<doc>` tag with attributes `id`, `url`, and `title`.

The news text of each document is represented as the element of the `<doc>` tag.  

In [7]:
# how many news/documents/lines?
corpus.count()

1000

There are 1000 lines in this file, each corresponding to a different text document. 

The goal here is to build, as an RDD, a dictionary. The dictionary will have as its key a number from 0 to 19,999 (this is a rank) and the value is the word corresponding to that rank. The words will be ranked according to their frequency in the corpus, with 0 being the most frequent and 19,999 being the 20-thousandth-most frequent.

Take a look at the below code. 

In [8]:
# each entry in validLines will be a "valid" line from the text file that should contain "id" 
validLines = corpus.filter(lambda x : 'id' in x)

# what is the type of validLines?
type(validLines)

pyspark.rdd.PipelinedRDD

In [9]:
# see first 3 elements
validLines.take(3)

['<doc id="20_newsgroups/comp.graphics/37261" url="" title="20_newsgroups/comp.graphics/37261"> From: lipman@oasys.dt.navy.mil (Robert Lipman) Subject: CALL FOR PRESENTATIONS: Navy SciViz/VR Seminar Date: 19 Mar 93 20:10:23 GMT Distribution: usa Lines: 65   CALL FOR PRESENTATIONS  NAVY SCIENTIFIC VISUALIZATION AND VIRTUAL REALITY SEMINAR  Tuesday, June 22, 1993  Carderock Division, Naval Surface Warfare Center formerly the David Taylor Research Center) Bethesda, Maryland  SPONSOR: NESS (Navy Engineering Software System) is sponsoring a  one-day Navy Scientific Visualization and Virtual Reality Seminar.   The purpose of the seminar is to present and exchange information for Navy-related scientific visualization and virtual reality programs,  research, developments, and applications.  PRESENTATIONS: Presentations are solicited on all aspects of  Navy-related scientific visualization and virtual reality.  All  current work, works-in-progress, and proposed work by Navy  organizations will 

Extract `id` as the key of each line, and all the text represented in the `<doc>` element as the text of each line.  

In [10]:
# transform into a bunch of (docID, text) pairs
keyAndText = validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('"> ') + 3:x.index(' </doc>')]))

# what is the type of keyAndText?
type(keyAndText)

pyspark.rdd.PipelinedRDD

In [11]:
# see first 3 elements
keyAndText.take(3)

[('20_newsgroups/comp.graphics/37261',
  "From: lipman@oasys.dt.navy.mil (Robert Lipman) Subject: CALL FOR PRESENTATIONS: Navy SciViz/VR Seminar Date: 19 Mar 93 20:10:23 GMT Distribution: usa Lines: 65   CALL FOR PRESENTATIONS  NAVY SCIENTIFIC VISUALIZATION AND VIRTUAL REALITY SEMINAR  Tuesday, June 22, 1993  Carderock Division, Naval Surface Warfare Center formerly the David Taylor Research Center) Bethesda, Maryland  SPONSOR: NESS (Navy Engineering Software System) is sponsoring a  one-day Navy Scientific Visualization and Virtual Reality Seminar.   The purpose of the seminar is to present and exchange information for Navy-related scientific visualization and virtual reality programs,  research, developments, and applications.  PRESENTATIONS: Presentations are solicited on all aspects of  Navy-related scientific visualization and virtual reality.  All  current work, works-in-progress, and proposed work by Navy  organizations will be considered.  Four types of presentations are  avail

In [12]:
# now we split the text in each (docID, text) pair into a list of words
# we have a bit of fancy regular expression stuff here to make sure that we do not die on some of the documents
# after this, we have a data set with (docID, ["word1", "word2", "word3", ...])
regex = re.compile('[^a-zA-Z]')
keyAndListOfWords = keyAndText.map(lambda x : (str(x[0]), regex.sub(' ', x[1]).lower().split()))

# what is the type of keyAndListOfWords?
type(keyAndListOfWords)

pyspark.rdd.PipelinedRDD

In [13]:
keyAndListOfWords.take(3)

[('20_newsgroups/comp.graphics/37261',
  ['from',
   'lipman',
   'oasys',
   'dt',
   'navy',
   'mil',
   'robert',
   'lipman',
   'subject',
   'call',
   'for',
   'presentations',
   'navy',
   'sciviz',
   'vr',
   'seminar',
   'date',
   'mar',
   'gmt',
   'distribution',
   'usa',
   'lines',
   'call',
   'for',
   'presentations',
   'navy',
   'scientific',
   'visualization',
   'and',
   'virtual',
   'reality',
   'seminar',
   'tuesday',
   'june',
   'carderock',
   'division',
   'naval',
   'surface',
   'warfare',
   'center',
   'formerly',
   'the',
   'david',
   'taylor',
   'research',
   'center',
   'bethesda',
   'maryland',
   'sponsor',
   'ness',
   'navy',
   'engineering',
   'software',
   'system',
   'is',
   'sponsoring',
   'a',
   'one',
   'day',
   'navy',
   'scientific',
   'visualization',
   'and',
   'virtual',
   'reality',
   'seminar',
   'the',
   'purpose',
   'of',
   'the',
   'seminar',
   'is',
   'to',
   'present',
   'and',
  

In [14]:
# now get the top 20,000 words

# first change (docID, ["word1", "word2", "word3", ...]) to ("word1", 1) ("word2", 1) ...
allWords = keyAndListOfWords.flatMap(lambda x: ((j, 1) for j in x[1]))

# now, count all of the words, giving us ("word1", 1433), ("word2", 3423423), etc.
allCounts = allWords.reduceByKey (lambda a, b: a + b)

# and get the top 20,000 words in a local array
# each entry is a ("word1", count) pair
topWords = allCounts.top (20000, lambda x : x[1])

# what is the type of topWords?
type(topWords)

list

In [15]:
# see the first 3 elements
topWords[:3]

[('the', 8877), ('to', 5541), ('a', 5485)]

Having the `topWords` as a list of tuples with top 20000 frequent words and their frequencies, we want to create a dictionary, as an RDD, with keys as words' ranks (from 0 to 19,999) and values as words. The words will be ranked according to their frequency in the corpus, with 0 being the most frequent and 19,999 being the 20-thousandth-most frequent.  

In [16]:
# start by creating an RDD that has the numbers 0 thru 20000
# 20000 is the number of words that will be in our dictionary
twentyK = sc.parallelize(range(20000))

# what is the type of twentyK?
type(twentyK)

pyspark.rdd.PipelinedRDD

In [17]:
# see the first 3 elements
twentyK.take(3)

[0, 1, 2]

### Your Code Here
Your task is to come up with an appropriate lambda that is going to attach the correct word to each rank in `twentyK`, putting the results into an RDD.

Replace the question marks with your lambda. 

The key of the tuple you create and put into the RDD via the lambda should be the word; the value is the index (the number
from 0 to 19,999). Your lambda will refer to the generated list of frequent words `topWords`.


In other words, map the parallelized range $(0, 1, 2, \ldots, )$ 
into a set of tuples ("mostcommonword", 0), ("nextmostcommon", 1), ....

In [34]:
# now, we transform (0), (1), (2), ... to ("mostcommonword", 0) ("nextmostcommon", 1), ...
# the number will be the spot in the dictionary used to tell us where the word is located
# HINT: make use of topWords in the lambda that you supply
dictionary = twentyK.map(lambda rank: (topWords[rank][0], rank))

In [35]:
# finally, print out some of the dictionary, just for debugging and to get checked off
dictionary.take(10)

[('the', 0),
 ('to', 1),
 ('a', 2),
 ('and', 3),
 ('of', 4),
 ('i', 5),
 ('is', 6),
 ('in', 7),
 ('for', 8),
 ('it', 9)]

Stop the Spark context when you are done

In [36]:
sc.stop() #commented out so that you don't stop your context by mistake

Copyright ©2020 Christopher M Jermaine (cmj4@rice.edu), Risa B Myers  (rbm2@rice.edu), and Marmar Orooji (marmar.orooji@rice.edu)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.