# LABORATORY 2

## 1.0 Problem Specification: Analysis of the file.

In [40]:
rdd = sc.textFile\
("/data/students/bigdata_internet/lab2/word_frequency.tsv")

#### 1.0.1 Can you draw 5 samples from the input RDD? Which command do you use?
Can we draw five samples using the *take(nsamples)* or the *takeSample(withReplacement=False, nsamples)* action.

In [41]:
sample = rdd.takeSample(False, 5)
print(sample)

['nobodies\t4', 'Englsih\t1', 'eyes/teeth/gums\t1', 'steviaEat\t1', 'Irishy\t1']


#### 1.0.2 Now pick the first 5 words in order of frequency.

In [42]:
# split the line by the \t
splitRDD = rdd.map(lambda l: (l.split("\t")[0], int(l.split("\t")[1])))
splitRDD.top(5, lambda freq: freq[1])

[('the', 1630750),
 ('I', 1448619),
 ('and', 1237250),
 ('a', 1164419),
 ('to', 997979)]

#### 1.0.3 How many words does the file contain?

In [43]:
numbWords = rdd.count()
print("The number of words in the file is: ", numbWords)

The number of words in the file is:  339819


#### 1.0.4 Is `word_frequency.tsv` a folder or a file?
*word_frequency.tsv* is a folder with the same structure as an RDD: it contains *_SUCCESS*, *part-00000*, *part-00001*

## 1.1 Filter words starting with a specified prefix

In [44]:
# set the prefix
prefix = "ho"
# filter the words that start with the selected prefix
PrefixWordRDD = splitRDD.filter(lambda word: word[0].startswith(prefix))

#### 1.1.1 How many lines are left?

In [47]:
numbWords = PrefixWordRDD.count()
print("Number of lines left: ",numbWords)

Number of lines left:  1519


#### 1.1.2 How frequent is the most frequent word of the selected sample (i.e., the maximum value of `freq` in the lines obtained by applying the filter)?

In [48]:
freqPrefixRDD= PrefixWordRDD.map(lambda freq: freq[1])

In [49]:
freqPrefixRDD.max()

36264

#### 1.1.3 Report the code of 3 different ways to solve the task number 1.1.2 (we only want the frequency, i.e., a number and not a tuple/list)

In [50]:
maxFreqPrefix = freqPrefixRDD.top(1)
print(maxFreqPrefix[0])

36264


In [51]:
maxFreqPrefix = PrefixWordRDD.max(lambda tup: tup[1])
print(maxFreqPrefix[1])

36264


In [53]:
maxFreqPrefix = freqPrefixRDD.max()
print(maxFreqPrefix)

36264


### 1.2 Filter most frequent words

In [55]:
maxfreqPercentage = 0.7
mostfrequentRDD = PrefixWordRDD.filter\
(lambda freq: freq[1] >= maxfreqPercentage*maxFreqPrefix)

### 1.3 Count the remaining words and save the output

#### 1.3.1 Count the number of selected lines and print this number on the standard output.

In [57]:
print("Most frequent words: ",mostfrequentRDD.count())

Most frequent words:  2


#### 1.3.2 Save the selected words (without frequency) in an hdfs output folder. Every line should contain a single word and ends with a semicolumn (`;`).

In [121]:
saveRDD = mostfrequentRDD.map(lambda word: f"{word[0]};")
saveRDD.saveAsTextFile("/user/s309709/LAB02/es1")

                                                                                

## 2. Run the application in different ways

#### 2.1 Run your script locally and in the cluster (--master option). How much time do the two modes require to run? Is there a difference? Can you give a plausible explanation?
The time taken to run locally is 4,5 seconds, while on the cluster is almost 17 seconds. A plausible explanation for this is that beacuse the driver is executed on the cluster and we need to send data on the network


#### Code of the python script
from pyspark import SparkConf, SparkContext
import time
import sys

start = time.time()

conf = SparkConf().setAppName("My app")
sc = SparkContext(conf = conf)
prefix = sys.argv[1]
outputPath = sys.argv[2]

rdd = sc.textFile("/data/students/bigdata_internet/lab2/word_frequency.tsv")
splitRDD = rdd.map(lambda l: (l.split("\t")[0], int(l.split("\t")[1])))

PrefixWordRDD = splitRDD.filter(lambda word: word[0].startswith(prefix))
toPrintRDD = PrefixWordRDD.map(lambda word: f"{word[0]} - {word[1]},")

toPrintRDD.saveAsTextFile(outputPath)

stop = time.time()
print(f"Program took {stop - start} seconds to run")


#### 2.2 In this application, would caching an RDD increase the performance? If yes, which RDD would you cache?
In this case caching would be useless beacuse only one action is performed, that is *saveAsTextFile()*

## 3. Bonus Task

In [61]:
rdd = sc.textFile\
("/data/students/bigdata_internet/lab2/finefoods_text.txt") 

In [18]:
splitRDD = rdd.flatMap(lambda l: l.split())

#### 3.1 How many words (with repetitions) does it contain? Consider a word all the characters between spaces (elements found with `split()` method)

In [59]:
n_words = splitRDD.count()
print("Number of words with repetitions: ", n_words)

Number of words with repetitions:  339819


#### 3.2 Report the code to obtain the word frequency file starting from the original file.

In [62]:
# map the words adding a 1 to count the number of occurence
wordsRDD = splitRDD.map(lambda word: (word, 1))
# use the words as keys and sum the values to obtain the occurence
distincWordsRDD = wordsRDD.reduceByKey(lambda v1,v2: int(v1) + int(v2))
# map in the requested format
formatRDD = distincWordsRDD.map(lambda v: f"{v[0]}\t{v[1]}")
print(formatRDD.takeSample(False, 5))

                                                                                

["('LIVE', 93)\t1", "('time~', 7)\t1", "('romaine', 33)\t1", "('rangetop', 1)\t1", "('messure', 1)\t1"]
