In [129]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


lib

In [130]:
%%capture
!pip install roboflow inference supervision

In [131]:
%%capture
!pip install tensorflow

In [132]:
%%capture
!pip install pytesseract
!sudo apt-get install tesseract-ocr

In [133]:
# imports

import cv2 as cv
import os
import math
import numpy as np
from PIL import Image, ImageDraw
from tensorflow.keras.models import load_model
from roboflow import Roboflow
import pandas as pd
import matplotlib.pyplot as plt
from copy import deepcopy
from pytesseract import image_to_string
import re

In [134]:
def len_video_from_dir(dir):
    """
    Parameters:
      dir: Source video directory
    Raises:
      Exception: File not found
      Exception: Video corrupted or of invalid format
    Returns: len of video
    """
    if not os.path.isfile(dir):
        raise Exception("File not found")

    video_capture = cv.VideoCapture(dir)
    if not video_capture.isOpened():
        raise Exception("Could not open video")

    return int(video_capture.get(cv.CAP_PROP_FRAME_COUNT))


import os
import cv2 as cv

def save_frame(video_dir, frame_num, output_dir, output_filename):
    """
    Saves the extracted frame as an image file in the specified directory.
    """
    if not os.path.isfile(video_dir):
        raise Exception("File not found")

    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    video_capture = cv.VideoCapture(video_dir)
    if not video_capture.isOpened():
        raise Exception("Could not open video")

    video_capture.set(cv.CAP_PROP_POS_FRAMES, frame_num)

    # Check the actual frame position
    current_frame = int(video_capture.get(cv.CAP_PROP_POS_FRAMES))
    if current_frame != frame_num:
        raise Exception(f"Frame {frame_num} not found, at {current_frame} instead")

    success, frame = video_capture.read()
    if not success or frame is None:
        raise Exception(f"Could not retrieve the frame {frame_num}")

    # Optionally, check for frame integrity
    if frame.shape[0] == 0 or frame.shape[1] == 0:
        raise Exception("Frame data is empty or corrupted")

    # Save the original frame without conversion (BGR format)
    output_path = os.path.join(output_dir, output_filename)
    cv.imwrite(output_path, frame)

    # Release video capture
    video_capture.release()

