# Introduction to Apache Spark
The first thing that we will have to do is to install pyspark.

In [1]:
# Install pyspark
! pip3 install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=0e4f83fbc61b6808d64508f5761f43e6764b83c00f2c39325c4f1c6dcda54ce9
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


## Defining master within pyspark
You can interact with Spark either by using SparkSession or SparkContext.

To use SparkSession you can use the following code:

<strong><span style="color:green">from</span></strong> pyspark.sql <strong><span style="color:green">import</span></strong> SparkSession

spark = <span style="color:blue">SparkSession</span>.<span style="color:blue">builder</span>.<span style="color:blue">appName</span>("SimpleApp").<span style="color:blue">master</span>("local").<span style="color:blue">getOrCreate</span>()

In [2]:
from pyspark import SparkContext, SparkConf

sconf = SparkConf().setAppName("App").setMaster("local")
sc = SparkContext(conf=sconf)

## RDD Example 1: Create a list that holds numbers from 1-5, and then print the array's elements that are smaller than 3.
In case of wanting to use SparkSession instead then you should use it like this:

distData = spark.<span style="color:blue">SparkContext</span>.<span style="color:blue">parallelize</span>(data)

In [3]:
# create a python list from 1-5
data = [1, 2, 3, 4, 5]

# create an RDD (parallelized - distributed collection) based on that data that can be operated on in parallel
# this is also equal to what I am going to refer to as "transform it into an rdd"
distData = sc.parallelize(data)

# use a filter transformation to select values less than 3 and then action collect RDD contents back to driver in result variable
# (File transformation returns a new dataset formed by selecting those elemtns of those source on which function returns true)
result = distData.filter(lambda s: s<3).collect() # creates a new RDD
print(result) # it should print [1, 2]


[1, 2]


### Equivalent Code in Simple Python

In [4]:
result = [x for x in data if x < 3]
print(result)

[1, 2]


## RDD Example 2: Word Count

<strong>.map() Transformation: </strong>
- Applies given function on <strong>on every element</strong> of the RDD & returns new RDD representing the results

<strong>.flatMap() Transformation: </strong>
- Same as map but instead of returning just one element per element <strong>returns a sequence per element</strong> (which can be empty) – flattens the results

<strong>.reduce() Action: </strong>
- Aggregates all elements of RDD using a given function and returns the final result to the driver program

