# The Classic Example of Counting Words

Let us visit the classic example of map-reduce, counting the words in a book or books.

## If we install findspark we can use that utility to help us connect to Spark

In [0]:
# Everything behaves a lot like it does in a Jupyter notebook
print("Hello World")

In [0]:
# Databricks has already setup Spark for us so we can create our 
# sparkContect connection (sc) using the following command

sc = spark.sparkContext.getOrCreate()

# We will also need some data, make sure to import your copies of "books" into the DBFS system.

### Let's create a utility function for stripping out non-alpha characters from our data

In [0]:
def strip(s):
    """Strip removes any non-alpha charcters"""
    return ''.join(filter(str.isalpha, s))

### Now we start to build the analysis pipeline that will process our code

In [0]:
# Make sure to check that your path is correct for the data files
books = sc.textFile("/FileStore/tables/books/*.txt")  

tokens = books.flatMap(lambda line: line.split())
stripped = tokens.map(strip)
notempty = stripped.filter(lambda w: len(w)>0)

### Your turn

You need several more functions:
1. Map the words to lower case
2. Convert the words into (key, value) pairs, where the key is the word and the value is the count so far
3. Next reduce by key, adding up the counts as you go

Make sure that your final variable is called wordcount so the rest of the code works

In [0]:
#now map the words to lower case

#next convert the words into (k,v) pairs, where the key is the word, and the value is the.count so far

#next reduce by key, adding up the counts as you go

In [0]:
lower = notempty.map(lambda w: w.lower())  #lowercase everything
mapped = lower.map(lambda w: (w,1))        #map every token to a (token,1) key-pair
wordcount = mapped.reduceByKey(lambda x,y: x+y)   #apply add to accumulate counts for each key

In [0]:
for k,v in wordcount.collect():
    print(k,v)

### What if we wanted to arrange the words in  descending order?

In [0]:
reorder = wordcount.map(lambda p:(p[1],p[0]))   #swap the arguments in the tuples
sort = reorder.sortByKey(False)    #sort by frequency (inverse)

for k,v in sort.collect():
    print(k,v)