In [2]:
import os
import sys
from pyspark import SparkContext

# Path for spark source folder
os.environ['SPARK_HOME']="C:\ProgramData\Anaconda3\lib\site-packages\pyspark"
os.environ['JAVA_HOME'] = "C:\java\jdk1.8.0_211"
os.environ['PYSPARK_SUBMIT_ARGS']="--master local[2] pyspark-shell"

# Append pyspark  to Python Path
sys.path.append("C:\ProgramData\Anaconda3\Lib\site-packages\pyspark\python\lib\py4j-0.9-src.zip")

try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    print ("Successfully imported Spark Modules")

except ImportError as e:
    print ("Can not import Spark Modules", e)
    sys.exit(1)


sc = SparkContext('local')
words = sc.parallelize(["scala","java","hadoop","spark","akka"])
print (words.count())
sc.stop()

Successfully imported Spark Modules
5


# Introduction to Spark

- If you'd like to learn how to install PySpark and integrate it with IPython Notebook, this wonderful blog [https://ramhiser.com/2015/02/01/configuring-ipython-notebook-support-for-pyspark/] post will walk you through the steps.
- The core data structure in Spark is a resilient distributed data set (RDD). 
- As the name suggests, an RDD is Spark's representation of a data set that's distributed across the RAM, or memory, of a cluster of many machines. 
- An RDD object is essentially a collection of elements we can use to hold lists of tuples, dictionaries, lists, etc. 
- Similar to a pandas DataFrame, we can load a data set into an RDD, and then run any of the methods accesible to that object.

#### PySpark

- While the Spark toolkit is in Scala, a language that compiles down to bytecode for the JVM, the open source community has developed a wonderful toolkit called PySpark that allows us to interface with RDDs in Python.
- While the Spark toolkit is in Scala, a language that compiles down to bytecode for the JVM, the open source community has developed a wonderful toolkit called PySpark that allows us to interface with RDDs in Python. Thanks to a library called Py4J, Python can interface with Java objects (in our case RDDs). Py4J is also one of the tools that makes PySpark work.
- Load the data set into an RDD. We're using the TSV version of FiveThirtyEight's data set. TSV files use a tab character ("\t") as the delimiter

In [19]:
sc.stop()
import os
import sys
from pyspark import SparkContext

# Path for spark source folder
os.environ['SPARK_HOME']="C:\ProgramData\Anaconda3\lib\site-packages\pyspark"
os.environ['JAVA_HOME'] = "C:\java\jdk1.8.0_211"
os.environ['PYSPARK_SUBMIT_ARGS']="--master local[2] pyspark-shell"

# Append pyspark  to Python Path
sys.path.append("C:\ProgramData\Anaconda3\Lib\site-packages\pyspark\python\lib\py4j-0.9-src.zip")

try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    print ("Successfully imported Spark Modules")

except ImportError as e:
    print ("Can not import Spark Modules", e)
    sys.exit(1)


sc = SparkContext('local')
raw_data = sc.textFile("C:/Users/jyotibo/Python_Test/Python_code/Spark/daily_show.tsv")

Successfully imported Spark Modules


In [22]:
raw_data.take(5)

['YEAR\tGoogleKnowlege_Occupation\tShow\tGroup\tRaw_Guest_List',
 '1999\tactor\t1/11/99\tActing\tMichael J. Fox',
 '1999\tComedian\t1/12/99\tComedy\tSandra Bernhard',
 '1999\ttelevision actress\t1/13/99\tActing\tTracey Ullman',
 '1999\tfilm actress\t1/14/99\tActing\tGillian Anderson']

- In Spark, the SparkContext object manages the connection to the clusters, and coordinates the running of processes on those clusters. More specifically, it connects to the cluster managers. 
- The cluster managers control the executors that run the computations.

<img src="cluster-overview.png" alt="Drawing" align="left" style="width: 500px;"/>

- https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#take
- https://www.qubole.com/resources/pyspark-cheatsheet/

- You may be wondering why, if an RDD resembles a Python list, we don't just use bracket notation to access elements in the RDD.

- The answer is that Spark distributes RDD objects across many partitions, and the RDD object is specifically designed to handle distributed data. We can't rely on the standard implementation of a list for these reasons.

- Spark offers many advantages over regular Python, though. For example, thanks to RDD abstraction, you can run Spark locally on your own computer. Spark will simulate distributing your calculations over many machines by automatically slicing your computer's memory into partitions.

