# NAME: Grey Harris
# ID: 97774899
# COURSE: Data301
# LAB: 1

## **Imports and setup**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

In [None]:
import pyspark, os
import math
from math import sqrt
from math import log
from random import seed
from random import randint
from pyspark import SparkConf, SparkContext
os.environ["PYSPARK_PYTHON"]="python3"
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64/"

#connects our python driver to a local Spark JVM running on the Google Colab server virtual machine
try:
  conf = SparkConf().setMaster("local[*]").set("spark.executor.memory", "1g")
  sc = SparkContext(conf = conf)
except ValueError:
  #it's ok if the server is already started
  pass


#also include this short helper function for use later in this lab
def dbg(x):
  """ A helper function to print debugging information on RDDs """
  if isinstance(x, pyspark.RDD):
    print([(t[0], list(t[1]) if 
            isinstance(t[1], pyspark.resultiterable.ResultIterable) else t[1])
           if isinstance(t, tuple) else t
           for t in x.take(100)])
  else:
    print(x)


## Messing about


In [None]:
A = range(1000)
pA = sc.parallelize(A) 

#prints sum
print("#prints sum: dbg(sum(A))")
dbg(sum(A))

#prints sum but parrallised version 
print("#prints sum but parrallised version : dbg(pA.reduce(lambda a,b: a+b))")
dbg(pA.reduce(lambda a,b: a+b))

499500
499500


In [None]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

print(data)
print(distData)

value = distData.reduce(lambda a, b: a + b)
print(value)


[1, 2, 3, 4, 5]
ParallelCollectionRDD[14] at parallelize at PythonRDD.scala:195
15


In [None]:
nums = sc.parallelize([1,2,3,4,5,6])
results = nums.map(lambda x: x + 1) 
dbg(results)

alpha = sc.parallelize(['the', 'poop', 'derp'])
results_alpha = alpha.map(lambda x: True if x == 'the' else False) 
dbg(results_alpha)


[2, 3, 4, 5, 6, 7]
[True, False, False]


# New Section

In [None]:
pB = sc.parallelize(range(1000))
pBsquares = pB.map(lambda b: sqrt(b))
pBresult = pBsquares.reduce(lambda a, b: a + b)

print(pBresult)


21065.833110879044


In [None]:
# Let us quickly review some basic transformations availble within Spark. Lets create a smaller list of numbers to play with. 

nums = sc.parallelize([1,2,3,4,5]) 

# retain elements passing a predicate 
evens = nums.filter(lambda x: x%2 == 0) 
print("evens")
dbg(evens)

# map each element to zero or more others 
x = nums.map(lambda x: range(x)) 
print("x map")
dbg(x)

# map each element to zero or more others 
x = nums.flatMap(lambda x: range(x)) 
print("x")
dbg(x)

# retrieve RDD contents as a local collection 
print("dbg(x.collect())")
dbg(x.collect())

# return first 2 elements
print("dbg(evens.take(2))")
dbg(evens.take(2))

# count number of elements 
print("dbg(nums.count())")
dbg(nums.count())


evens
[2, 4]


In [None]:
#Due to the ease and performance of distributed hashing, 
#the original form MapReduce concept defaults to using (key,value) pairs as the data representation. 
#In python we use tuples to represent these

pair = ('a','b') 
print(pair[0], pair[1])

#Let’s consider a quick example
pets = sc.parallelize([('cat',1), ('dog',3), ('cat',2),('dog',1),('hamster',1)]) 
print("pets start")
dbg(pets.reduceByKey(lambda x,y: x+y))
dbg(pets.groupByKey())
dbg(pets.sortByKey())
print("pets end")


[('cat', 1), ('dog', 3), ('cat', 2), ('dog', 1), ('hamster', 1)]
[('cat', 1), ('cat', 2), ('dog', 3), ('dog', 1), ('hamster', 1)]


In [None]:
#Let’s try for a more complex example, word count and working with files. 
#First use a shell command to download the text of Peter Pan from the Guttenberg project
!wget -q -O peterpan.txt https://www.gutenberg.org/files/16/16-0.txt
# load the file into a distributed dataset of lines
file = sc.textFile("peterpan.txt")
# split each line into (word, 1) tuples
words = file.flatMap(lambda line: [(word.lower(), 1) for word in line.split(" ")])
# reduce by key (the word) the counts and sort descending
counts = words.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], False) 
dbg(counts.collect())




In [None]:
#def word_counts(filename):
#    file = sc.textFile(filename)
#    words = file.flatMap(lambda line: [(word.lower(), 1) for word in line.split(" ")])
#    counts = words.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], False) 
#    #return counts.collect()
#    return counts

#panCounts = word_counts("peterpan.txt")
#sherlockCounts = word_counts("sherlockholmes.txt")
#aliceCounts = word_counts("aliceinwonderland.txt")
#huckCounts = word_counts("huckleberryfinn.txt")

