In [1]:
# visit http://127.0.0.1:8050/ in your web browser.
# Imports
import torch
from torch.autograd.functional import jacobian
import torch.nn.functional as F
import cv2 

import matplotlib.pyplot as plt

import sys
# Add the upper directory to the path
sys.path.append("../../models/")
from CustomCNNVessel import CustomResNet
sys.path.append("../../data/")
from VessMapDatasetLoader import vess_map_dataloader

import dash
from dash import html, dcc, Dash, html, dcc, Input, Output, callback, State
from dash.dependencies import Input, Output, MATCH
import plotly.express as px
import json

torch.cuda.empty_cache()
device = torch.device("cuda")

In [2]:
# Dataloaders
image_dir = '/home/fonta42/Desktop/interpretacao-redes-neurais/data/VessMap/images'
mask_dir = '/home/fonta42/Desktop/interpretacao-redes-neurais/data/VessMap/labels'
skeleton_dir = '/home/fonta42/Desktop/interpretacao-redes-neurais/data/VessMap/skeletons'

batch_size = 10
train_size = 0.8

train_loader, test_loader = vess_map_dataloader(image_dir, 
                                  mask_dir, 
                                  skeleton_dir, 
                                  batch_size,
                                  train_size = train_size)

In [3]:
""" # Concating images
all_images = []
all_masks = []
all_skeletons = []

# Iterate through the entire train_loader
for batch in train_loader:
    images, masks, skeletons = batch
    images, masks, skeletons = images.to(device), masks.to(device), skeletons.to(device)

    all_images.extend(images)
    all_masks.extend(masks)
    all_skeletons.extend(skeletons)
    
for batch in test_loader:
    images, masks, skeletons = batch
    images, masks, skeletons = images.to(device), masks.to(device), skeletons.to(device)

    all_images.extend(images)
    all_masks.extend(masks)
    all_skeletons.extend(skeletons) """

' # Concating images\nall_images = []\nall_masks = []\nall_skeletons = []\n\n# Iterate through the entire train_loader\nfor batch in train_loader:\n    images, masks, skeletons = batch\n    images, masks, skeletons = images.to(device), masks.to(device), skeletons.to(device)\n\n    all_images.extend(images)\n    all_masks.extend(masks)\n    all_skeletons.extend(skeletons)\n    \nfor batch in test_loader:\n    images, masks, skeletons = batch\n    images, masks, skeletons = images.to(device), masks.to(device), skeletons.to(device)\n\n    all_images.extend(images)\n    all_masks.extend(masks)\n    all_skeletons.extend(skeletons) '

In [4]:
import os
from PIL import Image
import numpy as np

def load_images_from_directory(directory_name):
    # Get the list of image file names in sorted order
    image_files = sorted(os.listdir(directory_name))

    # Load and store the images in a list
    images = []
    for file_name in image_files:
        if file_name.endswith('.png'):
            img_path = os.path.join(directory_name, file_name)
            img = Image.open(img_path)
            img_array = np.array(img)
            images.append(img_array)

    return images

# Load images from both directories
original_images = load_images_from_directory('../original_images')
fulfillment_images = load_images_from_directory('../fulfillment_images_2')

# Convert the lists to arrays if needed
original_images = torch.tensor(np.array(original_images) / 255.0, dtype=torch.float).to('cuda')
fulfillment_images = torch.tensor(np.array(fulfillment_images) / 255.0, dtype=torch.float).to('cuda')

In [5]:
# Models
model = CustomResNet(num_classes=2).to(device)
# Load the weights
model.load_state_dict(torch.load(f"../../models/vess_map_regularized_none_200.pth"))
model = model.eval()

In [6]:
""" def get_all_gradients(model, image, sampling_rate = 10, device = "cuda", vectorize = True):
  model.to(device)
  image = image.to(device).requires_grad()
  
  sampled_image = image[:,:,::sampling_rate,::sampling_rate]
  jacobian_gradient = torch.autograd.functional.jacobian(model, 
                                                         sampled_image,
                                                         vectorize = vectorize)
  jacobian_gradient = jacobian_gradient.squeeze()
  
  return jacobian_gradient """

