In [None]:
import os
import azure.ai.vision as sdk
import json
from PIL import Image, ImageDraw
from tqdm import tqdm
import azure.ai.vision as sdk
from sklearn.cluster import KMeans
import numpy as np
from sklearn.metrics import silhouette_score
RSEED = 42


service_options = sdk.VisionServiceOptions(
    os.environ["VISION_ENDPOINT"], os.environ["VISION_KEY"])


def create_prediction_from_file(filename):
    """     
    Performs an image analysis using the vision analysis service.

    :param filename: str
        The path of the image file to be analyzed.

    :return: dict
        A dictionary containing the result of the image analysis in JSON format.

    Example:
    >>> file_path = "path/to/image.jpg"
    >>> result_json = create_prediction_from_file(file_path)
    >>> print(result_json)
    """
    vision_source = sdk.VisionSource(filename=filename)
    analysis_options = sdk.ImageAnalysisOptions()

    analysis_options.model_name = "historicalink13"
    analysis_options.language = "es"
    analysis_options.gender_neutral_caption = True
    image_analyzer = sdk.ImageAnalyzer(
        service_options, vision_source, analysis_options)

    result = image_analyzer.analyze()

    if result.reason == sdk.ImageAnalysisResultReason.ANALYZED:

        result_details = sdk.ImageAnalysisResultDetails.from_result(result)
        return result_details.json_result

    else:
        error_details = sdk.ImageAnalysisErrorDetails.from_result(result)
        print(" Analysis failed.")
        print("   Error reason: {}".format(error_details.reason))
        print("   Error code: {}".format(error_details.error_code))
        print("   Error message: {}".format(error_details.message))
        return {}

In [None]:

def kmeans_best(X, min_cols=2, max_cols=4):
    """
    Performs the K-Means algorithm with different numbers of clusters and returns the labels
    of the best model according to the silhouette coefficient.

    :param X: array-like or pd.DataFrame
        The input data for the K-Means algorithm.

    :param min_cols: int, optional, default: 2
        The minimum number of clusters to try.

    :param max_cols: int, optional, default: 4
        The maximum number of clusters to try.

    :return: array
        The labels of the best K-Means model according to the silhouette coefficient.

    Example:
    >>> from sklearn.datasets import make_blobs
    >>> X, _ = make_blobs(n_samples=300, centers=3, random_state=42)
    >>> best_labels = kmeans_best(X, min_cols=2, max_cols=5)
    >>> print(best_labels)
    """
    sil_score_max = -1

    for n_clusters in range(min_cols, max_cols+1):
        model = KMeans(n_clusters=n_clusters, init='k-means++',
                       max_iter=100, n_init=1)
        labels = model.fit_predict(X)
        sil_score = silhouette_score(X, labels)
        if sil_score > sil_score_max:
            sil_score_max = sil_score
            best_labels = labels
    return best_labels


flog = open("logfile.log", "a")


def ocr(filename):
    """
    Performs OCR analysis on an image using Azure Computer Vision.

    :param filename: str
        The path of the image file to be analyzed.

    :return: list
        A list of dictionaries containing information about the detected text.
        Each dictionary has the keys 'id', 'text', 'bounding_box', and 'center'.

    Example:
    >>> from azure.cognitiveservices.vision.computervision import ComputerVisionClient
    >>> from msrest.authentication import CognitiveServicesCredentials
    >>> service_options = ComputerVisionClient(endpoint="YOUR_ENDPOINT", credentials=CognitiveServicesCredentials("YOUR_SUBSCRIPTION_KEY"))
    >>> flog = open("error_log.txt", "w")
    >>> filename = "path/to/image.jpg"
    >>> result_text = ocr(filename, service_options, flog)
    >>> print(result_text)
    """

    vision_source = sdk.VisionSource(
        filename=filename)
    analysis_options = sdk.ImageAnalysisOptions()
    analysis_options.features = (
        sdk.ImageAnalysisFeature.TEXT
    )
    analysis_options.language = "es"
    analysis_options.gender_neutral_caption = True
    image_analyzer = sdk.ImageAnalyzer(
        service_options, vision_source, analysis_options)
    result = image_analyzer.analyze()
    texts = []
    polygons = []
    if result.reason == sdk.ImageAnalysisResultReason.ANALYZED:
        if result.text is not None:
            for line in result.text.lines:
                texts.append(line.content)
                polygons.append(line.bounding_polygon)
    else:
        error_details = sdk.ImageAnalysisErrorDetails.from_result(result)
        print(" Analysis failed.")
        print("   Error reason: {}".format(error_details.reason))
        print("   Error code: {}".format(error_details.error_code))
        print("   Error message: {}".format(error_details.message))
        print(
            f"ERROR EN IMAGEN {filename} {error_details.reason} {error_details.message} {error_details.error_code}")
        flog.write(
            f"ERROR EN IMAGEN {filename} {error_details.reason} {error_details.message} {error_details.error_code}\n")
    textos = []
    if len(polygons) > 5:
        data = np.array(polygons)
        data_text = np.array(texts)
        first_col = data[:, 0].reshape((len(data), 1))
        indices_nn = kmeans_best(first_col, 2, 5)
        labels = np.unique(indices_nn)
        for i, label in enumerate(labels):
            idx = indices_nn == label
            texto = ""
            textos_columna = data_text[idx]
            coords_columna = data[idx]
            for t in textos_columna:
                t = t.strip()
                if "-" in t[-1]:
                    texto += t[:-1] + ""
                else:
                    texto += t + " "

            minimos = np.min(coords_columna, axis=0)
            maximos = np.max(coords_columna, axis=0)
            coords = np.array([minimos, maximos])
            minx = np.min(minimos[::2])
            miny = np.min(minimos[1::2])

            maxx = np.max(maximos[::2])
            maxy = np.max(maximos[1::2])
            textos.append(
                {"id": i, "text": texto, "bounding_box": [minx, miny, maxx, maxy], "center": [(minx + maxx)/2, (miny + maxy)/2]})
    return textos


