# CSL7110 Assignment 1 (Questions 11 and 12)

**Submitted By: Arjun Baidya (M25CSA006)**

In [1]:
!pip install pyspark



In [2]:
!unzip -q D184MB.zip -d dataset

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.linalg import Vectors
import re
import numpy as np
from glob import glob
import os

print("All libraries imported successfully!")

spark = SparkSession.builder \
    .appName("CSL7110_Assignment") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

print(f"Spark Session created successfully!")
print(f"Spark Version: {spark.version}")



All libraries imported successfully!
Spark Session created successfully!
Spark Version: 4.0.1


In [4]:
DATA_PATH = '/content/dataset/D184MB/'

def load_books(data_path):
    #Loading all .txt files into a DataFrame

    books_data = []
    txt_files = glob(os.path.join(data_path, '*.txt'))

    print(f"Found {len(txt_files)} text files")

    for file_path in txt_files[:100]:  # Limit to first 100 for faster testing
        file_name = os.path.basename(file_path)
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                text = f.read()
                books_data.append((file_name, text))
        except Exception as e:
            print(f"Error reading {file_name}: {e}")

    schema = StructType([
        StructField("file_name", StringType(), False),
        StructField("text", StringType(), False)
    ])

    return spark.createDataFrame(books_data, schema)

# Loading data
books_df = load_books(DATA_PATH)
books_df.cache()

print(f"\nLoaded {books_df.count()} books successfully!")
books_df.select("file_name", substring("text", 1, 50).alias("preview")).show(5, truncate=50)


Found 425 text files

Loaded 100 books successfully!
+---------+--------------------------------------------------+
|file_name|                                           preview|
+---------+--------------------------------------------------+
|  423.txt|The Project Gutenberg EBook of Round the Red Lamp,|
|  152.txt|The Project Gutenberg EBook of Wild Justice, by Ru|
|   26.txt|The Project Gutenberg EBook of Paradise Lost, by J|
|   81.txt|The Project Gutenberg EBook of The Return of Tarza|
|  456.txt|Project Gutenberg's The Door in the Wall And Other|
+---------+--------------------------------------------------+
only showing top 5 rows


In [5]:
# Q10

def extract_title(text):
    if not text: return None
    match = re.search(r'Title:\s*(.+?)(?:\r?\n)', text, re.IGNORECASE)
    if match: return match.group(1).strip()
    match = re.search(r'The Project Gutenberg EBook of\s*(.+?)(?:,|\r?\n)', text, re.IGNORECASE)
    if match: return match.group(1).strip()
    return None

def extract_release_date(text):
    if not text: return None
    match = re.search(r'Release Date:\s*(.+?)(?:\[|$|\r?\n)', text, re.IGNORECASE)
    if match: return match.group(1).strip()
    return None

def extract_language(text):
    if not text: return None
    match = re.search(r'Language:\s*(.+?)(?:\r?\n)', text, re.IGNORECASE)
    if match: return match.group(1).strip()
    return None

def extract_encoding(text):
    if not text: return None
    match = re.search(r'Character set encoding:\s*(.+?)(?:\r?\n)', text, re.IGNORECASE)
    if match: return match.group(1).strip()
    return None

def parse_year(date_str):
    if not date_str: return None
    match = re.search(r'\b(19|20)\d{2}\b', str(date_str))
    if match: return int(match.group(0))
    return None

# Registering UDFs
extract_title_udf = udf(extract_title, StringType())
extract_release_date_udf = udf(extract_release_date, StringType())
extract_language_udf = udf(extract_language, StringType())
extract_encoding_udf = udf(extract_encoding, StringType())
parse_year_udf = udf(parse_year, IntegerType())

# Extracting Metadata
metadata_df = books_df.select(
    col("file_name"),
    extract_title_udf(col("text")).alias("title"),
    extract_release_date_udf(col("text")).alias("release_date"),
    extract_language_udf(col("text")).alias("language"),
    extract_encoding_udf(col("text")).alias("encoding")
).withColumn(
    "release_year",
    parse_year_udf(col("release_date"))
)