In [None]:
#dbg(word_counts("peterpan.txt").collect())
#dbg(word_counts("sherlockholmes.txt").collect())
#dbg(word_counts("aliceinwonderland.txt").collect())
#dbg(word_counts("huckleberryfinn.txt").collect())
#dbg(computeTFij(panCounts))
#dbg(computeTFij(sherlockCounts))
#dbg(computeTFij(aliceCounts))
#dbg(computeTFij(huckCounts))

## **Problem 1: map [10 points]**

**Sequential code**

In [None]:
numbers = range(1000)

def sequential_roots(number_range):
  """Takes a list of numbers and returns the sum of their individual roots"""
  result = 0
  for number in number_range:
    result += sqrt(number)
  return result

print(sequential_roots(numbers))

21065.833110879048


**Parallel version**

In [None]:
numbers = range(1000)

def parallel_roots(number_range):
  """Takes a list of numbers and returns the sum of individual roots (parallel computation)"""
  pB = sc.parallelize(number_range)
  pBsquares = pB.map(lambda b: sqrt(b))
  pBresult = pBsquares.reduce(lambda a, b: a + b)
  return pBresult

print(parallel_roots(numbers))

21065.833110879044


## **Problem 2 [15 points]:**

**Generate array of 5 int's between 1-6**

In [None]:
def rand_gen(length=5):
  """generates an array of length=5 of random numbers between 1-6 and returns them as a parrallel list"""
  seed()
  array = []
  for _ in range(length):
    array.append(randint(1,6))
  return sc.parallelize(array)

**Function that consumes parallel array of numbers and returns an array with that many copies of each value**

In [None]:
def repeat_nums(numList):
  """returns a composite array with the value of sub array repeated"""
  return numList.flatMap(lambda x: [x] * x)

dbg(repeat_nums(sc.parallelize([1, 3, 4, 2, 1])))
#> [1, 3, 3, 3, 4, 4, 4, 4, 2, 2, 1]

dbg(repeat_nums(sc.parallelize([5, 2, 6, 3, 1])))
#> [5, 5, 5, 5, 5, 2, 2, 6, 6, 6, 6, 6, 6, 3, 3, 3, 1]

dbg(repeat_nums(rand_gen()))
#> generates using random numbers

[1, 3, 3, 3, 4, 4, 4, 4, 2, 2, 1]
[5, 5, 5, 5, 5, 2, 2, 6, 6, 6, 6, 6, 6, 3, 3, 3, 1]
[1, 3, 3, 3, 3, 3, 3, 5, 5, 5, 5, 5, 4, 4, 4, 4]


## **Problem 3 [50 points total]:**

In [None]:
# load the files
!wget -q -O peterpan.txt https://www.gutenberg.org/files/16/16-0.txt
!wget -q -O sherlockholmes.txt http://www.gutenberg.org/files/1661/1661-0.txt
!wget -q -O aliceinwonderland.txt http://www.gutenberg.org/files/11/11-0.txt
!wget -q -O huckleberryfinn.txt http://www.gutenberg.org/files/76/76-0.txt

**3a. [10 points] Compute the TFij for each term i with j as Peter Pan**

In [None]:
# function to create a distributed data set from lines in a file
def word_counts(filename):
    file = sc.textFile(filename)
    words = file.flatMap(lambda line: [(word.lower(), 1) for word in line.split(" ")])
    counts = words.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], False)
    #counts = words.reduceByKey(lambda x,y: 1) 
    return counts

In [None]:
# function to compute a TFij
def computeTFij(scObject):
    max_count = scObject.take(1)[0][1]
    TFji = scObject.map(lambda x: (x[0], x[1]/max_count))
    return TFji

In [None]:
#load peter pan into RDD
pan = word_counts("peterpan.txt")
pan_TFij = computeTFij(pan)

# show still distributed data set
print(type(pan))

# print the values of RDD
dbg(pan.take(5))

# show TFji for pan
dbg(pan_TFij.take(5))

<class 'pyspark.rdd.PipelinedRDD'>
[('the', 2511), ('', 2259), ('and', 1425), ('to', 1241), ('he', 1029)]
[('the', 1.0), ('', 0.899641577060932), ('and', 0.5675029868578255), ('to', 0.4942254082039028), ('he', 0.40979689366786143)]


**3b. [5 points] Load the following 3 documents into RDD**

In [None]:
# load other docs into RDD's (I chose RDD counting unique words as it helps with later questions)

def unique_words(filename):
    """Takes text file and creates RDD with a node for each unique word in file"""
    file = sc.textFile(filename)
    words = file.flatMap(lambda line: [(word.lower(), 1) for word in line.split(" ")])
    counts = words.reduceByKey(lambda x,y: 1) 
    return counts

