# Introduction to Data Science (CS4661). Cal State Univ. LA, CS Dept.
## Dr. Mo. Porhomayoun
----------------------------------------------------------------------------------------------



# Lab6: Spark Tutorial

#### Feel free to refer to the suggested resources, references, and documentaries for more details:
- http://spark.apache.org/docs/latest/rdd-programming-guide.html
- http://spark.apache.org/examples.html
- https://www.edx.org/course/big-data-analysis-apache-spark-uc-berkeleyx-cs110x

----------------------------------------------------------------------------------------------



### You first need to install PySpark to be able to use Spark in Python.
For MAC users just type one of the following in terminal:

```
brew install apache-spark
pip install pyspark
```

In [None]:
# importing pyspark and defining a spark context:

import pyspark

sc = pyspark.SparkContext()

### Create a Parallelized RDD:

In [None]:
wordsList = ['dog', 'cat', 'tiger','elephant', 'pig', 'pig', 'cat']

n_partitions = 3 # number of partitions in parallel processing

wordsRDD = sc.parallelize(wordsList, n_partitions) # Lazy!

print(type(wordsRDD))
print(wordsRDD)


In [None]:
# accessing the elements of the RDD:

# first element:
print(wordsRDD.first())

# first 3 items:
print(wordsRDD.take(3))

# all items:
print(wordsRDD.collect())

In [None]:
# defining an arbitrary function for map:
def makePlural(word):
    return word + 's'

print(makePlural('cat'))

In [None]:
wordsList = ['dog', 'cat', 'tiger','elephant', 'pig', 'pig', 'cat', 'cat']

n_partitions = 3 # number of partitions in parallel processing

wordsRDD = sc.parallelize(wordsList, n_partitions)

# MAP: each machine applies the function on a chunck of data in parallel:
pluralRDD = wordsRDD.map(makePlural)    

print(pluralRDD)

Spark is Lazy! It has done nothing so far. It just remember the transformations for each chunck of data. The transformations are only computed when an action requires the results.

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. 
For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program  (Ref: Spark Programming Guide)

In [None]:
# this is an action that requests for final results
pluralRDD.collect()

#### Reviewing lambda:

In [None]:
# Reviewing lambda:

# Example1:
# defining a function that gets an input, and calculate and return squared value using lambda
mySqr = lambda x: x**2
y = mySqr(2)
print(y)

# Example2:
# defining a function that gets two input, and calculate and return the summation of squared values using lambda
mySqrSum = lambda x1,x2: x1**2 + x2**2 
y = mySqrSum(2,4)
print(y)


## Map-Reduce for Word Count

### 1- MAP:

In [None]:
wordsList = ['dog', 'cat', 'tiger','elephant', 'pig', 'pig', 'cat', 'cat']

wordsRDD = sc.parallelize(wordsList, 4)

# map with lambda:
keyValueList = wordsRDD.map(lambda w: (w, 1))

print(keyValueList.collect())

### 2- REDUCE:

In [None]:
# keyValueList comes from Map Stage:

wordCounts = (keyValueList
                .reduceByKey(lambda x,y: x+y)
                .collect())

print(wordCounts)

### Summary  of map-reduce for word count:

In [None]:
wordsList = ['dog', 'cat', 'tiger','elephant', 'pig', 'pig', 'cat', 'cat']

wordsRDD = sc.parallelize(wordsList, 4)

wordCounts = (wordsRDD
                .map(lambda w: (w, 1))
                .reduceByKey(lambda x,y: x+y)
                .collect())

print(wordCounts)

## More on Map-Reduce

In [None]:
# textFlie treats a txt file as a list of lines:

hamletRdd = sc.textFile("/Users/mpourho/Documents/CSU/Courses/CS4661/Datasets/hamlet.txt",4)

print(hamletRdd) # Lazy!

In [None]:
# List of the first 50 lines:

print(hamletRdd.take(50)) # Action!

In [None]:
# It returns the first 50 words of the hamlet:

hamletwords = hamletRdd.flatMap(lambda line: line.split(" ")) # Lazy!

# Here we use flatMap rather than map to avoid list of lists!
                    
print(hamletwords.take(50)) # Action

In [None]:
# After converting the Hamlet into words usig parallel processing,
# we count the number of each word in another map-reduce level:

hamletwordcounts = hamletRdd.flatMap(lambda line: line.split(" ")) \
                            .map(lambda word: (word, 1)) \
                            .reduceByKey(lambda a, b: a + b) \
                            .collect()
            
hamletwordcounts

#### Let's make it a little more complex:

In [None]:
# list of stop words:

stopwords=[w.strip() for w in open(
    "/Users/mpourho/Documents/CSU/Courses/CS4661/Datasets/stopwords.txt").readlines()]

print(stopwords[1:10]) # let's see the first 10 stop words

In [None]:
# list of keywords (not in stopwords) sorted by frequency:

hamletwordcounts = (hamletRdd.flatMap(lambda line: line.split(" ")) # words
                    .map(lambda word: word.strip().lower()) # lower case
                    .filter(lambda word: word not in stopwords) # no stopwords
                    .map(lambda word: (word, 1))
                    .reduceByKey(lambda a, b: a + b)  
                    .takeOrdered(20, lambda x: -x[1]) # sort by value
                    )
                             
hamletwordcounts

In [None]:
# list of keywords (not in stopwords) sorted by frequency:

hamletwordcounts = (hamletRdd.flatMap(lambda line: line.split(" ")) # words
                    .map(lambda word: word.strip().lower()) # lower case
                    .filter(lambda word: word not in stopwords) # no stopwords
                    .filter(lambda word: word not in ['','ham.','hor.','king.','pol.']) # no name
                    .map(lambda word: (word, 1))
                    .reduceByKey(lambda a, b: a + b)  
                    .takeOrdered(20, lambda x: -x[1]) # sort by value
                    )
                             
hamletwordcounts

In [None]:
sc.stop()

In [None]:
# Reference for some of the examples: Dr. Rahul Dave, Harvard University.