# Lab 7: MapReduce and Spark RDDs
David Morais  
Guillem Cabré

---


In this session you will:

- learn the basics of Spark's RDD API
- implement Mapreduce algorithms for inverted indices and Pagerank, and reason about their relative efficiency

# 1. Getting started with Spark

### Install Spark locally
It comes with a default local implementation.
You may want to have a look at http://localhost:4040/jobs/ after staring a Spark job.

In [None]:
!pip3 install pyspark --user

### Start Spark session

In [34]:
from pyspark.sql import SparkSession

# local[*] runs locally using all available cores; replace with local[1] to use only one core
spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", "8g").getOrCreate()           
sc = spark.sparkContext
sc.setLogLevel("ERROR")
print("Using %i cores" % sc.defaultParallelism)

Using 14 cores


### Load text into an RDD
A spark RDD (resilient distributed dataset) is a fault-tolerant collection that can be distributed across multiple cluster nodes.

RDDs are immutable (cannot be modified), but we can use an RDD to derive another RDD from it.

In [2]:
from pprint import pprint

# Simple example: we pretend this is a large file split into many chunks
text = """A mapper mapped as much data as any mapper could map,
but the reducer reduced the mapped data faster than any mapper could map.
The mapper mapped and the reducer reduced until results were produced.
"""
lines = sc.parallelize(text.splitlines())

# Each worker will receive one or more lines.
# Now we start a Spark job to print a few of them for verification purposes
pprint(lines.take(10))

                                                                                

['A mapper mapped as much data as any mapper could map,',
 'but the reducer reduced the mapped data faster than any mapper could map.',
 'The mapper mapped and the reducer reduced until results were produced.']


# 2. Counting word frequencies

### Tokenization

In [3]:
import re
regex = re.compile(r"[0-9]+|[a-zà-ÿ]+'?[a-zà-ÿ']*", re.IGNORECASE)

def tokens(s):
    return [m.group(0).lower() for m in regex.finditer(s)]

In [4]:
print(tokens("Hola, carabola, ¿qué tal? Ñam! Ío48"))

['hola', 'carabola', 'qué', 'tal', 'ñam', 'ío', '48']


## METHOD 1 (classic Mapreduce without combiner)|

### Mapper
- For each line, we want to output a list of (word, 1) pairs for each word in line. 
- This is an iteration over elements, producing several key-value pairs for each element.
- We cannot directly write code for the workers; this iteration is accomplished via Spark's `flatMap` function.
- By contrast, Spark's `map` function would only serve to produce a single output from each element.

In [5]:
def mapper(line):
    return ( (word, 1) for word in tokens(line) )

words1 = lines.flatMap(mapper)

- Spark uses lazy evaluation: no data has been processed yet. 
- It has only built a plan on how to transform `lines` into `words1`.
- Actions such as `count` or `take` trigger actual computation jobs.

In [6]:
print("%i words" % words1.count())
pprint(words1.take(10))

35 words
[('a', 1),
 ('mapper', 1),
 ('mapped', 1),
 ('as', 1),
 ('much', 1),
 ('data', 1),
 ('as', 1),
 ('any', 1),
 ('mapper', 1),
 ('could', 1)]


### Shuffle
- This operation is handled automatically in the classical MapReduce framework, but needs to be made explicit in Spark.
- It is accomplished via the `groupByKey` function. 
- This can be very costly if there are some very common words, as we haven't specified a combiner.

In [7]:
words1_shuffled = words1.groupByKey()

The lists in the shuffled RDD are returned as 'iterables'. 
For viewing purposes only, we may materialize them into Python lists:

In [8]:
pprint([ (w, list(l)) for (w, l) in words1_shuffled.take(20) ])

[('but', [1]),
 ('could', [1, 1]),
 ('as', [1, 1]),
 ('produced', [1]),
 ('much', [1]),
 ('were', [1]),
 ('any', [1, 1]),
 ('map', [1, 1]),
 ('reduced', [1, 1]),
 ('reducer', [1, 1]),
 ('than', [1]),
 ('the', [1, 1, 1, 1]),
 ('until', [1]),
 ('results', [1]),
 ('a', [1]),
 ('mapper', [1, 1, 1, 1]),
 ('mapped', [1, 1, 1]),
 ('data', [1, 1]),
 ('faster', [1]),
 ('and', [1])]