- Spark's RDD implementation also lets us evaluate code "lazily," meaning we can postpone running a calculation until absolutely necessary. On the previous screen, Spark waited to load the TSV file into an RDD until raw_data.take(5) executed. When our code called raw_data = sc.textFile("dail_show.tsv"), Spark created a pointer to the file, but didn't actually read it into raw_data until raw_data.take(5) needed that variable to run its logic.

- The advantage of "lazy" evaluation is that we can build up a queue of tasks and let Spark optimize the overall workflow in the background. In regular Python, the interpreter can't do much workflow optimization. We'll see more examples of lazy evaluation later on.
- https://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

- The **key idea** to understand when working with Spark is data pipelining. 
- Every operation or calculation in Spark is essentially a series of steps that we can chain together and run in succession to form a pipeline.
- Each step in the pipeline returns either 
    - a Python value (such as an integer), 
    - a Python data structure (such as a dictionary), 
    - an RDD object. 
- We'll start with the map() function.

### Map() :
- The map(f) function applies the function f to every element in the RDD. 
- Because RDDs are iterable objects (like most Python objects), Spark runs function f on each iteration and returns a new RDD.
- We'll walk through an example of a map function so you can get a better sense of how it works. 
- If you look carefully, you'll see that raw_data is in a format that's hard to work with. 
- While the elements are currently all strings, we'd like to convert each of them into a list to make the data more manageable

In [24]:
#1. Call the RDD function `map()` to specify we want to apply the logic in the parentheses to every line in our data set.
#2. Write a lambda function that splits each line using the tab delimiter (\t), and assign the resulting RDD to `daily_show`.
#3. Call the RDD function `take()` on `daily_show` to display the first five elements (or rows) of the resulting RDD.

daily_show = raw_data.map(lambda line: line.split('\t'))
daily_show.take(5)

[['YEAR', 'GoogleKnowlege_Occupation', 'Show', 'Group', 'Raw_Guest_List'],
 ['1999', 'actor', '1/11/99', 'Acting', 'Michael J. Fox'],
 ['1999', 'Comedian', '1/12/99', 'Comedy', 'Sandra Bernhard'],
 ['1999', 'television actress', '1/13/99', 'Acting', 'Tracey Ullman'],
 ['1999', 'film actress', '1/14/99', 'Acting', 'Gillian Anderson']]

- One of the wonderful features of PySpark is the ability to separate our logic - which we prefer to write in Python - from the actual data transformation.
- In the previous code cell, we wrote this lambda function in Python code.
- Even though the function was in Python, we also took advantage of Scala when Spark actually ran the code over our RDD. This is the power of PySpark. 
- Without learning any Scala, we get to harness the data processing performance gains from Spark's Scala architecture.

#### Transformations and Actions :

- **There are two types of methods in Spark:**
    1. Transformations - map(), reduceByKey()
    2. Actions - take(), reduce(), saveAsTextFile(), collect()

- Transformations are lazy operations that always return a reference to an RDD object. 
- Spark doesn't actually run the transformations, though, until an action needs to use the RDD resulting from a transformation. 
- Any function that returns an RDD is a transformation, and any function that returns a value is an action. 
- These concepts will become more clear as we work through this lesson and practice writing PySpark code.

#### Immutability:
- You may be wondering why we couldn't just split each string in place, instead of creating a new object daily_show. 
- In Python, we could have modified the collection element-by-element in place, without returning and assigning the results to a new object.
- RDD objects are immutable, meaning that we can't change their values once we've created them. 
- In Python, list and dictionary objects are mutable (we can change their values), while tuple objects are immutable. 
- The only way to modify a tuple object in Python is to create a new tuple object with the necessary updates. 
- Spark uses the immutability of RDDs to enhance calculation speeds. 

**reduceByKey(f)**
- combines tuples with the same key using the function we specify, f

In [27]:
tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)

In [28]:
tally.take(tally.count())

[('YEAR', 1),
 ('1999', 166),
 ('2000', 169),
 ('2001', 157),
 ('2002', 159),
 ('2003', 166),
 ('2004', 164),
 ('2005', 162),
 ('2006', 161),
 ('2007', 141),
 ('2008', 164),
 ('2009', 163),
 ('2010', 165),
 ('2011', 163),
 ('2012', 164),
 ('2013', 166),
 ('2014', 163),
 ('2015', 100)]

- Unlike pandas, Spark knows nothing about column headers, and didn't set them aside. We need a way to remove the element ('YEAR', 1) from our collection.
- The only way to remove that tuple is to create a new RDD object that doesn't have it.
- Spark comes with a filter(f) function that creates a new RDD by filtering an existing one for specific criteria.
- If we specify a function f that returns a binary value, True or False, the resulting RDD will consist of elements where the function evaluated to True.

