# Problem Set 3: Spark
### Instructions: Upload the file to databrcks community edition. Fill the code and text in the designated places and submit a filled .ipynb in moodle by 10/6/2021
### There are 10 questions overall. Each question is worth 10 points. The questions vary significantly in length and difficulty. 
### You should use apache spark commands, in addition to python code where needed
### The file contains small functions and other lines of code that will help you solve the exercise. You may copy and/or modify them as you wish
### Good luck

###  Part 1): Finding Prime Numbers with Spark

In [0]:
# Class used for timing your commands
import time as t
class Timer(object):
    def __init__(self, name=None):
        self.name = name

    def __enter__(self):
        self.tstart = time.time()

    def __exit__(self, type, value, traceback):
        if self.name:
            print('[%s]' % self.name,)
        print('Elapsed: %s' % (time.time() - self.tstart))


**Question 1:** Write a **python** function that given a natural number n returns True/False if the number is prime or not by checking its factors (divisors) up to a sqrt(n). <br>
Test the function on the first 100 natural numbers. <br>
What is the O(f(n)) complexity of this algorithm for finding all primes up to n as a function of n? (assume that arithmatic operations take O(1) regardless of the size of the numbers)

In [0]:
# SOLUTION
 
def check_prime(n):
   if n > 3:
      for i in range(2,round(n**0.5)+1):
          if (n % i) == 0:
            return False
      return True
   else:
      return True
x=[0]*100
for i in range(0,100):
  x[i]=check_prime(i)
print(x)
  



**SOLUTION <br>**
The run-time is O(sqrt(n)) for any given n.

**Question 2:** Create a range object of various sizes starting at 10 up to 10 millions (by a factor of 10, i.e. 10 ,100, 1000 .. etc up to 10 millions) and test your function in (a) in two ways: <br> 
- First, use native python for the range <br>
- Second, use a Spark RDD for the range <br>
Compare and print the running times (in seconds, rounded to 2 decimal points) of each methodology (Spark vs. python) for each `n`. <br>
For what values of `n` is the RDD is better (we call this order the `breaking point order`) ? Why do you think spark/python is faster before/after? <br>
*Note:* The answer may vary from run to run and depending on the number of cores available when running using Spark

In [0]:
# SOLUTION
# PYTHON IMPLEMENTATION:
#t=Timer("t")

for j in range(1,8):
  a = t.time()
  for i in range(1,10**j):
    check_prime(i)
  print(10**j)
  print(round((t.time() - a),2))
  print("Python")
# SPARK SCALA PARALLEL RANGE:
for l in range(1,8):
  a =t.time()
  interval = sc.parallelize(list(range(1,10**l)))
  clock = interval.map(lambda x: check_prime(x))
  print(round((t.time() - a),2))
  print(10**l)
  print("spark") 

**SOLUTION** <br>
After examining the results we can see that for small numbers(<100,000) the non-parallel computation is faster. For values larger then 100k we can see a large improvement by using spark. For the largest number that we checked there was a difference of the running time X90. We can assume that for small numbers the function of map-reduction takes longer then the Python method. However for large numbers, the advantage of parallel computing all the numbers at the same time is significantly better, even with taking to account the map-reduce time.

**Question 3:** When splitting the spark range array into different nodes, each nodes will get a consecutive sub-range in standard spark implementations. <br> 
Therefore, it is possible that different nodes will get ranges of numbers of different difficulty (for example, one node may get small, easier numbers, and another may get large, harder numbers). <br>
This may cause imbalance between the workload of the nodes and slow down the overall computation. 
How would you change the Spark implementation such that nodes faster (without changing the function testing primality)? <br>
Implementing the change in Spark RDD, repeat the computation for the same values of `n` as in the previous question and compare the computation times. <br>
Note that because we are using the free-tier of databricks, the available cluster only has 1 node, so the actual gain in running time might not be aparent.

