In [1]:
#Importing libraries
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, lower, regexp_replace, udf, split, countDistinct, log
from pyspark.sql.types import StringType
from docx import Document

In [2]:
#Define functions and UDFs

# Function to read .docx files
def read_doc_file(file_path):
    doc = Document(file_path)
    full_text = []
    for para in doc.paragraphs:
        full_text.append(para.text)
    return '\n'.join(full_text)

# UDF for Spark to process text data from .docx files
content_udf = udf(read_doc_file, StringType())

In [3]:
#Initialize spark session

spark = SparkSession.builder.appName("TF-IDF Local DOCX Files").getOrCreate()

25/01/20 15:44:52 WARN Utils: Your hostname, L-BKASH-MAC-94.local resolves to a loopback address: 127.0.0.1; using 10.21.176.197 instead (on interface en0)
25/01/20 15:44:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/20 15:44:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
#Load and Process document data

folder_path = 'docs'
doc_files = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith(".docx")]

# DataFrame with contents of each document
docs_df = spark.createDataFrame([(file,) for file in doc_files], ["filename"])
docs_df = docs_df.withColumn("text", content_udf(col("filename")))

# Clean and tokenize text
docs_df = docs_df.withColumn("text", lower(regexp_replace(col("text"), "[^a-zA-Z\\s]", "")))
docs_df = docs_df.withColumn("words", explode(split(col("text"), "\\s+")))


In [5]:
#Calculate TF, DF, IDF and TF.IDF

# Term Frequency
tf = docs_df.groupBy("filename", "words").count().alias("tf")

# Document Frequency
df = tf.groupBy("words").agg(countDistinct("filename").alias("df"))

# Inverse Document Frequency
total_docs = docs_df.select("filename").distinct().count()
idf = df.withColumn("idf", log((total_docs / col("df"))))

# Calculate TF-IDF
tf_idf = tf.join(idf, "words").withColumn("tf_idf", col("count") * col("idf"))

# Final DataFrame
final_index = tf_idf.select(col("words"), col("filename"), col("tf_idf"))
final_index.show(100)


                                                                                

+-----------+--------------------+------------------+
|      words|            filename|            tf_idf|
+-----------+--------------------+------------------+
|        few|docs/Weather File...|1.0986122886681096|
|       some|docs/Report on Bo...|1.0986122886681096|
|     online|docs/Report on Bo...|1.0986122886681096|
|    experts|docs/Report on Bo...|0.4054651081081644|
|    experts|docs/Weather File...|0.4054651081081644|
|        not|docs/Report on Bo...|1.0986122886681096|
| bookstores|docs/Report on Bo...|1.0986122886681096|
|       will|docs/Weather File...|1.0986122886681096|
|temperature|docs/Weather File...|1.0986122886681096|
|   document|docs/Big Data Cou...| 2.197224577336219|
|    records|docs/Weather File...|1.0986122886681096|
|        for|docs/Big Data Cou...|0.4054651081081644|
|        for|docs/Weather File...|0.4054651081081644|
|      words|docs/Big Data Cou...|1.0986122886681096|
| possession|docs/Report on Bo...|1.0986122886681096|
|   physical|docs/Report on 

In [6]:
#Stop session

spark.stop()