In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import torch
import nltk
import json
import sys
import re
import os

from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer
from tqdm import tqdm
from dotenv import load_dotenv
from huggingface_hub import login
from nltk.tokenize import sent_tokenize


In [None]:
load_dotenv()
path = os.environ['DATA_PATH']
os.environ["ACCELERATE_USE_TORCH_DEVICE"] = "true"
login(token=os.environ['HF_TOKEN'])
nltk.download('punkt')
cleaned_fn = "cleaned.json"
ocred_fn = "original_ocr.json"

In [None]:
device = torch.device("cuda")
print(torch.cuda.get_device_name(0))
print("Supports float16:", torch.cuda.is_available())
print("Supports bfloat16:", torch.cuda.is_bf16_supported())

In [None]:
minervaId = "sapienzanlp/Minerva-1B-base-v1.0"
# minervaId = "sapienzanlp/Minerva-3B-base-v1.0"

tokenizer = AutoTokenizer.from_pretrained(minervaId)
# tokenizer.add_special_tokens({'pad_token': '[PAD]'})
tokenizer.pad_token = tokenizer.eos_token

model = AutoModelForCausalLM.from_pretrained(minervaId, torch_dtype=torch.float16).to(device)

In [None]:
datasetRaw = {}

for i in [ocred_fn, cleaned_fn]:
    if i not in os.listdir(path):
        print(f"ERROR 404 ! File {i} not Found...")

    file_path = os.path.join(path, i)
    with open(file_path, 'r') as f:
        datasetRaw[i.split('.')[0]] = json.load(f)
        f.close()

In [None]:
sample = datasetRaw['original_ocr']['1']

sentences = sent_tokenize(sample)

print(len(sentences))

adaptor = lambda x: f"""
Sei un esperto di lingua italiana.
Questo testo contiene errori dovuti al fatto che è stato estratto da un immagine.
Correggi il testo mantenendo il più possibile le parole originali.
Non inserire elenchi o numerazioni.
Non aggiungere commenti o testo extra dopo la correzione.
Testo originale: {x}
Testo corretto:
"""
prompts = [adaptor(sentence) for sentence in sentences]
input_tensor = tokenizer(prompts, return_tensors='pt', padding=True, truncation=True).to(device)


In [None]:
print(input_tensor["input_ids"])
print("Max token ID:", input_tensor["input_ids"].max().item())
print("Tokenizer vocab size:", tokenizer.vocab_size)

In [None]:
output_tensor = model.generate(
    input_tensor["input_ids"],
    attention_mask=input_tensor["attention_mask"],
    max_new_tokens=512,
    repetition_penalty=1.2,
    do_sample=False,
    num_beams=3,
    num_return_sequences=1,
    pad_token_id=tokenizer.eos_token_id 
)

In [None]:
text_extracted = [tokenizer.decode(res, skip_special_tokens=True) for res in output_tensor]
for i, out in enumerate(text_extracted, 1):
    print(f"Output {i}:\n{out}\n{'-'*40}")

## Test

In [None]:
def number_words(sentence):
    sentence = sentence.split()
    return len(sentence)

In [None]:
# === 1. Setup modello ===
model_name = "sapienzanlp/Minerva-1B-base-v1.0"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name, device_map='cuda', torch_dtype=torch.float16
)

generator = pipeline("text-generation", model=model, tokenizer=tokenizer, pad_token_id=tokenizer.eos_token_id)

# === 2. Token matching check ===
# def token_match_ratio(input_text, output_text):
#     in_words = input_text.lower().split()
#     out_words = output_text.lower().split()
#     matches = sum(1 for w in out_words if w in in_words)
#     return matches / max(len(in_words), len(out_words))
import difflib

def token_match_ratio(input_text, output_text):
    in_words = input_text.lower().split()
    out_words = output_text.lower().split()

    matches = sum(1 for w in out_words if w in in_words)
    base_ratio = matches / max(len(in_words), len(out_words))

    # Verifica che ci sia almeno una parola diversa
    is_modified = any(i != o for i, o in zip(in_words, out_words)) or len(in_words) != len(out_words)

    return base_ratio, is_modified