**SOLUTION** <br>
To make the run quicker we would distribute the nodes in a way that the computation time for each nod will be approximately the same. We will implement a random distribution for sending each nod with random numbers.

In [0]:
# SOLUTION
import time as t
import random as r
for l in range(1,8):
  a =t.time()
  interval = sc.parallelize(list(range(1,10**l)))
  clock = interval.map(lambda x: check_prime(r.random(x)))
  print(round((t.time() - a),2))
  print(10**l)
  print("spark")



**Question 4:** Write a **python** function that implements Eratosthenes Sieve: given a natural integer `n`, finds all the primes up to that number by iterating over all the numbers larger than `1` in increasing order, and for each number (say i), crossing out all multiples of `i`. <br>
By doing so for all the numbers between 2 and `sqrt(n)`, we will end up with only prime numbers. <br>
Test your function using the same values of `n` from the previous questions with native  python and report the running time in seconds as previously done. What is the `O()` complexity of finding all primes up to `n` as a function of `n`?  how does it compare to the previous method?

In [0]:
def Eratosthenes(n):
  p = [True for i in range(n + 1)]
  x = 2
  p[0]= False
  p[1]= False 
  while (x**2<= n): 
    if (p[x] == True):
      for i in range(x * 2, n + 1, x):
        p[i] = False
    x+= 1
  return(p) 


# RUN AND RECORD TIME
for i in range(1,8):
  a = t.time()
  Eratosthenes(10**i)
  print(round((t.time() - a),2))
  print(10**i)


*SOLUTION* <BR>
The time complexity for the algorithm is O(n*log(log(n))). As we can see in the results  This method is much faster then the Python run-time and faster then even the spark.

**Question 5:** Can you implement the algorithm in Question 4 in a parallel implementation using Spark? If not, explain why, if yes, please do so and run for the same values of `n` as in the previous question

*SOLUTION* <br>
This algorithm can not be implemented using spark. The reason is thatspark uses paralel computation i.e the data is seperated between different nods, in way we do not controll or know. Since the algorithm is built in way that it starts by deleting all the multiplication of the first primal number that it finds. This would cause many number not to be deleted from the chart. For instance if the number are 50 to 100 all the number in this list will not be deleted besides 100. And same goes for all large numbers.

###  Part 2): Words Count with Spark

In [0]:
# Reading the book "war and peace"
dbutils.fs.ls("dbfs:/FileStore/shared_uploads/akiva.finkelstein@mail.huji.ac.il/war.txt") # Change to your path
import re # Regular expressions
# Load the "war and peace" novel into RDD
b = sc.textFile('/FileStore/shared_uploads/akiva.finkelstein@mail.huji.ac.il/war.txt')

# A useful function for remiving any non-words and splitting lines into separate words
def splitter(line):
    line = re.sub(r'^\W+|\W+$', '', line)
    return (re.split(r'\W+', line))

b.take(10) # show first 10 lines

**Question 1:** Upload the "war and peace" novel text file and change the path to match your account in the code above. <br> 
Count and print the total number of words and the number of lines in the file (any string separated by spaces is considered a word, even if it is a number, or another non-english-word string)

In [0]:
# SOLUTION
def splitter(line):
  line= re.sub(r'^\W+|\W+$', '',line)
  return(re.split(r'\W+',line))

b_split=b.flatMap(lambda x:splitter(x))
b_split.take(100)
print(b_split.count(), "Number of Words")


#count lines
b_lines=b.map(lambda x:splitter(x))
print(b_lines.count(), "Number of lines")



**Question 2:** Compute the number of times each word appears in the file. Ignore case (that is, for example `The` and `the` count as the same word). <br>
Print the 10 most frequent words, and the 10 longest words (together with their number of appearances for both)

