In [1]:

# SOME IMPORTS
import os
import subprocess
import sys
import time
import multiprocessing
import random
import re


In [2]:

# SET SOME ENVIRONMENTAL VARIABLES
os.environ['PYSPARK_PYTHON'] = r"C:\Users\aleks\AppData\Local\Programs\Python\Python37\python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = r"C:\Users\aleks\AppData\Local\Programs\Python\Python37\python.exe"
os.environ['SPARK_LOCAL_HOSTNAME'] = "localhost"
os.environ['SPARK_HOME'] = r"C:\Users\aleks\Desktop\zi\zad12\spark-2.2.1-bin-hadoop2.7"
os.environ['JAVA_HOME'] = r"C:\Program Files\Java\jdk1.8.0_251"


In [3]:

# CHECK IF FINDSPARK WORKS CORRECTLY
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf


In [4]:

# START SPARK CONTEXT ON LOCAL MACHINE
sc = SparkContext("local", appName="Test")
##------------------------------------
# GO TO LOCALHOST:4040 and ....


In [5]:

# STOP SPARK CONTEXT
sc.stop()


In [6]:

# OBTAIN THE NUMBER OF LOGICAL CPUs
cpus = multiprocessing.cpu_count()
print("The number of logical CPUs is " + str(cpus))


The number of logical CPUs is 6



# Exercise 1: Compute the value of PI using Monte Carlo Simulation



This exercise is solved. Your task is to read and analyse the code.


In [7]:

# this method generates one sample point and verifies whether it is inside a circle or not.
# The input is passed via filter method, however, we do not need it here
def inside(inValue):
    x, y = random.random(), random.random()
    return x*x + y*y < 1.0


In [8]:

# This method estimates the value of PI
def computePI_MonteCarlo_v1(sc, samples, partitions):
    # Create Resilient Distributed Dataset (RDDs) containing SAMPLES elements.
    # This data is distributed (parallelized) among available nodes (here, CPUs - partitions).
    dff = sc.parallelize(range(0, samples), partitions)
    # Filter out these samples that are not inside a circle.
    # For this purpose, Inside method is run and returns
    # true/false (for each data element) with appropriate probability distribution
    # Why do we generate samples "on fly"?
    filtered = dff.filter(inside)
    # count the number of hits
    left = filtered.count()
    # Estimate the value of PI and return it
    return 4.0 * float(left) / float(samples)


In [9]:

### ESTIMATE VALUE OF PI 
samples = 10000000

print("Monte Carlo simulation for " + str(samples) + " samples")
print("True value of PI = 3.1415926535...")

## i = number of nodes (CPUs)
for i in range(1, cpus + 1):
    master = "local["+str(i)+"]" 
    sc = SparkContext(master, appName="PI_MonteCarlo")
    start_time = time.time()
    piValue = computePI_MonteCarlo_v1(sc, samples, i)
    elapsed = time.time() - start_time
    print("  Number of CPUs = %i | Time = %.4f s | Result(PI) = %.8f" % (i, elapsed, piValue))  
    sc.stop()


Monte Carlo simulation for 10000000 samples
True value of PI = 3.1415926535...
  Number of CPUs = 1 | Time = 6.3377 s | Result(PI) = 3.14211800
  Number of CPUs = 2 | Time = 4.6003 s | Result(PI) = 3.14126080
  Number of CPUs = 3 | Time = 4.3938 s | Result(PI) = 3.14134000
  Number of CPUs = 4 | Time = 4.6032 s | Result(PI) = 3.14071880
  Number of CPUs = 5 | Time = 4.6407 s | Result(PI) = 3.14128440
  Number of CPUs = 6 | Time = 4.8990 s | Result(PI) = 3.14140080



# Exercise 2: Wordcount


In [10]:

# Dummy collection 1: 3 short documents
# create RDD divided into n-paritions
def getSmallCollection_EX1(sc, partitions):
    doc1 = "Roses,are red "
    doc2 = "Roses are roses"
    doc3 = "The Sun is red."
    rdd1 = sc.parallelize\
        ([doc1, doc2, doc3], partitions)
    return rdd1