print("QUESTION 10 - METADATA EXTRACTION")
print("=" * 80)
print("\n--- Sample Metadata ---")
metadata_df.show(10, truncate=50)

print("\nAnalysis 1: Books Released Each Year")
books_per_year = metadata_df.filter(col("release_year").isNotNull()) \
    .groupBy("release_year") \
    .count() \
    .orderBy("release_year")

books_per_year.show(20)

print("\nAnalysis 2: Most Common Language")
language_counts = metadata_df.filter(col("language").isNotNull()) \
    .groupBy("language") \
    .count() \
    .orderBy(desc("count"))

language_counts.show(10)

most_common = language_counts.first()
if most_common:
    print(f"\nMost common language: {most_common['language']} ({most_common['count']} books)")

print("\nAnalysis 3: Average Title Length")
title_length_df = metadata_df.filter(col("title").isNotNull()) \
    .withColumn("title_length", length(col("title")))

avg_length = title_length_df.agg(avg("title_length")).first()[0]
print(f"Average title length: {avg_length:.2f} characters")

title_length_df.describe("title_length").show()

QUESTION 10 - METADATA EXTRACTION

--- Sample Metadata ---
+---------+---------------------------------------+----------------+--------+--------+------------+
|file_name|                                  title|    release_date|language|encoding|release_year|
+---------+---------------------------------------+----------------+--------+--------+------------+
|  423.txt|                     Round the Red Lamp|February 3, 2008| English|   ASCII|        2008|
|  152.txt|                           Wild Justice|    August, 1994| English|   ASCII|        1994|
|   26.txt|                          Paradise Lost|  February, 1992| English|   ASCII|        1992|
|   81.txt|                   The Return of Tarzan|   June 23, 2008| English|   ASCII|        2008|
|  456.txt| The Door in the Wall And Other Stories|   July 22, 2005| English|   ASCII|        2005|
|  148.txt| The Autobiography of Benjamin Franklin|    May 22, 2008| English|   ASCII|        2008|
|  323.txt|                       Verses 

In [6]:
# QUESTION 11: TF-IDF AND BOOK SIMILARITY

def remove_gutenberg_boilerplate(text):
    if not text: return ""

    start_markers = [
        "*** START OF THIS PROJECT GUTENBERG",
        "*** START OF THE PROJECT GUTENBERG"
    ]

    start_pos = 0
    for marker in start_markers:
        pos = text.find(marker)
        if pos != -1:
            newline_pos = text.find('\n', pos)
            if newline_pos != -1:
                start_pos = newline_pos + 1
                break

    end_markers = [
        "*** END OF THIS PROJECT GUTENBERG",
        "*** END OF THE PROJECT GUTENBERG"
    ]

    end_pos = len(text)
    for marker in end_markers:
        pos = text.find(marker)
        if pos != -1:
            end_pos = pos
            break

    return text[start_pos:end_pos]

remove_gutenberg_udf = udf(remove_gutenberg_boilerplate, StringType())

print("QUESTION 11 - TF-IDF AND BOOK SIMILARITY")
print("-" * 100)

# Cleaning text
preprocessed_df = books_df.select(
    col("file_name"),
    remove_gutenberg_udf(col("text")).alias("clean_text")
)

# Lowercaseing and removing punctuation
preprocessed_df = preprocessed_df.withColumn(
    "clean_text",
    regexp_replace(lower(col("clean_text")), r'[^\w\s]', ' ')
).withColumn(
    "clean_text",
    regexp_replace(col("clean_text"), r'\s+', ' ')
).withColumn(
    "clean_text",
    trim(col("clean_text"))
)

# Tokenizing
tokenizer = Tokenizer(inputCol="clean_text", outputCol="words")
tokenized_df = tokenizer.transform(preprocessed_df)

# Removing stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_df = remover.transform(tokenized_df)

print("Text preprocessing complete")
filtered_df.select("file_name", size("filtered_words").alias("word_count")).show(10)

# Calculating TF (Term Frequency)
cv = CountVectorizer(inputCol="filtered_words", outputCol="raw_features",
                     vocabSize=5000, minDF=2.0)
cv_model = cv.fit(filtered_df)
tf_df = cv_model.transform(filtered_df)