In [30]:
def filter_year(line):
    if line[0]=='YEAR':
        return False
    else:
        return True

filtered_daily_show = daily_show.filter(lambda line: filter_year(line))

- Spark improves on Hadoop's turnaround time significantly:
- https://www.quora.com/What-are-the-advantages-of-DAG-directed-acyclic-graph-execution-of-big-data-algorithms-over-MapReduce-I-know-that-Apache-Spark-Storm-and-Tez-use-the-DAG-execution-model-over-MapReduce-Why-Are-there-any-disadvantages/answer/Tathagata-Das?share=1&srid=umKP

In [31]:
filtered_daily_show.filter(lambda line: line[1] != '') \
                   .map(lambda line: (line[1].lower(), 1)) \
                   .reduceByKey(lambda x,y: x+y) \
                   .take(5)

[('actor', 596),
 ('comedian', 103),
 ('television actress', 13),
 ('film actress', 21),
 ('singer-lyricist', 2)]

In [36]:
raw_hamlet = sc.textFile("C:/Users/jyotibo/Python_Test/Python_code/Spark/hamlet.txt")
raw_hamlet.take(5)

['hamlet@0\t\tHAMLET',
 'hamlet@8',
 'hamlet@9',
 'hamlet@10\t\tDRAMATIS PERSONAE',
 'hamlet@29']

In [39]:
split_hamlet = raw_hamlet.map(lambda line : line.split("\t"))
split_hamlet.take(5)

[['hamlet@0', '', 'HAMLET'],
 ['hamlet@8'],
 ['hamlet@9'],
 ['hamlet@10', '', 'DRAMATIS PERSONAE'],
 ['hamlet@29']]

- Lambda functions are great for writing quick functions we can pass into PySpark methods with simple logic. 
- They fall short when we need to write more customized logic, though. 
- Thankfully, PySpark lets us define a function in Python first, then pass it in. 
- Any function that returns a sequence of data in PySpark (versus a guaranteed Boolean value, like filter() requires) must use a *yield* statement to specify the values that should be pulled later.

### Yield

- https://stackoverflow.com/questions/231767/what-does-the-yield-keyword-do/231855#231855
- To understand what yield does, you must understand what generators are. And before you can understand generators, you must understand iterables.

**Iterables**
- When you create a list, you can read its items one by one. Reading its items one by one is called iteration:

In [32]:
mylist = [1, 2, 3]
for i in mylist:
    print(i)

1
2
3


- mylist is an iterable. When you use a list comprehension, you create a list, and so an iterable:

In [33]:
mylist = [x*x for x in range(3)]
for i in mylist:
   print(i)

0
1
4


- Everything you can use "for... in..." on is an iterable; lists, strings, files...
- These iterables are handy because you can read them as much as you wish, but you store all the values in memory and this is not always what you want when you have a lot of values.

**Generators**
- Generators are iterators, a kind of iterable you can only iterate over once. Generators do not store all the values in memory, they generate the values on the fly:

In [34]:
mygenerator = (x*x for x in range(3))
for i in mygenerator:
   print(i)

0
1
4


- It is just the same except you used () instead of []. 
- BUT, you cannot perform for i in mygenerator a second time since generators can only be used once: they calculate 0, then forget about it and calculate 1, and end calculating 4, one by one.

**Yield**
- Yield is a keyword that is used like return, except the function will return a generator.

In [35]:
def createGenerator(): 
        mylist = range(3)
        for i in mylist:
            yield i*i
mygenerator = createGenerator() # create a generator
print(mygenerator) # mygenerator is an object!

for i in mygenerator:
    print(i)

<generator object createGenerator at 0x000001FB5024E6D8>
0
1
4


- Here it's a useless example, but it's handy when you know your function will return a huge set of values that you will only need to read once.
- To master yield, you must understand that when you call the function, the code you have written in the function body does not run. 
- The function only returns the generator object, this is a bit tricky :-)
- Then, your code will continue from where it left off each time for uses the generator.

**Now the hard part:**
- The first time the for calls the generator object created from your function, it will run the code in your function from the beginning until it hits yield, then it'll return the first value of the loop. 
- Then, each other call will run the loop you have written in the function one more time, and return the next value, until there is no value to return.
- The generator is considered empty once the function runs, but does not hit yield anymore. It can be because the loop had come to an end, or because you do not satisfy an "if/else" anymore