' def get_all_gradients(model, image, sampling_rate = 10, device = "cuda", vectorize = True):\n  model.to(device)\n  image = image.to(device).requires_grad()\n  \n  sampled_image = image[:,:,::sampling_rate,::sampling_rate]\n  jacobian_gradient = torch.autograd.functional.jacobian(model, \n                                                         sampled_image,\n                                                         vectorize = vectorize)\n  jacobian_gradient = jacobian_gradient.squeeze()\n  \n  return jacobian_gradient '

In [7]:
def plot_gradients_with_bounding_box(gradient, model_name, threshold=0.01):
    gradient = gradient.squeeze()
    mask = np.abs(gradient) > threshold
    non_zero_coords = np.nonzero(mask)
    
    if len(non_zero_coords[0]) > 0:
        y_min, y_max = non_zero_coords[0].min(), non_zero_coords[0].max()
        x_min, x_max = non_zero_coords[1].min(), non_zero_coords[1].max()
        num_pixels_above_threshold = np.sum(mask)
        bounding_box_area = (y_max - y_min + 1) * (x_max - x_min + 1)
        fulfillment = num_pixels_above_threshold / bounding_box_area

        # Create the figure using Plotly Express
        fig = px.imshow(
            gradient[y_min:y_max+1, x_min:x_max+1].transpose(1, 0),
            title=f'Gradient Analysis for {model_name}',
            labels={'x': 'x-axis', 'y': 'y-axis'},
            color_continuous_scale=px.colors.diverging.RdYlGn,
            range_color=[-np.abs(gradient).max(), np.abs(gradient).max()],
        )

        fig.update_layout(
            annotations=[{
                'text': f"Pixels: {num_pixels_above_threshold}<br>"
                        f"X(min, max): ({x_min}, {x_max})<br>"
                        f"Y(min, max): ({y_min}, {y_max})<br>"
                        f"Area: {bounding_box_area}<br>"
                        f"Fulfillment: {fulfillment:.2f}<br>"
                        f"Threshold: {threshold:.6f}<br>",
                        
                'showarrow': False,
                'xref': 'paper',
                'yref': 'paper',
                'x': 0, 'y': 1,
                'xanchor': 'left', 'yanchor': 'top',
                'font': {'size': 12, 'color': 'black'},
                'bgcolor': 'white',
                'opacity': 0.7
            }],
            xaxis={'visible': False},
            yaxis={'visible': False},
        )
        return fig
    else:
        return px.imshow(torch.zeros(224, 224), color_continuous_scale=px.colors.sequential.gray)

In [8]:
def get_all_gradients_chunks(model, image, rows_proc=28, device="cuda"):
  print(image.shape)
  c, nr, nc = image.shape
  model.eval()
  model.to(device)

  size_jac = (2, nr, nc, nr, nc)
  # Number of image chunks to process
  nchunks = nr//rows_proc
  nchunks += nr%rows_proc!=0   # Add 1 if shape is not divisible by rows_proc
  image_cuda = image.unsqueeze(0).to(device).requires_grad_()

  jacobian_gradient = torch.zeros(*size_jac, device='cpu')
  first_row = 0
  for idx in range(nchunks):
      print(idx)
      last_row = first_row + rows_proc
      model_w = wrapper(model, first_row, last_row)
      out = jacobian(model_w, image_cuda, vectorize=False).to('cpu')  
      # Cannot use .squeeze here since one valid size of out might be 1
      jacobian_gradient[:,first_row:last_row] = out[0,:,:,:,0,0]
      first_row = last_row

  return jacobian_gradient

In [19]:
app = Dash(__name__)

