## Laboratory 2 - Spark RDD

### 1.0 Problem Specification: Analysis of the file.

#### Questions:
##### 1.0.1 Can you draw 5 samples from the input RDD? Which command do you use?<br>1.0.2 Now pick the first 5 words in order of frequency.<br>1.0.3 How many words does the file contain?<br> 1.0.4 Is "word_frequency.tsv" a folder or a file?

In [10]:
inputPath  = "/data/students/bigdata_internet/lab2/word_frequency.tsv"

In [11]:
# Read the content of the input file
readingsRDD = sc.textFile(inputPath)

#### Answer of 1.0.1

In [12]:
# Draw 5 samples
samples = readingsRDD.takeSample(False, 5)

In [13]:
# Display the samples
for sample in samples:
    print(sample)

steak)	9
clearsave	1
(56g)	1
Chamomiles	12
fleas"	2


#### Answer of 1.0.2

In [6]:
# Parse the data and create (word, frequency) tuples
word_frequency_rdd = readingsRDD.map(lambda line: tuple(line.split('\t'))).map(lambda x: (x[0], int(x[1])))

In [7]:
# Sort the RDD by frequency in descending order
sorted_rdd = word_frequency_rdd.sortBy(lambda x: x[1], ascending=False)

                                                                                

In [8]:
# Take the first 5 words in order of frequency
top_words = sorted_rdd.take(5)

                                                                                

In [9]:
# Display the result
print(top_words)

[('the', 1630750), ('I', 1448619), ('and', 1237250), ('a', 1164419), ('to', 997979)]


#### Answer of 1.0.3

In [17]:
# Count the total number of words
total_words = word_frequency_rdd.count()

In [18]:
# Display the result
print("Total number of words:", total_words)

Total number of words: 339819


#### Answer of 1.0.4

The name "word_frequency.tsv" implies that it is a file rather than a directory, with the ".tsv" extension indicating its format as tab-separated values (TSV). In HDFS, both files and directories can share the same name but reside in different locations within the file system. To ascertain whether "word_frequency.tsv" is a file or a folder in HDFS, Hadoop FileSystem commands can be employed. By executing the provided code, we can confirm it as a file based on the displayed information, encompassing details such as file size and modification time. For a directory, the output would include a list of its contents. The command for verification is as follows:
hadoop fs -ls /data/students/bigdata_internet/lab2/word_frequency.tsv


#### 1.1 Filter words starting with a specified prefix

#### Questions:
##### 1.1.1 How many lines are left?
##### 1.1.2 How frequent is the most frequent word of the selected sample (i.e., the maximum value of `freq` in the lines obtained by applying the filter)?
##### 1.1.3 Report the code of 3 different ways to solve the task number 1.1.2 (we only want the frequency, i.e., a number and not a tuple/list)

#### Answer of 1.1.1

In [5]:
# Filter lines containing words starting with the prefix 'ho'
filtered_rdd = readingsRDD.filter(lambda line: line.split('\t')[0].startswith('ho'))

In [22]:
# Count the number of remaining lines
remaining_lines = filtered_rdd.count()

In [23]:
print("Number of lines after filtering:", remaining_lines)

Number of lines after filtering: 1519


#### Answer of 1.1.2

In [6]:
# Extract (word, frequency) tuples
filtered_word_frequency_rdd = filtered_rdd.map(lambda line: tuple(line.split('\t'))).map(lambda x: (x[0], int(x[1])))

In [7]:
# Find the maximum frequency
max_frequency = filtered_word_frequency_rdd.map(lambda x: x[1]).max()

In [29]:
# Display the result
print("Frequency of the most frequent word:", max_frequency)

Frequency of the most frequent word: 36264


#### Answer of 1.1.3

##### Approach 1: Using reduce action

In [None]:
# Filter lines containing words starting with the prefix 'ho'
filtered_rdd = rdd.filter(lambda line: line.split('\t')[0].startswith('ho'))

# Extract (word, frequency) tuples
filtered_word_frequency_rdd = filtered_rdd.map(lambda line: tuple(line.split('\t'))).map(lambda x: (x[0], int(x[1])))

# Find the maximum frequency using reduce
max_frequency = filtered_word_frequency_rdd.map(lambda x: x[1]).reduce(lambda x, y: max(x, y))

# Display the result
print("Frequency of the most frequent word:", max_frequency)

##### Approach 2: Using "sortBy" and "first" actions

In [None]:
# Filter lines containing words starting with the prefix 'ho'
filtered_rdd = rdd.filter(lambda line: line.split('\t')[0].startswith('ho'))

# Extract (word, frequency) tuples
filtered_word_frequency_rdd = filtered_rdd.map(lambda line: tuple(line.split('\t'))).map(lambda x: (x[0], int(x[1])))