### Reducer
- For each (word, L), return (word, sum(L))
- This is simply just another 'map' in the functional programming sense.

In [9]:
def reducer(tup):
    word, L = tup[0], tup[1]
    return (word, sum(L))

counts = words1_shuffled.map(reducer)

In [10]:
pprint(counts.takeOrdered(20, key=lambda x:-x[1]))

[('the', 4),
 ('mapper', 4),
 ('mapped', 3),
 ('could', 2),
 ('as', 2),
 ('any', 2),
 ('map', 2),
 ('reduced', 2),
 ('reducer', 2),
 ('data', 2),
 ('but', 1),
 ('produced', 1),
 ('much', 1),
 ('were', 1),
 ('than', 1),
 ('until', 1),
 ('results', 1),
 ('a', 1),
 ('faster', 1),
 ('and', 1)]


### Putting it all together
The entire pipeline could have been written much more succinctly by using anonymous functions:

In [11]:
counts = (lines
    .flatMap(lambda line:( (word, 1) for word in tokens(line) ))        # mapper
    .groupByKey()                                                       # shuffle
    .map(lambda tup:(tup[0], sum(tup[1])))                              # reducer
)

In [12]:
pprint(counts.takeOrdered(20, key=lambda x:-x[1]))

[('the', 4),
 ('mapper', 4),
 ('mapped', 3),
 ('could', 2),
 ('as', 2),
 ('any', 2),
 ('map', 2),
 ('reduced', 2),
 ('reducer', 2),
 ('data', 2),
 ('but', 1),
 ('produced', 1),
 ('much', 1),
 ('were', 1),
 ('than', 1),
 ('until', 1),
 ('results', 1),
 ('a', 1),
 ('faster', 1),
 ('and', 1)]


## METHOD 2 (combiner for associative, commutative reducers)
- In Method 1 we didn't specify a combiner. Spark is built on the philosophy that shuffle operations should be avoided as much as possible.
- Spark assumes that most reduce operations arising in practice are associative and commutative.
- If so, reducers do not need to take an entire list; it suffices to know how to combine two $(key, value_1)$ and $(key, value_2)$ tuples into another tuple $(key, value_3)$. Combiners can do the same.
- Thus, we use Spark's `reduceByKey`, passing as a parameter the function that sums two values.
- **This is the preferred way** whenever possible, as it corresponds to using a combiner and significantly reduces the amount of disk storage and network communication.

In [13]:
counts = (lines
    .flatMap(lambda line:( (word, 1) for word in tokens(line) ))    # mapper
    .reduceByKey(lambda a, b: a + b)                                # reducer and combiner; shuffles much less data
)

In [14]:
pprint(counts.takeOrdered(20, key=lambda x:-x[1]))

[('the', 4),
 ('mapper', 4),
 ('mapped', 3),
 ('could', 2),
 ('as', 2),
 ('any', 2),
 ('map', 2),
 ('reduced', 2),
 ('reducer', 2),
 ('data', 2),
 ('but', 1),
 ('produced', 1),
 ('much', 1),
 ('were', 1),
 ('than', 1),
 ('until', 1),
 ('results', 1),
 ('a', 1),
 ('faster', 1),
 ('and', 1)]


## Last step: from absolute counts to frequencies
We can first compute the total number of words:

In [15]:
num_words = counts.map(lambda tup:tup[1]).sum()
print("%i total words" % num_words)
print("%i different words" % counts.count())

35 total words
20 different words


- Instead of Spark's sum() method, we could have used `reduce()` to compute the sum
- `reduceByKey` would not suit here, as there are no keys anymore

In [16]:
print(counts.map(lambda tup:tup[1]).reduce(lambda a,b:a+b))

35


- Now each worker can compute frequencies by dividing by `num_words`
- Note that num_words is sent to every time for every task. This is fine for a small integer, but if we wanted the machines to cache this value for all tasks we would use `sc.broadcast()` instead.
- Finally, we print the five most frequent word, along with each frequency

In [17]:
freqs = counts.map(lambda tup:(tup[0], tup[1] / num_words))
pprint(freqs.takeOrdered(5, key=lambda x:-x[1]))

