In [1]:

import warnings
import cv2
import time
import os
import sys
import pytesseract
import numpy as np
import torch
from datasets import DatasetDict
from PIL import Image

# zeige keine Warnungen an
warnings.filterwarnings("ignore")

module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)
    
from src.ocr_pipeline import OCRPreprocessor, OCRPostProcessor
from src.utils import rotate_image, pil_to_cv, from_cv_to_pil

torch.set_num_threads(1)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def delete_cache_files(directory: str) -> None:
    """
    Löscht alle Cache-Dateien mit der Endung '.arrow' im angegebenen Verzeichnis und seinen Unterverzeichnissen.
    
    Args:
        directory (str): Das Verzeichnis, in dem nach Cache-Dateien gesucht werden soll.
        
    Gibt aus:
        Die Gesamtgröße des freigegebenen Speicherplatzes in Gigabyte (GB).
    """
    total_freed_space = 0  # Variable zur Speicherung der gesamten freigegebenen Speichergröße in Bytes

    # Durchlaufen des Verzeichnisses und aller Unterordner
    for root, dirs, files in os.walk(directory):
        for file in files:
            # Überprüfen, ob der Dateiname "cache" enthält und mit ".arrow" endet
            if "cache" in file and file.endswith(".arrow"):
                file_path = os.path.join(root, file)  # Vollständigen Pfad der Datei erstellen
                file_size = os.path.getsize(file_path)  # Größe der Datei in Bytes ermitteln
                total_freed_space += file_size  # Speichergröße zur Gesamtsumme hinzufügen
                os.remove(file_path)  # Datei löschen
                print(f"Gelöscht: {file_path}")  # Bestätigung der gelöschten Datei ausgeben

    # Umrechnen der freigegebenen Speichergröße von Bytes in Gigabyte
    freed_space_gb = total_freed_space / (1024**3)

    # Ausgabe der zusammengefassten Informationen
    print(f"Insgesamt {freed_space_gb:.2f} GB Speicherplatz freigegeben.")

In [4]:
delete_cache_files("../data/interim_rgb")

Insgesamt 0.00 GB Speicherplatz freigegeben.


In [5]:
# Datensatz initialisieren
dataset = DatasetDict.load_from_disk("../data/interim_rgb")

In [6]:
from typing import Union
class OCRPipeline:
    def __init__(self, image: Union[np.ndarray, Image.Image]):
        """OCR-Pipeline zu Vorbereitung des Dokumentes, 
        Extraktion des Textes und Aufbereitung des extrahierten Textes.

        Args:
            Args:
            image (Union[np.ndarray, Image.Image]): Das Eingangsbild als NumPy-Array oder PIL.Image.Image.
        """
        self.raw_image = image
        self.preprocessed_image = None
        self.ocr_output = ""

    def preprocess(self) -> None:
        """Initialisiert und wendet den OCRPreprocessor an, speichert das verarbeitete Bild."""
        preprocessor = OCRPreprocessor(self.raw_image)
        preprocessor.cropping(buffer_size=10)
        preprocessor.to_gray()
        preprocessor.correct_skew()
        preprocessor.sharpen(kernel_type="laplace_standard")
        preprocessor.opening(kernel=(1,1), iterations=2)
        preprocessor.power_law_transform(gamma=2)
        self.preprocessed_image = preprocessor.get_image()

    def extract_text(self) -> None:
        """Wendet PyTesseract auf das vorverarbeitete Bild an und speichert den Text."""
        self.ocr_output = pytesseract.image_to_string(self.preprocessed_image)

    def postprocess(self) -> None:
        """Initialisiert und wendet den OCRPostProcessor auf den extrahierten Text an."""
        if self.ocr_output.strip():  # Prüft, ob `ocr_output` nicht leer ist
            postprocessor = OCRPostProcessor(self.ocr_output)
            # Anwenden verschiedener Methoden
            postprocessor.identify_language()
            postprocessor.remove_special_characters()
            postprocessor.lowercase()
            postprocessor.remove_stopwords()
            postprocessor.remove_extra_spaces()
            
            # Aufbereiteten OCR-Output extrahieren
            self.ocr_output = postprocessor.get_text()
        else:
            self.ocr_output = "no text found in document image with ocr!"

    def get_output(self):
        """Gibt den aufbereiteten OCR-Output zurück."""
        return self.ocr_output

In [7]:
def apply_ocr_to_dataset(dataset: DatasetDict) -> DatasetDict:
    """
    Diese Methode wendet die OCR (Optical Character Recognition) auf alle Bilder in jedem Split (train, validation, test) eines Huggingface-Datensatzes an und fügt ein neues Feature hinzu, das den erkannten Text enthält.
    """
    def add_ocr_text(example: dict) -> dict:
        image = example['image']
            
        ocr_pipeline = OCRPipeline(image)
            
        ocr_pipeline.preprocess()

        ocr_pipeline.extract_text()
            
        ocr_pipeline.postprocess()

        example['text'] = ocr_pipeline.get_output()
            
        return example
    
     # Anwenden der Funktion auf jeden Split im Datensatz
    #for split in dataset.keys():
    dataset = dataset.map(add_ocr_text, keep_in_memory=False, batch_size=1)
            
    return dataset

In [8]:
test_dataset = apply_ocr_to_dataset(dataset["test"])

Map:  14%|█▍        | 72/523 [03:25<19:18,  2.57s/ examples]

In [None]:
#%%time
#processed_dataset = apply_ocr_to_dataset(dataset)

In [None]:
processed_dataset["test"][0]

In [None]:
processed_dataset["test"][0]

### Prüfen ob kein string im Feature "Text" leer ist in allen drei Datensätzen

In [None]:
# Funktion, die prüft, ob der Text leer ist
def is_empty_string(example):
    return example["text"] == ""

# Zählen der leeren Strings in jedem Split
empty_counts = {}
for split in dataset.keys():
    empty_count = sum(1 for example in dataset[split] if is_empty_string(example))
    empty_counts[split] = empty_count

# Ausgabe der Ergebnisse
for split, count in empty_counts.items():
    print(f"Anzahl der leeren Strings im '{split}'-Split: {count}")

In [None]:
processed_dataset.save_to_disk("../data/processed")