Part 1.1

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, lower, col, count
from operator import add

# Initialize Spark session
spark = SparkSession.builder.appName("GutenbergWordCount").getOrCreate()

# Read the text file
file_path = "PRIDE AND PREJUDICE.rtf"
text_file = spark.read.text(file_path)

# Perform word count
word_counts = (text_file
    .select(explode(split(lower(col("value")), "\\W+")).alias("word"))
    .filter(col("word") != "")
    .groupBy("word")
    .agg(count("*").alias("count"))
    .orderBy("count", ascending=False)
)

# Show the top 20 words
word_counts.show(20)

# Stop the Spark session
# spark.stop()

+----+-----+
|word|count|
+----+-----+
| the| 4811|
|  to| 4383|
|  of| 3955|
| and| 3740|
| her| 2258|
|   a| 2079|
|  in| 2038|
|  94| 1899|
| was| 1870|
|   i| 1794|
| she| 1710|
|that| 1561|
|  it| 1542|
| not| 1508|
| you| 1323|
|  he| 1318|
|  be| 1279|
| his| 1279|
|  as| 1223|
| had| 1179|
+----+-----+
only showing top 20 rows



Part 1.2

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, lower, regexp_replace, col, count, size, trim
from pyspark.ml.feature import StopWordsRemover

# Initialize Spark session
spark = SparkSession.builder.appName("GutenbergWordCountExtended").getOrCreate()

# Read the text file
file_path = "PRIDE AND PREJUDICE.rtf"
text_file = spark.read.text(file_path)

# Preprocess the text: make it lowercase, remove punctuation, and split into words (array of strings)
words_df = (text_file
    .select(split(lower(regexp_replace(col("value"), "[^a-zA-Z\\s]", "")), "\\s+").alias("words"))
    .filter(size(col("words")) > 0)  # Filter out empty arrays
)

# Remove stop words using Spark's StopWordsRemover
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_words_df = remover.transform(words_df)

# Explode the filtered words back into individual rows
exploded_words_df = filtered_words_df.select(explode(col("filtered_words")).alias("word"))

# Filter out any empty strings or spaces
cleaned_words_df = exploded_words_df.filter(trim(col("word")) != "")

# Perform word count on cleaned words
word_counts = (cleaned_words_df
    .groupBy("word")
    .agg(count("*").alias("count"))
    .orderBy("count", ascending=False)
)

# Show the top 15 words
word_counts.show(15)


# Stop the Spark session
# spark.stop()

+---------+-----+
|     word|count|
+---------+-----+
|       mr|  806|
|elizabeth|  604|
|     said|  405|
|    darcy|  380|
|      mrs|  357|
|     much|  338|
|     must|  325|
|     miss|  315|
|   bennet|  307|
|      one|  285|
|     jane|  271|
|  bingley|  261|
|     know|  244|
|   though|  233|
|    never|  230|
+---------+-----+
only showing top 15 rows



Part 2.1

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, lower, regexp_replace, col, count, trim, concat_ws, lead
from pyspark.sql.window import Window
from pyspark.ml.feature import StopWordsRemover

# Initialize Spark session
spark = SparkSession.builder.appName("WordCoOccurrence").getOrCreate()

# Read the text file
file_path = "PRIDE AND PREJUDICE.rtf"
text_file = spark.read.text(file_path)

# Preprocess the text: make it lowercase, remove punctuation, and split into words (array of strings)
words_df = (text_file
    .select(split(lower(regexp_replace(col("value"), "[^a-zA-Z\\s]", "")), "\\s+").alias("words"))
    .filter(col("words").isNotNull())  # Filter out any null arrays
)

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_words_df = remover.transform(words_df)

# Explode words into individual rows
exploded_words_df = filtered_words_df.select(explode(col("filtered_words")).alias("word"))

# Filter out any empty strings
cleaned_words_df = exploded_words_df.filter(trim(col("word")) != "")

# Generate bigrams using window function
window_spec = Window.orderBy(lit(1))  # Using lit(1) to create a global window
bigrams_df = (cleaned_words_df
    .withColumn("next_word", lead("word", 1).over(window_spec))
    .filter(col("next_word").isNotNull())  # Remove rows where next_word is null
    .select(concat_ws(" ", col("word"), col("next_word")).alias("bigram"))
)

# Count bigrams
bigram_counts = (bigrams_df
    .groupBy("bigram")
    .agg(count("*").alias("count"))
    .orderBy("count", ascending=False)
)

# Show the top 10 bigrams
bigram_counts.show(10)

# Stop the Spark session
# spark.stop()


+-----------------+-----+
|           bigram|count|
+-----------------+-----+
|         mr darcy|  242|
|       mrs bennet|  147|
|       mr collins|  140|
|   lady catherine|  106|
|       mr bingley|   94|
|project gutenberg|   87|
|        mr bennet|   82|
|     miss bingley|   72|
|      miss bennet|   62|
|       mr wickham|   57|
+-----------------+-----+
only showing top 10 rows



removing stop words mr, mrs etc

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, lower, regexp_replace, col, count, trim, concat_ws, lead
from pyspark.sql.window import Window
from pyspark.ml.feature import StopWordsRemover

# Initialize Spark session
spark = SparkSession.builder.appName("WordCoOccurrence").getOrCreate()

# Custom stop words list (adding titles like 'mr', 'mrs', 'lady', 'miss' etc.)
custom_stopwords = ["mr", "mrs", "lady", "miss", "sir", "lord", "countess", "duke", "king", "queen", "prince", "princess"]

# Read the text file
file_path = "PRIDE AND PREJUDICE.rtf"
text_file = spark.read.text(file_path)

# Preprocess the text: make it lowercase, remove punctuation, and split into words (array of strings)
words_df = (text_file
    .select(split(lower(regexp_replace(col("value"), "[^a-zA-Z\\s]", "")), "\\s+").alias("words"))
    .filter(col("words").isNotNull())  # Filter out any null arrays
)

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
remover.setStopWords(remover.getStopWords() + custom_stopwords)  # Add custom stop words
filtered_words_df = remover.transform(words_df)

# Explode words into individual rows
exploded_words_df = filtered_words_df.select(explode(col("filtered_words")).alias("word"))

# Filter out any empty strings
cleaned_words_df = exploded_words_df.filter(trim(col("word")) != "")

# Generate bigrams using window function
window_spec = Window.orderBy(lit(1))  # Using lit(1) to create a global window
bigrams_df = (cleaned_words_df
    .withColumn("next_word", lead("word", 1).over(window_spec))
    .filter(col("next_word").isNotNull())  # Remove rows where next_word is null
    .select(concat_ws(" ", col("word"), col("next_word")).alias("bigram"))
)

# Count bigrams
bigram_counts = (bigrams_df
    .groupBy("bigram")
    .agg(count("*").alias("count"))
    .orderBy("count", ascending=False)
)

# Show the top 10 bigrams
bigram_counts.show(10)

# Stop the Spark session
# spark.stop()


+-----------------+-----+
|           bigram|count|
+-----------------+-----+
|project gutenberg|   87|
|   said elizabeth|   46|
|     george allen|   37|
|        de bourgh|   36|
| copyright george|   35|
|        young man|   33|
|         dare say|   30|
|     young ladies|   28|
|  heading chapter|   27|
|  colonel forster|   27|
+-----------------+-----+
only showing top 10 rows

