In [None]:
from docx import Document
from io import BytesIO
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

def find_and_replace_on_binary(binary_content, target_text, replacement_text):
    """
    Find and replace the specified text in the binary content of a DOCX file.

    Args:
        binary_content (bytes): The binary content of the DOCX file.
        target_text (str): The text to be replaced.
        replacement_text (str): The text to replace the target text with.

    Returns:
        bytes: The modified binary content.
    """
    docx_bytes = BytesIO(binary_content)
    document = Document(docx_bytes)
    count_changes = 0  # Counter for changes made

    for paragraph in document.paragraphs:
        index = paragraph.text.find(target_text)
        if index != -1:
            for run in paragraph.runs:
                run_index = run.text.find(target_text)
                if run_index != -1:
                    print(f"Replacing '{target_text}' with '{replacement_text}' in paragraph: {paragraph.text}")
                    run.text = run.text.replace(target_text, replacement_text)
                    count_changes += 1
                    break

    for table in document.tables:
        for row in table.rows:
            for cell in row.cells:
                for paragraph in cell.paragraphs:
                    index = paragraph.text.find(target_text)
                    if index != -1:
                        for run in paragraph.runs:
                            run_index = run.text.find(target_text)
                            if run_index != -1:
                                print(f"Replacing '{target_text}' with '{replacement_text}' in cell paragraph: {paragraph.text}")
                                run.text = run.text.replace(target_text, replacement_text)
                                count_changes += 1
                                break

    modified_docx_bytes = BytesIO()
    document.save(modified_docx_bytes)
    return modified_docx_bytes.getvalue(), count_changes

# Initialize Spark session
spark = SparkSession.builder.appName("FindAndReplaceInDOCX").getOrCreate()

# Specify the path to the directory containing DOCX files
input_directory = "/content/sample_data"

# Read binary DOCX files directly into a DataFrame
docx_files_df = (
    spark.read.format("binaryFile")
    .option("pathGlobFilter", "*.docx")
    .load(input_directory)
)

# Function to apply find-and-replace on each row of the DataFrame
def apply_find_and_replace(row):
    file_path = row.path
    binary_content = row.content
    modified_content, count_changes = find_and_replace_on_binary(binary_content, "sumit", "xzat")
    return file_path, modified_content, count_changes

# Apply find-and-replace on each row of the DataFrame
modified_docx_df = docx_files_df.rdd.map(apply_find_and_replace).toDF(["File", "ModifiedBinaryContent", "ChangesCount"])

# Show the results (for demonstration purposes, use show wisely)
modified_docx_df.show(truncate=True)

# Stop Spark session
spark.stop()