[('the', 0.11428571428571428),
 ('mapper', 0.11428571428571428),
 ('mapped', 0.08571428571428572),
 ('could', 0.05714285714285714),
 ('as', 0.05714285714285714)]


In [18]:
# Using broadcast:
num_words_b = sc.broadcast(num_words)
freqs = counts.map(lambda tup:(tup[0], tup[1] / num_words_b.value))
pprint(freqs.takeOrdered(5, key=lambda x: -x[1]))

[('the', 0.11428571428571428),
 ('mapper', 0.11428571428571428),
 ('mapped', 0.08571428571428572),
 ('could', 0.05714285714285714),
 ('as', 0.05714285714285714)]


# 3. Inverted index with Mapreduce

## Loading the dataset
- We read the CSV with Wikipedia titles from last lab and turn it into an RDD of (doc_id, title string) pairs.
- We use `repartition` to specify a desired number of parts.
- `cache` is used so that the workers cache their data locally. This avoids reading data from disk each time we want to apply a different computation on the same dataset.

In [None]:
titles_rdd = (spark.read.csv("enwiki-2013-names.csv", header=True, escape="\\")
              .na.fill({"name": ""})
              .rdd
              .map(lambda row: (int(row.node_id), row.name))
              .repartition(6)
              .cache()
             )

In [23]:
print("%i titles" % titles_rdd.count())
titles_rdd.take(10)

4206785 titles


[(30, 'DAT (newspaper)'),
 (31, 'Ularbek Baitailaq'),
 (32, 'Altyn Tamyr'),
 (33, 'Tortinshi Bilik'),
 (34, 'Middle Yangchenghu Road Station'),
 (35, 'North Qimen Main Street Station'),
 (36, 'Xujiang Road Station'),
 (37, 'Laodong Road Station'),
 (38, 'Line 2, Suzhou Rail Transit'),
 (39, 'Middle Chunshenhu Road Station')]

In [24]:
# While developing your code, you may want to test it against a smaller dataset
if False:
    titles_rdd = sc.parallelize(zip(range(len(text.splitlines())), text.splitlines()))

---

**Exercise 1:**  
Using RDDs, write a function to build an inverted index for `titles_rdd`. It should output an RDD containing all `(term, posting list)` pairs. Each non-stopword must appear exactly once in the output and the posting list has to be sorted (hint: `sortByKey()`).

You may use the code below to determine a set of stopwords.

In [25]:
# Find stopwords
counts = (titles_rdd
    .flatMap(lambda tup:( (word, 1) for word in tokens(tup[1]) ))        # mapper
    .reduceByKey(lambda a, b: a + b)                                     # reducer and combiner
)        
stopwords_freq = counts.takeOrdered(40, key=lambda x:-x[1])
pprint(stopwords_freq)




[('of', 312846),
 ('u', 281808),
 ('the', 195580),
 ('in', 94147),
 ('list', 80441),
 ('80', 75900),
 ('and', 63075),
 ('â', 62175),
 ('de', 43194),
 ('county', 43105),
 ('a', 40734),
 ('school', 39807),
 ('john', 38042),
 ('93', 37596),
 ('disambiguation', 37508),
 ('station', 35116),
 ('album', 35029),
 ('district', 34713),
 ('new', 31946),
 ('film', 31256),
 ('river', 29937),
 ('at', 27697),
 ('season', 27545),
 ('national', 26785),
 ('for', 24153),
 ('s', 22295),
 ('st', 22012),
 ('song', 22005),
 ('united', 21919),
 ('william', 21387),
 ('football', 21340),
 ('ã', 21338),
 ('c', 21292),
 ('state', 21233),
 ('9', 20205),
 ('high', 19954),
 ('world', 19588),
 ('8', 18919),
 ('n', 18789),
 ('south', 18716)]


                                                                                

In [28]:
stopwords = set([w for (w, _) in stopwords_freq])
stopwords_b = sc.broadcast(stopwords)

def inverted_index(titles_rdd):
    return (titles_rdd
        .flatMap(lambda tup: ((word, tup[0]) for word in tokens(tup[1]) if word not in stopwords))
        .distinct()
        .groupByKey()
        .map(lambda tup: (tup[0], sorted(list(tup[1]))))
        .sortByKey()
    )

inverted_index = inverted_index(titles_rdd)

