In [None]:
import stanza
import re
from docx.text.paragraph import Paragraph
from docx.document import Document
from docx.table import _Cell, Table
from docx.oxml.text.paragraph import CT_P
from docx.oxml.table import CT_Tbl
stanza.download('he')
nlp = stanza.Pipeline('he')
import sys
print(sys.executable)

# Modify property of Paragraph.text to include hyperlink text
Paragraph.text = property(lambda self: get_paragraph_text(self))

def get_paragraph_text(paragraph) -> str:
    """
    Extract text from paragraph, including hyperlink text.
    """
    def get_xml_tag(element):
        return "%s:%s" % (element.prefix, re.match("{.*}(.*)", element.tag).group(1))

    text_content = ''
    run_count = 0
    for child in paragraph._p:
        tag = get_xml_tag(child)
        if tag == "w:r":
            text_content += paragraph.runs[run_count].text
            run_count += 1
        if tag == "w:hyperlink":
            for sub_child in child:
                if get_xml_tag(sub_child) == "w:r":
                    text_content += sub_child.text
    return text_content

def is_block_bold(block) -> bool:
    """
    Check if the entire block/paragraph text is bold.
    """
    if block.runs:
        for run in block.runs:
            if run.bold:
                return True
    return False

def iterate_block_items(parent):
    """
    Iterate over paragraphs and tables in a document or cell.
    """
    if isinstance(parent, Document):
        parent_element = parent.element.body
    elif isinstance(parent, _Cell):
        parent_element = parent._tc
    else:
        # Log unsupported type and return
        print(f"Unsupported parent type: {type(parent)}")
        return

    for child in parent_element.iterchildren():
        try:
            if isinstance(child, CT_P):
                yield Paragraph(child, parent)
            elif isinstance(child, CT_Tbl):
                table = Table(child, parent)
                for row in table.rows:
                    for cell in row.cells:
                        yield from iterate_block_items(cell)
        except Exception as e:
            print(f"Error processing child element: {e}")

def extract_part_after_number_or_hebrew_letter(sentence: str) -> str:
    """
    Extract text following a pattern of number or Hebrew letter.
    """
    pattern = r'^(?:[0-9\u05D0-\u05EA]+)\.\s*(.*)'
    match = re.search(pattern, sentence)
    return match.group(1).strip() if match else sentence

def count_patterns_in_block(block) -> int:
    """
    Count the number-dot or dot-number patterns in a block.
    """
    pattern = r'\s*(?:\.\d+|\d+\.)'
    return len(re.findall(pattern, block.text))

def count_consecutive_blocks_starting_with_number(blocks) -> int:
    """
    Count consecutive blocks starting with a number or Hebrew letter.
    """
    count = 0
    for block in blocks:
        if 'הנאשם' in block.text:
            return 1
        count += count_patterns_in_block(block)
        if 'חקיקה שאוזכרה' in block.text:
            break
    return count

def extract_name_after_word(text: str, word: str) -> str:
    """
    Extract the words following a given word up to the end of the sentence.
    """
    pattern = re.compile(fr'{word}(?:,)?\s*([\u0590-\u05FF\s\'\(\)-]+)')
    match = pattern.search(text)
    return match.group(1) if match else ''

def extract_violations(text: str) -> list:
    """
    Extract violations from the text based on a pre-defined pattern.
    """

    matches = re.findall(r"(?:סעיף|סעיפים|ס'|סע')\s*\d+\s*(?:\([\s\S]*?\))?.*?(?=\s*(?:ב|ל)(?:חוק|פקודת))\s*(?:ב|ל)(?:חוק|פקודת)\s*ה?(?:עונשין|כניסה לישראל|סמים\s+המסוכנים|\w+)?", text)
    # matches = re.findall(r"(?:סעיף|סעיפים|ס'|סע')\s*\d+\s*(?:\([\s\S]*?\))?.*?(?=\s*(?:ב|ל)(?:חוק|פקודת))\s*(?:ב|ל)(?:חוק|פקודת)\s*ה?(?:עונשין|כניסה לישראל|סמים\s+המסוכנים|[^\[]+)?", text)

    matches = [match.strip() for match in matches]
    return matches