huck_unique = unique_words("huckleberryfinn.txt")
sherlock_unique = unique_words("sherlockholmes.txt")
alice_unique = unique_words("aliceinwonderland.txt")
pan_unique = unique_words("peterpan.txt")

# show each is now RDD, also show their TFji's
print(type(alice_unique))
dbg(alice_unique.take(5))

<class 'pyspark.rdd.PipelinedRDD'>
[('project', 1), ('gutenberg', 1), ('ebook', 1), ('of', 1), ('adventures', 1)]


**3c. [15 points] Compute IDFi for each term i over all N=4 documents in our RDDs**

In [None]:
# load them all into one
def glue_RDD_list(RDD_list):
  """returns a new RDD that contains all entered rdds with sum of unique keys found in rdd's"""
  return sc.union(RDD_list).reduceByKey(lambda x,y:x+y)

def compute_IDFi(RDD_list_glued, n=4):
  """N = number of books, Ni = number of times word is present over all docs"""
  IDFi = (RDD_list_glued.map(lambda word: (word[0], log((n/word[1]), 2))).sortBy(lambda x: -x[1]))
  return IDFi

# LIST OF 4 BOOKS UNIQUE WORD RDD'S
RDD_list = [huck_unique, sherlock_unique, alice_unique, pan_unique]

# GLUE THE RDD'S COUNTING EACH BOOK WITH UNIQUE WORD OCCURANCE
RDD_list_glued = glue_RDD_list(RDD_list)

# PRINT GLUED RDD LIST
dbg(RDD_list_glued)

# COMPUTE THE IDFi's
IDFi = compute_IDFi(RDD_list_glued)

# PRINT TO SHOW IT WORKS
dbg(type(IDFi))
dbg(IDFi)

[('', 4), ('project', 4), ('gutenberg', 4), ('ebook', 4), ('of', 4), ('adventures', 4), ('huckleberry', 1), ('finn,', 1), ('mark', 4), ('twain', 2), ('clemens)', 1), ('this', 4), ('is', 4), ('use', 4), ('anyone', 4), ('anywhere', 4), ('at', 4), ('no', 4), ('restrictions', 4), ('whatsoever.', 4), ('may', 4), ('it,', 4), ('give', 4), ('away', 4), ('re-use', 4), ('online', 4), ('author:', 4), ('date:', 4), ('20,', 2), ('last', 4), ('february', 3), ('23,', 1), ('language:', 4), ('set', 4), ('utf-8', 4), ('***', 4), ('start', 4), ('produced', 4), ('widger', 2), ('(tom', 1), ("sawyer's", 1), ('chapter', 3), ('civilizing', 1), ('sawyer', 1), ('ii.', 3), ('boys', 3), ('escape', 4), ('iii.', 3), ('good', 4), ('going-over.--grace', 1), ('tom', 2), ("sawyers's", 1), ('lies”.', 1), ('iv.', 3), ('huck', 1), ('v.', 4), ("huck's", 1), ('fond', 4), ('vi.', 3), ('he', 4), ('decided', 4), ('leave.--political', 1), ('economy.--thrashing', 1), ('vii.', 3), ('him.--locked', 1), ('in', 4), ('viii.', 3), ('s

In [None]:
a = sc.parallelize(["a1", "a2", "a3"])
b = sc.parallelize(["b1", "b2", "b3"])

union_file = sc.union([a, b]).reduceByKey(lambda x, y : x + y)
dbg(union_file)

[('b', '123'), ('a', '123')]


**3d. [20 points] Compute TF.IDF score for each term in Peter Pan and collect/display the top 100 terms by TF.IDF score**

In [None]:
def computeTF_IDF(RDD, IDFi_RDD):
  """Takes RDD TFji and compares with IDFi to return TFIDF"""
  TF_IDF = RDD.join(IDFi_RDD)
  return TF_IDF.map(lambda word: (word[0], word[1][0] * word[1][1]))

# PRINT THE INPUTS FOR SANITY CHECK
dbg(pan_TFij.take(5))
dbg(IDFi.take(5))

# COMPUTE AND PRINT FINAL ANSWER
dbg(computeTF_IDF(pan_TFij, IDFi).sortBy(lambda x: -x[1]).collect())

[('the', 1.0), ('', 0.899641577060932), ('and', 0.5675029868578255), ('to', 0.4942254082039028), ('he', 0.40979689366786143)]
[('huckleberry', 2.0), ('finn,', 2.0), ('clemens)', 2.0), ('23,', 2.0), ('(tom', 2.0)]


In [None]:
a = sc.parallelize([("a", 1), ("b", 3)])
b = sc.parallelize([("a", 2), ("b", 4)])
joined = a.join(b)
dbg(joined)

[('b', (3, 4)), ('a', (1, 2))]


**Overall [15 points] for correctly using Spark function**

I think I deserve all 15 if that helps