# Data cleaning

## Import

In [66]:
import os
from concurrent.futures import ProcessPoolExecutor
import unicodedata

# Third party
import pandas as pd

## PDF
import fitz # pip install PyMuPDF
## DOCX
from docx import Document # pip install python-docx
## DOC
from striprtf.striprtf import rtf_to_text
## RTF
import textract # Needs dependencies, see https://textract.readthedocs.io/en/stable/installation.html

## Settings

In [67]:
working_directory = os.path.split(os.getcwd())

data_folder = os.path.join(working_directory[0], "data")
print("Data folder exists:\t\t", os.path.exists(data_folder))

input_folder = os.path.join(working_directory[0], "data", "unclean")
print("File input folder exists:\t", os.path.exists(input_folder))

output_folder = os.path.join(working_directory[0], "data", "clean")
print("File output folder exists:\t", os.path.exists(output_folder))

Data folder exists:		 True
File input folder exists:	 True
File output folder exists:	 True


## Functions

In [68]:
def file_paths_from_directory(dir_path:str, abs_path:bool=False, file_exts:list=None, hidden:bool=False, sys_files:bool=False) -> list:
    """
    Get a list of all files in a directory including subdirectories.

    Args:
    - dir_path (str): The path to the directory to search.
    - hidden (list or None): Limit to specific file extension. Default is None.
    - file_ext (bool): The file extension to search for. Default is None.
    - abs_path (bool): Return absolute file paths. Default is False.
    - hidden (bool): Include hidden files. Default is False.
    - sys_files (bool): Include system files. Default is False.

    Returns:
    - list: A list of file paths. If abs_path is True, the file paths will be absolute. Otherwise, they will be relative.
    """
    # Check if the directory exists
    if not os.path.isdir(dir_path):
        raise FileNotFoundError(f"The directory '{dir_path}' does not exist.")

    # Get all files in the directory
    file_list: list = []
    for root, _, files in os.walk(dir_path):
        for file in files:
            file_path = os.path.join(root, file)
            file_list.append(file_path)

    # Remove spaces from the file names
    ################# untested #################
    for file_path in file_list:
        get_clean_file_name(file_path)
    ################# untested #################

    # Filter the file list by file extension
    if file_exts is not None:
        _file_list: list = []
        for file_ext in file_exts:
            _file_list.extend([f for f in file_list if f.lower().endswith(file_ext)])
        file_list = _file_list

    # Filter the file list by hidden files
    if not hidden:
        file_list: list = [f for f in file_list if not os.path.basename(f).startswith(".")]

    # Get the absolute file paths
    if abs_path:
        file_list: list = [os.path.abspath(f) for f in file_list]

    # Filter the file list by system files
    if not sys_files:
        windows_sys_files = ["desktop.ini", "thumbs.db"]
        mac_sys_files = [".ds_store"]
        for sys_file in mac_sys_files + windows_sys_files:
            file_list = [f for f in file_list if not os.path.basename(f).lower().startswith(sys_file)]

    # Return the file list
    return file_list

def str_from_pdf_paths(pdf_paths: list) -> pd.DataFrame:
    """
    Get all text from a list of pdf files.
    This function uses concurrent.futures.ProcessPoolExecutor to parallelize the extraction.
    Uses PyMuPDF (fitz) as the pdf reader.

    Args:
    - pdf_paths (list): A list of paths to the pdf files.

    Returns a DataFrame with columns:
    - pdf_path (str): The path to the pdf file.
    - status_ok (bool): True if the extraction was successful, False if not.
    - result (str): The extracted text if successful, the error message if not.
    """
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(str_from_pdf_path, pdf_paths))

    return pd.DataFrame(results, columns=["pdf_path", "status_ok", "result"])

def str_from_pdf_path(pdf_path: str) -> dict:
    """
    Get all text from a pdf file.
    Uses PyMuPDF (fitz) as the pdf reader.

    Returns dictionary with keys:
    - status_ok: True if the extraction was successful, False if not.
    - result: The extracted text if successful, the error message if not.
    """
    try:
        with open(pdf_path, "rb") as file:
            doc = fitz.open(file)
            return {
                "path": pdf_path,
                "status_ok": True,
                "result": " ".join([page.get_text() for page in doc])
            }

    except Exception as e:
        return {
            "path": pdf_path,
            "status_ok": False,
            "result": str(e)
        }
    
def get_clean_file_name(path: str) -> str:
    """
    Remove spaces from the file name.

    Args:
    - path (str): The path to the file.

    Returns:
    - str: The new path to the file.
    """
    file_name = os.path.basename(path)
    new_file_name = file_name.replace(" ", "_")
    new_path = os.path.join(os.path.dirname(path), new_file_name)
    os.rename(path, new_path)

    return new_path

def normalize_text(text:str) -> str:
    """
    Normalize text by removing unnecessary whitespaces and converting unicode characters to ASCII.

    Args:
    - text (str): The text to normalize.

    Returns:
    - str: The normalized text.
    """
    return unicodedata.normalize("NFKD", text)