def dist(p0, p1):
    """
    Calculates the Euclidean distance between two points in space.

    :param p0: array-like
        Coordinates of the first point.

    :param p1: array-like
        Coordinates of the second point.

    :return: float
        The Euclidean distance between the two points.

    Example:
    >>> point1 = [1, 2, 3]
    >>> point2 = [4, 5, 6]
    >>> distance = dist(point1, point2)
    >>> print(distance)
    5.196152422706632
    """
    return np.linalg.norm(np.array(p0)-np.array(p1))


def process_image_from_predictions(img_path, predictionsJSON, dest_folder, th=0.45, metadata=None):
    """
    Crops regions of interest (ROI) from an image according to the predictions of a custom model.
    The regions of interest include objects with confidence greater than or equal to a given threshold.

    :param img_path: str
        Path of the input image.

    :param predictionsJSON: dict
        Prediction results from the model in JSON format.

    :param dest_folder: str
        Destination folder to save the cropped images and the JSON results.

    :param th: float, optional, default: 0.45
        Confidence threshold to include an object in the regions of interest.

    :param metadata: dict, optional, default: None
        Additional metadata that can be included in the JSON result.

    :return: dict
        A dictionary containing information about the cropped images, detected texts, and metadata.
        A JSON file with this dictionary is saved in the destination folder.

    Example:
    >>> img_path = "path/to/image.jpg"
    >>> predictionsJSON = create_prediction_from_file(file_path)
    >>> dest_folder = "destination/path"
    >>> metadata = {"author": "John Doe"}
    >>> result_dict = process_image_from_predictions(img_path, predictionsJSON, dest_folder, th=0.5, metadata=metadata)
    >>> print(result_dict)
    """
    im = Image.open(img_path)
    data = predictionsJSON["customModelResult"]["objectsResult"]["values"]
    original_size = int(predictionsJSON["metadata"]["width"]), int(
        predictionsJSON["metadata"]["height"])
    only_text = Image.new(mode="RGB", size=original_size, color="white")

    basename_noextension = os.path.basename(img_path).split(".")[0]
    hay_texto = False
    images_bb = []
    imdraw = ImageDraw.Draw(only_text)
    images = []
    for i, segment in enumerate(data):
        confidence = segment["tags"][0]["confidence"]
        if confidence >= th:
            x = segment["boundingBox"]["x"]
            y = segment["boundingBox"]["y"]
            w = segment["boundingBox"]["w"]
            h = segment["boundingBox"]["h"]
            left = x
            top = y
            right = x + w
            bottom = y + h
            imi = im.crop((left, top, right, bottom))

            if "image" in segment["tags"][0]["name"].lower():
                imi.save(
                    f"{dest_folder}/images/{basename_noextension}_{i}.jpg")
                images_bb.append([left, top, right, bottom])
                image = {}
                image["center"] = [(left+right)/2, (top+bottom)/2]
                image["bounding_box"] = [left, top, right, bottom]
                image["context"] = []
                image["filename"] = f"{dest_folder}/images/{basename_noextension}_{i}.jpg"
                images.append(image)
            else:
                hay_texto = True
                only_text.paste(imi, (int(left), int(top)))
    list_textos = []
    if hay_texto:
        for left, top, right, bottom in images_bb:
            imdraw.rectangle(
                [left, top, right, bottom], fill=None, outline="red")
        only_text.save(
            f"{dest_folder}/text/{basename_noextension}.jpg")
        list_textos = ocr(
            f"{dest_folder}/text/{basename_noextension}.jpg")
        for te in list_textos:
            left, top, right, bottom = te["bounding_box"]
            imdraw.rectangle(
                [left, top, right, bottom], fill=None, outline="blue")
            distancias = []
            for im in images:
                dista = dist(te["center"], im["center"])
                distancias.append(dista)
            if distancias:
                id_img = np.argmin(distancias)
                images[id_img]["context"].append(te["id"])

        only_text.save(
            f"{dest_folder}/text/{basename_noextension}.jpg")

    metadata = metadata or {}
    dictio = {"metadata": metadata, "contexts": list_textos, "images": images}
    open(f"{dest_folder}/{basename_noextension}.json",
         "w", encoding="utf-8").write(json.dumps(dictio, ensure_ascii=False))
    return dictio

