PART 1 SPARK BASICS


Task 1.1: Basic Word Count

In [None]:
from pyspark import SparkContext
import string
import os
import shutil

In [None]:
sc = SparkContext("local", "WordCount")

In [None]:
file1 = sc.textFile("book1.txt")
file2 = sc.textFile("book2.txt")

In [None]:
combined_files = file1.union(file2)

In [None]:
words = combined_files.flatMap(lambda line: line.lower().split()) \
                      .map(lambda word: ''.join(filter(str.isalnum, word))) \
                      .filter(lambda word: word != '')

In [None]:
word_counts = words.map(lambda word: (word, 1)) \
                   .reduceByKey(lambda a, b: a + b)

In [None]:
output_dir = "output1.txt"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
    print(f"Deleted existing folder: {output_dir}")

In [None]:
word_counts.saveAsTextFile("output1.txt")

Task 1.2: Extended Word Count

In [None]:
stop_words = set([
    "the", "is", "in", "and", "to", "a", "of", "for", "on", "with", "that",
    "this", "an", "it", "as", "are", "was", "at", "by", "be", "has", "have",
    "but", "not", "or", "from", "had", "they", "you", "i", "we", "his", "her", "their"
])

In [None]:
broadcast_stopwords = sc.broadcast(stop_words)

In [None]:
word_counts = (
    combined_files
    .flatMap(lambda line: line.lower().translate(str.maketrans("", "", string.punctuation)).split())
    .filter(lambda word: word not in broadcast_stopwords.value and word != '')
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
    .sortBy(lambda pair: pair[1], ascending=False)  # sort by count desc
)

In [None]:
output_dir = "output1extended.txt"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
    print(f"Deleted existing folder: {output_dir}")

In [None]:
word_counts.saveAsTextFile("output1extended.txt")

Task 1.4: Top 25 Words from One Book

In [None]:
text_file = sc.textFile("book1.txt")
broadcast_stopwords = sc.broadcast(stop_words)
word_counts = (
    text_file
    .flatMap(lambda line: line.lower().translate(str.maketrans("", "", string.punctuation)).split())
    .filter(lambda word: word and word not in broadcast_stopwords.value)
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
    .sortBy(lambda pair: pair[1], ascending=False)
)

top_25 = word_counts.take(25)
print("\nTop 25 most common words:\n")
for word, count in top_25:
    print(f"{word}: {count}")