You can download pg4300.txt from [here](https://raw.githubusercontent.com/hustlijian/hadoop-tutorial/master/pg4300.txt), and upload it to google colab as we learned last time.

In [5]:
file = sc.textFile("pg4300.txt")
words = file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
"""
saveAsTextFile saves the output into a text file in the same way hadoop does, meaning:
it will output a folder locally by the name we gave as input (in our case it's "output"),
and the result is inside the part-00000 file.
If you run this cell twice, you will get an error, since it cannot override the folder, so you need to delete
the files inside the folder, then the folder and run the cell again.
"""
words.saveAsTextFile("output") # output folder on local fs
print("Done!")

Done!


### Equivalent Code in Simple Python

In [6]:
with open("pg4300.txt") as f:
    lines = f.readlines()

words = []
for line in lines:
    line = line.strip()
    words.extend(line.split(" "))

word_counts = {}
for word in words:
    if word in word_counts:
        word_counts[word] += 1
    else:
        word_counts[word] = 1

with open("output.txt", "w") as f:
    for word, count in word_counts.items():
        f.write(f"('{word}', {count})\n")

print("Done!")

Done!


## Hands-On! :)
You can always visit this site to find pyspark RDD methods: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html
### Part 1
1. Create an RDD based on the "data" list that is provided below.
2. Count how many elements are in your RDD using the method .count()
3. Print the elements that the RDD holds using the method .collect()
4. Print the elements that the RDD holds using .foreach()
5. Do the same as the 4th point, but now you have to print at the end of each word an exlamation mark as well (!)

In [7]:
data = ["tutorial", "learning", "spark", "pyspark", "python", "pyspark and spark", "data", "science", "university", "of", "cyprus"]

In [8]:
# TODO 1: Create an RDD based on the "data" list that is provided above.
RDD = 

In [9]:
# TODO 2: Count how many elements are in your RDD using the method .count()

11

In [10]:
# TODO 3: Print the elements that the RDD holds using the method .collect()

['tutorial',
 'learning',
 'spark',
 'pyspark',
 'python',
 'pyspark and spark',
 'data',
 'science',
 'university',
 'of',
 'cyprus']

In [12]:
# TODO 4: Print the elements that the RDD holds using .foreach() inside a file named "step_4.txt" using the step_4_print_in_file() method
"""
Normally, it is not a very good practice to print elements through foreach since the output will be printed to the executor logs
(aka worker logs). So, in this case, if we want to view our data through foreach, we can save them inside a file instead.
"""
def step_4_print_in_file(x):
    with open("step_4.txt", "a") as f:
        f.write(x + "\n")

RDD.


In [13]:
# TODO 5: Do the same as the 4th point, but now you have to print at the end of each word an exlamation mark as well (!)
"""
The same applies for here as well.
"""


### Part 2
Now we shall specify a new RDD using the list that is below and implement the following:

1. Create a list that holds the numbers from 0-30 and transform it into an RDD.
2. Multiply each element *3 and then print all elements (tip: use map())
3. Do the 2nd point and then filter out (remove) the numbers that are equal and less than 30.

In [14]:
# TODO 1: Create a list that holds the numbers from 0-30 and transform it into an RDD.
data2 = 
rdd2 = 

In [15]:
# TODO 2: Multiply each element *3 and then print all elements (tip: use map())
rdd2.

[0,
 3,
 6,
 9,
 12,
 15,
 18,
 21,
 24,
 27,
 30,
 33,
 36,
 39,
 42,
 45,
 48,
 51,
 54,
 57,
 60,
 63,
 66,
 69,
 72,
 75,
 78,
 81,
 84,
 87]

In [16]:
# TODO 3: Do the 2nd point and then filter out (remove) the numbers that are equal and less than 30.
rdd2.

[30,
 33,
 36,
 39,
 42,
 45,
 48,
 51,
 54,
 57,
 60,
 63,
 66,
 69,
 72,
 75,
 78,
 81,
 84,
 87]

### Part 3
1. Create an RDD based using the randomized data2.
2. Print the contents of the RDD
3. Print only the first element of the RDD
4. Print the RDD's max element
5. Print the RDD's min element
6. Print only the first 10 elements of the RDD.

In [17]:
# Firstly, let's randomize the elements that the list you created in the previous part contains.
import random
random.shuffle(data2)

In [18]:
# TODO 1: Create an RDD based using the randomized data2.
rdd3 = 

In [19]:
# TODO 3: Print the contents of the RDD (since it's random, don't expect to get the numbers in the same order as in the given notebook)
rdd3.

[12,
 21,
 10,
 27,
 26,
 22,
 29,
 1,
 19,
 2,
 14,
 17,
 3,
 15,
 28,
 18,
 20,
 9,
 4,
 11,
 23,
 8,
 6,
 13,
 16,
 5,
 24,
 25,
 7,
 0]

In [20]:
# TODO 4: Print only the first element of the RDD
rdd3.

12

In [21]:
# TODO 5: Print the RDD's max element
rdd3.

29

In [22]:
# TODO 6: Print the RDD's min element
rdd3.

0

In [23]:
# TODO 7: Print only the first 10 elements of the RDD. (tip: use the method .take())
rdd3.

[12, 21, 10, 27, 26, 22, 29, 1, 19, 2]

### Part 4
Now let's move on something more complex:

1. Create a python list that has the following words: ["queue", "blade", "scrap", "class", "priority", "price", "random", "public"]
2. Transorm it into an RDD.
3. Filter out the words that have less and equal to 5 characters

In [24]:
# TODO 1: Create a python list that has the following words: ["queue", "blade", "scrap", "class", "priority", "price", "random", "public"]

In [25]:
# TODO 2: Transorm it into an RDD.
rdd4 = 

In [26]:
# TODO 3: Filter out the words that have less and equal to 5 characters
result4 = 

In [27]:
# Let's print the result
print(result4)

['priority', 'random', 'public']


### Part 5
Let's move on to the hardest part of our lab today. The program should calculate a file's total size of lines.

1. Make an RDD the content's of the file "[unixdict.txt](https://www.cs.ucy.ac.cy/~jgeorg02/dsc511/hadoop/unixdict.txt)".
2. Using a transformation or action method, change each element from to line to its size (line -> len(size))
3. Using a transformation or action method summarize our elements into one element.

In [28]:
# TODO 1: Make an RDD the content's of the file "unixdict.txt"
file = 

In [29]:
# TODO 2: Using a transformation or action method, change each element from to line to its size (line -> len(size))
lineLengths = file.

In [30]:
# TODO 3: Using a transformation or action method summarize our elements into one element.
totalLength = 

In [31]:
# Let's print the result
print("The result is: ", totalLength)

The result is:  181299


### Equivalent Code in Simple Python
This code might not help you to implement it using Spark, but it might help you to understand what our code's algorithm does and to verify your result. :)

In [32]:
with open("./unixdict.txt", "r") as file:
    lines = file.readlines()
    # rstrip method is used to remove the newline character "\n" from the end of each line,
    # so that the line length accurately reflects the number of characters in the line
    line_lengths = [len(line.rstrip("\n")) for line in lines]
    total_length = sum(line_lengths)

print("The result is:", total_length)

The result is: 181299


### Part 6: Palindrome Words
Now let's revisit the Map-Reduce examples we saw last time.
Our 2nd Map-Reduce example was to find all of the word that were palindrome and how many times did they appear in a file. Try and recreate this example using pySpark. :)