app.layout = html.Div(
    style={
        'display': 'flex',
        'flex-direction': 'column',
        'align-items': 'center',
        'height': '100vh',
        'width': '100vw'
    },
    children=[
        html.H1("Dynamic Gradient Plots"),
        html.Div(
            style={
                'display': 'flex',
                'flex-direction': 'row',
                'justify-content': 'space-around',
                'width': '90%',
                'margin-bottom': '20px',
                'align-items': 'center'
            },
            children=[
                dcc.Dropdown(
                    id='image-dropdown',
                    options=[{'label': f'Image {i}', 'value': i} for i in range(len(original_images))],
                    value=0,
                    style={'width': '80%', 'flex': '1'}
                ),
                dcc.Input(
                    id='threshold-input',
                    type='number',
                    placeholder='Enter the proportion of max gradient value',
                    debounce=True,
                    min=0,
                    step=0.001,
                    value=0.005,
                    style={'width': '80%', 'flex': '1'}
                ),
                html.Button(
                    'Update',
                    id='update-button',
                    n_clicks=0,
                    style={'width': '80%', 'flex': '1'}
                )
            ]
        ),
        html.Div(
            style={
                'display': 'flex',
                'flex-direction': 'row',
                'justify-content': 'space-between',
                'width': '95vw',
                'height': '80vh',
            },
            children=[
                html.Div(
                    [
                        html.H2("Original Image"),
                        dcc.Graph(
                            id='image-display',
                            config={'displayModeBar': True},
                            style={'height': '100%'}
                        ),
                        html.Div(id='hover-data', style={'display': 'none'})
                    ],
                    style={'flex': '1', 'margin-right': '10px'}
                ),
                html.Div(
                    [
                        html.H2("Fulfillment Image"),
                        dcc.Graph(
                            id='fulfillment-image-display',
                            config={'displayModeBar': True},
                            style={'height': '100%'}
                        ),
                    ],
                    style={'flex': '1', 'margin-right': '10px'}
                ),
                html.Div(
                    [
                        html.H2("Model Mask"),
                        dcc.Graph(
                            id='model-mask-display',
                            config={'displayModeBar': True},
                            style={'height': '100%'}
                        ),
                    ],
                    style={'flex': '1', 'margin-right': '10px'}
                ),
                html.Div(
                    [
                        html.H2(id='hover-coordinates', children='Coordinates: (x, y)'),
                        dcc.Graph(
                            id='result-image-display',
                            config={'displayModeBar': True},
                            style={'height': '100%'}
                        )
                    ],
                    style={'flex': '1', 'margin-right': '10px'}
                ),
                html.Div(
                    [
                        html.H2(id='threshold-title', children="Scale Delimited Gradient (Threshold: 0)"),
                        dcc.Graph(
                            id='gradient-display',
                            config={'displayModeBar': True},
                            style={'height': '100%'}
                        )
                    ],
                    style={'flex': '1'}
                )
            ]
        )
    ]
)

# Callback to update the original image based on the dropdown selection
@app.callback(
    Output('image-display', 'figure'),
    Output('fulfillment-image-display', 'figure'),
    Output('model-mask-display', 'figure'),
    Input('image-dropdown', 'value')
)
def update_images(selected_index):
    original_image_data = original_images[selected_index].squeeze()
    fulfillment_image_data = fulfillment_images[selected_index]
    original_fig = px.imshow(original_image_data.to('cpu'), color_continuous_scale=px.colors.sequential.gray)
    fulfillment_fig = px.imshow(fulfillment_image_data[:, :, :3].to('cpu'), color_continuous_scale=px.colors.diverging.RdYlGn)
    
    model_mask = model(original_images[selected_index].unsqueeze(0).unsqueeze(0))
    softmax_probs = F.softmax(model_mask, dim=1)
    class_one_probs = softmax_probs[0, 1, :, :].detach().cpu().numpy()

    model_fig = px.imshow(class_one_probs, color_continuous_scale=px.colors.diverging.RdYlGn)
    
    return original_fig, fulfillment_fig,model_fig

# Define callback to update the hover data
@app.callback(
    Output('hover-data', 'children'),
    Input('image-display', 'hoverData'),
    Input('fulfillment-image-display', 'hoverData')
)
def store_hover_data(hover_data_original, hover_data_fulfillment):
    hover_data = hover_data_original or hover_data_fulfillment
    if hover_data:
        return json.dumps({'x': hover_data['points'][0]['x'], 'y': hover_data['points'][0]['y']})
    return "{}"