pprint(inverted_index.take(10))



[('0',
  [16132,
   20373,
   25453,
   26960,
   47727,
   53848,
   58830,
   59606,
   59704,
   59741,
   60712,
   60907,
   60909,
   61094,
   62645,
   62832,
   63052,
   63055,
   63108,
   63176,
   64158,
   65074,
   67091,
   67472,
   68205,
   69550,
   70456,
   71142,
   71155,
   71305,
   71614,
   71639,
   71817,
   71928,
   71966,
   72465,
   72972,
   74059,
   74485,
   75249,
   75878,
   79635,
   79636,
   79637,
   79926,
   80908,
   81184,
   82222,
   83251,
   83819,
   85247,
   87046,
   87598,
   87601,
   87603,
   89614,
   89853,
   90264,
   91553,
   92046,
   93301,
   95482,
   100380,
   101499,
   101845,
   105183,
   105936,
   106749,
   107001,
   107208,
   107729,
   107891,
   108481,
   110822,
   110884,
   114772,
   116355,
   117621,
   120088,
   120584,
   120772,
   121867,
   122188,
   123571,
   124710,
   124752,
   124938,
   125120,
   125327,
   144164,
   144224,
   144855,
   147826,
   188010,
   249172,
   251615,

                                                                                

In [29]:
inverted_index.lookup("computer")
count_computer = len(inverted_index.lookup("computer")[0])
print("The word 'computer' appears in %i titles" % count_computer)

[Stage 66:>                                                         (0 + 1) / 1]

The word 'computer' appears in 1649 titles


                                                                                

# 4. Pagerank with Mapreduce

## Load graph as an RDD

In [32]:
edges_rdd = (sc.textFile("enwiki-2013.txt")
    .filter(lambda s: s[0] != '#')
    .map(lambda s: tuple(map(int, s.split())))
    .cache()
)

In [33]:
m = edges_rdd.count()
n = max(titles_rdd.count(), edges_rdd.map(max).max() + 1)
print("Loaded graph: %i vertices, %i directed edges" % (n, m))



Loaded graph: 4206785 vertices, 101311613 directed edges


                                                                                

---

**Exercise 2:**  
Using RDDs, write a function to compute Pagerank with damping. We assume that each worker has $\Theta(n)$ memory (enough to store a full Pagerank or degree vector, but not enough to store the complete adjacency matrix).

Some subtasks you will need to solve with Spark are:
- Compute an RDD with the out degree of all nodes.
- Compute the probabilities resulting from a single random step in the graph, given the current `pr` vector.

The rest (stopping, damping and teleportation, etc.) is handled by the driver (main program).
    
Additional **questions to be discussed in the report**:

1. The teleportation logic could be simplified if we computed the Google matrix in advance. What would happen then?
2. Compare with your sequential solution from lab 6. Are they the same? Which one is faster? Why do you think that's the case?
3. Suppose we precomputed an RDD with the list of incoming neighbors for each vertex using `groupByKey` and used it in the main loop, instead of summing contributions of individual edges. Is this approach always faster or can it become problematic?
4. What if $n$ were too big to store the entire Pagerank/degree vectors in memory? What would you do?

# Code skeleton

In [36]:
import numpy as np

# Given the outdegrees rdd, returns the list of dead-end nodes, 
# and a modified out-degree vector where their degree is 1.
def get_sinks(outdeg_rdd):
    # Collect the out degrees into the driver program's memory
    outdeg = np.zeros(n)
    for (key, val) in outdeg_rdd.collect():
        outdeg[key] = val

    sinks = np.where(outdeg == 0)[0]
    print("%i dead-end nodes" % len(sinks))
    outdeg[sinks] = 1                            # avoid division by zero
    return sinks, outdeg

In [42]:
def compute_outdegrees_rdd(edges_rdd):
    return edges_rdd.map(lambda e: (e[0], 1)).reduceByKey(lambda a, b: a + b)

def pagerank(edges_rdd, n, damping, teleport=None, tol=1e-5, max_iters=100):
    pr = np.full(n, 1.0 / n)
    teleport = np.full(n, 1.0 / n) if teleport is None else teleport

    # asegurarse de tener outdeg y sinks
    outdeg_rdd = compute_outdegrees_rdd(edges_rdd)
    sinks, outdeg = get_sinks(outdeg_rdd)
    outdeg_b = sc.broadcast(outdeg)   # usado sólo para evitar cero en divisiones (sinks ya tratados)

    it = 0
    while it < max_iters:
        it += 1
        pr_old = pr.copy()

        # suma de PR de nodos sin salida (sinks)
        pr_nowhere = pr[sinks].sum()

        # broadcast del vector pr / outdeg para que cada worker calcule contribuciones
        pr_div = pr / outdeg_b.value
        pr_div_b = sc.broadcast(pr_div)

        # contributions: cada arista u->v aporta pr[u]/outdeg[u] a v
        contribs_rdd = (edges_rdd
                        .map(lambda e: (e[1], pr_div_b.value[e[0]]))
                        .reduceByKey(lambda a, b: a + b))

        # construir nuevo vector pr
        base = (1.0 - damping) * teleport                       # parte de teleportación
        sink_share = pr_nowhere / n                             # parte que reparten los sinks uniformemente
        pr = np.full(n, base)                                   # inicia con teleport parte
        for k, v in contribs_rdd.collect():                     # traer sólo las contribuciones (size ~ n)
            pr[k] += damping * v
        pr += damping * sink_share                              # añadir efecto de sinks

        pr_div_b.unpersist()
        # comprobación de convergencia L1
        if np.linalg.norm(pr - pr_old, 1) <= tol:
            break

    return pr

sinks, outdeg = get_sinks(compute_outdegrees_rdd(edges_rdd))
pr = pagerank(edges_rdd, n, damping=0.8)

                                                                                

7891 dead-end nodes


                                                                                

7891 dead-end nodes


                                                                                

In [43]:
def show_topk(pr, titles_rdd, topk=20):
    print("Top %i nodes in order of decreasing pagerank:" % topk)
    top = np.argsort(-pr)[:topk]
    f = titles_rdd.filter(lambda tup:tup[0] in top).cache()
    pprint([ (pr[x], f.lookup(x)[0]) for x in top ])
    
show_topk(pr, titles_rdd)    

Top 20 nodes in order of decreasing pagerank:


                                                                                

[(np.float64(0.002891854286111481), 'United States'),
 (np.float64(0.0011754621947177308), 'France'),
 (np.float64(0.0010746008971443495), 'United Kingdom'),
 (np.float64(0.0010556081158133823), 'Germany'),
 (np.float64(0.0009193363223472411), 'List of sovereign states'),
 (np.float64(0.0008928837831747925), 'Animal'),
 (np.float64(0.0008841357586706798), 'England'),
 (np.float64(0.000879936064691018), 'Canada'),
 (np.float64(0.0008698032880566212), 'Association football'),
 (np.float64(0.0008358925723167322), 'World War II'),
 (np.float64(0.0008249085040551729), 'India'),
 (np.float64(0.0007061146046282517), 'Australia'),
 (np.float64(0.00066199068792812), 'Italy'),
 (np.float64(0.0006510532417520937), 'Japan'),
 (np.float64(0.0006178383349073968), 'English language'),
 (np.float64(0.0005799734724271947), 'China'),
 (np.float64(0.0005443237140697482), 'London'),
 (np.float64(0.000517325093601549), 'Russia'),
 (np.float64(0.0005086988937511213), 'Poland'),
 (np.float64(0.00048877457593

## 5. Rules of delivery

- To be solved in _pairs_.

- Submit the **report** as a PDF file. Make sure it has **both your names, date, and title**. Include your **code** in your submission (.py or .ipynb).

- No plagiarism; don't discuss your work with other teams. You can ask for help to others for simple things, such as recalling a python instruction or module, but nothing too specific to the session.

- If you feel you are spending much more time than the rest of the classmates, ask us for help. Questions can be asked either in person or by email, and you'll never be penalized by asking questions, no matter how stupid they look in retrospect.

- Write a short report listing the solutions to the exercises proposed. Include things like the important parts of your implementation (data structures used for representing objects, algorithms used, etc). You are welcome to add conclusions and findings that depart from what we asked you to do. We encourage you to discuss the difficulties you find; this lets us give you help and also improve the lab session for future editions.



- Submit your work through the [raco](http://www.fib.upc.edu/en/serveis/raco.html); see date at the raco's submissions page.