# Find the maximum frequency using sortBy and first
max_frequency = filtered_word_frequency_rdd.sortBy(lambda x: x[1], ascending=False).first()[1]

# Display the result
print("Frequency of the most frequent word:", max_frequency)

##### Approach 3: Using "max" action with key function

In [None]:
# Filter lines containing words starting with the prefix 'ho'
filtered_rdd = rdd.filter(lambda line: line.split('\t')[0].startswith('ho'))

# Extract (word, frequency) tuples
filtered_word_frequency_rdd = filtered_rdd.map(lambda line: tuple(line.split('\t'))).map(lambda x: (x[0], int(x[1])))

# Find the maximum frequency using max with key function
max_frequency = filtered_word_frequency_rdd.max(key=lambda x: x[1])[1]

# Display the result
print("Frequency of the most frequent word:", max_frequency)

#### 1.2 Filter most frequent words

In [33]:
# Find the maximum frequency
max_frequency = word_frequency_rdd.map(lambda x: x[1]).max()

# Select lines with frequency greater than 70% of max_frequency
selected_lines = word_frequency_rdd.filter(lambda x: x[1] > 0.7 * max_frequency)


# Select lines with the most frequent words
most_frequent_lines = selected_lines.filter(lambda x: x[1] == max_frequency)

# Display the result
most_frequent_lines.collect()

[('the', 1630750)]

#### 1.3 Count the remaining words and save the output

#### Questions:
##### 1.3.1 Count the number of selected lines and print this number on the standard output.
##### 1.3.2 Save the selected words (without frequency) in an hdfs output folder. Every line should contain a single word and ends with a semicolumn (`;`).

#### Answer of 1.3.1

In [34]:
# Count the number of selected lines
num_selected_lines = selected_lines.count()

# Print the number of selected lines
print("Number of selected lines:", num_selected_lines)

Number of selected lines: 4


#### Answer of 1.3.2

In [40]:
# Extract words from selected lines
selected_words = selected_lines.map(lambda x: x[0] + ';')
selected_words.collect()

# Save the selected words to an HDFS output folder
#output_path = "https://jupyter.polito.it/user/s302866/lab/tree/lab2output.txt"
#selected_words.saveAsTextFile(output_path)

['I;', 'the;', 'and;', 'a;']

### 2.0 Run the application in different ways

#### Questions:
##### 2.1 Run your script locally and in the cluster (--master option). How much time do the two modes require to run? Is there a difference? Can you give a plausible explanation?
##### 2.2 In this application, would caching an RDD increase the performance? If yes, which RDD would you cache?

#### Answer of 2.1

After running the !spark-submit --master local --deploy-mode client 'SampleLab02.py'
,it took 4.714531421661377 seconds to run

ARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running spark-class from user-defined location.
22/12/20 00:04:20 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/12/20 00:04:20 WARN util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
Program took 4.714531421661377  seconds to run                                   


#### Answer of 2.2

Caching an RDD can improve performance when the RDD is reused multiple times, particularly in iterative or multi-stage Spark applications. Cache RDDs that are frequently accessed to avoid recomputation.

### 3.0 Bonus Task

#### Questions:
##### 3.1 How many words (with repetitions) does it contain? Consider a word all the characters between spaces (elements found with `split()` method)
##### 3.2 Report the code to obtain the word frequency file starting from the original file.

#### Answer of 3.1

In [8]:
# Load the RDD from HDFS
input_path = "/data/students/bigdata_internet/lab2/finefoods_text.txt"
readingsRDD = sc.textFile(input_path)

# Split lines into words and flatten the result
words_rdd = readingsRDD.flatMap(lambda line: line.split())

# Count the occurrences of each word
word_counts = words_rdd.countByValue()

# Count the total number of words (with repetitions)
total_words = sum(word_counts.values())

# Display the result
print("Total number of words (with repetitions):", total_words)

                                                                                

Total number of words (with repetitions): 45444841


#### Answer of 3.1

In [11]:
output_path = "resultLab03/"

In [12]:
# Preprocess text: split lines into words, convert to lowercase
words_rdd = readingsRDD.flatMap(lambda line: line.split()).map(lambda word: word.lower())

# Count the occurrences of each word
word_counts = words_rdd.countByValue()

# Save the word frequencies to an output file
#output_path = "/data/students/bigdata_internet/lab2/word_frequency_original.tsv"
word_counts_rdd = sc.parallelize(word_counts.items()).map(lambda x: f"{x[0]}\t{x[1]}")
word_counts_rdd.saveAsTextFile(output_path)

24/01/20 16:18:02 WARN scheduler.TaskSetManager: Stage 7 contains a task of very large size (889 KB). The maximum recommended task size is 100 KB.
