# Homework 2

Here we will implement a map-reduce algorithm in spark for counting the number of words from a given set of documents.
Let's start by properly initializing a SparkContext object.

In [1]:
from pyspark import SparkConf, SparkContext

config = SparkConf().setAppName('Homework 2').setMaster('local')

sc = SparkContext(conf=config)

#### Loading the Dataset
Here we load the dataset and split it in a number of partitions. As a rule of thumb: The higher the number of partitions, the better the parallelism. One should nonetheless be aware that each parition have some overhead and thus it would be suboptimal to create a big number of partitions.

Since i'm running on a *4-core* machine i'll be partition the RDD in 8 parts.

In [2]:
docs = sc.textFile('dataset.txt').repartition(8)

## Trivial Algorithm
In this naive algorithm we do the following:
- Take each word from each document
- Create a new key-value pair for each word with a value 1
- Collect the pairs by key and sum their values

In [3]:
words = docs.flatMap(lambda document: document.split(" "))\
    .map(lambda word: (word,1))\
    .reduceByKey(lambda x,y: x+y)

print('The number of different words are: ', words.count())

The number of different words are:  15


## Improved Word Count v1

In this improvement we trade memory efficiency for computational complexity.
The improvement is done by creating key-value pairs from each document that instead of having a form:

    (word, 1)
now have like this:

    (word, number of occurrences of the word in the i-th document)
We then collect them by key and find the total number of occurences by summing the values in a similar way of the one implemented in the naive algorithm.

For achieving this result, instead of a lambda function, a more articulated function have been defined *partSum*, which is built with the help of the *howmany* function.
 

In [85]:
import numpy as np


def howmany(word, x):
    tot=0
    for i in range(len(x)):
        if word == x[i]: tot+=1
    return (word, tot)

def partSum(x):
    parS = []
    checked = []
    for word in x: 
        if not word in checked: 
            parS.append(howmany(word,x))
            checked.append(word)       
    return parS

words_improved1 = docs.map(lambda doc: (doc.split())).flatMap(partSum).reduceByKey(lambda x,y: x+y)

print('The number of different words are: ', words_improved1.count())


The number of different words are:  15


## Improved Word Count v2

we need to go deeper

In [185]:
from random import randint


N = docs.count()
partitionSize = round(np.sqrt(N))

print(partitionSize)


def partSumBucket(x):
    parS = []
    checked = []
    for word in x: 
        if not word in checked: 
            parS.append( (randint(0, partitionSize), howmany(word,x)) )
            checked.append(word)       
    return parS
 
    
def howmany2(word, x):
    tot=0
    checked = []
    for i in range(len(x)):
        if word == x[i]: tot+=1
    return (word, tot)



def secondSum(keyValIter):
    parS = []
    checked = []
    for keyval1 in keyValIter:
        word1 = keyval1[0]
        val = keyval1[1]
        parSum = val
        if not word1 in checked:        
            for keyval2 in keyValIter:
                if word1 == keyval2[0]: parSum += keyval2[1]
            parS.append( word1, parSum)
            checked.append(word1)      
    return parS
    
i=0
words_improved2 = docs.map(lambda doc: (doc.split())).flatMap( partSumBucket ).groupByKey().mapValues(secondSum)

print(words_improved2.collect())










2.0


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 344.0 failed 1 times, most recent failure: Lost task 0.0 in stage 344.0 (TID 1415, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/anaconda/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/anaconda/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/anaconda/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/anaconda/lib/python3.6/site-packages/pyspark/rdd.py", line 1979, in <lambda>
    map_values_fn = lambda kv: (kv[0], f(kv[1]))
  File "<ipython-input-185-a9191136b100>", line 39, in secondSum
TypeError: append() takes exactly one argument (2 given)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/anaconda/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/anaconda/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/anaconda/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/anaconda/lib/python3.6/site-packages/pyspark/rdd.py", line 1979, in <lambda>
    map_values_fn = lambda kv: (kv[0], f(kv[1]))
  File "<ipython-input-185-a9191136b100>", line 39, in secondSum
TypeError: append() takes exactly one argument (2 given)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [154]:
def partSum(x):
    parS = []
    checked = []
    for word in x: 
        if not word in checked: 
            parS.append( (randint(0, partitionSize),howmany(word,x)) )
            checked.append(word)       
    return parS
                        
print(partSum(ab)[1][1][1])

2


In [145]:
ab[2]

'de'