# Hamelet and Spark!

In a previous lesson, we touched briefly on transformations and actions, and how these two methods affect the execution of code. In this lesson, we'll dive deeper into how those mechanisms work, and explore a wider range of the functions built into the Spark core.

The file hamlet.txt contains the entire text of Shakespeare's play Hamlet. Shakespeare is well-known for his unique writing style and arguably one of the most influential writers in history. Hamlet is one of his most popular plays.

Let's perform some text analysis on it. The file is in pure text format, though, and not ready for analysis. Before we can proceed, we'll have to clean up and reformat the data.

In [1]:
# Find path to PySpark.
import findspark
findspark.init()

# Import PySpark and initialize SparkContext object.
import pyspark
sc = pyspark.SparkContext.getOrCreate()

In [2]:
raw_hamlet = sc.textFile('data/hamlet.txt')
raw_hamlet.take(5)

['hamlet@0\t\tHAMLET',
 'hamlet@8',
 'hamlet@9',
 'hamlet@10\t\tDRAMATIS PERSONAE',
 'hamlet@29']

The text file uses the tab character (\t) as a delimiter. We'll need to split the file on the tab delimiter and convert the results into an RDD that's more manageable.

In [3]:
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))

In [4]:
split_hamlet.take(15)

[['hamlet@0', '', 'HAMLET'],
 ['hamlet@8'],
 ['hamlet@9'],
 ['hamlet@10', '', 'DRAMATIS PERSONAE'],
 ['hamlet@29'],
 ['hamlet@30'],
 ['hamlet@31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['hamlet@74'],
 ['hamlet@75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['hamlet@131'],
 ['hamlet@132', 'POLONIUS', 'lord chamberlain. (LORD POLONIUS:)'],
 ['hamlet@176'],
 ['hamlet@177', 'HORATIO', 'friend to Hamlet.'],
 ['hamlet@203'],
 ['hamlet@204', 'LAERTES', 'son to Polonius.']]

Lambda functions are great for writing quick functions we can pass into PySpark methods with simple logic. They fall short when we need to write more customized logic, though. Thankfully, PySpark lets us define a function in Python first, then pass it in. Any function that returns a sequence of data in PySpark (versus a guaranteed Boolean value, like filter() requires) must use a yield statement to specify the values that should be pulled later.

If you're unfamiliar with the yield statement in Python, read this excellent Stack Overflow answer on the topic. To summarize, yield is a Python technique that allows the interpreter to generate data on the fly and pull it when necessary, instead of storing it to memory immediately. Because of its unique architecture, Spark takes advantage of this technique to reduce overhead and improve the speed of computations.

Spark runs the named function on every element in the RDD and restricts it in scope. Each instance of the function only has access to the object(s) you pass into the function, and the Python libraries available in your environment. If you try to refer to variables outside the scope of the function or import libraries, those actions may cause the computation to crash. That's because Spark compiles the function's code to Java to run on the RDD objects (which are also in Java).

Finally, not all functions require us to use yield; only the ones that generate a custom sequence of data do. For map() or filter(), we use return to return a value for every single element in the RDD we're running the functions on.

In the following code cell, we'll use the flatMap() method with the named function hamlet_speaks to check whether a line in the play contains the text HAMLET in all caps (indicating that Hamlet spoke). flatMap() is different than map() because it doesn't require an output for every element in the RDD. The flatMap() method is useful whenever we want to generate a sequence of values from an RDD.

In this case, we want an RDD object that contains tuples of the unique line IDs and the text "hamlet speaketh!," but only for the elements in the RDD that have "HAMLET" in one of the values. We can't use the map() method for this because it requires a return value for every element in the RDD.

We want each element in the resulting RDD to have the following format:

1. The first value should be the unique line ID (e.g.'hamlet@0') , which is the first value in each of the elements in the split_hamlet RDD.

2. The second value should be the string "hamlet speaketh!"

In [5]:
def hamlet_speaks(line):
    id = line[0]
    speaketh = False
    
    if "HAMLET" in line:
        speaketh = True
    
    if speaketh:
        yield id,"hamlet speaketh!"

hamlet_spoken = split_hamlet.flatMap(lambda x: hamlet_speaks(x))
hamlet_spoken.take(10)

[('hamlet@0', 'hamlet speaketh!'),
 ('hamlet@75', 'hamlet speaketh!'),
 ('hamlet@1004', 'hamlet speaketh!'),
 ('hamlet@9144', 'hamlet speaketh!'),
 ('hamlet@12313', 'hamlet speaketh!'),
 ('hamlet@12434', 'hamlet speaketh!'),
 ('hamlet@12760', 'hamlet speaketh!'),
 ('hamlet@12858', 'hamlet speaketh!'),
 ('hamlet@14821', 'hamlet speaketh!'),
 ('hamlet@15261', 'hamlet speaketh!')]

hamlet_spoken now contains the line numbers for the lines where Hamlet spoke. While this is handy, we don't have the full line anymore. Instead, let's use a filter() with a named function to extract the original lines where Hamlet spoke. The functions we pass into filter() must return values, which will be either True or False.

### Instructions

1. Write a named function filter_hamlet_speaks to pass into filter(). Apply it to split_hamlet to return an RDD with the elements containing the word HAMLET.
2. Assign the resulting RDD to hamlet_spoken_lines.

In [6]:
def filter_hamlet_speaks(line):
    if "HAMLET" in line:
        return True
    return False

hamlet_spoken_lines = split_hamlet.filter(lambda line: filter_hamlet_speaks(line))
hamlet_spoken_lines.take(5)

[['hamlet@0', '', 'HAMLET'],
 ['hamlet@75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['hamlet@1004', '', 'HAMLET'],
 ['hamlet@9144', '', 'HAMLET'],
 ['hamlet@12313',
  'HAMLET',
  '[Aside]  A little more than kin, and less than kind.']]

As we've discussed before, Spark has two kinds of methods, transformations and actions. While we've explored some of the transformations, we haven't used any actions other than take().

Whenever we use an action method, Spark forces the evaluation of lazy code. If we only chain together transformation methods and print the resulting RDD object, we'll see the type of RDD (e.g. a PythonRDD or PipelinedRDD object), but not the elements within it. That's because the computation hasn't actually happened yet.

Even though Spark simplifies chaining lots of transformations together, it's good practice to use actions to observe the intermediate RDD objects between those transformations. This will let you know whether your transformations are working the way you expect them to.

#### Count()

The count() method returns the number of elements in an RDD. count() is useful when we want to make sure the result of a transformation contains the right number of elements. For example, if we know there should be an element in the resulting RDD for every element in the initial RDD, we can compare the counts of both to ensure they match.

To get the number of elements in the RDD hamlet_spoken_lines, run .count() on it:

hamlet_spoken_lines.count()

#### Collect()

We've used take() to preview the first few elements of an RDD, similar to the way we've use head() in pandas. But what about returning all of the elements in a collection? We need to do this to write an RDD to a CSV, for example. It's also useful for running some basic Python code over a collection without going through PySpark.

Running .collect() on an RDD returns a list representation of it. To get a list of all the elements in hamlet_spoken_lines, for example, we would write:

hamlet_spoken_lines.collect()

##### Instructions

1. Compute the number of elements in hamlet_spoken_lines, and assign the result to the variable named spoken_count.
2. Grab the 101st element in hamlet_spoken_lines (which has the list index 100), and assign that list to spoken_101.

In [7]:
spoken_count = 0
spoken_101 = list()
spoken_count = hamlet_spoken_lines.count()
spoken_101 = hamlet_spoken_lines.collect()[100]

The first value in each element (or line from the play) is a line number that identifies the line of the play the text is from. It appears in the following format:

'hamlet@0'
'hamlet@8',
'hamlet@9',
...
We don't need the hamlet@ at the beginning of these IDs for our data analysis. Let's extract just the integer part of the ID from each line, which is much more useful.

##### Instructions

1. Transform the RDD split_hamlet into a new RDD hamlet_with_ids that contains the clean version of the line ID for each element.

- For example, we want to transform hamlet@0 to 0, and leave the rest of the values in that element untouched.
    - Recall that the map() function will run on each element in the RDD, where each element is a list that we can access using regular Python mechanics.

In [8]:
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))
split_hamlet.take(5)

[['hamlet@0', '', 'HAMLET'],
 ['hamlet@8'],
 ['hamlet@9'],
 ['hamlet@10', '', 'DRAMATIS PERSONAE'],
 ['hamlet@29']]

In [9]:
def map_line(x):
    x[0] = x[0].split('hamlet@')[1]
    return x

hamlet_with_ids = split_hamlet.map(map_line)
hamlet_with_ids.take(10)

[['0', '', 'HAMLET'],
 ['8'],
 ['9'],
 ['10', '', 'DRAMATIS PERSONAE'],
 ['29'],
 ['30'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['74'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['131']]

Next, we want to get rid of elements that don't contain any actual words (and just have an ID as the first value). These typically represent blank lines between paragraphs or sections in the play. We also want to remove any blank values ('') within elements, which don't contain any useful information for our analysis.

Instructions

Clean up the RDD and store the result as a new RDD hamlet_text_only.lalasldkfjsadflkjqwerillkwjaçsdflkjwoierqkj asdfk

In [10]:
def removeBlank(line):
        return list(filter(None,line))
    
hamlet_text_only = hamlet_with_ids.filter(lambda x: True if len(x) > 1 else False).map(removeBlank)
hamlet_text_only.take(5)

[['0', 'HAMLET'],
 ['10', 'DRAMATIS PERSONAE'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['132', 'POLONIUS', 'lord chamberlain. (LORD POLONIUS:)']]

In [None]:
['|', 2, 3]

In [20]:
hamlet_text_only.map(lambda x: [y for y in x if y != '|']).take(10)

[['0', 'HAMLET'],
 ['10', 'DRAMATIS PERSONAE'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['132', 'POLONIUS', 'lord chamberlain. (LORD POLONIUS:)'],
 ['177', 'HORATIO', 'friend to Hamlet.'],
 ['204', 'LAERTES', 'son to Polonius.'],
 ['230', 'LUCIANUS', 'nephew to the king.'],
 ['261', 'VOLTIMAND'],
 ['273']]

In [18]:
def fix_pipe(line):
    results = list()
    for l in line:
        if l == "|":
            pass
        elif "|" in l:
            fmtd = l.replace("|", "")
            results.append(fmtd)
        else:
            results.append(l)
    return results

hamlet_text_only.map(lambda line: fix_pipe(line)).count()

4376

In [None]:
sc.stop()