1) Dummy collection 2: ~200 documents about animals (ant.html, dog.html, panda.html, hedgehog.html, etc.). For this purpose, download www.cs.put.poznan.pl/mtomczyk/ir/lab6/pages.zip, unzip, and copy "pages" folder into your working directory.


In [44]:

def getLargeCollection_EX1(sc, partitions):
    DOCS = sc.wholeTextFiles(r"C:\Users\aleks\Desktop\zi\zad12\{pages/*}", partitions)
    rdd1 = DOCS.map(lambda x: x[1])
    return rdd1


In [12]:

# For a given text "x", this method performs simple tokenization and normalization (returns a list of terms)
def tokenizeAndNormalize(x):
    return [s.lower() for s in re.split(' |;|,|\t|\n|\.', x) if len(s) > 0]



2) Init spark context (1 core):


In [13]:

sc = SparkContext("local[1]", appName="Word_count")



3) TODO: Collect the data (getSmallCollection_EX1):


In [14]:
rdd1 = getSmallCollection_EX1(sc, 1)
# if you whish to print data stored in rdd, use print(rdd.collect())
print(rdd1.collect())

['Roses,are red ', 'Roses are roses', 'The Sun is red.']



4) TODO: Firslty, you should tokenize all documents. For this purpose use flatMap function (rdd2 = rdd1.flatMap) where you pass tokenizeAndNormalize method. There are two methods: map and flatMap. Both produce an output for each element of RDD object. The difference is that map keeps produced elements organised and flatMap puts them into a single list, e.g.: 


In [15]:

tempRDD = sc.parallelize([("a", 1), ("b", 2)])
print(tempRDD.map(lambda x: (x[0], x[1]+1)).collect())
print(tempRDD.flatMap(lambda x: (x[0], x[1]+1)).collect())


[('a', 2), ('b', 3)]
['a', 2, 'b', 3]


In [16]:
# Complete the task here (flatMap with tokenizeAndNormalize):
rdd2 = rdd1.flatMap(lambda el: tokenizeAndNormalize(el))
print(rdd2.collect())

['roses', 'are', 'red', 'roses', 'are', 'roses', 'the', 'sun', 'is', 'red']



5) TODO: Now for each term produce (term, 1). Use map (why not flatMap?) with lambda function:


In [17]:
rdd3 = rdd2.map(lambda term: (term, 1))
print(rdd3.collect())

[('roses', 1), ('are', 1), ('red', 1), ('roses', 1), ('are', 1), ('roses', 1), ('the', 1), ('sun', 1), ('is', 1), ('red', 1)]



6) TODO: Now it is time to group the results. Use groupByKey method. When any "...byKey" method is invoked, the first element of a stored object is treated as a key. When invoking this method, you should also invoke .mapValues(list) so that all corresponding values will be stored in a single list. E.g.:


In [18]:

tempRDD = sc.parallelize([("a", 1), ("a", 1)])
print(tempRDD.groupByKey().mapValues(list).collect())


[('a', [1, 1])]


In [19]:
# Complete the task here:
rdd4 = rdd3.groupByKey().mapValues(list)
print(rdd4.collect())

[('roses', [1, 1, 1]), ('are', [1, 1]), ('red', [1, 1]), ('the', [1]), ('sun', [1]), ('is', [1])]



7) TODO: Now you could use countByKey method but it returns a dictionarty. Use map function again to sum the elements of a list:


In [20]:
rdd5 = rdd4.map(lambda el: (el[0], sum(el[1])))
print(rdd5.collect())

[('roses', 3), ('are', 2), ('red', 2), ('the', 1), ('sun', 1), ('is', 1)]



8) TODO: It is almost done but we wish the objects to be sorted (alphabetically). You can use sortByKey method:


