In [1]:
%pip install -q -U torch torchvision nobuco


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import tensorflow as tf

import nobuco
from nobuco import ChannelOrder, ChannelOrderingStrategy
from nobuco.layers.weight import WeightLayer

@nobuco.converter(torch.split, channel_ordering_strategy=ChannelOrderingStrategy.MINIMUM_TRANSPOSITIONS)
def split(input, split_size_or_sections, dim=0):
    def split_fn(input, split_size_or_sections, dim=0):
        if isinstance(split_size_or_sections, int):
            return tf.split(input, num_or_size_splits=split_size_or_sections, axis=dim)
        else:
            return tf.split(input, num_or_size_splits=split_size_or_sections.tolist(), axis=dim)
    return split_fn

# FORCE_PYTORCH_ORDER -> orange imprecise
# FORCE_TENSORFLOW_ORDER -> not work
# MINIMUM_TRANSPOSITIONS -> kinda work (not really)
@nobuco.converter(F.affine_grid, channel_ordering_strategy=ChannelOrderingStrategy.FORCE_PYTORCH_ORDER)
def affine_grid(theta, size, align_corners=None):
    def affine_grid_fn(theta, size):
        # Estraiamo la dimensione della griglia
        _, _, height, width  = size

        # Creiamo una griglia di coordinate normalizzate
        x = tf.linspace(-1.0, 1.0, width)
        y = tf.linspace(-1.0, 1.0, height)
        x_t, y_t = tf.meshgrid(x, y)
        ones = tf.ones_like(x_t)
        grid = tf.stack([x_t, y_t, ones], axis=-1)

        # Riformattiamo la griglia per poter fare una moltiplicazione batch-wise
        grid = tf.reshape(grid, [-1, height * width, 3])

        # Applichiamo la trasformazione affine
        theta = tf.reshape(theta, [-1, 2, 3])
        grid = tf.matmul(grid, tf.transpose(theta, [0, 2, 1]))
        
        # Riportiamo la griglia nella sua forma originale
        grid = tf.reshape(grid, [-1, height, width, 2])

        return grid
    return affine_grid_fn


@nobuco.converter(F.grid_sample, channel_ordering_strategy=ChannelOrderingStrategy.FORCE_TENSORFLOW_ORDER)
# def converter_grid_sample(input: Tensor, grid: Tensor, mode: str = "bilinear", padding_mode: str = "zeros", align_corners: Optional[bool] = None):
def converter_grid_sample(input, grid):
    def grid_sample(input, grid):
        def process_coord(grid, w_h):
            pixs = (grid + 1) * (0.5 * w_h) - 0.5
            pixs = tf.clip_by_value(pixs, -1, w_h) + 1
            return pixs
        
        def gather(input, y, x, b, h, w, c):
            w_padded = w + 2
            h_padded = h + 2
            linear_coordinates = tf.cast(y * w_padded + x, dtype=tf.int32)

            #print(linear_coordinates.shape)

            # linear_coordinates = tf.reshape(linear_coordinates, shape=(b, h, w))
            linear_coordinates = tf.reshape(linear_coordinates, shape=(b, h/2, w/2))
            input = tf.reshape(input, shape=(b, h_padded * w_padded, c))
            out = tf.gather(params=input, indices=linear_coordinates, batch_dims=1)
            return out

        grid = tf.transpose(grid, perm=(0, 3, 1, 2))
        b, h, w, c = tf.cast(tf.shape(input), tf.float32)
        # b, c, h, w = tf.cast(tf.shape(input), tf.float32)
        
        #print(tf.shape(input))

        grid_x, grid_y = tf.split(grid, num_or_size_splits=2, axis=-1)
        
        x = process_coord(grid_x, w)
        y = process_coord(grid_y, h)

        input = tf.keras.layers.ZeroPadding2D(padding=(1, 1))(input)

        x0 = tf.math.floor(x)
        y0 = tf.math.floor(y)
        x1 = tf.math.ceil(x)
        y1 = tf.math.ceil(y)

        dx = x - x0
        dy = y - y0
        oneminus_dx = 1 - dx
        oneminus_dy = 1 - dy
        w_y0_x0 = oneminus_dy * oneminus_dx
        w_y1_x0 = dy * oneminus_dx
        w_y1_x1 = dy * dx
        w_y0_x1 = oneminus_dy * dx

        v_y0_x0 = gather(input, y0, x0, b, h, w, c)
        v_y1_x0 = gather(input, y1, x0, b, h, w, c)
        v_y1_x1 = gather(input, y1, x1, b, h, w, c)
        v_y0_x1 = gather(input, y0, x1, b, h, w, c)

        return w_y0_x0 * v_y0_x0 + w_y1_x0 * v_y1_x0 + w_y1_x1 * v_y1_x1 + w_y0_x1 * v_y0_x1

    return grid_sample