Downloading https://raw.githubusercontent.com/stanfordnlp/stanza-resources/main/resources_1.8.0.json: 386kB [00:00, 7.94MB/s]                    
2025-01-15 13:08:54 INFO: Downloaded file to /home/liorkob/stanza_resources/resources.json
2025-01-15 13:08:54,662 - INFO - Downloaded file to /home/liorkob/stanza_resources/resources.json
2025-01-15 13:08:54 INFO: Downloading default packages for language: he (Hebrew) ...
2025-01-15 13:08:54,669 - INFO - Downloading default packages for language: he (Hebrew) ...
2025-01-15 13:08:55 INFO: File exists: /home/liorkob/stanza_resources/he/default.zip
2025-01-15 13:08:55,226 - INFO - File exists: /home/liorkob/stanza_resources/he/default.zip
2025-01-15 13:08:57 INFO: Finished downloading models and saved to /home/liorkob/stanza_resources
2025-01-15 13:08:57,481 - INFO - Finished downloading models and saved to /home/liorkob/stanza_resources
2025-01-15 13:08:57 INFO: Checking for updates to resources.json in case models have been updated.  Note: th

/home/liorkob/.conda/envs/pdocx/bin/python


In [None]:
import os
import re
import pandas as pd
import stanza
from docx import Document
import logging

"""
This script processes `.docx` verdict files, extracting text from them, identifying and classifying specific sections of the document. It saves the results into CSV files with the extracted text, sections, and metadata for further analysis.

### Key Functionalities:

1. **Text Extraction and Preprocessing**:
   - The script iterates through paragraphs in `.docx` files, using custom functions from `utils.py` to identify specific sections based on formatting (e.g., bold text).
   - The extracted sections are stored in a dictionary along with the corresponding full sentences from the document.

2. **Part Identification**:
   - It processes bolded blocks of text as distinct "parts" or sections (e.g., titles or key sections) and appends them to a list.
   - For each sentence, the script associates it with both the most recent part (stored as `part_single`) and a concatenation of all previous parts (stored as `part_concatenated`).

3. **NLP Processing**:
   - The Hebrew Stanza NLP pipeline is used to split the text into sentences, which are then stored in the output alongside the associated document sections.
   - The script also applies filters to skip short paragraphs and unwanted patterns (e.g., references to certain case types).

4. **Error Handling and Logging**:
   - The script uses Python’s `logging` module to provide informative logs, including handling errors if a document can't be opened or processed.
   - It catches and logs any exceptions during the processing of files.

5. **CSV Output**:
   - For each `.docx` file, the extracted data (including text, section titles, and concatenated sections) is saved to a CSV file.

6. **Recursive Directory Processing**:
   - The script recursively processes `.docx` files in a specified root directory (`selenium_downloads\מרב גרינברג`), saving the results for each file in a corresponding output directory (`outputs\merav_grinberg_preproccsed`).

### Main Functions:

- **doc_to_csv(doc_path: str, result_path: str)**:
   - Processes a single `.docx` file, extracting text and metadata.
   - Saves the results to a CSV file if a result path is provided.

- **run()**:
   - Iterates through all `.docx` files in the root directory.
   - For each file, it calls `doc_to_csv` and saves the resulting DataFrame as a CSV.

### Usage:
The script is executed via the `run()` function, which processes all files in the specified directory. It logs the status and outputs CSV files containing preprocessed data for each document.
"""

stanza.download('he')
nlp = stanza.Pipeline('he')

from docx import Document

def validate_docx(file_path):
    try:
        doc = Document(file_path)
        print("The file is valid.")
        return True
    except Exception as e:
        print(f"Error validating document: {e}")
        return False