def str_from_docx_path(docx_path: str) -> dict:
    """
    Get all text from a docx file.

    Args:
    - doc_path (str): The path to the docx file.

    Returns
    - dict: A dictionary with keys:
        - path (str): The path to the docx file.
        - status_ok (bool): True if the extraction was successful, False if not.
        - result (str): The extracted text if successful, the error message if not.

    """
    try:
        with open(docx_path, "rb") as file:
            
            document = Document(file)

            paras = [para.text for para in document.paragraphs]
        
            return {
                "path": docx_path,
                "status_ok": True,
                "result": "\n".join(paras)
            }

    except Exception as e:
        return {
            "path": docx_path,
            "status_ok": False,
            "result": str(e)
        }

def str_from_doc_path(doc_path: list) -> dict:
    """
    Get all text from a doc file.

    Args:
    - doc_path (str): The path to the doc file.

    Returns:
    - dict: A dictionary with keys:
        - path (str): The path to the doc file.
        - status_ok (bool): True if the extraction was successful, False if not.
        - result (str): The extracted text if successful, the error message if not.
    """
    try:
        text = textract.process(doc_path)
        return {
            "path": doc_path,
            "status_ok": True,
            "result": text.decode("utf-8")
        }
    
    except Exception as e:
        return {
            "path": doc_path,
            "status_ok": False,
            "result": str(e)
        }

def str_from_rtf_path(rtf_path: str) -> dict:
    """
    Get all text from a rtf file.

    Args:
    - rtf_path (str): The path to the rtf file.

    Returns
    - dict: A dictionary with keys:
        - path (str): The path to the rtf file.
        - status_ok (bool): True if the extraction was successful, False if not.
        - result (str): The extracted text if successful, the error message if not.

    """
    try:
        with open(rtf_path) as file:
            content = file.read()
            
            return {
                "path": rtf_path,
                "status_ok": True,
                "result": rtf_to_text(content)
            }

    except Exception as e:
        return {
            "path": rtf_path,
            "status_ok": False,
            "result": str(e)
        }

def str_from_file_path(file_path: str) -> dict:
    """
    Get all text from a file.

    Args:
    - file_path (str): The path to the file.

    Returns:
    - dict: A dictionary with keys:
        - path (str): The path to the file.
        - status_ok (bool): True if the extraction was successful, False if not.
        - result (str): The extracted text if successful, the error message if not.
    """
    
    file_ext = os.path.splitext(file_path)[1].lower()

    if file_ext == ".pdf":
        return str_from_pdf_path(file_path)
    elif file_ext == ".docx":
        return str_from_docx_path(file_path)
    elif file_ext == ".doc":
        return str_from_doc_path(file_path)
    elif file_ext == ".rtf":
        return str_from_rtf_path(file_path)
    else:
        return {
            "path": file_path,
            "status_ok": False,
            "result": f"Unsupported file type: {file_ext}"
        }

def txt_export_from_list_with_dics(input_list:list, output_folder: str) -> None:
    """
    Export the text from a DataFrame to text files.

    Args:
    - innput_list (list): A list of dictionaries with keys:
    - output_folder (str): The path to the output folder.
    - extension (str): The file extension to use for the output files.

    Returns:
    - None
    """
    df = pd.DataFrame(input_list, columns=["path", "status_ok", "result"])

    for _, row in df.iterrows():
        if row["status_ok"]:
            file_ext = os.path.splitext(row["path"])[1]
            output_path = os.path.join(output_folder, os.path.basename(row["path"]).replace(file_ext, ".txt"))
            with open(output_path, "w") as file:
                file.write(row["result"])

## Main

In [69]:
# Get file paths for supported file types
file_paths = file_paths_from_directory(input_folder, file_exts=[".pdf", ".docx", ".doc", ".rtf", ".txt"])

# Process files in list
all_results = [str_from_file_path(get_clean_file_name(file_path)) for file_path in file_paths]
txt_export_from_list_with_dics(all_results, output_folder)

# Copy txt files
txt_file_paths = [path for path in file_paths if path.endswith(".txt")]
for file_path in txt_file_paths:
    new_path = get_clean_file_name(file_path)
    os.rename(file_path, os.path.join(output_folder, os.path.basename(new_path)))

## JSON export

- The data is exported from the database in JSON format
- The text is split in chunks of 2000 characters
- The Elements are: file_path and text_section

In [70]:
all_text_files = file_paths_from_directory(output_folder, file_exts=[".txt"])

results = []

for file in all_text_files:
    with open(file) as f:
        text = f.read()

        length = len(text)
        for i in range(0, length, 2000):

            results.append({
                "file_path": os.path.basename(file),
                "text_section": text[i:i+2000]
            })

text_df = pd.DataFrame(results)
text_df.to_json(os.path.join(data_folder, "text_data.json"), orient="records", indent=4)