## Setup

In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("Example").setMaster("local[*]")

sc = SparkContext(conf=conf)


## Task 1

In [None]:
text_rdd = sc.textFile("./notebooks/example/quijote.txt")

print(text_rdd.collect())

word_counts = (
    text_rdd.flatMap(lambda line: line.split())
    .map(lambda word: (word.lower(), 1))
    .reduceByKey(lambda a, b: a + b)
)

result = word_counts.collectAsMap()
print(result)


## Task 2

In [None]:
# Find the most frequent word
most_frequent_word = word_counts.reduce(lambda a, b: a if a[1] > b[1] else b)

print(
    f"The most frequent word is: {most_frequent_word[0]} with a count of {most_frequent_word[1]}"
)


## Task 3

In [None]:
lines_with_index = text_rdd.zipWithIndex()

inverted_index = (
    lines_with_index.flatMap(
        lambda line: [(word.lower(), line[1]) for word in line[0].split()]
    )
    .groupByKey()
    .mapValues(list)
)

inverted_index_result = inverted_index.collectAsMap()

word_to_search = most_frequent_word[0]
line_count = inverted_index.filter(lambda x: x[0] == word_to_search).collect()

if line_count:
    print(
        f"The word '{word_to_search}' appears in {len(line_count[0][1])} unique lines."
    )
else:
    print(f"The word '{word_to_search}' does not appear in any lines.")


## Task 4

In [None]:
word_to_search = most_frequent_word[0]

line_count = (
    inverted_index.filter(lambda x: x[0] == word_to_search)
    .map(lambda x: len(x[1]))
    .collect()
)

print(f"The word '{word_to_search}' appears in {line_count[0]} unique lines.")


## Cleanup

In [None]:
sc.stop()
