## SparkSession

In order to work with Spark, we have to first set up a `SparkSession`.

From this point forward, we can interact with Apache Spark using this `spark` object.

In [None]:
from pyspark.sql import SparkSession

In [None]:
# sc = SparkContext("local[*]","PySpark Word Count Example")
spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()
print(spark)

Let's break down this code snippet a bit further.
In order to work with Spark, we have to set up a Spark Application which we wish to name `HelloWorldApp`.

To do this:
- We initiated a `SparkSession` using the `.builder` method.
- We used `.appName` to tell Spark to name our Application `PythonWordCount`. 
- We used `.getOrCreate()` to tell Spark to create the Application if it does not exist yet, or reconnect to the existing app with the given name should it exist already.
- Finally, the reference to this Spark application is stored in an object we named `spark`

*__Note__ that without a SparkSession, it is not possible to access and use Spark.
More information about SparkSession can be found [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession)*

In [None]:
from pathlib import Path, PurePath
dataset_path=Path().resolve().parent / 'data-sets/20NewshroupDataSet/20_newsgroup/alt.atheism'
# dataset_path=Path().resolve().parent / 'README.md'
print(str(dataset_path.resolve()))

In [None]:
def lower_clean_str(x):
  punc='!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
  lowercased_str = x.lower()
  for ch in punc:
    lowercased_str = lowercased_str.replace(ch, ' ')
  return lowercased_str

In [None]:
# words = sc.textFile(str(dataset_path.resolve())+"/*/*").map(lower_clean_str)
lines = spark.read.text(str(dataset_path.resolve())+"/*").rdd.map(lambda r: r[0])

In [None]:
from operator import add
clean_lines = lines.map(lower_clean_str)
words = clean_lines.flatMap(lambda x: x.split(' '))
counts_clean = words.map(lambda x: (x, 1)).reduceByKey(add)


In [None]:
# Other 
counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)

In [None]:
threshold = lambda t: lambda v: v > t

In [None]:
print(threshold(10)(20))
print(threshold(20)(10))

In [None]:
count_filtered = counts_clean.filter(lambda couple: threshold(50)(couple[1]))

In [None]:
output = count_filtered.collect()
for (word, count) in output:
        print("%s: %i" % (word, count))

In [None]:
output = counts_clean.collect()
for (word, count) in output:
        if threshold(count ,50):
            print("%s: %i" % (word, count))


In [None]:
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b)

In [None]:
wordCounts.foreach(print)

In [None]:
spark.stop()