In [14]:
!pip install numpy==1.26.4 --force-reinstall


Collecting numpy==1.26.4
  Using cached numpy-1.26.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
Using cached numpy-1.26.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.0 MB)
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.26.4
    Uninstalling numpy-1.26.4:
      Successfully uninstalled numpy-1.26.4
Successfully installed numpy-1.26.4


In [15]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Gutenberg_TFIDF") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark


In [16]:
from pyspark.sql.functions import input_file_name

books_df = spark.read.text("/home/prapti/CSL7110_Assignment/Hadoop/Dataset/*.txt") \
    .withColumn("file_name", input_file_name()) \
    .withColumnRenamed("value", "text")

books_df.count()


                                                                                

4119082

In [17]:
from pyspark.sql.functions import lower, regexp_replace

books_clean = books_df.withColumn(
    "clean_text",
    lower(regexp_replace("text", "[^a-zA-Z\\s]", ""))
)


In [18]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

tokenizer = Tokenizer(inputCol="clean_text", outputCol="words")
words_data = tokenizer.transform(books_clean)

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_data = remover.transform(words_data)


In [19]:
from pyspark.sql.functions import collect_list, flatten

book_words = filtered_data.groupBy("file_name") \
    .agg(flatten(collect_list("filtered_words")).alias("all_words"))

book_words.count()


                                                                                

425

In [20]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(
    inputCol="all_words",
    outputCol="raw_features",
    vocabSize=10000,       # limit vocabulary to avoid memory issues
    minDF=5                # ignore very rare words
)

cv_model = cv.fit(book_words)
featurized_data = cv_model.transform(book_words)


                                                                                

In [21]:
from pyspark.ml.feature import IDF

idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(featurized_data)

tfidf_data = idf_model.transform(featurized_data)

tfidf_data.select("file_name", "features").show(2)




+--------------------+--------------------+
|           file_name|            features|
+--------------------+--------------------+
|file:///home/prap...|(10000,[0,1,2,3,4...|
|file:///home/prap...|(10000,[0,1,2,3,4...|
+--------------------+--------------------+
only showing top 2 rows



                                                                                

In [22]:
from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="features", outputCol="norm_features")
normalized_data = normalizer.transform(tfidf_data)


In [23]:


book_vectors = normalized_data.select("file_name", "norm_features").rdd \
    .map(lambda row: (row["file_name"], row["norm_features"]))




In [24]:
import itertools

book_list = book_vectors.collect()

similarities = []

for (name1, vec1), (name2, vec2) in itertools.combinations(book_list, 2):
    similarity = float(vec1.dot(vec2))
    similarities.append((name1, name2, similarity))

similarities_sorted = sorted(similarities, key=lambda x: -x[2])

similarities_sorted[:5]


                                                                                

[('file:///home/prapti/CSL7110_Assignment/Hadoop/Dataset/156.txt',
  'file:///home/prapti/CSL7110_Assignment/Hadoop/Dataset/117.txt',
  0.9999999999999982),
 ('file:///home/prapti/CSL7110_Assignment/Hadoop/Dataset/30.txt',
  'file:///home/prapti/CSL7110_Assignment/Hadoop/Dataset/10.txt',
  0.9999922452487483),
 ('file:///home/prapti/CSL7110_Assignment/Hadoop/Dataset/29.txt',
  'file:///home/prapti/CSL7110_Assignment/Hadoop/Dataset/37.txt',
  0.999966342485173),
 ('file:///home/prapti/CSL7110_Assignment/Hadoop/Dataset/302.txt',
  'file:///home/prapti/CSL7110_Assignment/Hadoop/Dataset/212.txt',
  0.9997704490258186),
 ('file:///home/prapti/CSL7110_Assignment/Hadoop/Dataset/73.txt',
  'file:///home/prapti/CSL7110_Assignment/Hadoop/Dataset/463.txt',
  0.9996619617080579)]

In [25]:
for s in similarities_sorted[:5]:
    print(s[0].split("/")[-1],
          s[1].split("/")[-1],
          s[2])


156.txt 117.txt 0.9999999999999982
30.txt 10.txt 0.9999922452487483
29.txt 37.txt 0.999966342485173
302.txt 212.txt 0.9997704490258186
73.txt 463.txt 0.9996619617080579