def crop_imgs_from_predicition(pred, img):
    """
    Parameters:
      pred: Roboflow prediction json
      img: Image to be cropped
    Returns: List of cropped images from predictions or None if no images were cropped
    """
    pred = pred["predictions"]
    imgs = []
    for p in pred:
        h = int(math.floor(p["height"]))
        w = int(math.floor(p["width"]))
        py = (int(p["y"]) - (h // 2), int(p["y"]) + (h // 2))
        px = (int(p["x"]) - (w // 2), int(p["x"]) + (w // 2))
        imgs.append(img[py[0]:py[1], px[0]:px[1]])
    return imgs if len(imgs) != 0 else None

def load_and_predict(input_data, model_head, model_eyes):
  """Loads two models and returns the prediction of the last one.

  Args:
    model_head_path: Path para o modelo de cabeça.
    model_eyes_path: Path para o modelo dos olhos.
    input_data: Array de imagem contexto X.

  Returns:
    Predição do modelo de olhos.
  """
  try:
    # Predição da cabeça
    prediction_head = model_head.predict(input_data)

    # Aplicando mascara de segmentação da cabeça
    data_x = prediction_head * input_data

    # Predição dos olhos
    prediction_eye = model_eyes.predict(data_x)

    # Retorna segmentação dos olhos
    return prediction_eye
  except Exception as e:
    print(f"Erro carregando os modelos: {e}")
    raise
    return None


def only_eyes(image_original,image_predita):
  """
    Parameters:
      image_original: Contexto  X image 128x128
      image_predita: output do modelo de sementação dos olhos image 128x128
    Returns: Imagem original com área preta fora dos olhos
    """
  img1 = image_predita
  img2 = image_original
  mask = img1 > 0.5 #seleciono apenas os pixeis que não são pretos na imagem predita
  img2_eyes = np.zeros_like(img2) #crio uma imagem vazia com as mesmas dimensões da imagem original

  img2_eyes[mask] = img2[mask]# faço a máscara dos olhos na imagem original

  return img2_eyes

def get_temp_from_pixel(pixel, temp_tuple):
    """
    Parameters:
      pix: rgb color tuple representing image pixel
      temp_tuple: (min, max) temperature tuple representing the temperature scale
    Returns: °C tempertature from pixel
    """
    MIN_TEMP_PIXEL = 32 / 255.0
    MAX_TEMP_PIXEL = 159 / 255.0
    TEMP_PIXEL_RANGE = MAX_TEMP_PIXEL - MIN_TEMP_PIXEL

    if pixel < MIN_TEMP_PIXEL: pixel = MIN_TEMP_PIXEL
    if pixel > MAX_TEMP_PIXEL: pixel = MAX_TEMP_PIXEL

    min_temp = min(temp_tuple)
    max_temp = max(temp_tuple)
    temp_range = max_temp - min_temp

    temp = min_temp + ((pixel - MIN_TEMP_PIXEL) / TEMP_PIXEL_RANGE) * temp_range

    return temp


def get_temp_olhos(image_original,image_predita, temp_tupla):
  """
    Parameters:
      image_original: Contexto  X image 128x128
      image_predita: output do modelo de sementação dos olhos image 128x128
      temp_tuple: (min, max) tupla de temperatura representando o intervalo de temperatura
    Returns: °C tempertatura dos olhos bovinos
    """
  if temp_tupla[0] == None or temp_tupla[1] == None:
    return None
  img = only_eyes(image_original,image_predita)
  img_ble = img[img > 0.1]
  media = np.mean(img_ble)
  return get_temp_from_pixel(media, temp_tupla)

def draw_temp(image_original, temp):
    """
    Parameters:
      image_original: Imagem original no formato np.array
      temp: Temperatura que será desenhada na imagem
    Returns:
      Exibe a imagem com a temperatura desenhada no canto superior esquerdo
    """
    im = Image.fromarray(image_original.astype(np.uint8))
    draw = ImageDraw.Draw(im)

    temp_text = f"{temp:.1f}°C"
    draw.text((20, 20), temp_text, fill="red")
    im.show()

get_resized_img = lambda img: cv.resize(img, (128, 128), interpolation=cv.INTER_LINEAR)

Pegar vídeo -> Selecionar os frames -> Realizar object detection nos frames para extrair o X -> Recortar o X -> Realizar a segmentação da cabeça e dos olhos -> Medir a temperatura de cada olho

# Object Detection

In [135]:
def save_predictions_to_df(predictions_list):
    # Lista para armazenar todas as previsões
    all_predictions = []

    # Iterar por cada conjunto de previsões
    for prediction_set in predictions_list:
        # Extrair as previsões de cada imagem
        for prediction in prediction_set['predictions']:
            # Adicionar ao conjunto de todas as previsões
            all_predictions.append(prediction)

    # Criar o DataFrame a partir das previsões consolidadas
    df = pd.DataFrame(all_predictions)

    return df

def find_cattle(image_folder_path):
  """
  Retorna uma lista de jsons
  """
  image_files = []
  for filename in os.listdir(image_folder_path):
    if filename.endswith(('.jpg', '.jpeg', '.png')):
        image_files.append(os.path.join(image_folder_path, filename))

  predictions = []
  rf = Roboflow(api_key="P0Tl1nhUD7sHjcWmQCvs")
  project = rf.workspace().project("bois-teste-samuel")
  model = project.version(1).model

  for i in range(len(image_files)):
      prediction = model.predict(image_files[i], confidence=45, overlap=30).json()
      predictions.append(prediction)
  return [save_predictions_to_df(predictions), predictions]

def draw_box(img, x, y, width, height, label, confidence, color=(0, 255, 0), text_color=(255, 255, 255), thickness=2, text_box_height_increase=10):
    # Calcular as coordenadas dos vértices da caixa
    x1 = int(x - width / 2)
    y1 = int(y - height / 2)
    x2 = int(x + width / 2)
    y2 = int(y + height / 2)

    # Desenhar o retângulo da caixa
    cv.rectangle(img, (x1, y1), (x2, y2), color, thickness)

    # Texto com o rótulo e a confiança (convertido para porcentagem)
    label_text = f'{label}: {confidence * 100:.2f}%'

    # Medir o tamanho do texto para desenhar o fundo corretamente
    (text_width, text_height), baseline = cv.getTextSize(label_text, cv.FONT_HERSHEY_SIMPLEX, 0.5, 2)

    # Aumentar a altura do fundo do texto
    total_text_height = text_height + baseline + text_box_height_increase

    # Desenhar o retângulo de fundo do texto com altura aumentada
    cv.rectangle(img, (x1, y1 - total_text_height - 5), (x1 + text_width, y1), color, -1)

    # Colocar o texto sobre o fundo
    cv.putText(img, label_text, (x1, y1 - baseline - 5), cv.FONT_HERSHEY_SIMPLEX, 0.5, text_color, 2)

def plot_all_cattle_predictions(cattle_predictions):
  for i in range(len(cattle_predictions)):
    # Verifica se 'predictions' existe no dicionário e se não está vazio
    if 'predictions' in cattle_predictions[i] and cattle_predictions[i]['predictions']:
        # Obtenha o caminho da imagem a partir da primeira predição
        image_path = cattle_predictions[i]['predictions'][0].get('image_path')

        # Verifica se o caminho da imagem não é nulo ou vazio
        if image_path:
            # Ler a imagem uma vez por loop
            img = cv.imread(image_path)

            if img is not None:
                # Para cada predição da imagem, desenha a caixa
                for prediction in cattle_predictions[i].get('predictions', []):
                    x = prediction.get('x')
                    y = prediction.get('y')
                    width = prediction.get('width')
                    height = prediction.get('height')
                    label = prediction.get('class')
                    confidence = prediction.get('confidence')
                    draw_box(img, x, y, width, height, "boi", confidence)  # Desenhar a caixa na imagem

                # Exibe a imagem com todas as caixas desenhadas
                plt.imshow(cv.cvtColor(img, cv.COLOR_BGR2RGB))  # Converter BGR para RGB
                plt.axis('off')  # Remover os eixos
                plt.show()  # Mostrar a imagem com todas as caixas desenhadas
            else:
                print(f"Erro ao carregar a imagem no caminho: {image_path}")
        else:
            print("Caminho da imagem está vazio.")
    else:
        print(f"Nenhuma predição encontrada para a imagem {i}")

In [136]:
def get_global_temp(frame):

  def type_convert(img, silent=False):
    """
    Parameters:
      img: np.array of image
    Raises:
      Exception: If image couldn't be converted to np.array
    Returns: Normalized image np.array
    """
    img = deepcopy(img)
    if not isinstance(img, np.ndarray):
      try: img = np.array(img)
      except Exception: raise Exception(f'Could not convert {type(np.array(None))} to np.array')
    if not silent: print("--- Warning: Image is now a normalized np.array ---")
    return img / 255.0 if type(img.flatten()[0]) not in (np.float16, np.float32, np.float64, np.float128) else img

  def check_dims_number(img):
    """
    Parameters:
      img: np.array of image
    Raises:
      Exception: If image doesn't have 3 channels
    """
    if len(img.shape) != 3: raise Exception(f'Array must have three channels, got {img.shape}')

  def get_digits(img, silent=False):
    """
    Parameters:
      img: np.array of image
      silent: If it should be verbose on image conversion
    Raises:
      Exception: If image doesn't have 3 channels
      Exception: If image couldn't be converted to np.array
    Returns: Tuple of cut digits from image (shape (2, 4, img_channels))
    """
    img = deepcopy(img)

    def preprocess(img):
      """
      Parameters:
        img: np.array of image
      Raises:
        Exception: If image doesn't have 3 channels
        Exception: If image couldn't be converted to np.array
      Returns: Preprocessed image for getting digits
      """
      img = type_convert(img, silent)
      check_dims_number(img)
      if img.shape[:2] != (512, 640): return cv.resize(img, (640, 512), interpolation=cv.INTER_LINEAR)
      return img

    img = preprocess(img)

    BOTTOM_DIGITS_LOCATION = [
      (403, 511, 425, 527),
      (403, 527, 425, 543),
      (403, 543, 425, 559),
      (403, 559, 425, 575)
    ]

    TOP_DIGITS_LOCATION = [
      (67, 511, 89, 527),
      (67, 527, 89, 543),
      (67, 543, 89, 559),
      (67, 559, 89, 575)
    ]


    return tuple([
      tuple(img[y1:y2, x1:x2] for (y1, x1, y2, x2) in BOTTOM_DIGITS_LOCATION),
      tuple(img[y1:y2, x1:x2] for (y1, x1, y2, x2) in TOP_DIGITS_LOCATION)
    ])

  def get_bw_imgs(img, silent=False):
    """
    Parameters:
      img: Image to be optimized
    Raises:
      Exception: If image doesn't have 3 channels
      Exception: If image couldn't be converted to np.array
    Returns: String converted in black and white versions without background
    """
    img = deepcopy(img)

    img = type_convert(img, silent)
    check_dims_number(img)

    THRESHOLD = (15 / 255.0, 180 / 255.0)

    img_b = np.zeros(img.shape, np.float32)
    img_w = np.zeros(img.shape, np.float32)


    for y in range(img.shape[0]):
      for x in range(img.shape[1]):
        pix = img[y, x][0]
        if pix <= THRESHOLD[0]:  img_b[y, x] = [1., 1., 1.]
        if pix >= THRESHOLD[1]: img_w[y, x] = [1., 1., 1.]

    return tuple([img_b, img_w])

  def to_str(img):
    """
    Parameters:
      img: Image to be converted to string (Optimized for numbers)
    Returns: String converted from image (No whitespaces)
    """
    preprocess = lambda img: cv.GaussianBlur(
      cv.erode((img * 255).astype(np.uint8), np.ones((2, 2), np.uint8), iterations=1),
      (3, 3),
      .2
    )

    return image_to_string(preprocess(img), config='--psm 6').replace(' ', '')

  def get_temp_from_img_pixel(img, location, temp_tuple, silent=True):
    """
    Parameters:
      img: np.array of image
      location: (y, x) location tuple of pixel
      temp_tuple: (min, max) temperature tuple representing the temperature scale
      silent: If it should be verbose on image conversion
    Raises:
      Exception: Out of bounds location
      Exception: If image couldn't be converted to np.array
    Returns: °C tempertature from image in pixel
    """
    def get_temp_from_pixel(pixel, temp_tuple):
      """
      Parameters:
        pix: rgb color tuple representing image pixel
        temp_tuple: (min, max) temperature tuple representing the temperature scale
      Returns: °C tempertature from pixel
      """
      MIN_TEMP_PIXEL = 32 / 255.0
      MAX_TEMP_PIXEL = 159 / 255.0
      TEMP_PIXEL_RANGE = MAX_TEMP_PIXEL - MIN_TEMP_PIXEL

      pixel = pixel[0]
      if pixel < MIN_TEMP_PIXEL: pixel = MIN_TEMP_PIXEL
      if pixel > MAX_TEMP_PIXEL: pixel = MAX_TEMP_PIXEL

      min_temp = min(temp_tuple)
      max_temp = max(temp_tuple)
      temp_range = max_temp - min_temp

      temp = min_temp + ((pixel - MIN_TEMP_PIXEL) / TEMP_PIXEL_RANGE) * temp_range

      return temp

    img = type_convert(img, silent)
    check_dims_number(img)

    try: pixel = img[location]
    except Exception: raise Exception(f'Out of bounds for image shape {img.shape}, {location} is not acceptible')
    return get_temp_from_pixel(pixel, temp_tuple)

  digits = [i for i in get_digits(frame, silent=True)]

  imgs = [None, None]
  for idx, tup in enumerate(digits):
    for jdx, im in enumerate(tup):
      for i in get_bw_imgs(im, silent=True):
        if imgs[idx] is None: imgs[idx] = i
        else: imgs[idx] = np.hstack((imgs[idx], i))

  for i in range(len(imgs)):
    imgs[i] = to_str(imgs[i])

  imgs = (''.join([i for i in imgs[0] if i.isdigit() or i in ['-', '.']]), ''.join([i for i in imgs[1] if i.isdigit() or i in ['-', '.']]))
  try:
    imgs = (float(imgs[0]), float(imgs[1]))
  except:
    imgs = (None, None)

  return tuple(imgs)

In [137]:
def save_multiple_frames(video_path, save_dir, division_factor):
  # prompt: Delete all the files inside save_dir
  # delete all the files inside save_dir
  for filename in os.listdir(save_dir):
      file_path = os.path.join(save_dir, filename)
      try:
          if os.path.isfile(file_path) or os.path.islink(file_path):
              os.unlink(file_path)
              # elif os.path.isdir(file_path):
          #     shutil.rmtree(file_path)
      except Exception as e:
          print('Failed to delete %s. Reason: %s' % (file_path, e))

  frame_count = len_video_from_dir(video_path)
  for i in range(0, frame_count, division_factor):
      save_frame(video_path, i, frames_folder_path, f"frame_{i}.jpg")

def crop_all_images(X_folder_path, predictions_json):

  # Limpa o repositório
  for filename in os.listdir(X_folder_path):
      file_path = os.path.join(X_folder_path, filename)
      try:
          if os.path.isfile(file_path) or os.path.islink(file_path):
              os.unlink(file_path)
              # elif os.path.isdir(file_path):
          #     shutil.rmtree(file_path)
      except Exception as e:
          print('Failed to delete %s. Reason: %s' % (file_path, e))

  for prediction in predictions_json:
    if len(prediction["predictions"]) == 0:
      continue
    cropped_images = crop_imgs_from_predicition(prediction, cv.imread(prediction["predictions"][0]["image_path"]))
    frame_path = prediction["predictions"][0]["image_path"]
    frame_name = os.path.basename(frame_path)
    frame_name_without_extension = os.path.splitext(frame_name)[0]
    for i, cropped_image in enumerate(cropped_images):
      file_name = f"{frame_name_without_extension}_X_{i}.jpg"
      output_path = os.path.join(X_folder_path, file_name)
      cv.imwrite(output_path, cv.cvtColor(cropped_image, cv.COLOR_RGB2BGR))

def segment_eyes_and_get_temperatures(frames_folder_path, X_folder_path, y_folder_path, model_head_path, model_eyes_path):

  model_head = load_model(model_head_path, compile=False) # Exemplo Path: '/content/drive/MyDrive/CattleImageRepository/G2/segmentacao_cabeca.keras'
  model_eyes = load_model(model_eyes_path, compile=False)
  images = []
  y_names = []
  # iterate over files in X_folder_path
  for filename in os.listdir(X_folder_path):
    if filename.endswith(".jpg"):
      # load image
      img = cv.imread(os.path.join(X_folder_path, filename))
      img = get_resized_img(img)
      img = img / 255.0
      images.append(img)
      y_names.append(f"{filename[:-4]}_y.jpg")

  batched_images = np.stack(images, axis=0)
  prediction_list = load_and_predict(batched_images, model_head, model_eyes)

  eye_temperatures = []

  for i in range(len(y_names)):
    frame_name = y_names[i].split("_X_")[0]
    frame = cv.imread(os.path.join(frames_folder_path, f"{frame_name}.jpg"))
    temperature_tuple = get_global_temp(frame)
    X_name = y_names[i].replace("_y", "")
    X = cv.imread(os.path.join(X_folder_path, X_name))
    X = get_resized_img(X)
    eye_temperature = get_temp_olhos(X, prediction_list[i], temperature_tuple, 90)
    eye_temperatures.append(eye_temperature)

  return eye_temperatures

In [139]:
def final_function(path):
  video_path = "/content/drive/MyDrive/videos/00000000220000000.mp4"
  video_path = path
  frames_folder_path = "/content/drive/MyDrive/modelo_final/frames_from_video_temp_folder"
  X_folder_path = "/content/drive/MyDrive/modelo_final/X_temp_folder"
  y_folder_path = "/content/drive/MyDrive/modelo_final/y_temp_folder"
  model_head_path = '/content/drive/MyDrive/CattleImageRepository/G2/segmentacao_cabeca.keras'
  model_eyes_path = '/content/drive/MyDrive/CattleImageRepository/G2/modelo_olho.keras'
  frame_interval = 200

  # Comentar a linha abaixo quando não for a primeira vez rodando o código para esse vídeo
  save_multiple_frames(video_path, frames_folder_path, frame_interval)
  df_and_json = find_cattle(frames_folder_path)
  cattle_predictions = df_and_json[0]
  # Comentar a linha abaixo quando não for a primeira vez rodando o código para esse vídeo
  predictions_json = df_and_json[1]
  crop_all_images(X_folder_path, predictions_json)
  temperatures = segment_eyes_and_get_temperatures(frames_folder_path, X_folder_path, y_folder_path, model_head_path, model_eyes_path)
  return temperatures

In [141]:
%%capture
!pip install gradio

In [143]:
import gradio as gr

# Create Gradio interface
interface = gr.Interface(
    fn=final_function,
    inputs=gr.Textbox(label="Enter the video path"),
    outputs=gr.Textbox(label="Lista de temperaturas")
)

# Launch the interface
interface.launch()


Setting queue=True in a Colab notebook requires sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Running on public URL: https://ec3bd53081cebea59a.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


