## Integrating Spark and Jupyter Notebook

PySpark isn't on sys.path by default, but that doesn't mean it can't be used as a regular library. You can address this by either symlinking pyspark into your site-packages, or adding pyspark to sys.path at runtime. findspark does the latter.

```
nano ~/.bash_profile
export SPARK_HOME="/usr/local/Cellar/apache-spark/2.2.0/libexec/"

pip install findspark
```

## To Spark or not to Spark, that is the question

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
sc = pyspark.SparkContext()

In [3]:
raw_hamlet = sc.textFile('hamlet.txt')

In [4]:
raw_hamlet.take(5)

['hamlet@0\t\tHAMLET',
 'hamlet@8',
 'hamlet@9',
 'hamlet@10\t\tDRAMATIS PERSONAE',
 'hamlet@29']

In [5]:
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))

In [6]:
split_hamlet.take(5)

[['hamlet@0', '', 'HAMLET'],
 ['hamlet@8'],
 ['hamlet@9'],
 ['hamlet@10', '', 'DRAMATIS PERSONAE'],
 ['hamlet@29']]

### Lambda functions are great for writing quick functions we can pass into PySpark methods with simple logic. 

Any function that returns a sequence of data in PySpark (versus a guaranteed Boolean value, like filter() requires) must use a yield statement to specify the values that should be pulled later.

### ```yield``` is a Python technique that allows the interpreter to generate data on the fly and pull it when necessary, instead of storing it to memory immediately

Finally, not all functions require us to use yield; only the ones that generate a custom sequence of data do. For map() or filter(), we use return to return a value for every single element in the RDD we're running the functions on.

In [7]:
def hamlet_speaks(line):
    id = line[0]
    speaketh = False
    
    if "HAMLET" in line:
        speaketh = True
    
    if speaketh:
        yield id,"hamlet speaketh!"

hamlet_spoken = split_hamlet.flatMap(lambda x: hamlet_speaks(x))
hamlet_spoken.take(10)

[('hamlet@0', 'hamlet speaketh!'),
 ('hamlet@75', 'hamlet speaketh!'),
 ('hamlet@1004', 'hamlet speaketh!'),
 ('hamlet@9144', 'hamlet speaketh!'),
 ('hamlet@12313', 'hamlet speaketh!'),
 ('hamlet@12434', 'hamlet speaketh!'),
 ('hamlet@12760', 'hamlet speaketh!'),
 ('hamlet@12858', 'hamlet speaketh!'),
 ('hamlet@14821', 'hamlet speaketh!'),
 ('hamlet@15261', 'hamlet speaketh!')]

### flatMap() is different than map() because it doesn't require an output for every element in the RDD.

The flatMap() method is useful whenever we want to generate a sequence of values from an RDD.

In this case, we want an RDD object that contains tuples of the unique line IDs and the text "hamlet speaketh!," but only for the elements in the RDD that have "HAMLET" in one of the values. We can't use the map() method for this because it requires a return value for every element in the RDD.

let's use a filter() with a named function to extract the original lines where Hamlet spoke. The functions we pass into filter() must return values, which will be either True or False.

In [8]:
def filter_hamlet_speaks(line):
    return "HAMLET" in line

hamlet_spoken_lines = split_hamlet.filter(lambda line: filter_hamlet_speaks(line))
hamlet_spoken_lines.take(5)

[['hamlet@0', '', 'HAMLET'],
 ['hamlet@75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['hamlet@1004', '', 'HAMLET'],
 ['hamlet@9144', '', 'HAMLET'],
 ['hamlet@12313',
  'HAMLET',
  '[Aside]  A little more than kin, and less than kind.']]

### Spark has two kinds of methods, transformations and actions. 

While we've explored some of the transformations, we haven't used any actions other than take()

Whenever we use an action method, Spark forces the evaluation of lazy code. If we only chain together transformation methods and print the resulting RDD object, we'll see the type of RDD (e.g. a PythonRDD or PipelinedRDD object), but not the elements within it. That's because the computation hasn't actually happened yet.

Even though Spark simplifies chaining lots of transformations together, it's good practice to use actions to observe the intermediate RDD objects between those transformations. This will let you know whether your transformations are working the way you expect them to.

### Count()

The count() method returns the number of elements in an RDD. count() is useful when we want to make sure the result of a transformation contains the right number of elements.

In [9]:
hamlet_spoken_lines.count()

381

### Collect()

We've used take() to preview the first few elements of an RDD, similar to the way we've use head() in pandas. But what about returning all of the elements in a collection? We need to do this to write an RDD to a CSV, for example. It's also useful for running some basic Python code over a collection without going through PySpark

Running .collect() on an RDD returns a list representation of it.

In [10]:
hamlet_spoken_lines.collect()

[['hamlet@0', '', 'HAMLET'],
 ['hamlet@75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['hamlet@1004', '', 'HAMLET'],
 ['hamlet@9144', '', 'HAMLET'],
 ['hamlet@12313',
  'HAMLET',
  '[Aside]  A little more than kin, and less than kind.'],
 ['hamlet@12434', 'HAMLET', "Not so, my lord; I am too much i' the sun."],
 ['hamlet@12760', 'HAMLET', 'Ay, madam, it is common.'],
 ['hamlet@12858', 'HAMLET', "Seems, madam! nay it is; I know not 'seems.'"],
 ['hamlet@14821', 'HAMLET', 'I shall in all my best obey you, madam.'],
 ['hamlet@15261', 'HAMLET', 'O, that this too too solid flesh would melt'],
 ['hamlet@16738', 'HAMLET', 'I am glad to see you well:'],
 ['hamlet@16863',
  'HAMLET',
  "Sir, my good friend; I'll change that name with you:"],
 ['hamlet@17006', 'HAMLET', 'I am very glad to see you. Good even, sir.'],
 ['hamlet@17149', 'HAMLET', 'I would not hear your enemy say so,'],
 ['hamlet@17459', 'HAMLET', 'I pray thee, do not mock me, fellow-student;'],
 ['hamlet@17605

In [11]:
spoken_101 = list(hamlet_spoken_lines.collect())[100]
spoken_101

['hamlet@58478', 'HAMLET', 'A goodly one; in which there are many confines,']

Transform the RDD split_hamlet into a new RDD hamlet_with_ids that contains the clean version of the line ID for each element.

- For example, we want to transform hamlet@0 to 0, and leave the rest of the values in that element untouched.
 -Recall that the map() function will run on each element in the RDD, where each element is a list that we can access using regular Python mechanics.

In [21]:
def format_id(x):
    id = x[0].split('@')[1]
    results = list()
    results.append(id)
    if len(x) > 1:
        for y in x[1:]:
            results.append(y)
    return results

hamlet_with_ids = split_hamlet.map(lambda line: format_id(line))
hamlet_with_ids.take(10)

[['0', '', 'HAMLET'],
 ['8'],
 ['9'],
 ['10', '', 'DRAMATIS PERSONAE'],
 ['29'],
 ['30'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['74'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['131']]