def doc_to_csv(doc_path: str = None, result_path: str = None):
    logging.info(f"Processing file: {doc_path}")
    
    data = {'verdict': [], 'text': [], 'part_single': [], 'part_concatenated': []}
    data['verdict'] = os.path.splitext(os.path.basename(doc_path))[0]
    
    # Ensure the file path ends with .docx
    if not doc_path.lower().endswith('.docx'):
        doc_path += '.docx'
    
    if not os.path.exists(doc_path):
        raise FileNotFoundError(f"File not found: {doc_path}")
    
    try:
        doc = Document(doc_path)
    except Exception as e:
        logging.error(f"Error opening document {doc_path}: {str(e)}")
        raise

    # Initialize the `part` variables
    part_single = 'nothing'  # This stores only the current part
    part_list = []  # This list will store all parts for concatenation

    for block in iterate_block_items(doc):
        flag = False

        # Skip blocks that are too short or contain specific unwanted patterns
        if len(block.text) <= 1 or 'ע"פ' in block.text or 'ת"פ' in block.text or 'עפ"ג' in block.text:
            continue
        #if len(block.text) <= 1:
         #   continue

        # If the block is bold and meets specific conditions, treat it as a new "part"
        if is_block_bold(block) and len(block.text.split(' ')) < 10 and not re.match(r'^\d', block.text) and not re.match(r'[\u0590-\u05FF][^.)*]*[.)]', block.text):
            # Update both `part_single` and `part_list`
            part_single = block.text  # Update the current part
            part_list.append(block.text)  # Append to the list of parts for concatenation
            part_concatenated = ', '.join(part_list)  # Concatenate all parts
        else:
            # For non-bold blocks, process the text as a sentence and associate it with both `part_single` and `part_concatenated`
            extracted_part_text = extract_part_after_number_or_hebrew_letter(block.text)
            sentences = nlp(extracted_part_text)

            for sentence in sentences.sentences:
                text = sentence.text
                if text.startswith('"'):
                    flag = True
                    continue
                if text.endswith('".') or text.endswith('"'):
                    flag = False
                    continue
                if flag:
                    continue
                if text == part_single:
                    continue
                if len(block.text.split(' ')) > 3:
                    # Append the text and its corresponding parts
                    data['text'].append(text)
                    data['part_single'].append(part_single)
                    data['part_concatenated'].append(part_concatenated)

    # Convert the data dictionary into a DataFrame
    sentence_doc_df = pd.DataFrame(data)
    
    # Save the result if a result path is provided
    if result_path:
        result_path = os.path.join(result_path, 'preprocessing.csv')
        sentence_doc_df.to_csv(result_path, index=False)
        logging.info(f"Saved preprocessed data to {result_path}")
    
    return sentence_doc_df

def run():
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    root_directory = "/home/liorkob/thesis/nlp_course/lcp/docx_try"

    for root, _, files in os.walk(root_directory):
        logging.info(f"Processing directory: {root}")
        for file in files:
            if not file.lower().endswith('.docx'):
                continue
            
            input_path = os.path.join(root, file)
            output_dir = root_directory
            os.makedirs(output_dir, exist_ok=True)
            
            try:
                is_valid = validate_docx(input_path)
                df = doc_to_csv(doc_path=input_path)
                output_path = os.path.join(output_dir, f"{os.path.splitext(file)[0]}.csv")
                df.to_csv(output_path, index=False)
                logging.info(f"Processed and saved: {output_path}")
            except Exception as e:
                logging.error(f"Error processing {file}: {str(e)}")

if __name__ == "__main__":
    run()


Downloading https://raw.githubusercontent.com/stanfordnlp/stanza-resources/main/resources_1.8.0.json: 386kB [00:00, 3.90MB/s]                    
2025-01-15 13:10:22 INFO: Downloaded file to /home/liorkob/stanza_resources/resources.json
2025-01-15 13:10:22,029 - INFO - Downloaded file to /home/liorkob/stanza_resources/resources.json
2025-01-15 13:10:22 INFO: Downloading default packages for language: he (Hebrew) ...
2025-01-15 13:10:22,041 - INFO - Downloading default packages for language: he (Hebrew) ...
2025-01-15 13:10:22 INFO: File exists: /home/liorkob/stanza_resources/he/default.zip
2025-01-15 13:10:22,606 - INFO - File exists: /home/liorkob/stanza_resources/he/default.zip
2025-01-15 13:10:24 INFO: Finished downloading models and saved to /home/liorkob/stanza_resources
2025-01-15 13:10:24,954 - INFO - Finished downloading models and saved to /home/liorkob/stanza_resources
2025-01-15 13:10:24 INFO: Checking for updates to resources.json in case models have been updated.  Note: th

The file is valid.
