# Word count

Word count has been called the "Hello World" of big data. This makes sense because counting occurences of words comes down one **map** and one **reduce** operation. It turns out that suprisingly many of the more complex types of analysis we wish to perform on data can be broken down to operations very similar to word count.

Let us start with a small dataset of Shakespear quotes:

In [1]:
shakespeare_data = spark.sparkContext.parallelize(["all that glitters is not gold", 
                                     "love all, trust a few, do wrong to none",
                                    "to be or not to be that is the question",
                                    "my kingdom for a horse",
                                    "the world is a stage",
                                    "is this a dagger that I see before me",
                                    "nothing will come of nothing"])

In Spark we can perform wordcount in very few lines of code:

In [2]:
shakespeare_data.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b) \
             .collect()

[('glitters', 1),
 ('is', 4),
 ('gold', 1),
 ('love', 1),
 ('do', 1),
 ('none', 1),
 ('question', 1),
 ('world', 1),
 ('stage', 1),
 ('this', 1),
 ('before', 1),
 ('of', 1),
 ('all', 1),
 ('that', 3),
 ('not', 2),
 ('all,', 1),
 ('trust', 1),
 ('a', 4),
 ('few,', 1),
 ('wrong', 1),
 ('to', 3),
 ('be', 2),
 ('or', 1),
 ('the', 2),
 ('my', 1),
 ('kingdom', 1),
 ('for', 1),
 ('horse', 1),
 ('dagger', 1),
 ('I', 1),
 ('see', 1),
 ('me', 1),
 ('nothing', 2),
 ('will', 1),
 ('come', 1)]

Let's break down this operation by operation:

- **flatMap(lambda line: line.split(" "))**: The input to this is an RDD of quote-lines. This RDD has length 7, because there are 7 quotes. These lines are then split into words and outputted as an RDD of individual words. The output RDD will have length 49, because this is the number of individual words. Had we used **map()** instead of **flatMap()** we wold have had an output RDD of length 7, where each element would have been a list of words. This is the difference between **flatMap()** and **map()**.
- **map(lambda word: (word, 1))**: For each of the 49 input words we turn them into tuples of the word and an integer of 1. This we do to prepare for the next operation which is reduce.
- **reduceByKey(lambda a, b: a + b)**: The input tuples are interpreted as (key, value)-pairs and all the values of the same key are subjected to the addition operation. We do not know the order of the values and operation, so it is important to choose a reduce operation such that the order in which the values are processed is insignificant. Formally, this means that the operation must be commutative and associative (see https://en.wikipedia.org/wiki/Commutative_property and https://en.wikipedia.org/wiki/Associative_property )
- **collect()**: All the previous operations have been performed in parallel on the nodes in the cluster. We now have a dataset small enough for us to collect it to the client, which in our case is the master-node of the Spark-cluster.

We can add a few more operations to sort the list of words by occurance and only retrieve the top ten words:

In [3]:
shakespeare_data.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b) \
             .map(lambda x: (x[1], x)) \
             .sortByKey(False) \
             .map(lambda x: x[1]) \
             .take(10)

[('is', 4),
 ('a', 4),
 ('that', 3),
 ('to', 3),
 ('not', 2),
 ('be', 2),
 ('the', 2),
 ('nothing', 2),
 ('glitters', 1),
 ('gold', 1)]

Let us also explain the additional operations:

- **map(lambda x: (x[1], x))**: The input of this **map**-operation is tuples of words and their counts (word, count). The output is tuples of counts and the input tuple (count, (word, count)).
- **sortByKey(False)**: As the name suggest we sort the input tuples by key. The boolean argument determines if we are sorting in ascending or descending order. In this case we are interested in the most common words and we therefore sort in descending order.
- **map(lambda x: x[1])**: The previous **sort**-operation kept the keys, so we do a **map** to remove the keys and just keep the (word, count)-tuples.
- **take(10)**: We collect the 10 first objects of the RDD.


------------------

# Assignment

Seven Shakespeare quotes are hardly "big data", so let us try a dataset which is a little larger. Below we load a dataset consisting of all the New Year's speeches of the queen of Denmark from 1972 to 2018.

Try to find the top 10 words used in all of these.

What are the counts of the words "prins" (prince) and "prinsesse" (princess)? 

For that last question it may be usefull to look at the **filter()** function ( https://spark.apache.org/docs/2.2.0/api/python/pyspark.html#pyspark.RDD )

In [4]:
nytaar = spark.sparkContext.textFile("gs://big-data-course-datasets/nytaar/").cache()