![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)

<p style="font-size:20px;font-weight: bold;color:blue">"Digital College Paris" </p>
<p style="font-size:15px;font-weight: bold;color:blue">Big Data - MBA IT -- Instructor: B. Men</p>

**Sources**: These labs synthetize and *builds on* labs from several origins: 
- The series of moocs from Berkeley and Databricks,(Creative Commons licences), namely
   - [Introduction to Apache Spark](https://courses.edx.org/courses/course-v1:BerkeleyX+CS105x+1T2016/info)
   - [Big data Analysis with Apache Spark](https://courses.edx.org/courses/course-v1:BerkeleyX+CS110x+2T2016/info)
   - [Distributed Machine Learning with Apache Spark](https://courses.edx.org/courses/course-v1:BerkeleyX+CS120x+2T2016/info)
   - [Introduction to Big Data with Apache Spark](https://courses.edx.org/courses/BerkeleyX/CS100.1x/1T2015/info)
   - [Scalable Machine Learning](https://courses.edx.org/courses/BerkeleyX/CS190.1x/1T2015/info)
- [Apache Spark & Python (pySpark) tutorials for Big Data Analysis and Machine Learning](https://github.com/jadianes/spark-py-notebooks) (Apache License, Version 2.0)

We have kept the labs text in english. This will enable us to reuse them in international sections. 

<p style="font-size:35px;font-weight: bold;"> Lab 1 :  Word Count with PySpark</p>

This lab will enable us to develop a simple word count application.  The volume of unstructured text in existence is growing dramatically, and Spark is an excellent tool for analyzing this type of data.  In this lab, we will write code that calculates the most common words in the [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) retrieved from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page).  This could also be scaled to find the most common words on the Internet.

** During this lab we will cover: **

- **Part 1:** Word Count with RDDs  -- You will play with basic RDDs, then build a pair RDD, use it for counting words. Finally you will learn how to clean and prepare the RDD and build the application to count words in a file. 

- **Part 2:** is devoted to a small tutorial on Spark Dataframes (introduced in Spark 1.3)

- **Part 3:** Word Count with dataframes -- You will follow the same steps as in Part 1 to develop a Word Counting application and apply it to a file. 


**Exercises** will include an explanation of what is expected, followed by code cells where one cell will have one or more `<FILL IN>` sections.  The cell that needs to be modified will have `# TODO: Replace <FILL IN> with appropriate code` on its first line.  Once the `<FILL IN>` sections are updated and the code is run, the test cell can then be run to verify the correctness of your solution.  The last code cell before the next markdown section will contain the tests.

Note that, for reference, you can look up the details of the relevant methods in [Spark's Python API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)

##  Prerequisites : Spark Context configuration 

<p>If you can't succeed to create SparkContext and SQLContext objects such as explained on your course :<br/>
Remove the following comments and modify the "spark_path" variable according to your spark location and its name on your computer.
Using Colaboratory : Execute this cell, this will install java and spark in the notebook and set all the environment.</p>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [21]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

import findspark
findspark.init()
import sys

from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName("Digital College Paris").master("local[*]").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# Part 1 : Word Count with RDDs  

##  1. Creating a base RDD and pair RDDs 

We will first explore creating a base RDD with `parallelize` and using pair RDDs to count words.


** (1a) Create a base RDD **

We'll start by generating a base RDD by using a Python list and the `sc.parallelize` method.  Then we'll print out the type of the base RDD.


In [23]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print(type(wordsRDD))

<class 'pyspark.rdd.RDD'>


** (1b) Pluralize and test **

Let's use a `map()` transformation to add the letter 's' to each string in the base RDD we just created. We'll define a Python function that returns the word with an 's' at the end of the word.  Please replace `<FILL IN>` with your solution.  If you have trouble, the next cell has the solution.  After you have defined `makePlural` you can run the third cell which contains a test.  If you implementation is correct it will print `2 test passed`.

This is the general form that exercises will take, except that no example solution will be provided. 


In [24]:
# TODO: Replace <FILL IN> with appropriate code
def makePlural(word):
    """Adds an 's' to `word`.

    Note:
        This is a simple function that only adds an 's'.  No attempt is made to follow proper
        pluralization rules.

    Args:
        word (str): A string.

    Returns:
        str: A string with 's' added to it.
    """
    return word + 's'

print(makePlural('cat'))

cats


In [25]:
# One way of completing the function
def makePlural(word):
    return word + 's'
  
print(makePlural('cat'))

cats


In [26]:
# Load in the testing code and check to see if your answer is correct
# If incorrect it will report back '1 test failed' for each failed test
# Make sure to rerun any cell you change before trying the test again
from databricks_test_helper import Test
# TEST Pluralize and test (1b)
Test.assertEquals(makePlural('rat'), 'rats', 'incorrect result: makePlural does not add an s')

1 test passed.


** (1c) Apply `makePlural` to the base RDD **

Now pass each item in the base RDD into a [map()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.map) transformation that applies the `makePlural()` function to each element. And then call the [collect()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect) action to see the transformed RDD.


In [27]:
# TODO: Replace <FILL IN> with appropriate code
pluralRDD = wordsRDD.map(makePlural)
print(pluralRDD.collect())

['cats', 'elephants', 'rats', 'rats', 'cats']


In [28]:
# TEST Apply makePlural to the base RDD(1c)
Test.assertEquals(pluralRDD.collect(), ['cats', 'elephants', 'rats', 'rats', 'cats'],
                  'incorrect values for pluralRDD')

1 test passed.


** (1d) Pass a `lambda` function to `map` **

Let's create the same RDD using a `lambda` function.


In [29]:
# TODO: Replace <FILL IN> with appropriate code
pluralLambdaRDD = wordsRDD.map(lambda x:makePlural(x))
print(pluralLambdaRDD.collect())

['cats', 'elephants', 'rats', 'rats', 'cats']


In [30]:
# TEST Pass a lambda function to map (1d)
Test.assertEquals(pluralLambdaRDD.collect(), ['cats', 'elephants', 'rats', 'rats', 'cats'],
                  'incorrect values for pluralLambdaRDD (1d)')

1 test passed.


** (1e) Length of each word **

Now use `map()` and a `lambda` function to return the number of characters in each word.  We'll `collect` this result directly into a variable.


In [31]:
# TODO: Replace <FILL IN> with appropriate code
pluralLengths = (pluralRDD
                 .map(lambda x:len(x))
                 .collect())
print(pluralLengths)

[4, 9, 4, 4, 4]


In [32]:
# TEST Length of each word (1e)
Test.assertEquals(pluralLengths, [4, 9, 4, 4, 4],
                  'incorrect values for pluralLengths')

1 test passed.


** (1f) Pair RDDs **

The next step in writing our word counting program is to create a new type of RDD, called a pair RDD. A pair RDD is an RDD where each element is a pair tuple `(k, v)` where `k` is the key and `v` is the value. In this example, we will create a pair consisting of `('<word>', 1)` for each word element in the RDD.

We can create the pair RDD using the `map()` transformation with a `lambda()` function to create a new RDD.

In [33]:
# TODO: Replace <FILL IN> with appropriate code
wordPairs = wordsRDD.map(lambda x:(x,1))
print(wordPairs.collect())

[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]


In [34]:
# TEST Pair RDDs (1f)
Test.assertEquals(wordPairs.collect(),
                  [('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)],
                  'incorrect value for wordPairs')

1 test passed.


## 2. Counting with pair RDDs 

Now, let's count the number of times a particular word appears in the RDD. There are multiple ways to perform the counting, but some are much less efficient than others.

A naive approach would be to `collect()` all of the elements and count them in the driver program. While this approach could work for small datasets, we want an approach that will work for any size dataset including terabyte- or petabyte-sized datasets. In addition, performing all of the work in the driver program is slower than performing it in parallel in the workers. For these reasons, we will use data parallel operations.

** (2a) `groupByKey()` approach **


An approach you might first consider (we'll see shortly that there are better ways) is based on using the [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) transformation. As the name implies, the `groupByKey()` transformation groups all the elements of the RDD with the same key into a single list in one of the partitions. There are two problems with using `groupByKey()`:
  + The operation requires a lot of data movement to move all the values into the appropriate partitions.
  + The lists can be very large. Consider a word count of English Wikipedia: the lists for common words (e.g., the, a, etc.) would be huge and could exhaust the available memory in a worker.
 
Use `groupByKey()` to generate a pair RDD of type `('word', iterator)`.

In [35]:
# TODO: Replace <FILL IN> with appropriate code
# Note that groupByKey requires no parameters
wordsGrouped = wordPairs.groupByKey()
for key, value in wordsGrouped.collect():
    print('{0}: {1}'.format(key, list(value)))

cat: [1, 1]
elephant: [1]
rat: [1, 1]


In [36]:
# TEST groupByKey() approach (2a)
Test.assertEquals(sorted(wordsGrouped.mapValues(lambda x: list(x)).collect()),
                  [('cat', [1, 1]), ('elephant', [1]), ('rat', [1, 1])],
                  'incorrect value for wordsGrouped')

1 test passed.


** (2b) Use `groupByKey()` to obtain the counts **

<p>Using the `groupByKey()` transformation creates an RDD containing 3 elements, each of which is a pair of a word and a Python iterator.<br/>
Now sum the iterator using a `map()` transformation.  The result should be a pair RDD consisting of (word, count) pairs.</p>



In [37]:
# TODO: Replace <FILL IN> with appropriate code
wordCountsGrouped = wordsGrouped.map(lambda x:(x[0],sum(x[1])))
print(wordCountsGrouped.collect())

[('cat', 2), ('elephant', 1), ('rat', 2)]


In [38]:
# TEST Use groupByKey() to obtain the counts (2b)
Test.assertEquals(sorted(wordCountsGrouped.collect()),
                  [('cat', 2), ('elephant', 1), ('rat', 2)],
                  'incorrect value for wordCountsGrouped')

1 test passed.


** (2c) Counting using `reduceByKey` **

A better approach is to start from the pair RDD and then use the [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) transformation to create a new pair RDD. The `reduceByKey()` transformation gathers together pairs that have the same key and applies the function provided to two values at a time, iteratively reducing all of the values to a single value. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions, allowing it to scale efficiently to large datasets.


In [39]:
# TODO: Replace <FILL IN> with appropriate code
# Note that reduceByKey takes in a function that accepts two values and returns a single value
wordCounts = wordPairs.reduceByKey(lambda x,y:x+y)
print(wordCounts.collect())

[('cat', 2), ('elephant', 1), ('rat', 2)]


In [40]:
# TEST Counting using reduceByKey (2c)
Test.assertEquals(sorted(wordCounts.collect()), [('cat', 2), ('elephant', 1), ('rat', 2)],
                  'incorrect value for wordCounts')

1 test passed.


** (2d) All together **

The expert version of the code performs the `map()` to pair RDD, `reduceByKey()` transformation, and `collect` in one statement.


In [41]:
# TODO: Replace <FILL IN> with appropriate code
wordCountsCollected = (wordsRDD
                       .map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
                       .collect())
print(wordCountsCollected)

[('cat', 2), ('elephant', 1), ('rat', 2)]


In [42]:
# TEST All together (2d)
Test.assertEquals(sorted(wordCountsCollected), [('cat', 2), ('elephant', 1), ('rat', 2)],
                  'incorrect value for wordCountsCollected')

1 test passed.


##  3. Finding unique words and a mean value 

** (3a) Unique words **

Calculate the number of unique words in `wordsRDD`.  You can use other RDDs that you have already created to make this easier.


In [43]:
# TODO: Replace <FILL IN> with appropriate code
uniqueWords = wordsRDD.distinct().count()
print(uniqueWords)

3


In [44]:
# TEST Unique words (3a)
Test.assertEquals(uniqueWords, 3, 'incorrect count of uniqueWords')

1 test passed.


** (3b) Mean using `reduce` **

<p>Find the mean number of words per unique word in `wordCounts`.<br/>

Use a `reduce()` action to sum the counts in `wordCounts` and then divide by the number of unique words.  First `map()` the pair RDD `wordCounts`, which consists of (key, value) pairs, to an RDD of values.</p>


In [45]:
# TODO: Replace <FILL IN> with appropriate code
totalCount = (wordCounts
              .map(lambda x: x[1])
              .reduce(lambda x, y: x + y))
average = totalCount / float(uniqueWords)
print(totalCount)
print(round(average, 2))

5
1.67


In [46]:
# TEST Mean using reduce (3b)
Test.assertEquals(round(average, 2), 1.67, 'incorrect value of average')

1 test passed.


## 4.  Apply word count to a file

In this section we will finish developing our word count application.  We'll have to build the `wordCount` function, deal with real world problems like capitalization and punctuation, load in our data source, and compute the word count on the new data.


** (4a) `wordCount` function **

First, define a function for word counting.  You should reuse the techniques that have been covered in earlier parts of this lab.  This function should take in an RDD that is a list of words like `wordsRDD` and return a pair RDD that has all of the words and their associated counts.


In [47]:
# TODO: Replace <FILL IN> with appropriate code
def wordCount(wordListRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        wordListRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return wordListRDD.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
print(wordCount(wordsRDD).collect())

[('cat', 2), ('elephant', 1), ('rat', 2)]


In [48]:
# TEST wordCount function (4a)
Test.assertEquals(sorted(wordCount(wordsRDD).collect()),
                  [('cat', 2), ('elephant', 1), ('rat', 2)],
                  'incorrect definition for wordCount function')

1 test passed.


** (4b) Capitalization and punctuation **

Real world files are more complicated than the data we have been using in this lab. Some of the issues we have to address are:

  +  Words should be counted independent of their capitialization (e.g., Spark and spark should be counted as the same word).
  +  All punctuation should be removed.
  +  Any leading or trailing spaces on a line should be removed.
 
Define the function `removePunctuation` that converts all text to lower case, removes any punctuation, and removes leading and trailing spaces.  Use the Python [re](https://docs.python.org/2/library/re.html) module to remove any text that is not a letter, number, or space. Reading `help(re.sub)` might be useful.


In [49]:
# TODO: Replace <FILL IN> with appropriate code
import re
def removePunctuation(text):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        text (str): A string.

    Returns:
        str: The cleaned up string.
    """
    text_no_punctuation = re.sub("[^a-zA-Z0-9 ]+", "",  text)
    
    return re.sub('([^\w\s]|_)','',text,0).strip().lower()
    
print(removePunctuation('Hi, you!'))
print(removePunctuation(' No under_score!'))

hi you
no underscore


In [50]:
# TEST Capitalization and punctuation (4b)
Test.assertEquals(removePunctuation(" The Elephant's 4 cats. "),
                  'the elephants 4 cats',
                  'incorrect definition for removePunctuation function')

1 test passed.


** (4c) Load a text file **

For the next part of this lab, we will use the [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page). To convert a text file into an RDD, we use the `SparkContext.textFile()` method. We also apply the recently defined `removePunctuation()` function using a `map()` transformation to strip out the punctuation and change all text to lowercase.  Since the file is large we use `take(15)`, so that we only print 15 lines.


In [51]:
# Just run this code
import os.path
baseDir = os.path.join('..')
inputPath = os.path.join('/content/drive/My Drive/Colab Notebooks/data', 'shakespeare.txt')
fileName = os.path.join(baseDir, inputPath)

shakespeareRDD = (sc
                  .textFile(fileName, 8)
                  .map(removePunctuation))
print('\n'.join(shakespeareRDD
                .zipWithIndex()  # to (line, lineNum)
                .map(lambda x: '{}:{}'.format(x[1],x[0]))  # to 'lineNum: line'
                .take(15)))

0:1609
1:
2:the sonnets
3:
4:by william shakespeare
5:
6:
7:
8:1
9:from fairest creatures we desire increase
10:that thereby beautys rose might never die
11:but as the riper should by time decease
12:his tender heir might bear his memory
13:but thou contracted to thine own bright eyes
14:feedst thy lights flame with selfsubstantial fuel


** (4d) Words from lines **

Before we can use the `wordcount()` function, we have to address two issues with the format of the RDD:

  +  The first issue is that  that we need to split each line by its spaces.
  +  The second issue is we need to filter out empty lines.
 
Apply a transformation that will split each element of the RDD by its spaces. For each element of the RDD, you should apply Python's string [split()](https://docs.python.org/2/library/string.html#string.split) function. You might think that a `map()` transformation is the way to do this, but think about what the result of the `split()` function will be. Use an other transformation.

In [52]:
# TODO: Replace <FILL IN> with appropriate code
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x:x.split(' '))
shakespeareWordCount = shakespeareWordsRDD.count()
print(shakespeareWordsRDD.top(5))
print(shakespeareWordCount)

['zwaggerd', 'zounds', 'zounds', 'zounds', 'zounds']
927631


In [53]:
# TEST Words from lines (4d)
# This test allows for leading spaces to be removed either before or after
# punctuation is removed.
Test.assertTrue(shakespeareWordCount == 927631 or shakespeareWordCount == 928908,
                'incorrect value for shakespeareWordCount')
Test.assertEquals(shakespeareWordsRDD.top(5),
                  [u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds'],
                  'incorrect value for shakespeareWordsRDD')

1 test passed.
1 test passed.


** (4e) Remove empty elements **

The next step is to filter out the empty elements.  Remove all entries where the word is `''`.


In [54]:
# TODO: Replace <FILL IN> with appropriate code
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x:x!='')
shakeWordCount = shakeWordsRDD.count()
print(shakeWordCount)

882996


In [55]:
# TEST Remove empty elements (4e)
Test.assertEquals(shakeWordCount, 882996, 'incorrect value for shakeWordCount')

1 test passed.


** (4f) Count the vocabulary size **

<p> How many different words (vocabulary size) are there in Shakespeare's vocabulary? </p>

In [56]:
# let us get the complete Shakespare' vocabulary
shakespare_vocab_size = shakeWordsRDD.distinct().count()
print("Shakespeare vocabulary contains %s words" % shakespare_vocab_size)

Shakespeare vocabulary contains 28147 words


In [57]:
# TEST Count the vocabulary size (4f)
Test.assertEquals(shakespare_vocab_size, 28147, 'incorrect value for shakespare_vocab_size')

1 test passed.


** (4g) Get the top words **

We now have an RDD that is only words.  Next, let's apply the `wordCount()` function to produce a list of word counts. We can view the top 15 words by using the `takeOrdered()` action; however, since the elements of the RDD are pairs, we need a custom sort function that sorts using the value part of the pair.

<p>You'll notice that many of the words are common English words. These are called stopwords. In a later lab, we will see how to eliminate them from the results.<br/>
Use the `wordCount()` function and `takeOrdered()` to obtain the fifteen most common words and their counts.</p>



In [58]:
# TODO: Replace <FILL IN> with appropriate code
top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15, key = lambda x: -x[1])
print('\n'.join(map(lambda wc: '{0}: {1}'.format(wc[0], wc[1]), top15WordsAndCounts)))

the: 27361
and: 26028
i: 20681
to: 19150
of: 17463
a: 14593
you: 13615
my: 12481
in: 10956
that: 10890
is: 9134
not: 8497
with: 7771
me: 7769
it: 7678


In [59]:
# TEST Count the words (4f)
Test.assertEquals(top15WordsAndCounts,
                  [(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463),
                   (u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890),
                   (u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678)],
                  'incorrect value for top15WordsAndCounts')

1 test passed.


# Part 2 : Short tutorial to DataFrames 

and chaining together transformations and actions

## 1. Working with your first DataFrames

In Spark, we first create a base [DataFrame](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame). We can then apply one or more transformations to that base DataFrame. *A DataFrame is immutable, so once it is created, it cannot be changed.* As a result, each transformation creates a new DataFrame. Finally, we can apply one or more actions to the DataFrames.

> Note that Spark uses lazy evaluation, so transformations are not actually executed until an action occurs.

We will perform several exercises to obtain a better understanding of DataFrames:
* Create a Python collection of 10,000 people 
* Create a Spark DataFrame from that collection
* Subtract one from each value using `map`
* Perform action `collect` to view results
* Perform action `count` to view counts
* Apply transformation `filter` and view results with `collect`
* Learn about lambda functions
* Explore how lazy evaluation works and the debugging challenges that it introduces

A DataFrame consists of a series of `Row` objects; each `Row` object has a set of named columns. You can think of a DataFrame as modeling a table, though the data source being processed does not have to be a table.

More formally, a DataFrame must have a _schema_, which means it must consist of columns, each of which has a _name_ and a _type_. Some data sources have schemas built into them. Examples include RDBMS databases, Parquet files, and NoSQL databases like Cassandra. Other data sources don't have computer-readable schemas, but you can often apply a schema programmatically.

## 2. Create a Python collection of 10,000 people

We will use a third-party Python testing library called [Faker](https://pypi.python.org/pypi/fake-factory/0.5.3) to create a collection of fake person records.

In [60]:
!pip install Faker --user



<b style="color:red">After successfully installing the library, use the "Exécution" menu and select "Redémarrer l'environnement d'exécution...".</b>

You have to reset the SparkContext and SQLContext, so execute this again :

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

import findspark
findspark.init()
import sys

from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName("Digital College Paris").master("local[*]").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
#re-importing the Test library
from databricks_test_helper import Test

In [0]:
from faker import Factory
fake = Factory.create('fr_FR')
fake.seed(4321)

We're going to use this factory to create a collection of randomly generated people records. In the next section, we'll turn that collection into a DataFrame. We'll use the Spark `Row` class,
because that will help us define the Spark DataFrame schema. There are other ways to define schemas, though; see
the Spark Programming Guide's discussion of [schema inference](http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection) for more information. (For instance,
we could also use a Python `namedtuple`.)

In [0]:
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row

def fake_entry():
    name = fake.name().split()
    return (name[1], name[0], fake.ssn(), fake.job(),
            abs(2016 - fake.date_time().year) + 1)
# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
    for _ in range(times):
        yield func(*args, **kwargs)

In [64]:
for a in repeat(3, fake_entry): print(a)

('Girard', 'Guillaume', '510-29-0746', 'Ingénieur réservoir', 20)
('Leveque', 'Pauline', '018-27-3493', 'Ingénieur en informatique', 18)
('Guichard', 'Philippine', '566-11-5493', "Chargé d'études naturalistes", 31)


In [65]:
data = list(repeat(10000, fake_entry))
print(data)

[('Ruiz', 'Michelle', '561-47-8114', 'Agent de transit', 34), ('Lebreton', 'Alex', '658-13-3841', 'Architecte produit industriel', 7), ('Lombard', 'Sophie', '981-29-0464', 'Chercheur en biologie', 38), ('Bonnin', 'Élise', '035-58-3904', 'Encadreur', 30), ('Valentin', 'Tristan', '358-19-7650', 'Mécatronicien', 38), ('de', 'Margaud', '143-41-2672', 'Ingénieur chimiste en développement analytique', 2), ('Mercier', 'Martine-Thérèse', '938-12-8151', 'Officier de la marine marchande', 12), ('Peron', 'Antoine', '385-65-0441', 'Directeur de création', 36), ('Chartier', 'Lucas', '356-40-3015', 'Électromécanicien ', 35), ('Marchand', 'Émilie', '427-32-8671', 'Assistant', 2), ('Loiseau', 'Christophe', '987-30-8626', 'Chercheur en physique   ', 3), ('Fabre-Marin', 'Thérèse', '175-24-9412', 'Ingénieur process aval', 16), ('Laine', 'Théophile', '412-10-7314', 'Ingénieur réservoir', 47), ('Blanchard', 'Odette', '893-14-5482', 'Optronicien', 8), ('Bertrand', 'Gilles-Yves', '365-88-5286', 'Développeur 

`data` is just a normal Python list, containing Python tuples objects. Let's look at the first item in the list:

## 3. Distributed data and using a collection to create a DataFrame

In Spark, datasets are represented as a list of entries, where the list is broken up into many different partitions that are each stored on a different machine.  Each partition holds a unique subset of the entries in the list.  Spark calls datasets that it stores "Resilient Distributed Datasets" (RDDs). Even DataFrames are ultimately represented as RDDs, with additional meta-data.

<img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-3b.png" style="width: 900px; float: right; margin: 5px"/>

One of the defining features of Spark, compared to other data analytics frameworks (e.g., Hadoop), is that it stores data in memory rather than on disk.  This allows Spark applications to run much more quickly, because they are not slowed down by needing to read data from disk.
The figure to the right illustrates how Spark breaks a list of data entries into partitions that are each stored in memory on a worker.


To create the DataFrame, we'll use `sqlContext.createDataFrame()`, and we'll pass our array of data in as an argument to that function. Spark will create a new set of input data based on data that is passed in.  A DataFrame requires a _schema_, which is a list of columns, where each column has a name and a type. Our list of data has elements with types (mostly strings, but one integer). We'll supply the rest of the schema and the column names as the second argument to `createDataFrame()`.

Let's view the help for `createDataFrame()`.

In [66]:
help(sqlContext.createDataFrame)

Help on method createDataFrame in module pyspark.sql.context:

createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True) method of pyspark.sql.context.SQLContext instance
    Creates a :class:`DataFrame` from an :class:`RDD`, a list or a :class:`pandas.DataFrame`.
    
    When ``schema`` is a list of column names, the type of each column
    will be inferred from ``data``.
    
    When ``schema`` is ``None``, it will try to infer the schema (column names and types)
    from ``data``, which should be an RDD of :class:`Row`,
    or :class:`namedtuple`, or :class:`dict`.
    
    When ``schema`` is :class:`pyspark.sql.types.DataType` or a datatype string it must match
    the real data, or an exception will be thrown at runtime. If the given schema is not
    :class:`pyspark.sql.types.StructType`, it will be wrapped into a
    :class:`pyspark.sql.types.StructType` as its only field, and the field name will be "value",
    each record will also be wrapped into a tuple, wh

In [0]:
dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))

Let's take a look at the DataFrame's schema and some of its rows.

In [68]:
dataDF.printSchema()

root
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- ssn: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- age: long (nullable = true)



How many partitions will the DataFrame be split into?

In [69]:
dataDF.rdd.getNumPartitions()

2

## 4. Subtract one from each value using _select_

So far, we've created a distributed DataFrame that is splitted into many partitions, where each partition is stored on a single machine in our cluster.  Let's look at what happens when we do a basic operation on the dataset.  Many useful data analysis operations can be specified as "do something to each item in the dataset".  These data-parallel operations are convenient because each item in the dataset can be processed individually: the operation on one entry doesn't effect the operations on any of the other entries.  Therefore, Spark can parallelize the operation.

One of the most common DataFrame operations is `select()`, and it works more or less like a SQL `SELECT` statement: You can select specific columns from the DataFrame, and you can even use `select()` to create _new_ columns with values that are derived from existing column values. We can use `select()` to create a new column that decrements the value of the existing `age` column.

`select()` is a _transformation_. It returns a new DataFrame that captures both the previous DataFrame and the operation to add to the query (`select`, in this case). But it does *not* actually execute anything on the cluster. When transforming DataFrames, we are building up a _query plan_. That query plan will be optimized, implemented (in terms of RDDs), and executed by Spark _only_ when we call an action (see Lazy Evaluation process).

In [0]:
# Transform dataDF through a select transformation and rename the newly created '(age -1)' column to 'age'
# Because select is a transformation and Spark uses lazy evaluation, no jobs, stages,
# or tasks will be launched when we run this code.
subDF = dataDF.select('last_name', 'first_name', 'ssn', 'occupation', (dataDF.age - 1).alias('age'))

##  5.  Use _collect_ or _show_ to view results

<img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-3d.png" style="height:700px;float:right"/>

To see a list of elements decremented by one, we need to create a new list on the driver from the the data distributed in the executor nodes.  To do this we can call the `collect()` method on our DataFrame.  `collect()` is often used after transformations to ensure that we are only returning a *small* amount of data to the driver.  This is done because the data returned to the driver must fit into the driver's available memory.  If not, the driver will crash.

The `collect()` method is the first action operation that we have encountered.  Action operations cause Spark to perform the (lazy) transformation operations that are required to compute the values returned by the action.  In our example, this means that tasks will now be launched to perform the `createDataFrame`, `select`, and `collect` operations.

In the diagram, the dataset is broken into four partitions, so four `collect()` tasks are launched. Each task collects the entries in its partition and sends the result to the driver, which creates a list of the values, as shown in the figure below.

Now let's run `collect()` on `subDF`.

In [71]:
subDF.take(3)

[Row(last_name='Ruiz', first_name='Michelle', ssn='561-47-8114', occupation='Agent de transit', age=33),
 Row(last_name='Lebreton', first_name='Alex', ssn='658-13-3841', occupation='Architecte produit industriel', age=6),
 Row(last_name='Lombard', first_name='Sophie', ssn='981-29-0464', occupation='Chercheur en biologie', age=37)]

In [72]:
# Let's collect the data
results = subDF.collect()
print (results)

[Row(last_name='Ruiz', first_name='Michelle', ssn='561-47-8114', occupation='Agent de transit', age=33), Row(last_name='Lebreton', first_name='Alex', ssn='658-13-3841', occupation='Architecte produit industriel', age=6), Row(last_name='Lombard', first_name='Sophie', ssn='981-29-0464', occupation='Chercheur en biologie', age=37), Row(last_name='Bonnin', first_name='Élise', ssn='035-58-3904', occupation='Encadreur', age=29), Row(last_name='Valentin', first_name='Tristan', ssn='358-19-7650', occupation='Mécatronicien', age=37), Row(last_name='de', first_name='Margaud', ssn='143-41-2672', occupation='Ingénieur chimiste en développement analytique', age=1), Row(last_name='Mercier', first_name='Martine-Thérèse', ssn='938-12-8151', occupation='Officier de la marine marchande', age=11), Row(last_name='Peron', first_name='Antoine', ssn='385-65-0441', occupation='Directeur de création', age=35), Row(last_name='Chartier', first_name='Lucas', ssn='356-40-3015', occupation='Électromécanicien ', age

In [73]:
subDF.show() # look at parameters

+-----------+---------------+-----------+--------------------+---+
|  last_name|     first_name|        ssn|          occupation|age|
+-----------+---------------+-----------+--------------------+---+
|       Ruiz|       Michelle|561-47-8114|    Agent de transit| 33|
|   Lebreton|           Alex|658-13-3841|Architecte produi...|  6|
|    Lombard|         Sophie|981-29-0464|Chercheur en biol...| 37|
|     Bonnin|          Élise|035-58-3904|           Encadreur| 29|
|   Valentin|        Tristan|358-19-7650|       Mécatronicien| 37|
|         de|        Margaud|143-41-2672|Ingénieur chimist...|  1|
|    Mercier|Martine-Thérèse|938-12-8151|Officier de la ma...| 11|
|      Peron|        Antoine|385-65-0441|Directeur de créa...| 35|
|   Chartier|          Lucas|356-40-3015|  Électromécanicien | 34|
|   Marchand|         Émilie|427-32-8671|           Assistant|  1|
|    Loiseau|     Christophe|987-30-8626|Chercheur en phys...|  2|
|Fabre-Marin|        Thérèse|175-24-9412|Ingénieur process...|

## 6. Use _count_ to get total

One of the most basic jobs that we can run is the `count()` job which will count the number of elements in a DataFrame, using the `count()` action. Since `select()` creates a new DataFrame with the same number of elements as the starting DataFrame, we expect that applying `count()` to each DataFrame will return the same result.

<img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-3e.png" style="height:750px;float:right"/>

Note that because `count()` is an action operation, if we had not already performed an action with `collect()`, then Spark would now perform the transformation operations when we executed `count()`.

Each task counts the entries in its partition and sends the result to your SparkContext, which adds up all of the counts. The figure on the right shows what would happen if we ran `count()` on a small example dataset with just four partitions.

In [74]:
print (dataDF.count())
print (subDF.count())

10000
10000


## 7. Apply transformation _filter_ and view results with _collect_

Next, we'll create a new DataFrame that only contains the people whose ages are less than 10. To do this, we'll use the `filter()` transformation. (You can also use `where()`, an alias for `filter()`, if you prefer something more SQL-like). The `filter()` method is a transformation operation that creates a new DataFrame from the input DataFrame, keeping only values that match the filter expression.

The figure shows how this might work on the small four-partition dataset.

<img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-3f.png" style="height:700px;float:right"/>


- To view the filtered list of elements less than 10, we need to create a new list on the driver from the distributed data on the executor nodes.  We use the `collect()` method to return a list that contains all of the elements in this filtered DataFrame to the driver program.

In [75]:
filteredDF = subDF.filter(subDF.age < 10)
filteredDF.show(truncate=False)
filteredDF.count()

+-------------+---------------+-----------+----------------------------------------------+---+
|last_name    |first_name     |ssn        |occupation                                    |age|
+-------------+---------------+-----------+----------------------------------------------+---+
|Lebreton     |Alex           |658-13-3841|Architecte produit industriel                 |6  |
|de           |Margaud        |143-41-2672|Ingénieur chimiste en développement analytique|1  |
|Marchand     |Émilie         |427-32-8671|Assistant                                     |1  |
|Loiseau      |Christophe     |987-30-8626|Chercheur en physique                         |2  |
|Blanchard    |Odette         |893-14-5482|Optronicien                                   |7  |
|Texier       |David          |096-41-2299|Responsable de formation                      |2  |
|Fernandes    |Aurélie        |006-81-0101|Encadreur                                     |2  |
|Blanc        |Véronique      |135-14-0051|Enseign

2647

## 8. Additional DataFrame actions

Let's investigate some additional actions:

* [first()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.first)
* [take()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.take)

One useful thing to do when we have a new dataset is to look at the first few entries to obtain a rough idea of what information is available.  In Spark, we can do that using actions like `first()`, `take()`, and `show()`. Note that for the `first()` and `take()` actions, the elements that are returned depend on how the DataFrame is *partitioned*.

Instead of using the `collect()` action, we can use the `take(n)` action to return the first _n_ elements of the DataFrame. The `first()` action returns the first element of a DataFrame, and is equivalent to `take(1)[0]`.

In [76]:
print ("first: {0}\n".format(filteredDF.first()))
print ("Four of them: {0}\n".format(filteredDF.take(4)))

first: Row(last_name='Lebreton', first_name='Alex', ssn='658-13-3841', occupation='Architecte produit industriel', age=6)

Four of them: [Row(last_name='Lebreton', first_name='Alex', ssn='658-13-3841', occupation='Architecte produit industriel', age=6), Row(last_name='de', first_name='Margaud', ssn='143-41-2672', occupation='Ingénieur chimiste en développement analytique', age=1), Row(last_name='Marchand', first_name='Émilie', ssn='427-32-8671', occupation='Assistant', age=1), Row(last_name='Loiseau', first_name='Christophe', ssn='987-30-8626', occupation='Chercheur en physique   ', age=2)]



## 9. Additional DataFrame transformations

**  _orderBy_**

[`orderBy()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.distinct) allows you to sort a DataFrame by one or more columns, producing a new DataFrame.

For example, let's get the first five oldest people in the original (unfiltered) DataFrame. We can use the `orderBy()` transformation. `orderBy` takes one or more columns, either as _names_ (strings) or as `Column` objects. To get a `Column` object, we use one of two notations on the DataFrame:

* Pandas-style notation: `filteredDF.age`
* Subscript notation: `filteredDF['age']`

Both of those syntaxes return a `Column`, which has additional methods like `desc()` (for sorting in descending order) or `asc()` (for sorting in ascending order, which is the default).

Here are some examples:

```
dataDF.orderBy(dataDF['age'])  # sort by age in ascending order; returns a new DataFrame
dataDF.orderBy(dataDF.last_name.desc()) # sort by last name in descending order
```

In [77]:
# Get the five oldest people in the list. To do that, sort by age in descending order.
dataDF.orderBy(dataDF.age.desc()).take(5)

[Row(last_name='Le', first_name='Benjamin', ssn='779-72-0132', occupation='Éducateur', age=47),
 Row(last_name='Barbe', first_name='Tristan', ssn='683-32-8410', occupation='Agent de développement local', age=47),
 Row(last_name='Fleury-Pascal', first_name='Tristan', ssn='749-23-4305', occupation='Secrétaire médical', age=47),
 Row(last_name='Laine', first_name='Théophile', ssn='412-10-7314', occupation='Ingénieur réservoir', age=47),
 Row(last_name='Hamon', first_name='Jeannine-Camille', ssn='160-85-8720', occupation='Enseignant humanitaire', age=47)]

**  _distinct_ and _dropDuplicates_**

[`distinct()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.distinct) filters out duplicate rows, and it considers all columns. Since our data is completely randomly generated (by `fake-factory`), it's extremely unlikely that there are any duplicate rows:

In [78]:
print (dataDF.count())
print (dataDF.distinct().count())

10000
10000


** _drop_**

[`drop()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.drop) is like the opposite of `select()`: Instead of selecting specific columns from a DataFrame, it drops a specifed column from a DataFrame.

Here's a simple use case: Suppose you're reading from a 1,000-column CSV file, and you have to get rid of five of the columns. Instead of selecting 995 of the columns, it's easier just to drop the five you don't want.

**_groupBy_**

[`groupBy()`]((http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy) is one of the most powerful transformations. It allows you to perform aggregations on a DataFrame.

Unlike other DataFrame transformations, `groupBy()` does _not_ return a DataFrame. Instead, it returns a special [GroupedData](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData) object that contains various aggregation functions.

The most commonly used aggregation function is [count()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.count),
but there are others (like [sum()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.sum), [max()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.max), and [avg()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.avg).

These aggregation functions typically create a new column and return a new DataFrame.

In [79]:
dataDF.groupBy('occupation').count().show(truncate=False)

+------------------------------------------------------------+-----+
|occupation                                                  |count|
+------------------------------------------------------------+-----+
|Chef de station de traitement des eaux                      |19   |
|Officier de l'armée de l'air                                |17   |
|Iconographe                                                 |19   |
|Microtechnicien                                             |22   |
|Technicien céramiste                                        |14   |
|Officier de marine                                          |19   |
|Architecte                                                  |17   |
|Mécanicien d'entretien d'avion                              |17   |
|Chargé d'études ressources humaines                         |14   |
|Secrétaire de rédaction                                     |15   |
|Acousticien                                                 |8    |
|Secrétaire d'édition             

# Part 3 : WordCount with Dataframes

**_Create a DataFrame_**

We'll start by generating a base DataFrame by using a Python list of tuples and the `sqlContext.createDataFrame` method.  Then we'll print out the type and schema of the DataFrame.  The Python API has several examples for using the [`createDataFrame` method](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.createDataFrame).

In [80]:
wordsDF = sqlContext.createDataFrame([('cat',), ('elephant',), ('rat',), ('rat',), ('cat', )], ['word'])
wordsDF.show()
print (type(wordsDF))
wordsDF.printSchema()

+--------+
|    word|
+--------+
|     cat|
|elephant|
|     rat|
|     rat|
|     cat|
+--------+

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- word: string (nullable = true)



## 1. Counting with Spark SQL and DataFrames 

Now, let's count the number of times a particular word appears in the 'word' column. There are multiple ways to perform the counting, but some are much less efficient than others.

A naive approach would be to call `collect` on all of the elements and count them in the driver program. While this approach could work for small datasets, we want an approach that will work for any size dataset including terabyte- or petabyte-sized datasets. In addition, performing all of the work in the driver program is slower than performing it in parallel in the workers. For these reasons, we will use data parallel operations.

**_Using `groupBy` and `count`_**

Using DataFrames, we can preform aggregations by grouping the data using the [`groupBy` function](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy) on the DataFrame.  Using `groupBy` returns a [`GroupedData` object](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData) and we can use the functions available for `GroupedData` to aggregate the groups.  For example, we can call `avg` or `count` on a `GroupedData` object to obtain the average of the values in the groups or the number of occurrences in the groups, respectively.

To find the counts of words, group by the words and then use the [`count` function](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.count) to find the number of times that words occur.

In [81]:
# TODO: Replace <FILL IN> with appropriate code
wordCountsDF = (wordsDF.groupBy('word').count())
wordCountsDF.show()

+--------+-----+
|    word|count|
+--------+-----+
|     rat|    2|
|     cat|    2|
|elephant|    1|
+--------+-----+



In [82]:
# TEST groupBy and count (2a)
Test.assertEquals(sorted(wordCountsDF.collect()), [('cat', 2), ('elephant', 1), ('rat', 2)],
                 'incorrect counts for wordCountsDF')

1 test passed.


## 2. Finding unique words and a mean value 

** _Unique words_ **

Calculate the number of unique words in `wordsDF`.  You can use other DataFrames that you have already created to make this easier.



In [83]:
# TODO: Replace <FILL IN> with appropriate code
uniqueWordsCount = wordCountsDF.count()
print(uniqueWordsCount)

3


In [84]:
# TEST Unique words 
Test.assertEquals(uniqueWordsCount, 3, 'incorrect count of unique words')

1 test passed.


##  Apply word count to a file 

In this section we will finish developing our word count application.  We'll have to build the `wordCount` function, deal with real world problems like capitalization and punctuation, load in our data source, and compute the word count on the new data.

**  The `wordCount` function **

First, define a function for word counting.  You should reuse the techniques that have been covered in earlier parts of this lab.  This function should take in a DataFrame that is a list of words like `wordsDF` and return a DataFrame that has all of the words and their associated counts.

In [85]:
# TODO: Replace <FILL IN> with appropriate code
def wordCount(wordListDF):
    """Creates a DataFrame with word counts.

    Args:
        wordListDF (DataFrame of str): A DataFrame consisting of one string column called 'word'.

    Returns:
        DataFrame of (str, int): A DataFrame containing 'word' and 'count' columns.
    """
    return wordListDF.groupBy('word').count()

wordCount(wordsDF).show()

+--------+-----+
|    word|count|
+--------+-----+
|     rat|    2|
|     cat|    2|
|elephant|    1|
+--------+-----+



In [86]:
# TEST wordCount function (4a)
Test.assertEquals(sorted(wordCount(wordsDF).collect()),
                  [('cat', 2), ('elephant', 1), ('rat', 2)],
                  'incorrect definition for wordCountDF function')

1 test passed.


**  _Capitalization and punctuation_ **

Real world files are more complicated than the data we have been using in this lab. Some of the issues we have to address are:
  + Words should be counted independent of their capitialization (e.g., Spark and spark should be counted as the same word).
  + All punctuation should be removed.
  + Any leading or trailing spaces on a line should be removed.

Define the function `removePunctuation` that converts all text to lower case, removes any punctuation, and removes leading and trailing spaces.  

To complete this task, you should use Pyspark implemented functions such as :

[regexp_replace](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.regexp_replace) module to remove any text that is not a letter, number, or space. If you are unfamiliar with regular expressions, refer to the `removePunctuation` function defined previously in this notebook. You may also want to review [this tutorial](https://developers.google.com/edu/python/regular-expressions) from Google.  Also, [this website](https://regex101.com/#python) is  a great resource for debugging your regular expression. 

You should also use the `trim` and `lower` functions found in [pyspark.sql.functions] in order to lower text and remove leading and trailing spaces. (http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions).

> Note that you shouldn't use any RDD operations or need to create custom user defined functions (udfs) to accomplish this task

In [87]:
# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import regexp_replace, trim, col, lower
def removePunctuation(column):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        column (Column): A Column containing a sentence.

    Returns:
        Column: A Column named 'sentence' with clean-up operations applied.
    """
    return lower(trim(regexp_replace(column, "[^\w\s\d]*","")))

sentenceDF = sqlContext.createDataFrame([('Hi, you!',),
                                         (' No under_score!',),
                                         (' *      Remove punctuation then spaces  * ',)], ['sentence'])
sentenceDF.show(truncate=False)
(sentenceDF
 .select(removePunctuation(col('sentence')))
 .show(truncate=False))

+------------------------------------------+
|sentence                                  |
+------------------------------------------+
|Hi, you!                                  |
| No under_score!                          |
| *      Remove punctuation then spaces  * |
+------------------------------------------+

+---------------------------------------------------+
|lower(trim(regexp_replace(sentence, [^\w\s\d]*, )))|
+---------------------------------------------------+
|hi you                                             |
|no under_score                                     |
|remove punctuation then spaces                     |
+---------------------------------------------------+



** _Load a text file_ **

For the next part of this lab, we will use the [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page). To convert a text file into a DataFrame, we use the `sqlContext.read.text()` method. We also apply the recently defined `removePunctuation()` function using a `select()` transformation to strip out the punctuation and change all text to lower case.  Since the file is large we use `show(15)`, so that we only print 15 lines.

In [88]:
import os.path
baseDir = os.path.join('..')
inputPath = os.path.join('/content/drive/My Drive/Colab Notebooks/data', 'shakespeare.txt')
fileName = os.path.join(baseDir, inputPath)

shakespeareDF = sqlContext.read.text(fileName).select(removePunctuation(col('value')))
shakespeareDF.show(15, truncate=False)

+-------------------------------------------------+
|lower(trim(regexp_replace(value, [^\w\s\d]*, ))) |
+-------------------------------------------------+
|1609                                             |
|                                                 |
|the sonnets                                      |
|                                                 |
|by william shakespeare                           |
|                                                 |
|                                                 |
|                                                 |
|1                                                |
|from fairest creatures we desire increase        |
|that thereby beautys rose might never die        |
|but as the riper should by time decease          |
|his tender heir might bear his memory            |
|but thou contracted to thine own bright eyes     |
|feedst thy lights flame with selfsubstantial fuel|
+-------------------------------------------------+
only showing

** _Words from lines_ **

Before we can use the `wordcount()` function, we have to address two issues with the format of the DataFrame:
  + The first issue is that  that we need to split each line by its spaces.
  + The second issue is we need to filter out empty lines or words.

Apply a transformation that will split each 'sentence' in the DataFrame by its spaces, and then transform from a DataFrame that contains lists of words into a DataFrame with each word in its own row.  To accomplish these two tasks you can use the `split` and `explode` functions found in [pyspark.sql.functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions).

Once you have a DataFrame with one word per row you can apply the [DataFrame operation `where`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where) to remove the rows that contain ''.

> Note that `shakeWordsDF` should be a DataFrame with one column named `word`.

In [93]:
# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import split, explode

shakeWordsDF = (shakespeareDF
                .select(explode(split(shakespeareDF.columns[0]," ")).alias('word')).where("word!=''"))
shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print(shakeWordsDFCount)

+-----------+
|       word|
+-----------+
|       1609|
|        the|
|    sonnets|
|         by|
|    william|
|shakespeare|
|          1|
|       from|
|    fairest|
|  creatures|
|         we|
|     desire|
|   increase|
|       that|
|    thereby|
|    beautys|
|       rose|
|      might|
|      never|
|        die|
+-----------+
only showing top 20 rows

882996


In [90]:
# TEST Remove empty elements (4d)
Test.assertEquals(shakeWordsDF.count(), 882996, 'incorrect value for shakeWordCount')
Test.assertEquals(shakeWordsDF.columns, ['word'], "shakeWordsDF should only contain the Column 'word'")

1 test passed.
1 test passed.


**  _Count the words_ **

We now have a DataFrame that is only words.  Next, let's apply the `wordCount()` function to produce a list of word counts. We can view the first 20 words by using the `show()` action; however, we'd like to see the words in descending order of count, so we'll need to apply the [`orderBy` DataFrame method](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.orderBy) to first sort the DataFrame that is returned from `wordCount()`.

You'll notice that many of the words are common English words. These are called stopwords. In a later lab, we will see how to eliminate them from the results.

In [91]:
# TODO: Replace <FILL IN> with appropriate code
from pyspark.sql.functions import desc
topWordsAndCountsDF = wordCount(shakeWordsDF).orderBy(desc("count"))
topWordsAndCountsDF.show()

+----+-----+
|word|count|
+----+-----+
| the|27361|
| and|26028|
|   i|20681|
|  to|19150|
|  of|17463|
|   a|14593|
| you|13615|
|  my|12481|
|  in|10956|
|that|10890|
|  is| 9134|
| not| 8497|
|with| 7771|
|  me| 7769|
|  it| 7678|
| for| 7558|
| his| 6857|
|  be| 6857|
|your| 6655|
|this| 6602|
+----+-----+
only showing top 20 rows



In [92]:
# TEST Count the words (4e)
Test.assertEquals(topWordsAndCountsDF.take(15),
                  [(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463),
                   (u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890),
                   (u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678)],
                  'incorrect value for top15WordsAndCountsDF')

1 test passed.