# def regenerate_until_fidelity(text, generator, attempts=5, fidelity_threshold=0.7):
#     prompt = (
#         f"Correggi solo gli errori OCR senza cambiare la struttura. "
#         f"Testo OCR: {text}\nTesto corretto:"
#     )
# 
#     # Genera più output in parallelo
#     outputs = generator(
#         prompt,
#         max_new_tokens=len(text.split()) + 10,
#         do_sample=True,
#         top_k=50,
#         temperature=0.7,
#         num_return_sequences=attempts
#     )
# 
#     best_result = None
#     best_ratio = 0.0
# 
#     for o in outputs:
#         generated = o["generated_text"].split("Testo corretto:")[-1].strip().split("\n")[0]
#         ratio = token_match_ratio(text, generated)
# 
#         if ratio > best_ratio:
#             best_result = generated
#             best_ratio = ratio
# 
#         if ratio >= fidelity_threshold:
#             break  # early stop se già buono
# 
#     return {
#         "input": text,
#         "output": best_result,
#         "fidelity": round(best_ratio, 2),
#         "ok": best_ratio >= fidelity_threshold
#     }

def regenerate_until_fidelity(text, generator, attempts=5, fidelity_threshold=0.7):
    prompt = (
        f"Correggi solo gli errori OCR senza cambiare la struttura. "
        f"Testo OCR: {text}\nTesto corretto:"
    )

    outputs = generator(
        prompt,
        max_new_tokens=len(text.split()) + 10,
        do_sample=False
    )

    best_result = None
    best_ratio = 0.0

    for o in outputs:
        generated = o["generated_text"].split("Testo corretto:")[-1].strip().split("\n")[0]
        ratio, modified = token_match_ratio(text, generated)

        # Deve essere sia sufficientemente simile sia modificato
        if ratio > best_ratio and modified:
            best_result = generated
            best_ratio = ratio

        if ratio >= fidelity_threshold and modified:
            break  # early stop su buon output

    return {
        "input": text,
        "output": best_result if best_result else text,
        "fidelity": round(best_ratio, 2),
        "ok": best_ratio >= fidelity_threshold and best_result != text
    }


# === 3. Funzione principale con controllo fedeltà
def correct_with_minerva_and_check(texts, fidelity_threshold=0.6):
    results = []
    for text in texts:
        #prompt = (
        #    f"Correggi solo gli errori OCR senza cambiare la struttura. "
        #    f"Testo OCR: {text}\nTesto corretto:"
        #)
        #output = generator(prompt, max_new_tokens=len(text.split()) + 10, do_sample=False)[0]["generated_text"]
        #corrected = output.split("Testo corretto:")[-1].strip().split("\n")[0]

        #ratio = token_match_ratio(text, corrected)

        #results.append({
        #    "input": text,
        #    "output": corrected,
        #    "fidelity": round(ratio, 2),
        #    "ok": ratio >= fidelity_threshold
        #})
        result = regenerate_until_fidelity(text, generator, attempts=5)
        results.append(result)
    return results

# === 4. Esempio
ocr_text = "Qvando il sole sorge, le ombrre svannos noel silenzio del matino."
results = correct_with_minerva_and_check([ocr_text])

# === 5. Stampa con feedback
for r in results:
    print("Input:  ", r["input"])
    print("Output: ", r["output"])
    print("Fidelity ratio:", r["fidelity"])
    print("🟢 Accettato" if r["ok"] else "🔴 Da rigenerare")

## Test

In [None]:
def number_words(sentence):
    sentence = sentence.split()
    return len(sentence)

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch

# === 1. Carica modello Minerva ===
model_name = "sapienzanlp/Minerva-1B-base-v1.0"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, device_map='cuda', torch_dtype=torch.float16)

generator = pipeline("text-generation", model=model, tokenizer=tokenizer, pad_token_id=tokenizer.eos_token_id)

# === 3. Funzione corretta per OCR fixing ===
def correct_with_minerva(texts):
    results = []
    for text in texts:
        prompt = (
            f"Correggi gli errori OCR senza cambiare la struttura del testo. "
            f"Non riscrivere. Correggi solo lettere errate. "
            f"Testo OCR: {text} "
            f"Testo corretto:"
        )
        output = generator(prompt, max_new_tokens=number_words(prompt) + number_words(text), do_sample=False)[0]["generated_text"]
        
        # Estrai solo il contenuto generato dopo "Testo corretto:"
        cleaned = output.split("Testo corretto:")[-1].strip()
        
        # Stop eventuale se modello prolunga troppo
        if "\n" in cleaned:
            cleaned = cleaned.split("\n")[0]
        
        results.append({"input": text, "output": cleaned})
    return results

# === 4. Test ===
ocr_text = "Qvando il sole sorge, le ombrre svannos noel silenzio del matino."
corrected_data = correct_with_minerva([ocr_text])

# === 5. Stampa risultato ===
for clean in corrected_data:
    print(f"Input:  {clean['input']}")
    print(f"Output: {clean['output']}")

Non so perchè ma su un'altro notebook, stesse settings e prompt stampava:

===========================================================================

