## 7th Laboratory - 1st exercise

Write and execute a **Spark job in pySpark** to compute the **frequency of the words in Shakespeare’s
works (the wordcount problem)**. Use the data files from the Cloudera virtual machine (copy them
to your OS) and write the results into a directory named “counts” on the local machine.

### Import dependencies

In [1]:
import pyspark
import os
import re

In [2]:
sc

### Load data

In [3]:
os.listdir('../data/shakespeare')

['tragedies', 'comedies', 'glossary', 'histories', 'poems']

### PySpark job

In [4]:
counts = sc.textFile('../data/shakespeare/*')\
             .flatMap(lambda line: re.split('\W+', line))\
             .map(lambda word: (word, 1)) \
             .filter(lambda x: x[0] != "")\
             .reduceByKey(lambda a, b: a + b) \

counts.coalesce(1)\
      .sortBy(lambda a: a[0])\
      .saveAsTextFile("./counts")

In [5]:
sorted(os.listdir("./counts"))

['._SUCCESS.crc', '.part-00000.crc', '_SUCCESS', 'part-00000']

In [6]:
f = open("./counts/part-00000", "r")
file_content = f.read()
f.close()

print(file_content[:390])

('1', 49)
('10', 1)
('11th', 1)
('12th', 1)
('1s', 1)
('2', 48)
('2d', 1)
('2s', 3)
('3', 29)
('4', 1)
('4d', 1)
('5', 1)
('5s', 1)
('6', 1)
('6d', 2)
('6s', 1)
('7', 1)
('8', 1)
('8d', 2)
('9', 1)
('A', 2027)
('AARON', 72)
('ABATE', 1)
('ABATEMENT', 1)
('ABERGAVENNY', 9)
('ABHOR', 1)
('ABHORSON', 18)
('ABIDE', 1)
('ABLE', 1)
('ABOUT', 18)
('ABRAHAM', 7)
('ABRIDGEMENT', 1)
('ABROAD', 1)



## 7th Laboratory - 2nd exercise

Modify the code above to **compute the frequency only for the words having at most 5 characters**.
Show the results in the shell.

In [7]:
counts = sc.textFile('../data/shakespeare/*')\
             .flatMap(lambda line: re.split('\W+', line))\
             .map(lambda word: (word, 1)) \
             .filter(lambda x: len(x[0]) <= 5 and len(x[0]) > 0) \
             .reduceByKey(lambda a, b: a + b) 

counts.coalesce(1)\
      .sortBy(lambda a: a[0])\
      .saveAsTextFile("./counts_at_most_5_chars")

In [8]:
counts.sortBy(lambda a: a[0]).take(20)

[('1', 49),
 ('10', 1),
 ('11th', 1),
 ('12th', 1),
 ('1s', 1),
 ('2', 48),
 ('2d', 1),
 ('2s', 3),
 ('3', 29),
 ('4', 1),
 ('4d', 1),
 ('5', 1),
 ('5s', 1),
 ('6', 1),
 ('6d', 2),
 ('6s', 1),
 ('7', 1),
 ('8', 1),
 ('8d', 2),
 ('9', 1)]

In [9]:
print(counts.toDebugString())

b'(5) PythonRDD[17] at RDD at PythonRDD.scala:53 []\n |  MapPartitionsRDD[16] at mapPartitions at PythonRDD.scala:133 []\n |  ShuffledRDD[15] at partitionBy at NativeMethodAccessorImpl.java:0 []\n +-(5) PairwiseRDD[14] at reduceByKey at <ipython-input-7-1a6a1c0712ff>:1 []\n    |  PythonRDD[13] at reduceByKey at <ipython-input-7-1a6a1c0712ff>:1 []\n    |  ../data/shakespeare/* MapPartitionsRDD[12] at textFile at NativeMethodAccessorImpl.java:0 []\n    |  ../data/shakespeare/* HadoopRDD[11] at textFile at NativeMethodAccessorImpl.java:0 []'


## 7th Laboratory - 3rd exercise

Write and execute a Spark job in pySpark to compute the **average length of the words starting
with the same letter** (the average word length problem) in Shakespeare’s works. Write the results
in a directory named **“average”**.

In [10]:
counts = sc.textFile('../data/shakespeare/*') \
             .flatMap(lambda line: re.split('\W+', line)) \
             .filter(lambda x: x != "") \
             .map(lambda word: (word[0], len(word)))\
             .groupByKey()\
             .map(lambda key_values: (key_values[0], sum(key_values[1]) / len(key_values[1])))


counts.coalesce(1)\
      .sortBy(lambda a: a[0])\
      .saveAsTextFile("./average")

In [11]:
f = open("./average/part-00000", "r")
file_content = f.read()
f.close()

print(file_content[:390])

('1', 1.150943396226415)
('2', 1.0769230769230769)
('3', 1.0)
('4', 1.5)
('5', 1.5)
('6', 1.75)
('7', 1.0)
('8', 1.6666666666666667)
('9', 1.0)
('A', 3.901754225255347)
('B', 5.143532818532819)
('C', 6.634214463840399)
('D', 5.221781152916811)
('E', 5.53018939875429)
('F', 5.265583343912657)
('G', 5.810282153366799)
('H', 4.428398058252427)
('I', 1.4687346778674861)
('J', 4.9784550709406


In [12]:
!echo $SPARK_HOME

/usr/local/Cellar/apache-spark/3.0.1/libexec
