In [1]:
!pip install pytorch-lightning
!pip install torchmetrics
!pip install rasterio
!pip install tim

Collecting pytorch-lightning
  Downloading pytorch_lightning-2.6.0-py3-none-any.whl.metadata (21 kB)
Collecting torchmetrics>0.7.0 (from pytorch-lightning)
  Downloading torchmetrics-1.8.2-py3-none-any.whl.metadata (22 kB)
Collecting lightning-utilities>=0.10.0 (from pytorch-lightning)
  Downloading lightning_utilities-0.15.2-py3-none-any.whl.metadata (5.7 kB)
Downloading pytorch_lightning-2.6.0-py3-none-any.whl (849 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m849.5/849.5 kB[0m [31m30.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading lightning_utilities-0.15.2-py3-none-any.whl (29 kB)
Downloading torchmetrics-1.8.2-py3-none-any.whl (983 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m983.2/983.2 kB[0m [31m32.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: lightning-utilities, torchmetrics, pytorch-lightning
Successfully installed lightning-utilities-0.15.2 pytorch-lightning-2.6.0 torchmetrics-1.8.2
Collecting tim
  Downlo

In [2]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import r2_score, mean_absolute_error
import pytorch_lightning as L
import joblib
from tqdm import tqdm
import rasterio
import torchmetrics
import timm
import torch.nn as nn
import os
from google.colab import drive
import cv2

In [3]:
drive.mount("/content/drive")

Mounted at /content/drive


In [14]:
import tifffile

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
img = tifffile.imread('/content/drive/MyDrive/teg_resnet18_arroz/02_dataset/DATASETCONCAT64/images/1920_4307708_3_5.tif')
print(img.shape)

img_tensor = torch.from_numpy(img).float()  # convertir a tensor float
img_tensor = img_tensor.unsqueeze(0)  # añadir dimensión batch si el modelo lo requiere

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Mover tensor al mismo dispositivo que el modelo
img_tensor = img_tensor.to(device)

#logs_path = "/content/drive/MyDrive/teg_resnet18_arroz/04_resultados/resnet18_norm_mm_seed_37"
scaler_path = "/content/drive/MyDrive/teg_resnet18_arroz/02_dataset/DATASETCONCAT64/scaler_mm.pkl"

#csv_file = "/content/drive/MyDrive/teg_resnet18_arroz/02_dataset/DATASETCONCAT64/sample_100_seed37.csv"
#img_path = "/content/drive/MyDrive/teg_resnet18_arroz/02_dataset/DATASETCONCAT64/images/1920_4301636_0_0.tif"
#img = cv2.imread(img_path, cv2.IMREAD_UNCHANGED)

ckpt_path = "/content/drive/MyDrive/teg_resnet18_arroz/04_resultados/resnet18_mm/lightning_logs/version_0/checkpoints/epoch=14---val_rmse=0.08---val_loss=0.00.ckpt"
#epoca = "last"
BATCH_SIZE = 64


(64, 64, 10)


In [5]:
class LightningRegressionTask(L.LightningModule):
    def __init__(self, model, model_name, learning_rate=0.001):
        super().__init__()
        self.save_hyperparameters(ignore=["model"])
        self.model = model
        #self.criterion = nn.MSELoss() # Cambiar perdida
        self.criterion = nn.HuberLoss()
        self.learning_rate = learning_rate
        self.name = model_name

        # Métricas de regresión
        self.mse = torchmetrics.MeanSquaredError()
        self.rmse = torchmetrics.MeanSquaredError(squared=False)
        self.mae = torchmetrics.MeanAbsoluteError()
        self.r2 = torchmetrics.R2Score()

        self.scaler = joblib.load(scaler_path)

        # Diccionarios de logs
        self.train_logs = {"epoch": [], "loss": [], "mse": [], "rmse": [], "mae": [], "r2": []}
        self.val_logs = {"epoch": [], "loss": [], "mse": [], "rmse": [], "mae": [], "r2": []}
        self.test_logs = {"epoch": [], "loss": [], "mse": [], "rmse": [], "mae": [], "r2": []}

        self.train_step_loss_logs = {"step_loss": []}
        self.val_step_loss_logs = {"step_loss": []}
        self.test_step_loss_logs = {"step_loss": []}

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        images, labels = batch
        labels = labels.float().unsqueeze(1) # Asegúrate de que las dimensiones coincidan
        preds = self.model(images)
        loss = self.criterion(preds, labels)

        # Métricas de regresión
        mse = self.mse(preds, labels)
        rmse = self.rmse(preds, labels)
        mae = self.mae(preds, labels)
        r2 = self.r2(preds, labels)

        # Log de métricas
        self.log("train_step_loss", loss, on_step=True, on_epoch=False, prog_bar=True)
        self.log("train_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log("train_mse", mse, on_step=False, on_epoch=True, prog_bar=True)
        self.log("train_rmse", rmse, on_step=False, on_epoch=True, prog_bar=True)
        self.log("train_mae", mae, on_step=False, on_epoch=True, prog_bar=True)
        self.log("train_r2", r2, on_step=False, on_epoch=True, prog_bar=True)
        self.train_step_loss_logs["step_loss"].append(loss.item())
        return loss

    def on_train_epoch_end(self):
        epoch = self.trainer.current_epoch
        self.train_logs["epoch"].append(epoch)
        self.train_logs["loss"].append(self.trainer.callback_metrics["train_loss"].item())
        self.train_logs["mse"].append(self.trainer.callback_metrics["train_mse"].item())
        self.train_logs["rmse"].append(self.trainer.callback_metrics["train_rmse"].item())
        self.train_logs["mae"].append(self.trainer.callback_metrics["train_mae"].item())
        self.train_logs["r2"].append(self.trainer.callback_metrics["train_r2"].item())

    def validation_step(self, batch, batch_idx):
        images, labels = batch
        labels = labels.float().unsqueeze(1)
        preds = self.model(images)
        loss = self.criterion(preds, labels)

        # Métricas de regresión
        mse = self.mse(preds, labels)
        rmse = self.rmse(preds, labels)
        mae = self.mae(preds, labels)
        r2 = self.r2(preds, labels)

        self.log("val_step_loss", loss, on_step=True, on_epoch=False, prog_bar=True)
        self.log("val_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log("val_mse", mse, on_step=False, on_epoch=True, prog_bar=True)
        self.log("val_rmse", rmse, on_step=False, on_epoch=True, prog_bar=True)
        self.log("val_mae", mae, on_step=False, on_epoch=True, prog_bar=True)
        self.log("val_r2", r2, on_step=False, on_epoch=True, prog_bar=True)
        self.val_step_loss_logs["step_loss"].append(loss.item())
        return loss

    def on_validation_epoch_end(self):
        epoch = self.trainer.current_epoch
        self.val_logs["epoch"].append(epoch)
        self.val_logs["loss"].append(self.trainer.callback_metrics["val_loss"].item())
        self.val_logs["mse"].append(self.trainer.callback_metrics["val_mse"].item())
        self.val_logs["rmse"].append(self.trainer.callback_metrics["val_rmse"].item())
        self.val_logs["mae"].append(self.trainer.callback_metrics["val_mae"].item())
        self.val_logs["r2"].append(self.trainer.callback_metrics["val_r2"].item())

    # def on_fit_end(self):
    #     # Export logs
    #     train_logs_df = pd.DataFrame(self.train_logs)
    #     train_step_loss_logs_df = pd.DataFrame(self.train_step_loss_logs)
    #     train_logs_df.to_csv(f"logs-{self.name}_{normalization}_{seed}/training_logs_{self.name}.csv", index=False)
    #     train_step_loss_logs_df.to_csv(f"logs-{self.name}_{normalization}_{seed}/training_step_loss_logs_{self.name}.csv", index=False)

    #     val_logs_df = pd.DataFrame(self.val_logs)
    #     val_step_loss_logs_df = pd.DataFrame(self.val_step_loss_logs)
    #     val_logs_df.to_csv(f"logs-{self.name}_{normalization}_{seed}/validation_logs_{self.name}.csv", index=False)
    #     val_step_loss_logs_df.to_csv(f"logs-{self.name}_{normalization}_{seed}/validation_step_loss_logs_{self.name}.csv", index=False)

    def test_step(self, batch, batch_idx):
        images, labels = batch
        labels = labels.float().unsqueeze(1)
        preds = self.model(images)
        loss = self.criterion(preds, labels)

        mse = self.mse(preds, labels)
        rmse = self.rmse(preds, labels)
        mae = self.mae(preds, labels)
        r2 = self.r2(preds, labels)

        self.log("test_step_loss", loss, on_step=True, on_epoch=False, prog_bar=True)
        self.log("test_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log("test_mse", mse, on_step=False, on_epoch=True, prog_bar=True)
        self.log("test_rmse", rmse, on_step=False, on_epoch=True, prog_bar=True)
        self.log("test_mae", mae, on_step=False, on_epoch=True, prog_bar=True)
        self.log("test_r2", r2, on_step=False, on_epoch=True, prog_bar=True)
        self.test_step_loss_logs["step_loss"].append(loss.item())
        return loss

    def on_test_epoch_end(self):
        epoch = self.trainer.current_epoch
        self.test_logs["epoch"].append(epoch)
        self.test_logs["loss"].append(self.trainer.callback_metrics["test_loss"].item())
        self.test_logs["mse"].append(self.trainer.callback_metrics["test_mse"].item())
        self.test_logs["rmse"].append(self.trainer.callback_metrics["test_rmse"].item())
        self.test_logs["mae"].append(self.trainer.callback_metrics["test_mae"].item())
        self.test_logs["r2"].append(self.trainer.callback_metrics["test_r2"].item())

    # def on_test_end(self):
    #     test_logs_df = pd.DataFrame(self.test_logs)
    #     test_step_loss_logs_df = pd.DataFrame(self.test_step_loss_logs)
    #     test_logs_df.to_csv(f"logs-{self.name}_{normalization}_{seed}/test_logs_{self.name}.csv", index=False)
    #     test_step_loss_logs_df.to_csv(f"logs-{self.name}_{normalization}_{seed}/test_step_loss_logs_{self.name}.csv", index=False)

    def configure_optimizers(self):
        return torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)

    def predict_step(self, batch, batch_idx):
      x = batch if len(batch) == 1 else batch[0]
      y_pred_norm = self(x)
      if self.scaler:
          y_pred_real = self.scaler.inverse_transform(y_pred_norm.detach().cpu().numpy())
          return y_pred_real
      return y_pred_norm

In [6]:
resnet18_model = timm.create_model("resnet18", pretrained=False, in_chans=10, num_classes=1)
model = LightningRegressionTask.load_from_checkpoint(ckpt_path, model=resnet18_model, model_name="resnet18")
model.to(device)
model.eval()

LightningRegressionTask(
  (model): ResNet(
    (conv1): Conv2d(10, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (act1): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (drop_block): Identity()
        (act1): ReLU(inplace=True)
        (aa): Identity()
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (act2): ReLU(inplace=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1)

In [7]:
AREATOTAL = 64 * 64 * 100 / 10000

In [8]:
def resize_to_64_64_10(img):
    if img.ndim == 2:
        img = img[..., np.newaxis]
    H, W, C = img.shape
    img_resized = cv2.resize(img, (64, 64), interpolation=cv2.INTER_LINEAR)
    if C > 10:
        img_resized = img_resized[:, :, :10]
    elif C < 10:
        pad = np.zeros((64, 64, 10 - C), dtype=img_resized.dtype)
        img_resized = np.concatenate([img_resized, pad], axis=-1)
    return img_resized

In [9]:
def compute_roi_percent(img_64_64_10, thresh=0.1):
    """
    Devuelve ROI en porcentaje (0–100), consistente con el entrenamiento.
    """
    mean_img = img_64_64_10.mean(axis=-1)
    mean_norm = (mean_img - mean_img.min()) / (np.ptp(mean_img) + 1e-8)  # NumPy 2.0 [web:40][web:42]
    roi_mask = mean_norm >= thresh
    roi_fraction = roi_mask.sum() / roi_mask.size
    roi_percent = roi_fraction * 100.0
    return roi_percent, roi_mask.astype(np.uint8)

In [10]:
def produccion_a_rendimiento(model, img_tensor, thresh=0.1):
    """
    model: instancia de LightningRegressionTask ya entrenada.
    img: imagen (numpy) cualquiera -> se adapta a 64x64x10.
    ROI en %, como en el entrenamiento.
    """
    img_norm = resize_to_64_64_10(img).astype(np.float32)

    roi_percent, roi_mask = compute_roi_percent(img_norm, thresh=thresh)

    model.eval()
    with torch.no_grad():
        # (H,W,C) -> (1,C,H,W) para PyTorch
        inp = torch.tensor(img_norm).unsqueeze(0).permute(0, 3, 1, 2)
        inp = inp.to(device) # Move input to the correct device
        prod_predicha = model.predict_step(inp, 0)  # ya desnormalizado por el scaler

    # prod_predicha puede venir como array; asegúrate de quedarte con el escalar
    prod_predicha_val = float(np.array(prod_predicha).reshape(-1)[0])

    rendimiento_predicho = prod_predicha_val / (AREATOTAL * (roi_percent / 100))

    return rendimiento_predicho, prod_predicha_val, roi_percent, roi_mask

In [15]:
rendimiento_predicho, prod_predicha_val, roi_percent, roi_mask = produccion_a_rendimiento(
    model, img_tensor, thresh=0.1
)

print("Rendimiento predicho", rendimiento_predicho)
print("Producción predicha:", prod_predicha_val)
print("Porcentaje ROI", roi_percent)
print("Máscara ROI", roi_mask)

Rendimiento predicho 7179.476108504843
Producción predicha: 59302.47265625
Porcentaje ROI 20.166015625
Máscara ROI [[0 0 0 ... 1 1 1]
 [0 0 0 ... 1 1 1]
 [0 0 0 ... 1 1 1]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]