In [0]:
# SOLUTION
from operator import add 
b_lower = b_split.map(lambda x: x.lower())
histogram = b_lower.map(lambda x:(x,1)).reduceByKey(add)
most_word = histogram.sortBy(lambda x : x[1], ascending = False).take(10)
longest_word = histogram.sortBy(lambda x : len(x[0]), ascending = False).take(10)
print(most_word)
print(longest_word)

**Question 3:** Compute the counts of consecutive **pairs** of words in the file. Ignore case. Ignore empty words. <br>
The order of words in the pair matters (that is, for example, the pair `she is` should be counted as a different pair form the paier `is she`). <br>
Print the 10 most frequent **pairs** of words together with their count

In [0]:
# SOLUTION
wordPairCount1 = b.map(lambda x: x.lower()).map(lambda line: line.split()).flatMap(lambda x: [((x[i], x[i + 1]), 1) for i in range(0, len(x) - 1)]).reduceByKey(lambda a,b:a + b)
wordPairCount1.sortBy(lambda x:x[1], ascending = False).take(10)


**Question 4:** Repeat the previous question, but this time count word pairs **unordered**. That is, occurances of `she is` and of `is she` should be counted as instances of the same pair. <br>
When printing the top pairs, the two words should be ordered lexicographically (e.g. `is she` for the above example pair)

In [0]:
# SOLUTION
wordPairCount2 = b.map(lambda x: x.lower()).map(lambda line: line.split()).flatMap(lambda x:[((sorted((x[i], x[i + 1]))[0],sorted((x[i], x[i + 1]))[1]),1) for i in range(0, len(x) - 1)]).reduceByKey(lambda a,b:a + b)
wordPairCount2.sortBy(lambda x:x[1], ascending = False).take(10)


**Question 5:** Get for each word the number of times it appears with the first letter being in upper/lower case, separately, such that each word will have two counts associated with it. <br>
For example, for the word `The` count seperately the occurances of `The` (and also, for example `THE`) and the occurances of `the` (and also `tTe`, `tHE` ..). <br>
Next, filter and keep only words appearing with the first letter being both uppercase and lowercase at least once in the file. Sort these words by their uppercase count / lowercase count ratio. <br>
Finally, print the 10 words with the **highest** ratio (together with the number of appearances in uppercase and lowercase), and similarly the 10 words with the **lowest** ratio. <br>
Are the results expected/surprising?

In [0]:
# SOLUTION
lower_words = b.flatMap(lambda line: splitter(line)).filter(lambda word: word != "").map(lambda word: word.lower() if word[0] == word[0].lower() else None)
lower_counts = lower_words.map(lambda word: (word, 1)).reduceByKey(add)
upper_words = b.flatMap(lambda line: splitter(line)).filter(lambda word: word != "").map(lambda word: word.lower() if word[0] == word[0].upper() else None)
upper_counts = upper_words.map(lambda word: (word, 1)).reduceByKey(add)


join_counts =  upper_counts.join(lower_counts).map(lambda x: (x,(x[1][0]/x[1][1])))
join_counts_h = join_counts.sortBy(lambda x: x[1],ascending= False)
join_counts_l = join_counts.sortBy(lambda x: x[1])

i = 1
print("The Words With the Highest Ratio:")
for word, count in join_counts_h.collect()[0:10]:
    print("{} ) {} : {} ".format(i, word, count))
    i += 1
    

i = 1
print("The Words With the Lowest Ratio:")
for word, count in join_counts_l.collect()[0:10]:
    print("{} ) {} : {} ".format(i, word, count))
    i += 1



**SOLUTION** <br>
The results are not surprising. In the large ratio list we have words that are usally written in upper case (e.g names). For instance it make sense that 'i' would be in upper case almost all the time, because of English syntax.  In a similar way, notice that the words in the list with low ratio, are rarely in the beginning of a sentence therefore they would almost always not be in upper case. for instance, the word "eyes" would not get an upper case, since it does not make sense to start a sentce with eyes. 
In addition the words in both casses are very common, which causses the raitio to be very high/low.