# Spark vs. Non-Spark comparison

## Basic imports

In [4]:
import findspark
import pyspark
import random
from math import sqrt
import timeit

## Init Spark

In [5]:
findspark.init()

## Comparison 1: approximate Pi

### Spark function to approximate Pi

In [None]:
def calc_pi_spark(num_samples,sc):    
    def inside(p):     
      x, y = random.random(), random.random()
      return x*x + y*y < 1
    count = sc.parallelize(range(0, num_samples)).filter(inside).count()
    pi = 4 * count / num_samples

### Regular function to approximate Pi

In [None]:
def calc_pi_other(n):
    inside=0
    for i in range(0,n):
        x=random.random()
        y=random.random()
        if sqrt(x*x+y*y)<=1:
            inside+=1
    pi=4*inside/n

### Call, execute and log the results and the time

In [None]:
sc = pyspark.SparkContext(appName="Pi")

samples = [10000000*(x) for x in range(1,50)]
results_spark = []
results_other = []
time_spark = []
time_other = []

for sample in samples:
    start_time = timeit.default_timer()
    results_spark.append(calc_pi_spark(sample, sc))
    time_spark.append(timeit.default_timer() - start_time)
    print("spark calculated for " + str(sample) + " samples")
    
    
    start_time = timeit.default_timer()
    results_other.append(calc_pi_other(sample))
    time_other.append(timeit.default_timer() - start_time)   
    print("other calculated for " + str(sample) + " samples")

sc.stop()

### Show the results

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

plt.plot(samples, time_other)
plt.plot(samples,time_spark)
plt.xscale('log')
plt.yscale('log')
plt.show()

## Comparison 2: word count on James Joyce's Ulysses

### Spark function

In [7]:
import sys
from pyspark import SparkContext
sc = SparkContext(appName="WordCountExample", master='local[4]')
lines = sc.textFile(r"C:\Users\dehaeth\Documents\GitHub\datascience\big data tools\spark\text2.txt")
#lines.saveAsHadoopFile("hdfs://test/parent/child", 'org.apache.hadoop.mapred.TextOutputFormat')
counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(lambda x,y:x+y)
output = counts.collect()
#for (word, count) in output:
#    print("%s: %i" % (word, count))
print('done')
sc.stop()


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsHadoopFile.
: org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
	at org.apache.spark.api.python.SerDeUtil$.pythonToPairRDD(SerDeUtil.scala:238)
	at org.apache.spark.api.python.PythonRDD$.saveAsHadoopFile(PythonRDD.scala:797)
	at org.apache.spark.api.python.PythonRDD.saveAsHadoopFile(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Unknown Source)


### Regular function

In [None]:
from collections import Counter
with open(r"C:\Users\dehaeth\Documents\GitHub\datascience\big data tools\spark\text2.txt", "r", encoding="utf-8-sig") as file:
    wordcount = Counter(file.read().split())
    #for item in wordcount.items(): print("{}\t{}".format(*item))
print('done')

### Third option: classic counter

In [None]:
with open(r"C:\Users\dehaeth\Documents\GitHub\datascience\big data tools\spark\text2.txt", "r", encoding="utf-8-sig") as wordstring:
    words = {}
    for word in wordstring.read().split():
        if word in words:
            words[word]+=1
        else:
            words[word]=1
print('done')