# Big Data Analytics with PySpark

**Aim**: Hands-on lab showing how to use PySpark to predict the authorship of a set of books (either Jane Austen or Shakespeare), by clustering based on book content. 

Note, this notebook has been specifically written to be run on [Google Colab](https://colab.research.google.com/notebooks/welcome.ipynb). You may get errors if using other iPython IDE's (e.g. Jupyter Notebook).

**What you'll learn**: Learn basics of carrying out data pre-processing and ML (in this case k-means clustering) using PySpark. 

**So what**: PySpark is one of the most widely used frameworks for processing and analysing big data, i.e. data with these characteristics;

![alt text](https://www.zarantech.com/blog/wp-content/uploads/2016/09/Bigdata-4Vs.png)

When it comes to these kinds of data (which is most data nowadays), traditional tools (e.g. pandas, SQL, etc.) start to fall over, and PySpark really comes into its own.

**PySpark key concepts**:
* *What is it*: PySpark is the Python API for [Apache Spark](https://spark.apache.org), an open-source analytics engine for large-scale data processing.  
* *RDD*: Resilient Distributed Dataset. Basic data object in Spark where data is stored in chunks, allowing for parallel processing over a compute cluster (essential for big data). Can handle structured/unstructured data (also essential).
* *MapReduce*: Key technique for transforming data. First you map data points to values to create (key, value) pairs. Then you reduce data points based on keys. Textbook example is doing word count over large-scale text corpus, e.g.
![alt text](https://d1jnx9ba8s6j9r.cloudfront.net/blog/wp-content/uploads/2016/11/MapReduce-Way-MapReduce-Tutorial-Edureka.png)
* *DAG*: Directed Acyclic Graph. Link of RDD's mapping out list of tasks performed on RDD's. Allows for lazy evaluation where spark executes tasks only when it has to (by remembering all tasks leading up to this).
* *How it works under the hood*:
![alt text](https://d2h0cx97tjks2p.cloudfront.net/blogs/wp-content/uploads/sites/2/2017/08/Internals-of-job-execution-in-spark.jpg)


**Key Analysis Steps**:
1.   **Setting up PySpark** on Google Colab
2.   **Vectorising books**, by extracting 'stop word' frequency vectors for each book 
3.   **Performing k-means clustering** on vector representations to separate books into two groups (hopefully reflective of authorship)
4.   **Vectorising books again**, this time extracting 'all word' frequency vectors, then performing feature hashing to reduce to fixed width vectors.
5.   **Performing k-means clustering again**, to see if new vector representations allow for more accurate grouping of books by author



## Setting up PySpark
Installing pyspark, and key dependencies

In [1]:
#install pyspark and altair libraries
!pip install pyspark
!pip install altair

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 53kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 47.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=65b42571c15815fd416e45c8cee08451637316b8b27f9ce6f05c5d3e677446bb
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


In [2]:
#Make sure google colab is running java 8 (need this for pyspark to work correctly)
!sudo apt-get purge openjdk-\* icedtea-\* icedtea6-\*
!sudo apt install openjdk-8-jre-headless 

Reading package lists... Done
Building dependency tree       
Reading state information... Done
Note, selecting 'openjdk-9-jre-headless' for glob 'openjdk-*'
Note, selecting 'openjdk-8-jdk' for glob 'openjdk-*'
Note, selecting 'openjdk-8-jre' for glob 'openjdk-*'
Note, selecting 'openjdk-6-jre' for glob 'openjdk-*'
Note, selecting 'openjdk-6-jre-headless' for glob 'openjdk-*'
Note, selecting 'openjdk-11-demo' for glob 'openjdk-*'
Note, selecting 'openjdk-8-demo' for glob 'openjdk-*'
Note, selecting 'openjdk-11-source' for glob 'openjdk-*'
Note, selecting 'openjdk-8-jre-dcevm' for glob 'openjdk-*'
Note, selecting 'openjdk-11-jre-headless' for glob 'openjdk-*'
Note, selecting 'openjdk-11-dbg' for glob 'openjdk-*'
Note, selecting 'openjdk-11-doc' for glob 'openjdk-*'
Note, selecting 'openjdk-8-jdk-headless' for glob 'openjdk-*'
Note, selecting 'openjdk-7-jre-headless' for glob 'openjdk-*'
Note, selecting 'openjdk-8-jre-zero' for glob 'openjdk-*'
Note, selecting 'openjdk-8-source' for glob

In [3]:
import pyspark
# get a spark context
sc = pyspark.SparkContext.getOrCreate()
print(sc)
# and a spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
print(spark)

<SparkContext master=local[*] appName=pyspark-shell>
<pyspark.sql.session.SparkSession object at 0x7f0281194828>


## Reading in Data
Make sure you've saved and unzipped all lab files onto your machine. Then run the below code, and when prompted, navigate to the data folder and select all the data files (i.e. books) to upload them to Google Colab.

In [5]:
#upload data files from your local machine
from google.colab import files
uploaded = files.upload()

Saving emma.txt to emma.txt
Saving hamlet.txt to hamlet.txt
Saving henry_V.txt to henry_V.txt
Saving julius_cesar.txt to julius_cesar.txt
Saving king_lear.txt to king_lear.txt
Saving lady_susan.txt to lady_susan.txt
Saving macbeth.txt to macbeth.txt
Saving mansfield_park.txt to mansfield_park.txt
Saving merchant_of_venice.txt to merchant_of_venice.txt
Saving midsummer.txt to midsummer.txt
Saving northanger_abbey.txt to northanger_abbey.txt
Saving othello.txt to othello.txt
Saving persuasion.txt to persuasion.txt
Saving prideandpredjudice.txt to prideandpredjudice.txt
Saving richard_III.txt to richard_III.txt
Saving romeo_and_juliet.txt to romeo_and_juliet.txt
Saving senseandsensibility.txt to senseandsensibility.txt
Saving tempest.txt to tempest.txt


## Example of MapReduce Framework 

In [0]:
#define function for making words lower case and removing trailing 's' from plural words
def stripFinalS( word ):
    wordl = word.lower() # lower case
    if len(wordl) >1 and wordl[-1] == 's': # check for words ending in 's'
        return wordl[:-1] # remove final letter
    else:
        return wordl; # return unchanged

In [7]:
#example of MapReduce framework
import re

# read text as RDD
linesRDD = sc.textFile('hamlet.txt') 
print('linesRDD:',linesRDD.take(4),'\n')

# split words, break lists
wordsRDD = linesRDD.flatMap(lambda line: re.split('\W+',line)) 
print('wordsRDD:',wordsRDD.take(20),'\n')

# filter empty words out
wordsFilteredRDD = wordsRDD.filter(lambda word: len(word)>0)
print('wordsFilteredRDD:',wordsFilteredRDD.take(20),'\n')

 # lower case, map words to count of 1 in order to get overall word count when we reduce and add up
words1RDD = wordsFilteredRDD.map(lambda word: (stripFinalS(word),1))
print('words1RDD:',words1RDD.take(10),'\n')

# reduce and add up counts to get overall word count
wordCountRDD = words1RDD.reduceByKey(lambda wc1,wc2: wc1+wc2)
print('wordCountRDD:',wordCountRDD.take(10),'\n')

# remove rare words
freqWordsRDD = wordCountRDD.filter(lambda x:  x[1] >= 3 ) 
print('freqWordsRDD:',freqWordsRDD.take(10),'\n')

# collect 10 most frequent words
output = freqWordsRDD.sortBy(lambda x: -x[1]).take(10)
print('Most frequent words:')
for (word, count) in output: # iterate over (w,c) pairs
    print("%s: %i" % (word, count)) #  … and print

linesRDD: ['Project Gutenberg Etext of Hamlet by Shakespeare', "PG has multiple editions of William Shakespeare's Complete Works", '', ''] 

wordsRDD: ['Project', 'Gutenberg', 'Etext', 'of', 'Hamlet', 'by', 'Shakespeare', 'PG', 'has', 'multiple', 'editions', 'of', 'William', 'Shakespeare', 's', 'Complete', 'Works', '', '', 'Copyright'] 

wordsFilteredRDD: ['Project', 'Gutenberg', 'Etext', 'of', 'Hamlet', 'by', 'Shakespeare', 'PG', 'has', 'multiple', 'editions', 'of', 'William', 'Shakespeare', 's', 'Complete', 'Works', 'Copyright', 'laws', 'are'] 

words1RDD: [('project', 1), ('gutenberg', 1), ('etext', 1), ('of', 1), ('hamlet', 1), ('by', 1), ('shakespeare', 1), ('pg', 1), ('ha', 1), ('multiple', 1)] 

wordCountRDD: [('project', 36), ('gutenberg', 27), ('of', 733), ('shakespeare', 7), ('multiple', 3), ('edition', 7), ('s', 251), ('work', 20), ('copyright', 9), ('law', 11)] 

freqWordsRDD: [('project', 36), ('gutenberg', 27), ('of', 733), ('shakespeare', 7), ('multiple', 3), ('edition',

#Extracting Word Frequency vectors
Converting books into vector representations, based on 'stop word' frequency counts.

### Reading in corpus of books/works as a single RDD into PySpark - structured as (filename, content) pairs

In [8]:
#extracting word frequency vectors from documents

#create an RDD with wholeTextFiles - reads all the files as (file,document) pairs
ft_RDD = sc.wholeTextFiles('/content')

#check number of partitions of RDD
print("partitions: ", ft_RDD.getNumPartitions(),'\n')

#check number of elements in RDD (i.e. number of whole text files)
print("elements: ", ft_RDD.count(),'\n')

#show example elements
ft_RDD.take(2)

partitions:  2 

elements:  18 



[('file:/content/richard_III.txt',
  '\ufeffThis Etext file is presented by Project Gutenberg, in\r\ncooperation with World Library, Inc., from their Library of the\r\nFuture and Shakespeare CDROMS.  Project Gutenberg often releases\r\nEtexts that are NOT placed in the Public Domain!!\r\n\r\n*This Etext has certain copyright implications you should read!*\r\n\r\n<<THIS ELECTRONIC VERSION OF THE COMPLETE WORKS OF WILLIAM\r\nSHAKESPEARE IS COPYRIGHT 1990-1993 BY WORLD LIBRARY, INC., AND IS\r\nPROVIDED BY PROJECT GUTENBERG WITH PERMISSION.  ELECTRONIC AND\r\nMACHINE READABLE COPIES MAY BE DISTRIBUTED SO LONG AS SUCH COPIES\r\n(1) ARE FOR YOUR OR OTHERS PERSONAL USE ONLY, AND (2) ARE NOT\r\nDISTRIBUTED OR USED COMMERCIALLY.  PROHIBITED COMMERCIAL\r\nDISTRIBUTION INCLUDES BY ANY SERVICE THAT CHARGES FOR DOWNLOAD\r\nTIME OR FOR MEMBERSHIP.>>\r\n\r\n*Project Gutenberg is proud to cooperate with The World Library*\r\nin the presentation of The Complete Works of William Shakespeare\r\nfor your 

### Convert (filename, content) pairs into (filename, word) pairs (Map)

In [0]:
#define function which maps (filename,content) pairs to (filename,word) pairs for each document
def splitFileWords(filenameContent): # your splitting function
    f,c = filenameContent # split the input tuple  
    fwLst = [] # the new list for (filename,word) tuples
    wLst = re.split('\W+',c) # <<< now create a word list wLst
    for w in wLst: # iterate through the list
        if len(w) >0: 
            w = w.lower()
            w = stripFinalS(w)
            fwLst.append((f,w)) # <<< and append (f,w) to the 
    return fwLst #return a list of (f,w) tuples 

In [10]:
#apply function on RDD
fw_RDD = ft_RDD.flatMap(splitFileWords)
fw_RDD.take(5)

[('file:/content/richard_III.txt', 'thi'),
 ('file:/content/richard_III.txt', 'etext'),
 ('file:/content/richard_III.txt', 'file'),
 ('file:/content/richard_III.txt', 'i'),
 ('file:/content/richard_III.txt', 'presented')]

### Convert filenames to book names

In [11]:
# just take filename, drop path and extension for readability
fw_RDD = fw_RDD.map(lambda ft: (re.split('[/\.]',ft[0])[-2],ft[1]))
fw_RDD.take(5)

[('richard_III', 'thi'),
 ('richard_III', 'etext'),
 ('richard_III', 'file'),
 ('richard_III', 'i'),
 ('richard_III', 'presented')]

In [12]:
#filter out empty strings, and words 'project gutenberg ebook' as not part of actual documents
fw_RDD = fw_RDD.filter(lambda fw: len(fw[1])>0 and fw[1] not in ['project','gutenberg', 'ebook'])  
fw_RDD.take(5)

[('richard_III', 'thi'),
 ('richard_III', 'etext'),
 ('richard_III', 'file'),
 ('richard_III', 'i'),
 ('richard_III', 'presented')]

### Show basic summary information about the data

In [13]:
#let's get some basic info about our documents
print("a) Library size")
print("Number of documents: ",ft_RDD.count(),'\n')

print("b) Vocabulary size")
w_RDD = fw_RDD.map(lambda fw: fw[1])
w_RDDu = w_RDD.distinct()
print('Total vocabulary size: ',w_RDDu.count(),'\n')

print("c) Words per book")
from operator import add
f1_RDD = fw_RDD.map(lambda fw: (fw[0],1)) # swap and wrap (f,w) to (w,1)
fc_RDD = f1_RDD.reduceByKey(add)
print('Words per book: ',fc_RDD.take(4),'\n')

print("d) Vocabulary per book")
fw_RDDu = fw_RDD.distinct() # get unique (f,w) pairs - i.e. evey word only once per file.
f1_RDDu = fw_RDDu.map(lambda fw: (fw[0],1)) # swap and wrap (f,w) to (w,[f])
fcu_RDD = f1_RDDu.reduceByKey(add)
print('Vocabulary per book: ',fcu_RDD.take(4),'\n')

print("e) Average occurrences of distinct words in each book (i.e. words/vocab per book)")
f_wv_RDD = fc_RDD.join(fcu_RDD) # join the two RDDs to get (f,(w,v)) tuples
print("file, (word, vocab): ",f_wv_RDD.take(4)) 
f_awo_RDD = f_wv_RDD.map(lambda f_wv: (f_wv[0],round(f_wv[1][0]/f_wv[1][1],2)))
# Resolve nested tuples in the lambda to get (filename,words/vocab) tuples
print('Average word occurences: ',f_awo_RDD.take(4))

a) Library size
Number of documents:  18 

b) Vocabulary size
Total vocabulary size:  23369 

c) Words per book
Words per book:  [('henry_V', 29984), ('northanger_abbey', 78044), ('midsummer', 19852), ('prideandpredjudice', 125872)] 

d) Vocabulary per book
Vocabulary per book:  [('henry_V', 4816), ('northanger_abbey', 5511), ('midsummer', 3449), ('prideandpredjudice', 6182)] 

e) Average occurrences of distinct words in each book (i.e. words/vocab per book)
file, (word, vocab):  [('henry_V', (29984, 4816)), ('northanger_abbey', (78044, 5511)), ('midsummer', (19852, 3449)), ('prideandpredjudice', (125872, 6182))]
Average word occurences:  [('henry_V', 6.23), ('northanger_abbey', 14.16), ('midsummer', 5.76), ('prideandpredjudice', 20.36)]


### Filter data to include only stop words.

In [14]:
stopwlst = ['the','a','in','of','on','at','for','by','i','you','me'] # stopword list
fw_RDD2 = fw_RDD.filter(lambda x: x[1] in stopwlst) # filter, keeping only stopwords
fw_RDD2.take(5)

[('richard_III', 'i'),
 ('richard_III', 'by'),
 ('richard_III', 'in'),
 ('richard_III', 'of'),
 ('richard_III', 'the')]

### Count stop word occurrences in each book (Reduce)

In [15]:
# create list of stopwords with count 0 to avoid missing elements in our vectors
fsw_0_RDD = fw_RDD.keys().flatMap(lambda f: [((f,sw),0) for sw in stopwlst])
print(fsw_0_RDD.take(3),'\n') 

#map (f,w) to ((f,w),1) to allow for counting of words
fw_1_RDD = fw_RDD2.map(lambda x: (x,1)) 
print(fw_1_RDD.take(3),'\n')

#union in the stopwords with count 0
fw_10_RDD = fw_1_RDD.union(fsw_0_RDD)
print(fw_10_RDD.take(3),'\n')

#reduce by adding up word values to get word counts per book
fw_c_RDD = fw_10_RDD.reduceByKey(lambda wc1,wc2:wc1+wc2)
print(fw_c_RDD.take(3))

[(('richard_III', 'the'), 0), (('richard_III', 'a'), 0), (('richard_III', 'in'), 0)] 

[(('richard_III', 'i'), 1), (('richard_III', 'by'), 1), (('richard_III', 'in'), 1)] 

[(('richard_III', 'i'), 1), (('richard_III', 'by'), 1), (('richard_III', 'in'), 1)] 

[(('richard_III', 'by'), 210), (('richard_III', 'the'), 1065), (('richard_III', 'you'), 422)]


### Restructure (key, value) pairs (Map)

In [0]:
#create function to map the ((filename,word),count) pairs to (filename, [(word, count)]) pairs.
def reGrpLst(fw_c): # we get a nested tuple
    fw,c = fw_c
    f,w = fw
    return (f,[(w,c)]) # return (f,[(w,c)]) structure.

In [17]:
#apply function
f_wcL_RDD = fw_c_RDD.map(reGrpLst) 
f_wcL_RDD.take(5)

[('richard_III', [('by', 210)]),
 ('richard_III', [('the', 1065)]),
 ('richard_III', [('you', 422)]),
 ('richard_III', [('a', 547)]),
 ('henry_V', [('of', 789)])]

### Convert data into a single list of word counts per book (Reduce)

In [18]:
#reduce by producing list of (word,wordcount) pairs for each document using reduceByKey
f_wcL2_RDD = f_wcL_RDD.reduceByKey(lambda wc1,wc2: wc1 + wc2)
f_wcL2_RDD.take(2)

[('persuasion',
  [('at', 533),
   ('by', 418),
   ('a', 2404),
   ('the', 3329),
   ('you', 628),
   ('of', 2570),
   ('in', 1389),
   ('i', 1522),
   ('for', 708),
   ('on', 396),
   ('me', 188)]),
 ('lady_susan',
  [('at', 161),
   ('the', 784),
   ('by', 152),
   ('you', 353),
   ('a', 611),
   ('of', 787),
   ('i', 1106),
   ('in', 402),
   ('for', 262),
   ('me', 200),
   ('on', 140)])]

In [19]:
#sort the lists
f_wcLsort_RDD = f_wcL2_RDD.map(lambda f_wcL: (f_wcL[0], sorted(f_wcL[1],key=lambda x: x[0])))
f_wcLsort_RDD.take(2)

[('persuasion',
  [('a', 2404),
   ('at', 533),
   ('by', 418),
   ('for', 708),
   ('i', 1522),
   ('in', 1389),
   ('me', 188),
   ('of', 2570),
   ('on', 396),
   ('the', 3329),
   ('you', 628)]),
 ('lady_susan',
  [('a', 611),
   ('at', 161),
   ('by', 152),
   ('for', 262),
   ('i', 1106),
   ('in', 402),
   ('me', 200),
   ('of', 787),
   ('on', 140),
   ('the', 784),
   ('you', 353)])]

### Remove words, leaving only word count numbers (i.e. vector representations)

In [20]:
#remove the words and convert the numbers to floats
f_wVec_RDD = f_wcLsort_RDD.map(lambda f_wc: (f_wc[0],[float(c) for (w,c) in f_wc[1]]))
f_wVec_RDD.take(2)

[('persuasion',
  [2404.0,
   533.0,
   418.0,
   708.0,
   1522.0,
   1389.0,
   188.0,
   2570.0,
   396.0,
   3329.0,
   628.0]),
 ('lady_susan',
  [611.0,
   161.0,
   152.0,
   262.0,
   1106.0,
   402.0,
   200.0,
   787.0,
   140.0,
   784.0,
   353.0])]

## Clustering
Using k-means clustering on vector representations, to separate books into two groups, hopefully reflective of authorship - i.e. Jane Austen works in one cluster, Shakespeare works in the other

In [21]:
#cluster vectors and see if clusters reflect authorship
from math import sqrt

from pyspark.mllib.clustering import KMeans #, KMeansModel

#print('f_wVec_RDD.take(2): ', f_wVec_RDD.take(1))
wVec_RDD = f_wVec_RDD.map(lambda f_wcl: f_wcl[1]) # strip the filenames
#print(wVec_RDD.collect())

# Build the model (cluster the data)
clusterModel = KMeans.train(wVec_RDD, 2, maxIterations=10, initializationMode="random")

# Assign the files to the clusters
fc_RDD = f_wVec_RDD.map(lambda fv: (fv[0],clusterModel.predict(fv[1])))
for s in fc_RDD.collect():
    print(s)

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusterModel.centers[clusterModel.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = wVec_RDD.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

('persuasion', 0)
('lady_susan', 1)
('othello', 1)
('merchant_of_venice', 1)
('macbeth', 1)
('richard_III', 1)
('emma', 0)
('julius_cesar', 1)
('tempest', 1)
('mansfield_park', 0)
('henry_V', 1)
('northanger_abbey', 0)
('midsummer', 1)
('prideandpredjudice', 0)
('king_lear', 1)
('romeo_and_juliet', 1)
('senseandsensibility', 0)
('hamlet', 1)
Within Set Sum of Squared Error = 15118.000047182637


## Alternative Approach: Feature Hashing
Some misclassifications occurred in the clustering. Let's see if we can improve the results by vectorising books based on all word counts (not just stop words). 

**Key challenge**: we'll end up with sparse vectors (i.e. mostly zero's) of varying length, which ML algorithms generally don't like

**Solution**: Feature hashing. Using a hashing function to map word counts to reduced size (i.e. no more sparsity) and fixed-width vector. 
![alt text](https://miro.medium.com/max/2060/1*dMMWR6IETmDMkeu9-j51dQ.png)

Only issue is feature collisions; word counts getting mapped to same 'bucket', distorting the data. Highlights a common trade-off in big data analytics - sacrificing accuracy for processing convenience.

In [22]:
def hashing_vectorizer(word_count_list, N):
     v = [0] * N  # create fixed size vector of 0s
     for word_count in word_count_list: 
         word,count = word_count 	# unpack tuple
         h = hash(word) # get hash value
         v[h % N] = v[h % N] + count # add count
     return v 	# return hashed word vector

from operator import add

N = 10

# we use fw_RDD from the beginning with all the words, not just stopwords
fw_1_RDD = fw_RDD.map(lambda x: (x,1))  #<<< change (f,w) to ((f,w),1)
fw_c_RDD = fw_1_RDD.reduceByKey(add) #as above
f_wcL_RDD = fw_c_RDD.map(reGrpLst) #as above
f_wcL2_RDD = f_wcL_RDD.reduceByKey(add) #<<< create [(w,c), ... ,(w,c)] lists per file 
f_hwVec_RDD = f_wcL2_RDD.map(lambda f_wc: (f_wc[0],hashing_vectorizer(f_wc[1],N)))
print(f_hwVec_RDD.take(3))

[('henry_V', [3179, 3045, 2431, 2814, 3030, 3252, 2144, 4707, 1821, 3561]), ('northanger_abbey', [10493, 8084, 5098, 7615, 7151, 8377, 5797, 12660, 3987, 8782]), ('midsummer', [2039, 2084, 1604, 2005, 2024, 2244, 1637, 2750, 1253, 2212])]


## Perform clustering again on new vector representations, to see if results improve

In [23]:
#print('f_wVec_RDD.take(2): ', f_wVec_RDD.take(1))
hwVec_RDD = f_hwVec_RDD.map(lambda f_wcl: f_wcl[1]) # strip the filenames
#print(wVec_RDD.collect())

# Build the model (cluster the data)
hClusterModel = KMeans.train(hwVec_RDD, 2, maxIterations=10, initializationMode="random")

# Assign the files to the clusters
fhc_RDD = f_hwVec_RDD.map(lambda fv: (fv[0],hClusterModel.predict(fv[1])))
for s in fhc_RDD.collect():
    print(s)

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = hClusterModel.centers[hClusterModel.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

hWSSSE = hwVec_RDD.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Hash Vectors: Within Set Sum of Squared Error = " + str(hWSSSE))

('henry_V', 0)
('northanger_abbey', 1)
('midsummer', 0)
('prideandpredjudice', 1)
('persuasion', 1)
('king_lear', 0)
('romeo_and_juliet', 0)
('senseandsensibility', 1)
('lady_susan', 0)
('othello', 0)
('merchant_of_venice', 0)
('macbeth', 0)
('richard_III', 0)
('emma', 1)
('julius_cesar', 0)
('tempest', 0)
('hamlet', 0)
('mansfield_park', 1)
Hash Vectors: Within Set Sum of Squared Error = 77009.02390583832


Much better results! The only misclassification now is 'Lady Susan'. Not bad for an unsupervised method such as clustering.

Next steps? Look at using supervised learning methods. Much better suited for classification tasks.

Hope you enjoyed the lab! You now know the basics of how to perform analytics on big data using PySpark!