In [21]:
rdd6 = rdd5.sortByKey()
print(rdd6.collect())

[('are', 2), ('is', 1), ('red', 2), ('roses', 3), ('sun', 1), ('the', 1)]



9) TODO: Done. Bout it could be done in another way. Instead of grouping by key (rdd4) and counting the number of "1"s (rdd5), you could use reduceByKey method. reduceByKey "merges" all object with the same key. Similar to groupByKey, however, instead of grouping, a new value is computed by provided function, e.g.:


In [22]:

tempRDD = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
print(tempRDD.reduceByKey(lambda x, y: x + y).collect())


[('a', 4), ('b', 2)]


In [23]:
# Complete the task here. Use rdd3 object to compute rdd7.
rdd7 = rdd3.reduceByKey(lambda a, b: a + b)
print(rdd7.collect())

[('roses', 3), ('are', 2), ('red', 2), ('the', 1), ('sun', 1), ('is', 1)]



10) TODO: Sort the results:


In [24]:
rdd8 = rdd7.sortByKey()
print(rdd8.collect())

[('are', 2), ('is', 1), ('red', 2), ('roses', 3), ('sun', 1), ('the', 1)]


In [25]:

sc.stop()



11) TODO: Complete the method doWordCount (just copy your code, use groupByKey + map(sum) version; should return last rdd object):


In [26]:
def doWordCount(sc, collection, partitions):
    rdd1 = collection                                # data = collection
    rdd2 = rdd1.flatMap(tokenizeAndNormalize)      # flatMap
    rdd3 = rdd2.map(lambda term: (term, 1))          # map
    rdd4 = rdd3.groupByKey().mapValues(list)         # groupByKey
    rdd5 = rdd4.map(lambda el: (el[0], sum(el[1])))  # map (sum)
    rdd6 = rdd5.sortByKey()                          # sortByKey
    return rdd6


12) TODO: Run the script and observe the results (why is the best time for 1CPU?):


In [27]:

## i = number of nodes (CPUs). 
for i in range(1, cpus + 1):
    master = "local["+str(i)+"]" 
    sc = SparkContext(master, appName="WordCount")
    start_time = time.time()
    rdd1 = getSmallCollection_EX1(sc, i)
    computedData = doWordCount(sc, rdd1, i)
    elapsed = time.time() - start_time
    print("Number of CPUs = %i | Time = %.4f s " % (i, elapsed))  
    sc.stop()


Number of CPUs = 1 | Time = 0.0160 s 
Number of CPUs = 2 | Time = 6.5881 s 
Number of CPUs = 3 | Time = 7.6751 s 
Number of CPUs = 4 | Time = 9.1198 s 
Number of CPUs = 5 | Time = 9.9670 s 
Number of CPUs = 6 | Time = 11.5498 s 



13) TODO: Modify the above script (work on a copy, use the cell below) so that the top 3 most common words are printed. Use 1-2CPUs. computedData is an RDD object so you can use sortBy function to resort the elements. 


In [29]:

# do the task here
for i in [1,2]:
    master = "local["+str(i)+"]" 
    sc = SparkContext(master, appName="WordCount")
    start_time = time.time()
    rdd1 = getSmallCollection_EX1(sc, i)
    computedData = doWordCount(sc, rdd1, i)
    rddSort = computedData.sortBy(lambda x: -x[1])
    elapsed = time.time() - start_time
    print("Number of CPUs = %i | Time = %.4f s " % (i, elapsed))  
    ### PRINT HERE 
    sortedData = rddSort.collect()
    for i in range(0, 3): #print top 3
        print("   %i : '%s' occured %d times" % (i, sortedData[i][0], sortedData[i][1]))
    ###
    sc.stop()


Number of CPUs = 1 | Time = 0.0185 s 
   0 : 'roses' occured 3 times
   1 : 'are' occured 2 times
   2 : 'red' occured 2 times
