# Index
    - Setup to run pyspark on cloudxlab's juypter notebook
    - Test Program
    - Run Program using spark-submit
    - Looking in Program 1 (ratings-counter.py)
        - textFile() 
        - map() # Transformation
        - countByValue() # Action
    - Key/Value RDD
        - reduceByKey()
        - groupByKey()
        - keys()
        - values()
        - join
        - leftOuterJoin
        - rightOuterJoin
        - cogroup
        - subtractByKey
        - mapValues()
        - reduceByKey() # Action
        - collect() # Action
        - Looking in Program 2 (friends-by-age.py)
    - Filtering RDD
        - filter()
        - Looking in Program 3 (min-temperatures.py)
    - Flatten an RDD
        - flatMap()
        - flatMapValues()
        - Looking in Program 3 (word-count.py)
    - Regular Expression (Not Spark related but useful to know)
        - Looking in Program 4 (word-count-better.py)
    - Sorting RDD
        - sortByKey()
        - Looking in Program 4 (word-count-better-sorted.py)
        

## Setup to run pyspark on cloudxlab's juypter notebook

In [4]:
# Check the below link to run pyspark on cloudxlab's jupyter notebook and different versions of PySpark 
# https://cloudxlab.com/blog/running-pyspark-jupyter-notebook/

In [2]:
# Here i'm following the above link to run PySpark version 2.4.x

import os
import sys
 
os.environ["SPARK_HOME"] = "/usr/spark2.4.3"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [3]:
# Initialize the entry points of Spark: SparkContext and SparkConf

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("appName")
sc = SparkContext(conf=conf)

## Test Program

In [5]:
# Testing the setup

rdd = sc.textFile("/data/mr/wordcount/input/")
print(rdd.take(10))
print(sc.version)

['The Project Gutenberg EBook of The Adventures of Sherlock Holmes', 'by Sir Arthur Conan Doyle', '(#15 in our series by Sir Arthur Conan Doyle)', '', 'Copyright laws are changing all over the world. Be sure to check the', 'copyright laws for your country before downloading or redistributing', 'this or any other Project Gutenberg eBook.', '', 'This header should be the first thing seen when viewing this Project', 'Gutenberg file.  Please do not remove it.  Do not change or edit the']
2.4.3


In [6]:
# I have a directory named UdemySparkCourse having all the files for the course 
# "Taming Big Data with Apache Spark and Python - Hands On!" from Udemy

## Run Program using spark-submit

In [8]:
# 1st program is ratings-counter.py
# We run the code using the below steps
# 1. cd UdemySparkCourse # Directory having the .py file
# 2. spark-submit ratings-counter.py

## Looking in the Program

In [None]:
# Contents of ratings-counter.py
# Note: The below code will throw error when running in jupyter notebook cause jupyter notebook has its own SparkContent

from pyspark import SparkConf, SparkContext 
# Import SparkContext to create SparkContext and Import SparkConf to configure SparkContext (This will be in every spark script)
import collections # Its just python thing

conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
# setMaster("local") implies running in local Machine, there are other configs to run on cluster or to use all cores of your system
# setAppName("RatingsHistogram") is good practise to see the log for our application in web UI
sc = SparkContext(conf = conf)
# Intialize the sc

lines = sc.textFile("file:///home/manojtiwari11v6174/UdemySparkCourse/ml-100k/u.data") # Create an RDD
ratings = lines.map(lambda x: x.split()[2]) # split each line with " " and take the 3rd value and assign to ratings
result = ratings.countByValue() # creates a tuple having each unique entry with the number of times is occurs

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
    print("%s %i" % (key, value))

## Key/Value RDD

### Looking in Program 2

In [None]:
# Contents of ratings-counter.py
# Note: The below code will throw error when running in jupyter notebook cause jupyter notebook has its own SparkContent

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

lines = sc.textFile("file:///SparkCourse/fakefriends.csv")
rdd = lines.map(parseLine)
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
results = averagesByAge.collect()
for result in results:
    print(result)
    
# Then run the program using the spark-submit command like above

## Filtering RDD

### Looking in Program 3

In [None]:
# Contents of ratings-counter.py
# Note: The below code will throw error when running in jupyter notebook cause jupyter notebook has its own SparkContent

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile("file:///home/manojtiwari11v6174/UdemySparkCourse/1800.csv")
parsedLines = lines.map(parseLine)
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
results = minTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))
    
# Then run the code using spark-submit as above

## Flatten an RDD

### Looking in Program 3 (word-count.py)

In [None]:
# Contents of ratings-counter.py
# Note: The below code will throw error when running in jupyter notebook cause jupyter notebook has its own SparkContent

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("file:///home/manojtiwari11v6174/UdemySparkCourse/Book.txt")
words = input.flatMap(lambda x: x.split())
wordCounts = words.countByValue()

for word, count in wordCounts.items():
    cleanWord = word.encode('ascii', 'ignore')
    if (cleanWord):
        print(cleanWord.decode() + " " + str(count))
        
# Then run the code using spark-submit as above

## Regular Expression (Not Spark related but useful to know)

### Looking in Program 4 (word-count-better.py)

In [None]:
# Contents of ratings-counter.py
# Note: The below code will throw error when running in jupyter notebook cause jupyter notebook has its own SparkContent

import re
from pyspark import SparkConf, SparkContext

def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("file:///home/manojtiwari11v6174/UdemySparkCourse/Book.txt")
words = input.flatMap(normalizeWords)
wordCounts = words.countByValue()

for word, count in wordCounts.items():
    cleanWord = word.encode('ascii', 'ignore')
    if (cleanWord):
        print(cleanWord.decode() + " " + str(count))

# Then run the code using spark-submit as above

## Sorting RDD

### Looking in Program 4 (word-count-better-sorted.py)

In [None]:
# Contents of ratings-counter.py
# Note: The below code will throw error when running in jupyter notebook cause jupyter notebook has its own SparkContent

import re
from pyspark import SparkConf, SparkContext

def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("file:///home/manojtiwari11v6174/UdemySparkCourse/Book.txt")
words = input.flatMap(normalizeWords)

wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey()
results = wordCountsSorted.collect()

for result in results:
    count = str(result[0])
    word = result[1].encode('ascii', 'ignore')
    if (word):
        print(word.decode() + ":\t\t" + count)
        
# Then run the code using spark-submit as above