vocab = cv_model.vocabulary
print(f"Vocabulary size: {len(vocab)}")

# Calculating IDF (Inverse Document Frequency)
idf = IDF(inputCol="raw_features", outputCol="tfidf_features")
idf_model = idf.fit(tf_df)
tfidf_df = idf_model.transform(tf_df)

print("TF-IDF calculation complete")
tfidf_df.select("file_name", "tfidf_features").show(5, truncate=100)

def cosine_similarity(v1, v2):
    # Calculating cosine similarity between two vectors

    from pyspark.ml.linalg import SparseVector

    if isinstance(v1, SparseVector):
        v1 = v1.toArray()
    else:
        v1 = np.array(v1)

    if isinstance(v2, SparseVector):
        v2 = v2.toArray()
    else:
        v2 = np.array(v2)

    dot_product = np.dot(v1, v2)
    norm1 = np.linalg.norm(v1)
    norm2 = np.linalg.norm(v2)

    if norm1 == 0 or norm2 == 0:
        return 0.0

    return float(dot_product / (norm1 * norm2))

# Target book for similarity comparison
TARGET_BOOK = "10.txt"

# Getting target book vector
target_row = tfidf_df.filter(col("file_name") == TARGET_BOOK).first()

if target_row:
    target_vec = target_row["tfidf_features"]
    target_broadcast = spark.sparkContext.broadcast(target_vec)

    # Calculating similarities
    similarity_udf = udf(lambda v: float(cosine_similarity(target_broadcast.value, v)), FloatType())

    similarity_df = tfidf_df.withColumn(
        "similarity",
        similarity_udf(col("tfidf_features"))
    )

    # Top similar books
    print(f"\nTop 6 Most Similar Books to '{TARGET_BOOK}'")
    similarity_df.select("file_name", "similarity") \
        .orderBy(desc("similarity")) \
        .show(6)

    # Excluding target book itself
    print(f"\nTop 5 Similar Books (excluding '{TARGET_BOOK}'):")
    top_similar = similarity_df.filter(col("file_name") != TARGET_BOOK) \
        .select("file_name", "similarity") \
        .orderBy(desc("similarity")) \
        .limit(5)

    top_similar.show()
else:
    print(f"Book '{TARGET_BOOK}' not found!")

