Copyright (c) 2025 Christian Oechler

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 

## Installation of the necessary python packages

In [None]:
%pip install torch torchvision torchaudio
%pip install opencv-python
%pip install transformers==4.50.1
%pip install tqdm

## Importing the necessary python packages

In [None]:
import cv2
import json
import re
import torch
import random

from pathlib import Path
from PIL import Image
from transformers import AutoProcessor, AutoModelForCausalLM 
from tqdm.notebook import tqdm
from typing import Tuple, List

## Definition of the ocr machine

In [None]:
class OCRMachine():
    def __init__(self):
        self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
        self.torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

        if self.device == "cpu":
            print("ACHTUNG: Keine Beschleunigung durch eine Grafikkarte verfügbar!")
        
        self.model = AutoModelForCausalLM.from_pretrained("microsoft/Florence-2-base",
                                                            torch_dtype=self.torch_dtype,
                                                            trust_remote_code=True,
                                                            cache_dir="models/"
                                                           ).to(self.device)
        
        self.processor = AutoProcessor.from_pretrained("microsoft/Florence-2-base", trust_remote_code=True, cache_dir="models/")


    def do_ocr(self, frame) -> str:
        # Convert the image from BGR (OpenCV default) to RGB (PIL default)
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Loads the image from the array
        image = Image.fromarray(image_rgb)

        prompt = "<OCR>"
        inputs = self.processor(text=prompt, images=image, return_tensors="pt").to(self.device, self.torch_dtype)
        generated_ids = self.model.generate(input_ids=inputs["input_ids"],
                                                      pixel_values=inputs["pixel_values"],
                                                      max_new_tokens=4096,
                                                      num_beams=3,
                                                      do_sample=False
                                                     )
                
        generated_text = self.processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
                
        output = self.processor.post_process_generation(generated_text, task="<OCR>", image_size=(image.width, image.height))
                
        return output["<OCR>"]

## Defination of the helper functions

In [None]:
def test_video(ocr_machine, path_to_videofile) -> List[str|Tuple]:
    if isinstance(path_to_videofile, str):
        video_file = path_to_videofile
    else:
        video_file = str(path_to_videofile)

    # Saves the errors
    errors = []

    
    # Test the extraction of the unique video id
    try:
        # Get the unique video id from the filename
        match = re.search(r'\((\d+)\)', str(video_file))

        # Guard which throws an expetion if id could not be extracted from filename
        if match is None:
            raise ValueError()

        # Saves the metadata of the video file
        video_id = match.group(1)

    except Exception:
        error_message = f'Konnte aus dem Path {video_file} nicht die eindeutige ID auslesen.'
        errors.append(error_message)


    
    # Test the extraction of the date and the time
    text = ""
    cropped_frame_date = None
    try:
        cap = cv2.VideoCapture(f'{video_file}')
        cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
        ret, frame = cap.read()
        
        # Calculate the ratio between weidth and hight to dectect the video orientation
        height, width, _ = frame.shape
        image_ratio = width / height
    
        # Crops the frame relevat frame parts of the date and the cam
        if image_ratio > 1:
            cropped_frame_date = frame[:150, 1300:]
        else:
            cropped_frame_date = frame[0:250, 400:]

        text = machine.do_ocr(cropped_frame_date)

        pattern = r"(\d{2}-\d{2}-\d{4})[\s-](\d{2}:\d{2}:\d{2})"
    
        match = re.match(pattern, text)
    
        if not match:
            raise ValueError()
    
        assert len(match.groups()) == 2, f'Es wurden nicht genügend Daten aus dem Datums-Text extrahiert. Der Text lautete: {text}'
    
        # Guard if nothing was matched
        if not match:
            raise ValueError()
        
        date = match.group(1)
        time = match.group(2)
   
    except Exception:
        error_message = f'Das Datum oder die Zeit konnte nicht ausgelesen werden. Der OCR-Text lautet: {text}'
        errors.append(error_message)
        error_message = ("date", cropped_frame_date)
        errors.append(error_message)



    # Test the extraction of the camera
    text = ""
    cropped_frame_cam = None
    try:
        cap = cv2.VideoCapture(f'{video_file}')
        cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
        ret, frame = cap.read()
        
        # Calculate the ratio between weidth and hight to dectect the video orientation
        height, width, _ = frame.shape
        image_ratio = width / height
    
        # Crops the frame relevat frame parts of the date and the cam
        if image_ratio > 1:
            cropped_frame_cam = frame[950:, 0:300]
        else:
            cropped_frame_cam = frame[1700:, :370]

        text = machine.do_ocr(cropped_frame_cam)
        
    except Exception:
        error_message = f'Das Datum oder die Zeit konnte nicht ausgelesen werden. Der OCR-Text lautet: {text}'
        errors.append(error_message)
        error_message = ("cam", cropped_frame_cam)
        errors.append(error_message)


    return errors

### Setup the system

In [None]:
# Directory with the video files
MAIN_DIRECTORY_VIDEO_FILES = "videos"

# Directory with the test results
TEST_RESULT_DIRECTORY = "test_results"

In [None]:
machine = OCRMachine()

### Determine the video files for testing

In [None]:
main_directory_path = Path(MAIN_DIRECTORY_VIDEO_FILES)

test_video_filelist: List[Path] = []

# Iteration through the main video directory
with tqdm(list(main_directory_path.iterdir()), desc="Fortschritt", unit="Datei") as pbar:
    for i, video_directory in enumerate(pbar):
        # Updates the pogress bar
        pbar.set_postfix_str(f"Verzeichnis :{video_directory.name}")
        
        # Saves all video files in the directory in a list 
        files_in_directory: List[Path] = list(video_directory.iterdir())
    
        # Guard to prevent further processing if directory is empty
        if len(files_in_directory) < 1:
            continue
    
        # Calculates the end index
        end_index = len(files_in_directory) - 1
    
        # Get the index of a randomly selected file
        random_selected_index = random.randint(0, end_index)
    
        # Saves file path to the test list of video file
        test_video_filelist.append(files_in_directory[random_selected_index])

## Test the video files

In [None]:
test_result_directory_path = Path(TEST_RESULT_DIRECTORY)
test_result_directory_path.mkdir(parents=True, exist_ok=True)

with tqdm(test_video_filelist, desc="Fortschritt", unit="Datei") as pbar:
    for i, video_file in enumerate(pbar):
        pbar.set_postfix_str(f"{video_file.name}")
        
        result = test_video(machine, video_file)

        if len(result) == 0:
            print(f'\033[92m[OK]\033[0m {video_file}')
        else:
            print(f'\033[91m[ERROR]\033[0m {video_file}')

            with open(f'{test_result_directory_path}/error.txt', 'a') as file:
                file.write("--------------------\n")
                file.write(f'{video_file} \n')

                for error in result:
                    if isinstance(error, str):
                        file.write(f'{error}\n')
                
                file.write("--------------------\n")

            for error in result:
                if isinstance(error, tuple):
                    name, image = error
                    im = Image.fromarray(image)
                    filepath = f'{test_result_directory_path}/{video_file.name}_{name}.jpg'
                    im.save(filepath)