Device set to use cuda  
Input:  Qvando il sole sorge, le ombrre svannos noel silenzio del matino.  
Output: Il sole sorge, le ombre svaniscono.

===========================================================================

In [None]:
# === 1. Setup modello ===
model_name = "sapienzanlp/Minerva-1B-base-v1.0"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name, device_map='cuda', torch_dtype=torch.float16
)

generator = pipeline("text-generation", model=model, tokenizer=tokenizer, pad_token_id=tokenizer.eos_token_id)

# === 2. Token matching check ===
# def token_match_ratio(input_text, output_text):
#     in_words = input_text.lower().split()
#     out_words = output_text.lower().split()
#     matches = sum(1 for w in out_words if w in in_words)
#     return matches / max(len(in_words), len(out_words))
import difflib

def token_match_ratio(input_text, output_text):
    in_words = input_text.lower().split()
    out_words = output_text.lower().split()

    matches = sum(1 for w in out_words if w in in_words)
    base_ratio = matches / max(len(in_words), len(out_words))

    # Verifica che ci sia almeno una parola diversa
    is_modified = any(i != o for i, o in zip(in_words, out_words)) or len(in_words) != len(out_words)

    return base_ratio, is_modified


# def regenerate_until_fidelity(text, generator, attempts=5, fidelity_threshold=0.7):
#     prompt = (
#         f"Correggi solo gli errori OCR senza cambiare la struttura. "
#         f"Testo OCR: {text}\nTesto corretto:"
#     )
# 
#     # Genera più output in parallelo
#     outputs = generator(
#         prompt,
#         max_new_tokens=len(text.split()) + 10,
#         do_sample=True,
#         top_k=50,
#         temperature=0.7,
#         num_return_sequences=attempts
#     )
# 
#     best_result = None
#     best_ratio = 0.0
# 
#     for o in outputs:
#         generated = o["generated_text"].split("Testo corretto:")[-1].strip().split("\n")[0]
#         ratio = token_match_ratio(text, generated)
# 
#         if ratio > best_ratio:
#             best_result = generated
#             best_ratio = ratio
# 
#         if ratio >= fidelity_threshold:
#             break  # early stop se già buono
# 
#     return {
#         "input": text,
#         "output": best_result,
#         "fidelity": round(best_ratio, 2),
#         "ok": best_ratio >= fidelity_threshold
#     }

def regenerate_until_fidelity(text, generator, attempts=5, fidelity_threshold=0.7):
    prompt = (
        f"Correggi solo gli errori OCR senza cambiare la struttura. "
        f"Testo OCR: {text}\nTesto corretto:"
    )

    outputs = generator(
        prompt,
        max_new_tokens=len(text.split()) + 10,
        do_sample=False
    )

    best_result = None
    best_ratio = 0.0

    for o in outputs:
        generated = o["generated_text"].split("Testo corretto:")[-1].strip().split("\n")[0]
        ratio, modified = token_match_ratio(text, generated)

        # Deve essere sia sufficientemente simile sia modificato
        if ratio > best_ratio and modified:
            best_result = generated
            best_ratio = ratio

        if ratio >= fidelity_threshold and modified:
            break  # early stop su buon output

    return {
        "input": text,
        "output": best_result if best_result else text,
        "fidelity": round(best_ratio, 2),
        "ok": best_ratio >= fidelity_threshold and best_result != text
    }


# === 3. Funzione principale con controllo fedeltà
def correct_with_minerva_and_check(texts, fidelity_threshold=0.6):
    results = []
    for text in texts:
        #prompt = (
        #    f"Correggi solo gli errori OCR senza cambiare la struttura. "
        #    f"Testo OCR: {text}\nTesto corretto:"
        #)
        #output = generator(prompt, max_new_tokens=len(text.split()) + 10, do_sample=False)[0]["generated_text"]
        #corrected = output.split("Testo corretto:")[-1].strip().split("\n")[0]

        #ratio = token_match_ratio(text, corrected)

        #results.append({
        #    "input": text,
        #    "output": corrected,
        #    "fidelity": round(ratio, 2),
        #    "ok": ratio >= fidelity_threshold
        #})
        result = regenerate_until_fidelity(text, generator, attempts=5)
        results.append(result)
    return results

# === 4. Esempio
ocr_text = "Qvando il sole sorge, le ombrre svannos noel silenzio del matino."
results = correct_with_minerva_and_check([ocr_text])

# === 5. Stampa con feedback
for r in results:
    print("Input:  ", r["input"])
    print("Output: ", r["output"])
    print("Fidelity ratio:", r["fidelity"])
    print("🟢 Accettato" if r["ok"] else "🔴 Da rigenerare")