In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

In [2]:
from nltk.corpus import stopwords

In [3]:
#Download Illiad Odyssey
# !wget http://classics.mit.edu/Homer/iliad.mb.txt
# !wget http://classics.mit.edu/Homer/odyssey.mb.txt

In [4]:
def filter_stop_words(word):
    from nltk.corpus import stopwords
    english_stop_words = stopwords.words("english")
    return word not in english_stop_words

def load_text(text_path):
    # Split text in words
    # Remove empty word artefacts
    # Remove stop words ('I', 'you', 'a', 'the', ...)
    vocabulary = sc.textFile(text_path)\
        .flatMap(lambda lines: lines.lower().split())\
        .flatMap(lambda word: word.split("."))\
        .flatMap(lambda word: word.split(","))\
        .flatMap(lambda word: word.split("!"))\
        .flatMap(lambda word: word.split("?"))\
        .flatMap(lambda word: word.split("'"))\
        .flatMap(lambda word: word.split("\""))\
        .filter(lambda word: word is not None and len(word) > 0)\
        .filter(filter_stop_words)

    # Count the total number of words in the text
    word_count = vocabulary.count()

    # Compute the frequency of each word: frequency = #appearances/#word_count
    word_freq = vocabulary.map(lambda word: (word, 1))\
        .reduceByKey(lambda count1, count2: count1 + count2)\
        .map(lambda tupl: (tupl[0], tupl[1]/float(word_count))) #(word, count) -> (word, count/float(word_count)

    return word_freq

In [5]:
import time
tic= time.time()

iliad = load_text('./iliad.mb.txt')
odyssey = load_text('./odyssey.mb.txt')

# Join the two datasets and compute the difference in frequency
# Note that we need to write (freq or 0) because some words do not appear
# in one of the two books. Thus, some frequencies are equal to None after
# the full outer join.
def freq_diff(tupl):
    (word, (freq1, freq2)) = tupl
    return (word, (freq2 or 0) - (freq1 or 0))



join_words = iliad.fullOuterJoin(odyssey)\
     .map(lambda tupl: freq_diff(tupl))

# 10 words that get a boost in frequency in the sequel
emerging_words = join_words.takeOrdered(10, lambda rec: -rec[1])

# 10 words that get a decrease in frequency in the sequel
disappearing_words = join_words.takeOrdered(10, lambda rec: rec[1])

# Print results
for word, freq_diff in emerging_words:
    print("%.2f" % (freq_diff*10000), word)
for word, freq_diff in disappearing_words[::-1]:
    print("%.2f" % (freq_diff*10000), word)

tac= time.time()
print("Task Duration: {}s".format(round(tac-tic,1)))
    
# WAIT in order to inspect all jobs at http://localhost:4040/
input("press ENTER to exit")

92.52 ulysses
53.63 house
48.33 telemachus
43.06 suitors
36.68 tell
33.47 ship
33.35 one
31.94 home
26.73 said
25.97 got
-28.72 jove
-31.46 horses
-40.66 fight
-44.56 spear
-47.24 ships
-54.71 achilles
-61.74 achaeans
-65.52 hector
-72.71 trojans
-89.71 son
Task Duration: 257.4s
press ENTER to exit


''

**When inspecting Process in SPARK WEB UI, we can see that we step 2 and 3 take all the time. We performed actions 2 times, one during count and another during takeOrdered so RDD is computed twice : this can be avoided by using persistance (persist() or cache() operation )**

# Optimization of Operations

In [6]:
def load_text_optimized(text_path):
    # Split text in words
    # Remove empty word artefacts
    # Remove stop words ('I', 'you', 'a', 'the', ...)
    vocabulary = sc.textFile(text_path)\
        .flatMap(lambda lines: lines.lower().split())\
        .flatMap(lambda word: word.split("."))\
        .flatMap(lambda word: word.split(","))\
        .flatMap(lambda word: word.split("!"))\
        .flatMap(lambda word: word.split("?"))\
        .flatMap(lambda word: word.split("'"))\
        .flatMap(lambda word: word.split("\""))\
        .filter(lambda word: word is not None and len(word) > 0)\
        .filter(filter_stop_words).cache()

    # Count the total number of words in the text
    word_count = vocabulary.count()

    # Compute the frequency of each word: frequency = #appearances/#word_count
    word_freq = vocabulary.map(lambda word: (word, 1))\
        .reduceByKey(lambda count1, count2: count1 + count2)\
        .map(lambda tupl: (tupl[0], tupl[1]/float(word_count))) #(word, count) -> (word, count/float(word_count)

    return word_freq

