***
***
# Core Spark Fundamentals

***
***

## What comes with Apache Spark

* Spark is a distributed computing framework
* Core Spark contains all the basic functionality of Spark
  * Contains RDD class and all action/transformation methods which operate on RDDs
* Other modules in Spark sit on top of Core Spark
  * __Spark SQL__ introduces a DataFrame class which acts as a schema-based RDD allowing for SQL syntax for working with data
  * __Spark Streaming__ adds a streaming context object which can be used to do Spark operations on batches of streaming data
  * __MLib__ adds a collection of common machine learning algorithms which may be used with RDDs
  * __GraphX__ indtroduces node and edge classes and a set of common algorithms for analyzing graphs of data

<img src="figs/spark-stack.png">
#### (spark.apache.org)


***

## Let's play with some data!
To start with, let's use some features of Core Spark

***

### Import Moby Dick
This will be our first data set to use with Spark


In [None]:
mobyDick_RDD = sc.textFile("./data/MobyDick.txt")
print("Number of lines in Moby Dick = %d" % mobyDick_RDD.count())
mobyDick_RDD.take(3)

### Flat map
We have imported Moby Dick as a text file which creates an RDD of strings, each element being a single line in the text file. Let's create a new RDD which has each element being a word in the text.

<img src="figs/flatMap_map.png">

In [None]:
words_RDD = mobyDick_RDD.flatMap(lambda x: x.split(" "))
print("Number of words in Moby Dick = %d" % words_RDD.count())
words_RDD.take(7)

***
### Map
Let's make sure that the data is _clean_ by removing all non-alphabetic characters and making all the words lowercase. To do this, let's create a transformation function maping a string to a lowercase string containing only a-z. We will then apply a map to the words_RDD using the transformation function.

In [None]:
import re
regex = re.compile('[^a-zA-Z]')
def word_transform(word):
    clean_word = regex.sub('', word).lower()
    return clean_word

s1 = "te4st8 w0ord!."
print(s1 + " -> " + word_transform(s1))

In [None]:
clean_words_RDD = words_RDD.map(lambda x: word_transform(x))
print("Number of words in Moby Dick = %d" % words_RDD.count())
clean_words_RDD.take(7)

***
### Filter
It looks like we have made some progress on cleaning the data set, but we still have some '' chars left over from blank lines. Let's filter those out of our data set. Additionally, we have lots of common words that are pretty uninteresting (it, the, a, ...). These are stop words and we can also filter those out of our data set. Let's do that using an anonymous funciton. Filter functions need to return True if word should be in new dataset, or False if they should be filtered.

<img src="figs/filter.png">

In [None]:
def word_reader(filename):
    lines = [line.strip() for line in open(filename)]
    lines = [x.lower() for x in lines]
    return (lines)

stop_words = word_reader("./data/stopwords.txt")
stop_words.append('')
print(stop_words[0:7])

In [None]:
clean_filtered_words_RDD = clean_words_RDD.filter(lambda word: word not in stop_words)
print("Number of words in Moby Dick = %d" % clean_filtered_words_RDD.count())
clean_filtered_words_RDD.take(7)

In [None]:
print("We just filtered %d stop words from the text!" % (clean_words_RDD.count() - clean_filtered_words_RDD.count()))

***
### Map reduces

We now have a RDD which contains a nice list of clean words from Moby Dick. Let's see how common certain words are by mapping words to a key value pair (word, 1) and then reducing on word key and summing the values.

In [None]:
word_count_RDD = clean_filtered_words_RDD.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
print("Number of unique clean words in Moby Dick = %d" % word_count_RDD.count())
word_count_RDD.take(7)

***
### SortBy
There is an arbitrary sort method for RDDs which allows you to work with non-trivial data.

In [None]:
sorted_word_count_RDD = word_count_RDD.sortBy(lambda x: x[1], ascending=False)
sorted_word_count_RDD.take(7)

***
### Plotting word frequency
Awesome! We now know how frequenctly unque words occur in Moby Dick. Let's visualize this now using a word cloud. We should first collect the RDD from Spark back to the driver (this is essentially creating a local data structure from an RDD). We could return the sorted words as a list of tuples using _collect()_, or we could return as a python dict using _collectAsMap()_. We will use list method for now.

In [None]:
sorted_word_count = sorted_word_count_RDD.collect()
sorted_word_count[0:7]

In [None]:
%matplotlib inline
from os import path, system
from scipy.misc import imread
import matplotlib.pyplot as plt

try:
    from wordcloud import WordCloud
except:
    os.system("./data/install_word_cloud.sh")
    from wordcloud import WordCloud

whale_mask = imread("./figs/whale.png")
wc = WordCloud(background_color="black", max_words=2000,
               font_path='./figs/Verdana.ttf', mask=whale_mask,
               width=1600, height=800)
wc.generate_from_frequencies(sorted_word_count)

# store to file
wc.to_file("./figs/whale_text.png")

# show
plt.imshow(wc)
plt.axis("off")
fig = plt.gcf()
fig.set_size_inches(16, 9)
plt.show()

***
## Summary
We have accomplished the following during this session:
* Imported a txt file as a RDD where each element is a line int he txt file
* Did a flat map to get a RDD where each element is a word in the txt file
* Did a map transformation to make all words lowercase and stripping non-alphabetic characters
* Did a map-reduce to make (key, value) elements of the RDD where key is the word in the txt file and value is the number of times that word occured in the txt file
* Sorted the (key, value) RDD using custom sortBy function
* Collected the (key, value) RDD as a list of tuples to the python driver
* Plotted a word cloud of using the list of (key, value) RDDs