In [None]:
import pandas as pd


df = pd.read_excel("Inventario caricaturas prensa SXIX.xlsx")
df = df[["IDPublicación", "Año", "Ciudad"]]
df.dropna(inplace=True)
df["ID"] = df["IDPublicación"].str.replace(" - ", "")
df.set_index("ID", inplace=True)
del df["IDPublicación"]

In [None]:
"""
Processes newspaper data stored in an input folder, makes predictions, and saves results in an output folder.

:param input_folder: str
    Path of the folder containing the newspaper data.

:param output_folder: str
    Path of the output folder to save the results.
"""

OUTPUT = "OUTPUT"

try:

    os.mkdir(f"{OUTPUT}")

except Exception as e:
    pass

INPUT_FOLDER = "data"

newspapers = os.listdir(INPUT_FOLDER)

In [5]:
import numpy as np
OUTPUT_FOLDER_ISSUE = r"OUTPUT\PD168_El oso\1_results"


def asign_lost_contexts(output_folder_issue):
    pages = os.listdir(output_folder_issue)
    pages.remove("images")
    pages.remove("text")
    pages_numbers = [int(i.split("_")[-1].split(".")[0]) for i in pages]
    idxs = np.argsort(pages_numbers)
    pages = np.array(pages)[idxs]
    context_voids = []
    full_issue = {}
    for page in pages:
        filename = f"{output_folder_issue}/{page}"
        page_info = json.load(open(filename, encoding="utf-8"))
        full_issue[page] = page_info
        images = page_info["images"]
        page_number = page_info["metadata"]["page"]
        for image in images:
            context_voids.append(page)
            original_context = image["context"]
            new_context = []
            for ctx in original_context:
                new_context.append(f"{page_number}_{ctx}")
            image["context"] = new_context
    if context_voids:
        page = context_voids[0]
        page_number_parsed = int(context_voids[0].split("_")[-1].split(".")[0])
        n_page = context_voids[-1]
        page_number_parsed_r = int(
            context_voids[-1].split("_")[-1].split(".")[0])
        paginas_inicio_sin_foto = pages[:page_number_parsed]
        paginas_final_sin_foto = pages[page_number_parsed_r+1:]
        first_page = full_issue[page]
        last_page = full_issue[n_page]
        for paaaa in paginas_inicio_sin_foto:
            page_number = full_issue[paaaa]["metadata"]["page"]
            contexts = full_issue[paaaa]["contexts"]
            first_page["images"][0]["context"].extend(
                                [f"{page_number}_{i['id']}" for i in contexts])
        for paaaa in paginas_final_sin_foto:
            page_number = full_issue[paaaa]["metadata"]["page"]
            contexts = full_issue[paaaa]["contexts"]
            last_page["images"][-1]["context"].extend(
                [f"{page_number}_{i['id']}" for i in contexts])

        for i in range(len(context_voids)-1):
            page = context_voids[i]
            page_number_parsed = int(
                context_voids[i].split("_")[-1].split(".")[0])
            n_page = context_voids[i+1]
            page_number_parsed_r = int(
                context_voids[i+1].split("_")[-1].split(".")[0])
            left_page = full_issue[page]
            right_page = full_issue[n_page]

            paginas_void = pages[page_number_parsed+1:page_number_parsed_r]
            n = len(paginas_void)
            if n != 0:
                if n % 2 == 0:
                    mitad1 = paginas_void[:n//2]
                    mitad2 = paginas_void[n//2:]
                    for pag in mitad1:
                        page_info = full_issue[pag]
                        page_number = page_info["metadata"]["page"]
                        contexts = page_info["contexts"]
                        left_page["images"][-1]["context"].extend(
                            [f"{page_number}_{i['id']}" for i in contexts])
                    for pag in mitad2:
                        page_info = full_issue[pag]
                        page_number = page_info["metadata"]["page"]
                        contexts = page_info["contexts"]
                        right_page["images"][0]["context"].extend(
                            [f"{page_number}_{i['id']}" for i in contexts])
                else:
                    mitad1 = paginas_void[:n//2]
                    mitad2 = paginas_void[n//2:][1:]
                    medio = paginas_void[n//2]
                    for pag in mitad1:
                        page_info = full_issue[pag]
                        page_number = page_info["metadata"]["page"]
                        contexts = page_info["contexts"]
                        left_page["images"][-1]["context"].extend(
                            [f"{page_number}_{i['id']}" for i in contexts])
                    for pag in mitad2:
                        page_info = full_issue[pag]
                        page_number = page_info["metadata"]["page"]
                        contexts = page_info["contexts"]
                        right_page["images"][0]["context"].extend(
                            [f"{page_number}_{i['id']}" for i in contexts])
                    page_info = full_issue[medio]
                    contexts = page_info["contexts"]
                    m = len(contexts)
                    page_number_medio = page_info["metadata"]["page"]
                    left_page["images"][-1]["context"].extend(
                        [f"{page_number_medio}_{i['id']}" for i in contexts[m//2:]])
                    right_page["images"][0]["context"].extend(
                        [f"{page_number_medio}_{i['id']}" for i in contexts[:m//2]])
        for name, dic in full_issue.items():
            filename = f"{output_folder_issue}/{name}"
            json.dump(dic, open(filename, "w", encoding="utf-8"))


# asign_lost_contexts(OUTPUT_FOLDER_ISSUE)

In [None]:
revisados = os.listdir(OUTPUT)
no_revisados = set(newspapers)-set(revisados)
np.savetxt("faltantes.txt", list(no_revisados), encoding="utf-8", fmt="%s")
for NESPAPER_NAME in tqdm(no_revisados):
    newspaper_issues = set(os.listdir(f"{INPUT_FOLDER}/{NESPAPER_NAME}"))
    newspaper_output_folder = f"{OUTPUT}/{NESPAPER_NAME}"
    try:
        os.makedirs(f"{newspaper_output_folder}", exist_ok=True)
    except Exception as e:
        pass
    issues_revised = set([i[:-8] for i in os.listdir(newspaper_output_folder)])
    issues_not_revised = newspaper_issues - issues_revised
    for issue in issues_not_revised:
        OUTPUT_FOLDER = f"{OUTPUT}/{NESPAPER_NAME}/{issue}_results"
        print(OUTPUT_FOLDER)
        try:
            os.makedirs(f"{OUTPUT_FOLDER}", exist_ok=True)
        except Exception as e:

            pass
        # OUTPUT_FOLDER es OUTPUT_FOLDER_ISSUE
        try:
            os.makedirs(f"{OUTPUT_FOLDER}/images", exist_ok=True)
            os.makedirs(f"{OUTPUT_FOLDER}/text", exist_ok=True)
        except Exception as e:

            pass

        NPFolder = f"{INPUT_FOLDER}/{NESPAPER_NAME}/{issue}"
        files = [f"{NPFolder}/{i}" for i in os.listdir(NPFolder)]
        for filename in tqdm(files):
            nombre_periodico, archivo, pagina = filename.split("/")[1:]
            if "." in pagina:
                pagina = pagina.split(".")[0]
            id_periodico = nombre_periodico.split("_")[0]
            metadata = {"id": id_periodico,
                        "newspaper": nombre_periodico,
                        "year": df["Año"].get(id_periodico, "No registra"),
                        "city": df["Ciudad"].get(id_periodico, "No registra"),
                        "file": archivo,
                        "page": pagina}
            results = create_prediction_from_file(filename)

            if results:
                results = json.loads(results)
                try:
                    res = process_image_from_predictions(
                        filename, results, OUTPUT_FOLDER, metadata=metadata)
                except Exception as e:
                    print("Problema con imagen", filename)
        # nueva funcion
        asign_lost_contexts(output_folder_issue=OUTPUT_FOLDER)

flog.close()