There are several solutions for it but you can follow these steps:

1. Make an RDD the content's of the file [palindrome_words.txt](https://www.cs.ucy.ac.cy/~jgeorg02/dsc511/hadoop/palindrome_words.txt)
2. Apply Transformation or action on our RDD in order to return `(word.lower(), 1)` if it's palindrome, otherwise, `(word.lower(), 0)`
3. Apply transformation or action on your RDD in order to reduce the and sum the number of times the palindrome word appeared.
4. Print only 20 words with their number.

In [54]:
# Define the mapper function
def mapper(word):


file = 

rdd6 = file.
rdd6.

[('radar', 2),
 ('level', 2),
 ('civic', 1),
 ('banana', 0),
 ('grape', 0),
 ('kayak', 6),
 ('rotor', 3),
 ('peach', 0),
 ('noon', 2),
 ('blueberry', 0),
 ('deed', 2),
 ('racecar', 2),
 ('pineapple', 0),
 ('tenet', 5),
 ('strawberry', 0),
 ('apple', 0),
 ('stats', 1),
 ('lemon', 0),
 ('orange', 0),
 ('watermelon', 0)]

### Equivalent Code in using MRJob from our Last Lab
This code might not help you to remember what do we want to map and reduce as to translate this into pySpark code. :)

In [35]:
! pip install MRJob

Collecting MRJob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/439.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m153.6/439.6 kB[0m [31m4.4 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m430.1/439.6 kB[0m [31m7.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.6/439.6 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: MRJob
Successfully installed MRJob-0.7.4


In [51]:
%%file palindrome.py

from mrjob.job import MRJob

class MRPalindrome(MRJob):

    def mapper(self, _, word):
      # Check if word is palindrome
      if word == word[::-1]:
        yield (word.lower(), 1)
      yield (word.lower(), 0)


    def reducer(self, word, counts):
      yield(word, sum(counts))

Overwriting palindrome.py


In [53]:
import palindrome
#import importlib
#importlib.reload(palindrome)

def run_mr_job(mr_job):
  # Create a runner for the MapReduce job
  with mr_job.make_runner() as runner:
      # Run the MapReduce job
      runner.run()

      # Iterate over the output of the MapReduce job
      for key, value in mr_job.parse_output(runner.cat_output()):
          # Print each key-value pair (word, count)
          print(key, value)

mr_job = palindrome.MRPalindrome(args=['palindrome_words.txt'])
run_mr_job(mr_job)



radar 2
rotor 3
stats 1
strawberry 0
tenet 5
lemon 0
level 2
noon 2
orange 0
peach 0
pineapple 0
racecar 2
apple 0
banana 0
blueberry 0
civic 1
deed 2
grape 0
kayak 6
watermelon 0