# Define callback to update the hover coordinates display
@app.callback(
    Output('hover-coordinates', 'children'),
    Input('hover-data', 'children')
)
def update_hover_coordinates(hover_data_json):
    hover_data = json.loads(hover_data_json)
    if hover_data:
        x, y = hover_data['x'], hover_data['y']
        return f'Full Gradient, at Coordinates: ({x}, {y})'
    return 'Full Gradient, Coordinates: (x, y)'

# Define callback to update the result image
@app.callback(
    Output('result-image-display', 'figure'),
    Input('update-button', 'n_clicks'),
    Input('hover-data', 'children'),
    Input('image-dropdown', 'value'),
)
def update_image(n_clicks, hover_data_json, selected_index):
    hover_data = json.loads(hover_data_json)
    if hover_data:
        x, y = hover_data['x'], hover_data['y']
        image_data = original_images[selected_index].unsqueeze(0)
        # Ensure image requires gradient
        image = image_data.unsqueeze(0).requires_grad_()

        # Forward pass
        out = model(image)

        # Apply softmax to the output to get class probabilities
        probabilities = F.softmax(out, dim=1)

        score = probabilities[0, 1, x, y]  # Probability of class 1 at (x, y)

        # Compute gradients
        score.backward()
        gradient = image.grad.squeeze().transpose(1, 0)
        max_val = torch.max(torch.abs(gradient)).cpu().item()
        #max_val = np.max([np.max(gradient) for gradient in gradient])
        fig = px.imshow(gradient.to('cpu'), color_continuous_scale=px.colors.diverging.RdYlGn, range_color=[-max_val, max_val])
        return fig
    return px.imshow(torch.zeros(224, 224), color_continuous_scale=px.colors.diverging.RdYlGn)

# Define callback to update the gradient display and the threshold title
@app.callback(
    Output('gradient-display', 'figure'),
    Output('threshold-title', 'children'),
    Input('hover-data', 'children'),
    Input('threshold-input', 'value'),
    Input('image-dropdown', 'value')
)
def update_gradient_display(hover_data_json, threshold, selected_index):
    hover_data = json.loads(hover_data_json)
    if hover_data:
        x, y = hover_data['x'], hover_data['y']
        image_data = original_images[selected_index].unsqueeze(0)
        # Ensure image requires gradient
        image = image_data.unsqueeze(0).requires_grad_()

        # Forward pass
        out = model(image)
        
        # Apply softmax to the output to get class probabilities
        probabilities = F.softmax(out, dim=1)

        score = probabilities[0, 1, x, y]  # Probability of class 1 at (x, y)

        # Compute gradients
        score.backward()
        gradient = image.grad.squeeze()
        max_val = torch.max(torch.abs(gradient)).cpu().item()
        # Update the threshold if provided
        if threshold is not None:
            threshold = threshold * max_val
        else:
            threshold = 0.001  * max_val

        # Return the gradient with the diverging color map
        fig = plot_gradients_with_bounding_box(gradient.to('cpu').numpy(),"Model", threshold=threshold)
        fig.update_layout(
            title=f"Scale Delimited Gradient (Threshold: {threshold:.3f})"
        )
        return fig, f"Scale Delimited Gradient (Threshold: {threshold:.3f})"
    return px.imshow(torch.zeros(224, 224), color_continuous_scale=px.colors.diverging.RdYlGn), "Scale Delimited Gradient (Threshold: 0)"

if __name__ == '__main__':
    app.run_server(debug=True)

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-7-fecd5b3c7ab2> in plot_gradients_with_bounding_box(
    gradient=array([[0., 0., 0., ..., 0., 0., 0.],
       [0....   [0., 0., 0., ..., 0., 0., 0.]], dtype=float32),
    model_name='Model',
    threshold=tensor(5.2727e-06)
)
      1 def plot_gradients_with_bounding_box(gradient, model_name, threshold=0.01):
      2     gradient = gradient.squeeze()
----> 3     mask = np.abs(gradient) > threshold
        mask = undefined
        global np.abs = <ufunc 'absolute'>
        gradient = array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]], dtype=float32)
        threshold = tensor(5.2727e-06)
      4     non_zero_coords = np.nonzero(mask)
   