Many of these steps can be performed at once by chaining actions/transformations together. For example, we can directly obtain the final (key, value) list as follows:

In [None]:
sorted_word_count_easy = sc.textFile("./data/MobyDick.txt").\
                            flatMap(lambda x: x.split(" ")).\
                            map(lambda x: word_transform(x)).\
                            filter(lambda word: word not in stop_words).\
                            map(lambda x: (x,1)).\
                            reduceByKey(lambda x,y: x+y).\
                            sortBy(lambda x: x[1], ascending=False).\
                            collect()

In [None]:
sorted_word_count_easy[0:7]

In [None]:
sorted_word_count[0:7]

***
***
## More word clouds for entire bookes
***
### Function to take in text file and return plot of word cloud

In [None]:
def word_plotter(text_file, mask_file=""):
    %matplotlib inline
    from os import path, system
    from scipy.misc import imread
    import matplotlib.pyplot as plt

    try:
        from wordcloud import WordCloud
    except:
        os.system("./data/install_word_cloud.sh")
        from wordcloud import WordCloud

    if mask_file != "":
        whale_mask = imread(mask_file)
        wc = WordCloud(background_color="black", max_words=2000,
                       font_path='./figs/Verdana.ttf', mask=whale_mask,
                       width=1600, height=800)
    else:         
        wc = WordCloud(background_color="black", max_words=2000,
                       font_path='./figs/Verdana.ttf',
                       width=1600, height=800)
   
    dat = sc.textFile(text_file).\
                      flatMap(lambda x: x.split(" ")).\
                      map(lambda x: word_transform(x)).\
                      filter(lambda word: word not in stop_words).\
                      map(lambda x: (x,1)).\
                      reduceByKey(lambda x,y: x+y).\
                      sortBy(lambda x: x[1], ascending=False).\
                      collect()
    wc.generate_from_frequencies(dat)

    # store to file
    wc.to_file("./figs/" + text_file.split("/")[-1].split(".txt")[0] + "_wordCloud.png")

    # show
    plt.imshow(wc)
    plt.axis("off")
    fig = plt.gcf()
    fig.set_size_inches(16, 9)
    plt.show()

***
### Sleepy Hollow

In [None]:
word_plotter("./data/SleepyHollow.txt")

***
### Treasure Island

In [None]:
word_plotter("./data/TreasureIsland.txt")

***
### The Republic

In [None]:
word_plotter("./data/Republic.txt")

***
***
# Additional Exercises With Spark
## Using Scrabble scores to analyze book complexity

***
### Letter point values

In [None]:
LP = {'a':1,'b':4,'c':4,'d':2,'e':1,'f':4,'g':3,\
      'h':3,'i':1,'j':10,'k':5,'l':2,'m':4,'n':2,\
      'o':1,'p':4,'q':10,'r':1,'s':1,'t':1,'u':2,\
      'v':5,'w':4,'x':8,'y':3,'z':10}

***
### Function which takes a string (word) and returns total point value

In [None]:
# word (string)
# returns number of points in word (int)
def word_points(word):
    points = 0
    for letter in word:
        try:
            points += LP[letter.lower()]
        except:
            print("%s not in LP" % letter)
            points += 0
    return points

# should return 4
word_points("test")

***
### Function which takes a txt filename and returns average scrabble score (double)
#### Average scrabble score defined as [total score for all words in book] / [total number of words in book]

In [None]:
%matplotlib inline
from scipy.stats import norm
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab

def average_scrabble_score(book_title, text_file):
    rd0 = sc.textFile(text_file).\
                      flatMap(lambda x: x.split(" ")).\
                      map(lambda x: word_transform(x)).\
                      filter(lambda word: word not in stop_words).\
                      map(lambda x: word_points(x))

    rd0.cache()
    N_words = rd0.count()
    N_points = rd0.reduce(lambda x,y: x+y)
    Ave_Score = float(N_points)/float(N_words)
    all_points = rd0.collect()
    n, bins, patches = plt.hist(all_points, 50, normed=1, facecolor='green', alpha=0.5)
    (mu, sigma) = norm.fit(all_points)
    y = mlab.normpdf(bins, mu, sigma)
    l = plt.plot(bins, y, 'r--', linewidth=2)

    plt.xlabel('Word point value', fontsize=16)
    plt.ylabel('Normalized Frequency', fontsize=16)
    plt.title("%s Average score %2.2f" % (book_title, Ave_Score))
    plt.show()
    
    return (bins, y)

ret = average_scrabble_score("Moby Dick", "./data/MobyDick.txt")

***
### Run the function on a collection of books

In [None]:
Books = {"Moby Dick":                     {"fid": "./data/MobyDick.txt",       "year": 1851},
         "Treasure Island":               {"fid": "./data/TreasureIsland.txt", "year": 1883},
         "The Republic":                  {"fid": "./data/Republic.txt",       "year": -380},
         "Legend of Sleepy Hollow":       {"fid": "./data/SleepyHollow.txt",   "year": 1820},
         "Pride and Predjudice":          {"fid": "./data/PridePrejudice.txt", "year": 1813},
         "War and Peace":                 {"fid": "./data/WarAndPeace.txt",    "year": 1869},
         "Julius Ceasar":                 {"fid": "./data/JuliusCaesar.txt",   "year": 1599},
         }

for book in Books.keys():
    try:
        Books[book]["score"] = average_scrabble_score(book, Books[book]["fid"])
    except:
        print("%s failed!!" % book)