In [11]:
import time
tic= time.time()

iliad = load_text_optimized('./iliad.mb.txt')
odyssey = load_text_optimized('./odyssey.mb.txt')

# Join the two datasets and compute the difference in frequency
# Note that we need to write (freq or 0) because some words do not appear
# in one of the two books. Thus, some frequencies are equal to None after
# the full outer join.
def freq_diff(tupl):
    (word, (freq1, freq2)) = tupl
    return (word, (freq2 or 0) - (freq1 or 0))


# Another Optimization : We cache join.words
join_words = iliad.fullOuterJoin(odyssey)\
     .map(lambda tupl: freq_diff(tupl))

# 10 words that get a boost in frequency in the sequel
emerging_words = join_words.takeOrdered(10, lambda rec: -rec[1])

# 10 words that get a decrease in frequency in the sequel
disappearing_words = join_words.takeOrdered(10, lambda rec: rec[1])

# Print results
for word, freq_diff in emerging_words:
    print("%.2f" % (freq_diff*10000), word)
for word, freq_diff in disappearing_words[::-1]:
    print("%.2f" % (freq_diff*10000), word)

tac= time.time()
print("Task Duration: {}s".format(round(tac-tic,1)))
    
# WAIT in order to inspect all jobs at http://localhost:4040/
input("press ENTER to exit")

92.52 ulysses
53.63 house
48.33 telemachus
43.06 suitors
36.68 tell
33.47 ship
33.35 one
31.94 home
26.73 said
25.97 got
-28.72 jove
-31.46 horses
-40.66 fight
-44.56 spear
-47.24 ships
-54.71 achilles
-61.74 achaeans
-65.52 hector
-72.71 trojans
-89.71 son
Task Duration: 145.0s
press ENTER to exit


''

In [12]:
(257-145)/257*100

43.57976653696498

**We gained 43% on time by just caching the intermediate results**

**We can also optimize the time used to look at the stop_words**, as they are defined as a list and program has to read them all to find the good one. 

**One solution is to use a Hash table (dictionary or set).
Let's use a set**

# Optimization via hash table

In [13]:
# New filter_stop_table function using hash table saved as variable
from nltk.corpus import stopwords
english_stop_words = set(stopwords.words("english"))


def filter_stop_words(word):
    return word not in english_stop_words

In [14]:
import time
tic= time.time()

iliad = load_text_optimized('./iliad.mb.txt')
odyssey = load_text_optimized('./odyssey.mb.txt')

# Join the two datasets and compute the difference in frequency
# Note that we need to write (freq or 0) because some words do not appear
# in one of the two books. Thus, some frequencies are equal to None after
# the full outer join.
def freq_diff(tupl):
    (word, (freq1, freq2)) = tupl
    return (word, (freq2 or 0) - (freq1 or 0))


# Another Optimization : We cache join.words
join_words = iliad.fullOuterJoin(odyssey)\
     .map(lambda tupl: freq_diff(tupl))

# 10 words that get a boost in frequency in the sequel
emerging_words = join_words.takeOrdered(10, lambda rec: -rec[1])

# 10 words that get a decrease in frequency in the sequel
disappearing_words = join_words.takeOrdered(10, lambda rec: rec[1])

# Print results
for word, freq_diff in emerging_words:
    print("%.2f" % (freq_diff*10000), word)
for word, freq_diff in disappearing_words[::-1]:
    print("%.2f" % (freq_diff*10000), word)

tac= time.time()
print("Task Duration: {}s".format(round(tac-tic,1)))
    
# WAIT in order to inspect all jobs at http://localhost:4040/
input("press ENTER to exit")

92.52 ulysses
53.63 house
48.33 telemachus
43.06 suitors
36.68 tell
33.47 ship
33.35 one
31.94 home
26.73 said
25.97 got
-28.72 jove
-31.46 horses
-40.66 fight
-44.56 spear
-47.24 ships
-54.71 achilles
-61.74 achaeans
-65.52 hector
-72.71 trojans
-89.71 son
Task Duration: 33.7s
press ENTER to exit


''

In [15]:
(257-33)/257*100

87.15953307392996

**Duration reached 33s ! We gain 87% of the time thanks to the hash table trick !** 