**To summarize**: 
- yield is a Python technique that allows the interpreter to generate data on the fly and pull it when necessary, instead of storing it to memory immediately. Because of its unique architecture, Spark takes advantage of this technique to reduce overhead and improve the speed of computations.
- Spark runs the named function on every element in the RDD and restricts it in scope. 
- Each instance of the function only has access to the object(s) you pass into the function, and the Python libraries available in your environment. 
- If you try to refer to variables outside the scope of the function or import libraries, those actions may cause the computation to crash. 
- That's because Spark compiles the function's code to Java to run on the RDD objects (which are also in Java).
- **Finally**, not all functions require us to use yield; only the ones that generate a custom sequence of data do. 
- For map() or filter(), we use return to return a value for every single element in the RDD we're running the functions on

**flatMap()**
- flatMap() is different than map() because it doesn't require an output for every element in the RDD. 
- The flatMap() method is useful whenever we want to generate a sequence of values from an RDD. 

- In the following code cell, we'll use the flatMap() method with the named function hamlet_speaks to check whether a line in the play contains the text HAMLET in all caps (indicating that Hamlet spoke). 

In [41]:
def hamlet_speaks(line):
    id = line[0]
    speaketh = False
    
    if "HAMLET" in line:
        yield id,"hamlet speaketh!"

hamlet_spoken = split_hamlet.flatMap(lambda x: hamlet_speaks(x))
hamlet_spoken.take(10)

[('hamlet@0', 'hamlet speaketh!'),
 ('hamlet@75', 'hamlet speaketh!'),
 ('hamlet@1004', 'hamlet speaketh!'),
 ('hamlet@9144', 'hamlet speaketh!'),
 ('hamlet@12313', 'hamlet speaketh!'),
 ('hamlet@12434', 'hamlet speaketh!'),
 ('hamlet@12760', 'hamlet speaketh!'),
 ('hamlet@12858', 'hamlet speaketh!'),
 ('hamlet@14821', 'hamlet speaketh!'),
 ('hamlet@15261', 'hamlet speaketh!')]

##### Write a named function filter_hamlet_speaks to pass into filter(). Apply it to split_hamlet to return an RDD with the elements containing the word HAMLET.
##### Assign the resulting RDD to hamlet_spoken_lines.

In [42]:
def filter_hamlet_speaks(line):
    if "HAMLET" in line:
        return line
    else:
        return False

hamlet_spoken_lines = split_hamlet.filter(lambda line: filter_hamlet_speaks(line))
hamlet_spoken_lines.take(5)

[['hamlet@0', '', 'HAMLET'],
 ['hamlet@75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['hamlet@1004', '', 'HAMLET'],
 ['hamlet@9144', '', 'HAMLET'],
 ['hamlet@12313',
  'HAMLET',
  '[Aside]  A little more than kin, and less than kind.']]

- Spark has two kinds of methods, transformations and actions. 
- While we've explored some of the transformations, we haven't used any actions other than take()
- Whenever we use an action method, Spark forces the evaluation of lazy code. 
- If we only chain together transformation methods and print the resulting RDD object, we'll see the type of RDD (e.g. a PythonRDD or PipelinedRDD object), but not the elements within it. That's because the computation hasn't actually happened yet.
- Even though Spark simplifies chaining lots of transformations together, it's good practice to use actions to observe the intermediate RDD objects between those transformations. 
- This will let you know whether your transformations are working the way you expect them to.

**Count()**
- The count() method returns the number of elements in an RDD. 
- count() is useful when we want to make sure the result of a transformation contains the right number of elements. 
- For example, if we know there should be an element in the resulting RDD for every element in the initial RDD, we can compare the counts of both to ensure they match.

In [43]:
hamlet_spoken_lines.count()

381

**Collect()**
- We've used take() to preview the first few elements of an RDD, similar to the way we've use head() in pandas. 
- But what about returning all of the elements in a collection? We need to do this to write an RDD to a CSV, for example. 
- It's also useful for running some basic Python code over a collection without going through PySpark.

Running .collect() on an RDD returns a list representation of it. To get a list of all the elements in hamlet_spoken_lines, for example, we would write:

In [45]:
#hamlet_spoken_lines.collect()

- Compute the number of elements in hamlet_spoken_lines, and assign the result to the variable named spoken_count.
- Grab the 101st element in hamlet_spoken_lines (which has the list index 100), and assign that list to spoken_101.

In [47]:
spoken_count = 0
spoken_101 = list()

spoken_count = hamlet_spoken_lines.count()
spoken_collect = hamlet_spoken_lines.collect()
spoken_101 = spoken_collect[101]
spoken_101

['hamlet@58626', 'HAMLET', "Why, then, 'tis none to you; for there is nothing"]