Number of CPUs = 2 | Time = 12.8349 s 
   0 : 'roses' occured 3 times
   1 : 'are' occured 2 times
   2 : 'red' occured 2 times



14) TODO: Repeat the experiment for 1-2CPUs and for 2nd collection (much larger). Compare computation times and print the top 20 most common words. Are the results (the most frequent words) similar to the list of english stop words? Why is the difference in time not as big as in "PI" example?


In [52]:

# do the task here
sc.stop()
for i in [1,2]:
    master = "local["+str(i)+"]" 
    sc = SparkContext(master, appName="WordCount")
    start_time = time.time()
    rdd1 = getLargeCollection_EX1(sc, i)
    computedData = doWordCount(sc, rdd1, i)
    elapsed = time.time() - start_time
    print("Number of CPUs = %i | Time = %.4f s " % (i, elapsed))  
    ### PRINT HERE 
    rddSort = computedData.sortBy(lambda x: -x[1])
    sortedData = rddSort.collect()
    for j in range(0, 20):
        print("   %i : '%s' occured %d times" % (j, sortedData[j][0], sortedData[j][1]))
    ###
    sc.stop()


Number of CPUs = 1 | Time = 0.0982 s 
   0 : 'the' occured 3027 times
   1 : 'and' occured 1910 times
   2 : 'of' occured 1553 times
   3 : 'in' occured 1165 times
   4 : 'are' occured 1031 times
   5 : 'to' occured 962 times
   6 : 'a' occured 769 times
   7 : 'is' occured 622 times
   8 : 'as' occured 560 times
   9 : 'species' occured 558 times
   10 : 'they' occured 370 times
   11 : 'for' occured 362 times
   12 : 'with' occured 352 times
   13 : 'have' occured 344 times
   14 : 'their' occured 326 times
   15 : 'or' occured 306 times
   16 : 'from' occured 269 times
   17 : 'by' occured 244 times
   18 : 'on' occured 230 times
   19 : 'which' occured 214 times
Number of CPUs = 2 | Time = 7.4487 s 
   0 : 'the' occured 3027 times
   1 : 'and' occured 1910 times
   2 : 'of' occured 1553 times
   3 : 'in' occured 1165 times
   4 : 'are' occured 1031 times
   5 : 'to' occured 962 times
   6 : 'a' occured 769 times
   7 : 'is' occured 622 times
   8 : 'as' occured 560 times
   9 : 'sp


# Exercise 3: Inverted Index + Word Count



In this exercise you are asked to construct inverted index in the following form: (term, the number of doccuments in which the term occurs , sorted list of docIDs]. For instance: [...,("roses", 2, [0, 1]),...] -> term "roses" occurs in two documents: termIDs = 0 and 1. The "get...Collection" methods are slightly modified. Both return: rdd object, list of the names of the documents, and a dictionary (docID -> document name):


In [53]:

def getSmallCollection_EX2(sc, partitions):
    doc1 = "Roses,are red "
    doc2 = "Roses are roses"
    doc3 = "The Sun in red."
    rdd1 = sc.parallelize([doc1, doc2, doc3], partitions)
    docNames = ["doc1", "doc2", "doc3"]
    docIDs = {0: docNames[0], 1: docNames[1], 2: docNames[2]}
    return rdd1, docNames, docIDs


In [54]:

def getLargeCollection_EX2(sc, partitions):
    DOCS = sc.wholeTextFiles(r"C:\Users\aleks\Desktop\zi\zad12\{pages/*}", partitions)
    rdd1 = DOCS.map(lambda x: x[1])
    rdd2 = DOCS.map(lambda x: x[0])
    docNames = rdd2.collect()
    docIDs = [i for i in range(0, len(docNames))]
    return rdd1, docNames, docIDs



TODO: do the task and verify the results using the small collection.


In [55]:

def doInvertedIndex(sc, collection, partitions):
    rdd1 = collection
    rdd2 = rdd1.zipWithIndex()
    rdd3 = rdd2.map(lambda x: (x[1], tokenizeAndNormalize(x[0])))
    rdd4 = rdd3.flatMapValues(lambda x: x)
    rdd5 = rdd4.map(lambda x: (x[1], x[0]))
    rdd6 = rdd5.groupByKey().mapValues(list)
    rdd7 = rdd6.map(lambda x: (x[0], len(set(x[1])), list(set(x[1]))))
    return rdd7



12) Run the following script and verify the results.


In [56]:

## i = number of nodes (CPUs). 
#Why the best time is for 1CPU???
for i in [1,2]:
    master = "local["+str(i)+"]" 
    sc = SparkContext(master, appName="InvertedIndex")
    start_time = time.time()
    rdd1, docNames, docIDs = getSmallCollection_EX2(sc, i)
    computedData = doInvertedIndex(sc, rdd1, i)
    rddSort = computedData.sortBy(lambda x: -x[1])
    elapsed = time.time() - start_time
    print("Number of CPUs = %i | Time = %.4f s " % (i, elapsed))  
    ### PRINT HERE 
    sortedData = rddSort.collect()
    for i in range(0, 5): #print top 3
        print("   %i : '%s' occured in %i documents" % (i, sortedData[i][0], sortedData[i][1]))
    ###
    sc.stop()


Number of CPUs = 1 | Time = 0.0130 s 
   0 : 'roses' occured in 2 documents
   1 : 'are' occured in 2 documents
   2 : 'red' occured in 2 documents
   3 : 'the' occured in 1 documents
   4 : 'sun' occured in 1 documents
Number of CPUs = 2 | Time = 9.1841 s 
   0 : 'roses' occured in 2 documents
   1 : 'are' occured in 2 documents
   2 : 'red' occured in 2 documents
   3 : 'sun' occured in 1 documents
   4 : 'in' occured in 1 documents



12) Run the following script and verify if it is faster for 2 cores. Lastly, compare the obtained results with the results of exercise 2 (word count). Are the rankings corellated?


In [57]:

## i = number of nodes (CPUs). 
#Why the best time is for 1CPU???
for i in [1,2]:
    master = "local["+str(i)+"]" 
    sc = SparkContext(master, appName="InvertedIndex")
    start_time = time.time()
    rdd1, docNames, docIDs = getLargeCollection_EX2(sc, i)
    computedData = doInvertedIndex(sc, rdd1, i)
    rddSort = computedData.sortBy(lambda x: -x[1])
    elapsed = time.time() - start_time
    print("Number of CPUs = %i | Time = %.4f s " % (i, elapsed))  
    ### PRINT HERE 
    sortedData = rddSort.collect()
    for i in range(0, 20): #print top 3
        print("   %i : '%s' occured in %i documents" % (i, sortedData[i][0], sortedData[i][1]))
    ###
    sc.stop()


Number of CPUs = 1 | Time = 4.4048 s 
   0 : 'the' occured in 206 documents
   1 : 'of' occured in 204 documents
   2 : 'and' occured in 199 documents
   3 : 'in' occured in 194 documents
   4 : 'a' occured in 189 documents
   5 : 'are' occured in 189 documents
   6 : 'to' occured in 187 documents
   7 : 'is' occured in 180 documents
   8 : 'species' occured in 179 documents
   9 : 'as' occured in 155 documents
   10 : 'with' occured in 143 documents
   11 : 'for' occured in 141 documents
   12 : 'or' occured in 138 documents
   13 : 'they' occured in 131 documents
   14 : 'from' occured in 128 documents
   15 : 'their' occured in 127 documents
   16 : 'have' occured in 123 documents
   17 : 'which' occured in 116 documents
   18 : 'family' occured in 116 documents
   19 : 'by' occured in 115 documents
Number of CPUs = 2 | Time = 14.9198 s 
   0 : 'the' occured in 206 documents
   1 : 'of' occured in 204 documents
   2 : 'and' occured in 199 documents
   3 : 'in' occured in 194 documen