class CNNConStn(nn.Module):
    def __init__(self, img_size, nclasses, fixed_scale=True):
        super(CNNConStn, self).__init__()
        
        self.img_size = img_size
        self.fixed_scale = fixed_scale
        self.block1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(32),  # 48 corresponds to the number of input features it
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 32, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),  # IN remains unchanged during any pooling operation
            #nn.Dropout(p=0.3)
        )

        self.block2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            #nn.Dropout(p=0.3)
        )

        self.block3 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            #nn.Dropout(p=0.3)
        )

        self.block4 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            #nn.Dropout(p=0.3)
        )

        self.block5 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(p=0.3)
        )

        self.block6 = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            #nn.AvgPool2d(kernel_size=4)  # paper: 8
        )

        self.block1_stn = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(32),  # 48 corresponds to the number of input features it
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 32, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),  # IN remains unchanged during any pooling operation
            #nn.Dropout(p=0.3)
        )

        self.block2_stn = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            #nn.Dropout(p=0.3)
        )

        self.block3_stn = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            #nn.Dropout(p=0.3)
        )

        self.block4_stn = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            #nn.Dropout(p=0.3)
        )

        self.block5_stn = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(p=0.3)
        )

        self.block6_stn = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            #nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.block7 = nn.Sequential(
            nn.Linear(128, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
        )

        self.out = nn.Linear(256, nclasses)

        if fixed_scale: # scaling is kept fixed, only translation is learned
            # Regressor for the 3 * 2 affine matrix
            self.fc_loc = nn.Sequential(
                nn.Linear(128 * 7 * 7, 32),
                nn.ReLU(True),
                nn.Linear(32, 4)  # predict just translation params
            )
            # weight_layer (?)
            self.fc_loc[2].weight.data.zero_()
            self.fc_loc[2].bias.data.copy_(torch.tensor([0.3, 0.3, 0.2, 0.2], dtype=torch.float))
        else: # scaling, rotation and translation are learned
            # Regressor for the 3 * 2 affine matrix
            self.fc_loc = nn.Sequential(
                nn.Linear(128 * 7 * 7, 32),
                nn.ReLU(True)
            )
            self.trans = nn.Linear(32, 4)  # predict translation params
            self.scaling = nn.Linear(32, 2)  # predict the scaling parameter
            self.rotation = nn.Linear(32, 4)  # predict the rotation parameters

            # weight_layer (?)
            # Initialize the weights/bias with some priors
            self.trans.weight.data.zero_()
            self.trans.bias.data.copy_(torch.tensor([0.3, 0.3, 0.2, 0.2], dtype=torch.float))

            self.scaling.weight.data.zero_()
            self.scaling.bias.data.copy_(torch.tensor([0.5, 0.75], dtype=torch.float))

            self.rotation.weight.data.zero_()
            self.rotation.bias.data.normal_(0, 0.1)

    # Spatial transformer network forward function
    def stn(self, x):
        scaling = 0 # dummy variable for just translation
        xs = self.block1_stn(x)
        xs = self.block2_stn(xs)
        xs = self.block3_stn(xs)
        xs = self.block4_stn(xs)
        xs = self.block5_stn(xs)
        xs = self.block6_stn(xs)
        xs = xs.view(-1, 128 * 7 * 7)
        
        if self.fixed_scale:
            trans = self.fc_loc(xs)
            bs = trans.shape[0]
            trans_1, trans_2 = torch.split(trans, split_size_or_sections=trans.shape[1] // 2, dim=1)
            # prepare theta for each resolution
            theta_1 = torch.cat([(torch.eye(2, 2, device=x.device) * 0.5).view(1, 2, 2).repeat(bs, 1, 1),
                                 trans_1.view(bs, 2, 1)], dim=2)
            theta_2 = torch.cat([(torch.eye(2, 2, device=x.device) * 0.75).view(1, 2, 2).repeat(bs, 1, 1),
                                 trans_1.view(bs, 2, 1)], dim=2)
        else:
            xs = self.fc_loc(xs)
            # predict the scaling params
            scaling = F.sigmoid(self.scaling(xs))
            scaling_1, scaling_2 = torch.split(scaling, split_size_or_sections=scaling.shape[1] // 2, dim=1)
            # predict the translation params
            trans = self.trans(xs)
            bs = trans.shape[0]
            trans_1, trans_2 = torch.split(trans, split_size_or_sections=trans.shape[1] // 2, dim=1)
            # predict the rotation params
            rot = self.rotation(xs)
            rot_1, rot_2 = torch.split(rot, split_size_or_sections=rot.shape[1] // 2, dim=1)
            # prepare theta for each resolution
            rot_1 = torch.ones(2, 2, device=x.device).fill_diagonal_(0).view(1, 2, 2).repeat(bs, 1, 1) * rot_1.view(bs, 2,
                                                                                                                  1)
            rot_2 = torch.ones(2, 2, device=x.device).fill_diagonal_(0).view(1, 2, 2).repeat(bs, 1, 1) * rot_2.view(bs, 2,
                                                                                                                  1)
            # add to the scaling params
            rot_1 = rot_1 + torch.eye(2, 2, device=x.device).view(1, 2, 2) * scaling_1.view(bs, 1, 1)
            rot_2 = rot_2 + torch.eye(2, 2, device=x.device).view(1, 2, 2) * scaling_2.view(bs, 1, 1)
            # prepare the final theta
            theta_1 = torch.cat([rot_1, trans_1.view(bs, 2, 1)], dim=2)
            theta_2 = torch.cat([rot_2, trans_1.view(bs, 2, 1)], dim=2)
        
        # get the shapes
        bs, c, _ , _ = x.size()
        h , w = self.img_size // 2, self.img_size // 2
        stn_out_size = (bs, c, h, w)
        
        # apply transformations
        grid_1 = F.affine_grid(theta_1, stn_out_size)
        grid_2 = F.affine_grid(theta_2, stn_out_size)

        x_1 = F.grid_sample(x, grid_1)
        x_2 = F.grid_sample(x, grid_2)

        x = torch.cat([x_1, x_2], dim=0)

        return x, scaling

    def forward(self, x, domains=None):
        x, scaling = self.stn(x)  # transform the input
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = self.block6(x)
        x = F.avg_pool2d(x, x.shape[-2])
        x = x.view(x.shape[0], -1)  # reshape the tensor
        x = F.dropout(self.block7(x), training=self.training)
        x = self.out(x)
        return x, scaling

In [16]:
dummy_image = torch.rand(size=(1, 3, 224, 224))
pytorch_module = CNNConStn(224, 4, True).eval()

In [17]:
keras_model = nobuco.pytorch_to_keras(
    pytorch_module,
    args=[dummy_image], kwargs=None,
    inputs_channel_order=ChannelOrder.TENSORFLOW,
    outputs_channel_order=ChannelOrder.TENSORFLOW
)



Legend:
    [32mGreen[0m — conversion successful
    [33mYellow[0m — conversion imprecise
    [31mRed[0m — conversion failed
    [31m[7mRed[0m — no converter found
    [0m[1mBold[0m — conversion applied directly
    * — subgraph reused
    [7mTensor[0m — this output is not dependent on any of subgraph's input tensors
    [4mTensor[0m — this input is a parameter / constant
    [90mTensor[0m — this tensor is useless

[32mCNNConStn[__main__][0m(float32_0<1,3,224,224>[0m) -> (float32_241<2,4>[0m, 0)
[32m │ [0m [32mSequential[torch.nn.modules.container][0m(float32_0<1,3,224,224>[0m) -> float32_17<1,32,112,112>[0m
[32m │ [0m [32m │ [0m [32m[1mConv2d[torch.nn.modules.conv][0m(float32_0<1,3,224,224>[0m) -> float32_3<1,32,224,224>[0m
[32m │ [0m [32m │ [0m [32m[1m └·[0m [0mconv2d[torch.nn.functional][0m(float32_0<1,3,224,224>[0m, float32_1<32,3,3,3>[0m, float32_2<32>[0m, (1, 1), (1, 1), (1, 1), 1) -> float32_3<1,32,224,224>[0m
[32m │ [0m [32m 

In [5]:
keras_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(1, 224, 224, 3)]           0         []                            
                                                                                                  
 zero_padding2d (ZeroPaddin  (1, 226, 226, 3)             0         ['input_1[0][0]']             
 g2D)                                                                                             
                                                                                                  
 conv2d (Conv2D)             (1, 224, 224, 32)            896       ['zero_padding2d[0][0]']      
                                                                                                  
 batch_normalization (Batch  (1, 224, 224, 32)            128       ['conv2d[0][0]']          

In [20]:
keras_model

<keras.src.engine.functional.Functional at 0x2f228a250>

In [25]:
# json_config = keras_model.to_json()
# keras_model.save("roymodel.h5")
# keras_model.save("roymodel.keras")
tf.keras.models.save_model(keras_model, "roymodel_tf", save_format="tf")

INFO:tensorflow:Assets written to: roymodel_tf/assets


INFO:tensorflow:Assets written to: roymodel_tf/assets


In [26]:
# new_model = tf.keras.models.model_from_json(json_config, custom_objects={"WeightLayer": WeightLayer})
# new_model = tf.keras.models.load_model("roymodel.h5", custom_objects={"WeightLayer": WeightLayer})
# new_model = tf.keras.models.load_model("roymodel.keras", custom_objects={"WeightLayer": WeightLayer})
new_model = tf.keras.models.load_model("roymodel_tf")





# STN tests

## full PyTorch

In [None]:
import math
import numpy as np
import torch
import torch.nn.functional as F
from matplotlib import pyplot as plt

# Definizione della trasformazione affine (ad esempio una rotazione di 45 gradi)
theta = torch.tensor([[math.cos(math.pi/3), -math.sin(math.pi/3), 0],
                      [math.sin(math.pi/3), math.cos(math.pi/3), 0]])

# Definizione della dimensione della griglia
size = torch.Size([1, 3, 112, 112])  # Immagine 28x28

# Creazione della griglia di coordinate affine
grid = F.affine_grid(theta.unsqueeze(0), size)

# Esempio di utilizzo: applicare un'immagine di input alla griglia di coordinate
input_image = torch.randn(1, 3, 112, 112)  # Immagine di input 28x28
output_image = F.grid_sample(input_image, grid)

# Spostiamo il tensore su CPU e lo convertiamo in un array numpy
image_np = output_image.cpu().detach().numpy()

# Trasformiamo l'immagine in formato CHW (canale, altezza, larghezza) in formato HWC (altezza, larghezza, canale)
image_np = np.transpose(image_np, (0, 2, 3, 1))

# Assicuriamoci che i valori dei pixel siano compresi tra 0 e 1
image_np = np.clip(image_np, 0, 1)

# Stampa dell'immagine
plt.imshow(image_np[0])  # Selezioniamo il primo batch (indice 0)
plt.show()

## Full PyTorch without F.grid_sample()

In [None]:
import math
import numpy as np
import torch
import torch.nn.functional as F
from matplotlib import pyplot as plt

import torch

def grid_sample(input, grid):
    def process_coord(grid, w_h):
        pixs = (grid + 1) * (0.5 * w_h) - 0.5
        pixs = torch.clamp(pixs, -1, w_h) + 1
        return pixs

    def gather(input, y, x, b, h, w, c):
        w_padded = w + 2  # Calculate padded width
        h_padded = h + 2  # Calculate padded height

        # Combine y and x coordinates into a single tensor
        xy_coords = torch.stack([y, x], dim=2)  # Concatenate across channels

        # Reshape xy_coords to match the input's batch dimension (b, h, w, 2)
        xy_coords = xy_coords.reshape(b, h, w, 2)

        # Flatten input tensor to a 2D tensor for efficient gathering (b, h_padded * w_padded * c)
        input_flat = input.view(b, -1, c)

        # Ensure xy_coords has dtype of torch.int64 for compatibility with gather
        xy_coords = xy_coords.long()  # Cast xy_coords to torch.int64

        # Gather elements based on xy_coords
        out = torch.gather(input_flat, dim=1, index=xy_coords.view(b, -1, 2))

        # Reshape the output to match the original dimensions (b, h, w, c)
        out = out.view(b, h, w, c)

        return out


    b, c, h, w = input.size()

    grid_x, grid_y = torch.chunk(grid, chunks=2, dim=-1)
    x = process_coord(grid_x, w)
    y = process_coord(grid_y, h)

    input = torch.nn.ZeroPad2d(padding=(1, 1))(input)

    x0 = torch.floor(x)
    y0 = torch.floor(y)
    x1 = torch.ceil(x)
    y1 = torch.ceil(y)

    dx = x - x0
    dy = y - y0
    oneminus_dx = 1 - dx
    oneminus_dy = 1 - dy
    w_y0_x0 = oneminus_dy * oneminus_dx
    w_y1_x0 = dy * oneminus_dx
    w_y1_x1 = dy * dx
    w_y0_x1 = oneminus_dy * dx

    v_y0_x0 = gather(input, y0, x0, b, h, w, c)
    v_y1_x0 = gather(input, y1, x0, b, h, w, c)
    v_y1_x1 = gather(input, y1, x1, b, h, w, c)
    v_y0_x1 = gather(input, y0, x1, b, h, w, c)

    return w_y0_x0 * v_y0_x0 + w_y1_x0 * v_y1_x0 + w_y1_x1 * v_y1_x1 + w_y0_x1 * v_y0_x1


# Definizione della trasformazione affine (ad esempio una rotazione di 45 gradi)
theta = torch.tensor([[math.cos(math.pi/3), -math.sin(math.pi/3), 0],
                      [math.sin(math.pi/3), math.cos(math.pi/3), 0]])

# Definizione della dimensione della griglia
size = torch.Size([1, 3, 112, 112])  # Immagine 28x28

# Creazione della griglia di coordinate affine
grid = F.affine_grid(theta.unsqueeze(0), size)

# Esempio di utilizzo: applicare un'immagine di input alla griglia di coordinate
input_image = torch.randn(1, 3, 112, 112)  # Immagine di input 28x28
# output_image = F.grid_sample(input_image, grid)
output_image = grid_sample(input_image, grid)

# Spostiamo il tensore su CPU e lo convertiamo in un array numpy
image_np = output_image.cpu().detach().numpy()

print(image_np.shape)

# Trasformiamo l'immagine in formato CHW (canale, altezza, larghezza) in formato HWC (altezza, larghezza, canale)
image_np = np.transpose(image_np, (0, 3, 2, 1))

# Assicuriamoci che i valori dei pixel siano compresi tra 0 e 1
image_np = np.clip(image_np, 0, 1)

# Stampa dell'immagine
plt.imshow(image_np[0])  # Selezioniamo il primo batch (indice 0)
plt.show()

## TF affine_grid e PYTORCH grid_sample

In [None]:
import numpy as np
import torch
import torch.nn.functional as F
import tensorflow as tf
from matplotlib import pyplot as plt

def affine_grid(theta, size):
    # Estraiamo la dimensione della griglia
    _, _, height, width  = size

    # Creiamo una griglia di coordinate normalizzate
    x = tf.linspace(-1.0, 1.0, width)
    y = tf.linspace(-1.0, 1.0, height)
    x_t, y_t = tf.meshgrid(x, y)
    ones = tf.ones_like(x_t)
    grid = tf.stack([x_t, y_t, ones], axis=-1)

    # Riformattiamo la griglia per poter fare una moltiplicazione batch-wise
    grid = tf.reshape(grid, [-1, height * width, 3])

    # Applichiamo la trasformazione affine
    theta = tf.reshape(theta, [-1, 2, 3])
    grid = tf.matmul(grid, tf.transpose(theta, [0, 2, 1]))

    # Riportiamo la griglia nella sua forma originale
    grid = tf.reshape(grid, [-1, height, width, 2])

    return grid

# Esempio di utilizzo
import numpy as np

# Definizione della trasformazione affine (ad esempio una rotazione di 45 gradi)
theta = np.array([[np.cos(np.pi/3), -np.sin(np.pi/3), 0],
                  [np.sin(np.pi/3), np.cos(np.pi/3), 0]], dtype=np.float32)

# Definizione della dimensione della griglia
size = (1, 3, 112, 112)  # Immagine 28x28

# Creazione della griglia di coordinate affine
grid = affine_grid(theta, size)

# Stampa della shape dell'output
print(grid.shape)  # Output trasformato

numpy_grid_tensor = grid.numpy()  # Converti il tensore TensorFlow in NumPy array
grid_torch = torch.from_numpy(numpy_grid_tensor)  # Converti il NumPy array in un tensore PyTorch

# Esempio di utilizzo: applicare un'immagine di input alla griglia di coordinate
input_image = torch.randn(1, 3, 112, 112)  # Immagine di input 28x28

print(input_image)

output_image = F.grid_sample(input_image, grid_torch)

print(output_image.shape)  # Output trasformato

# Spostiamo il tensore su CPU e lo convertiamo in un array numpy
image_np = output_image.cpu().detach().numpy()

# Trasformiamo l'immagine in formato CHW (canale, altezza, larghezza) in formato HWC (altezza, larghezza, canale)
image_np = np.transpose(image_np, (0, 2, 3, 1))

# Assicuriamoci che i valori dei pixel siano compresi tra 0 e 1
image_np = np.clip(image_np, 0, 1)

# Stampa dell'immagine
plt.imshow(image_np[0])  # Selezioniamo il primo batch (indice 0)
plt.show()

## full TF

In [None]:
import numpy as np
import tensorflow as tf
from matplotlib import pyplot as plt

def affine_grid(theta, size):
    # Estraiamo la dimensione della griglia
    _, _, height, width  = size

    # Creiamo una griglia di coordinate normalizzate
    x = tf.linspace(-1.0, 1.0, width)
    y = tf.linspace(-1.0, 1.0, height)
    x_t, y_t = tf.meshgrid(x, y)
    ones = tf.ones_like(x_t)
    grid = tf.stack([x_t, y_t, ones], axis=-1)

    # Riformattiamo la griglia per poter fare una moltiplicazione batch-wise
    grid = tf.reshape(grid, [-1, height * width, 3])

    # Applichiamo la trasformazione affine
    theta = tf.reshape(theta, [-1, 2, 3])
    grid = tf.matmul(grid, tf.transpose(theta, [0, 2, 1]))

    # Riportiamo la griglia nella sua forma originale
    grid = tf.reshape(grid, [-1, height, width, 2])

    return grid

def grid_sample(input, grid):
    def process_coord(grid, w_h):
        pixs = (grid + 1) * (0.5 * w_h) - 0.5
        pixs = tf.clip_by_value(pixs, -1, w_h) + 1
        return pixs
    
    def gather(input, y, x, b, h, w, c):
        w_padded = w + 2
        h_padded = h + 2
        linear_coordinates = tf.cast(y * w_padded + x, dtype=tf.int32)
        linear_coordinates = tf.reshape(linear_coordinates, shape=(b, h, w))
        input = tf.reshape(input, shape=(b, h_padded * w_padded, c))
        out = tf.gather(params=input, indices=linear_coordinates, batch_dims=1)
        return out

    b, h, w, c = tf.cast(tf.shape(input), tf.float32)

    grid_x, grid_y = tf.split(grid, num_or_size_splits=2, axis=-1)
    x = process_coord(grid_x, w)
    y = process_coord(grid_y, h)

    input = tf.keras.layers.ZeroPadding2D(padding=(1, 1))(input)

    x0 = tf.math.floor(x)
    y0 = tf.math.floor(y)
    x1 = tf.math.ceil(x)
    y1 = tf.math.ceil(y)

    dx = x - x0
    dy = y - y0
    oneminus_dx = 1 - dx
    oneminus_dy = 1 - dy
    w_y0_x0 = oneminus_dy * oneminus_dx
    w_y1_x0 = dy * oneminus_dx
    w_y1_x1 = dy * dx
    w_y0_x1 = oneminus_dy * dx

    v_y0_x0 = gather(input, y0, x0, b, h, w, c)
    v_y1_x0 = gather(input, y1, x0, b, h, w, c)
    v_y1_x1 = gather(input, y1, x1, b, h, w, c)
    v_y0_x1 = gather(input, y0, x1, b, h, w, c)

    return w_y0_x0 * v_y0_x0 + w_y1_x0 * v_y1_x0 + w_y1_x1 * v_y1_x1 + w_y0_x1 * v_y0_x1


# Definizione della trasformazione affine (ad esempio una rotazione di 45 gradi)
theta = np.array([[np.cos(np.pi/4), -np.sin(np.pi/4), 0],
                  [np.sin(np.pi/4), np.cos(np.pi/4), 0]], dtype=np.float32)

# Definizione della dimensione della griglia (per immagine 112x112)
size = (1, 3, 112, 112)

# Creazione della griglia di coordinate affine
grid = affine_grid(theta, size)

# immagine di input random 112x112 
input_image = tf.constant(np.random.randn(1, 112, 112, 3).astype(np.float32))

# applicazione trasformazione affine ad immagine
output_image = grid_sample(input_image, grid)

# Stampa delle immagini in una griglia orizzontale
fig, axes = plt.subplots(1, 2, figsize=(10, 5))

# Stampa immagine di input
axes[0].imshow(np.clip(input_image[0].numpy(), 0, 1))
axes[0].set_title('Input Image')

# Stampa immagine trasformata
axes[1].imshow(np.clip(output_image[0].numpy(), 0, 1))
axes[1].set_title('Transformed Image')

plt.show()