QUESTION 11 - TF-IDF AND BOOK SIMILARITY
----------------------------------------------------------------------------------------------------
Text preprocessing complete
+---------+----------+
|file_name|word_count|
+---------+----------+
|  423.txt|     32302|
|  152.txt|     29013|
|   26.txt|     45963|
|   81.txt|     42554|
|  456.txt|     20768|
|  148.txt|     31530|
|  323.txt|     30450|
|  465.txt|     26909|
|   30.txt|    468762|
|  286.txt|     72833|
+---------+----------+
only showing top 10 rows
Vocabulary size: 5000
TF-IDF calculation complete
+---------+----------------------------------------------------------------------------------------------------+
|file_name|                                                                                      tfidf_features|
+---------+----------------------------------------------------------------------------------------------------+
|  423.txt|(5000,[0,1,2,4,5,6,7,8,9,11,13,14,15,16,17,18,19,21,22,23,24,25,26,27,28,29,30,31,3

In [7]:
# QUESTION 12: AUTHOR INFLUENCE NETWORK

from pyspark.sql.functions import countDistinct, coalesce, lit, desc, avg, count

def extract_author(text):
    if not text: return None
    match = re.search(r'Author:\s*(.+?)(?:\r?\n)', text, re.IGNORECASE)
    if match: return match.group(1).strip()
    match = re.search(r'\bby\s+([A-Z][a-zA-Z\s\.,]+?)(?:\r?\n|\s{2,})', text)
    if match: return match.group(1).strip()
    return None

extract_author_udf = udf(extract_author, StringType())

print("QUESTION 12 - AUTHOR INFLUENCE NETWORK")
print("-" * 100)

author_df = books_df.select(
    col("file_name"),
    extract_author_udf(col("text")).alias("author"),
    extract_release_date_udf(col("text")).alias("release_date")
).withColumn(
    "release_year",
    parse_year_udf(col("release_date"))
).filter(
    (col("author").isNotNull()) & (col("release_year").isNotNull())
)

print("\nAuthors and Release Years")
author_df.show(15, truncate=50)
print(f"Total books with author and year: {author_df.count()}")

INFLUENCE_WINDOW = 5  # years

print(f"\nBuilding Influence Network (Window: {INFLUENCE_WINDOW} years)")

# Self-join to find influence relationships
influence_edges = author_df.alias("a1").join(
    author_df.alias("a2"),
    (col("a1.author") != col("a2.author")) &
    (col("a1.release_year") <= col("a2.release_year")) &
    (col("a2.release_year") - col("a1.release_year") <= INFLUENCE_WINDOW)
).select(
    col("a1.author").alias("author1"),
    col("a2.author").alias("author2"),
    col("a1.release_year").alias("year1"),
    col("a2.release_year").alias("year2")
).distinct()

print(f"Total influence edges: {influence_edges.count()}")
print("\nSample Influence Relationships")
influence_edges.show(15, truncate=50)

# Out-degree: authors influenced by author1
out_degree = influence_edges.groupBy("author1") \
    .agg(countDistinct("author2").alias("out_degree")) \
    .withColumnRenamed("author1", "author")

# In-degree: authors who influenced author2
in_degree = influence_edges.groupBy("author2") \
    .agg(countDistinct("author1").alias("in_degree")) \
    .withColumnRenamed("author2", "author")

# Combining degrees
all_authors = author_df.select("author").distinct()

degree_df = all_authors.alias("a") \
    .join(in_degree.alias("i"), col("a.author") == col("i.author"), "left") \
    .join(out_degree.alias("o"), col("a.author") == col("o.author"), "left") \
    .select(
        col("a.author"),
        coalesce(col("i.in_degree"), lit(0)).alias("in_degree"),
        coalesce(col("o.out_degree"), lit(0)).alias("out_degree")
    )

print("\nTop 5 Authors by In-Degree (Most Influenced)")
degree_df.orderBy(desc("in_degree")).show(5, truncate=False)

print("\nTop 5 Authors by Out-Degree (Most Influential)")
degree_df.orderBy(desc("out_degree")).show(5, truncate=False)

# Network statistics
print("\nNetwork Statistics")
total_authors = degree_df.count()
total_edges = influence_edges.count()
avg_in = degree_df.agg(avg("in_degree")).first()[0]
avg_out = degree_df.agg(avg("out_degree")).first()[0]

print(f"Total authors: {total_authors}")
print(f"Total influence relationships: {total_edges}")
print(f"Average in-degree: {avg_in:.2f}")
print(f"Average out-degree: {avg_out:.2f}")

degree_df.describe("in_degree", "out_degree").show()

# Create output directory
!mkdir -p /content/output

# Save results
print("Saving Results..")

metadata_df.coalesce(1).write.mode("overwrite") \
    .option("header", "true").csv("/content/output/metadata")

if target_row:
    top_similar.coalesce(1).write.mode("overwrite") \
        .option("header", "true").csv("/content/output/similarity")

degree_df.coalesce(1).write.mode("overwrite") \
    .option("header", "true").csv("/content/output/network")


QUESTION 12 - AUTHOR INFLUENCE NETWORK
----------------------------------------------------------------------------------------------------

Authors and Release Years
+---------+------------------------------+----------------+------------+
|file_name|                        author|    release_date|release_year|
+---------+------------------------------+----------------+------------+
|  423.txt|            Arthur Conan Doyle|February 3, 2008|        2008|
|  152.txt|               Ruth M. Sprague|    August, 1994|        1994|
|   26.txt|                   John Milton|  February, 1992|        1992|
|   81.txt|          Edgar Rice Burroughs|   June 23, 2008|        2008|
|  456.txt|                   H. G. Wells|   July 22, 2005|        2005|
|  148.txt|             Benjamin Franklin|    May 22, 2008|        2008|
|  323.txt|               Rudyard Kipling|   June 29, 2008|        2008|
|  465.txt|          Stewart Edward White|     March, 1996|        1996|
|   30.txt|                   