In [None]:
import numpy as np
import torch
import torch.nn as nn
import torchvision
from tqdm import tqdm
from skimage.transform import resize
from scipy.ndimage import label, sum as ndi_sum
from scipy.stats import pearsonr
import matplotlib.image
from matplotlib.patches import Rectangle
from matplotlib import pyplot as plt
from PIL import Image
import h5py

folder_path = "/path/to/your/data/"

# TDANN response to synthetic shape & texture (ST) images

In [None]:
# load TDANN final model from checkpoint
def load_model_from_checkpoint(checkpoint_path: str, device: str):
    model = torchvision.models.resnet18(pretrained=False)
    # drop the FC layer
    model.fc = nn.Identity()
    # load weights
    ckpt = torch.load(checkpoint_path, map_location=torch.device(device))
    state_dict = ckpt["classy_state_dict"]["base_model"]["model"]["trunk"]
    new_state_dict = {}
    for k, v in state_dict.items():
        if k.startswith("base_model") and "fc." not in k:
            remainder = k.split("base_model.")[-1]
            new_state_dict[remainder] = v
    model.load_state_dict(new_state_dict)
    # freeze all weights
    for param in model.parameters(): param.requires_grad = False
    return model

# obtain TDANN unit activation values to shape & texture images
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"available device: {DEVICE}")
checkpoint_path = folder_path + "Fig4/TDANNfinal.torch"
model = load_model_from_checkpoint(checkpoint_path, DEVICE)
model.to(DEVICE)
model.eval()
# image preprocessing
preprocess = torchvision.transforms.Compose([torchvision.transforms.Resize((224, 224)), torchvision.transforms.ToTensor()])
# Define the layers of interest
layers_of_interest = {
    "layer3.1": model.layer3[1], # (256, 14, 14)
    "layer4.0": model.layer4[0], # (512, 7, 7)
}
# to store all layers' all channels' central units' activation values to 50k images
num_imgs = 366 + 448
units = [np.zeros((256, 14, 14, num_imgs)), np.zeros((512, 7, 7, num_imgs))] 
for i in tqdm(range(num_imgs), desc="imgs...", disable=False):
    activations = {} # Dictionary to store activations
    # Hook function to capture activations
    def hook_fn(name):
        def hook(module, input, output):
            activations[name] = output.detach()  # Store detached tensor
        return hook
    # Register hooks
    hooks = []
    for name, layer in layers_of_interest.items():
        hook = layer.register_forward_hook(hook_fn(name))
        hooks.append(hook)
    # Load and preprocess an image
    if i < 366: # shape images
        image_path = folder_path + "V4DT/ShapeStimuli/Shape_" + str(int(i + 1)) + ".jpg"
        input_tensor = preprocess(Image.open(image_path).convert("RGB")).unsqueeze(0) # [:, :3, :, :] # torch.Size([1, 3, 224, 224])
    else: # texture images
        image_path = folder_path + "V4DT/TextureStimuli/Texture_" + str(int(i - 366 + 1)) + ".jpg"
        input_tensor = preprocess(Image.open(image_path).convert("RGB")).unsqueeze(0) # .repeat(1, 3, 1, 1) # torch.Size([1, 3, 224, 224])
    input_tensor = input_tensor.to(DEVICE, dtype=torch.float) # move to cuda
    with torch.no_grad(): model(input_tensor) # Forward pass
    for hook in hooks: hook.remove() # Remove hooks (to prevent memory issues)
    for idx, (layer, activation) in enumerate(activations.items()):
        activation = activation.detach().cpu().numpy().squeeze()
        units[idx][:, :, :, i] = activation
        # print(f"Iteration {idx}: {layer} activation shape: {activation.shape}")
    # break # for testing purposes

# save activation values
np.savez(folder_path + "Fig4/ShapeTextureActivation_TDANN.npz", layer31=units[0], layer40=units[1])

# TDANN responses to theoretical receptive field masked ST images

In [None]:
# creating receptive field shifted images based on CNN structural feature map location
def get_receptive_field_box(feature_h, feature_w, rf_size=211, jump=16, start=0.5):
    """
    Compute the ideal receptive field box (in floating point) in the target image space 
    for a given feature map location.
    Parameters:
      feature_h (int): vertical index of the feature.
      feature_w (int): horizontal index of the feature.
      rf_size (int): receptive field size (e.g., 147 for layer3.0 of ResNet18).
      jump (int): the effective stride (e.g., 16 for layer3.0).
      start (float): the offset for the center of the (0,0) feature.
    Returns: left, top, right, bottom (floats): the coordinates of the ideal receptive field box.
    """
    center_x = start + feature_w * jump
    center_y = start + feature_h * jump
    half_rf = rf_size / 2.0
    left   = center_x - half_rf
    top    = center_y - half_rf
    right  = center_x + half_rf
    bottom = center_y + half_rf
    return left, top, right, bottom

def visualize_rf_on_full_canvas(input_image, feature_h, feature_w, image_size=224, rf_size=211, jump=16, start=0.5):
    """
    Create a new image (of dimensions image_size x image_size) that is white everywhere 
    except for the receptive field of the given feature unit. In this version, we assume
    the original input image may not be 224x224. Therefore, we first resize the original 
    image to the given image_size using bilinear interpolation, then compute the receptive 
    field box and copy only the valid, in-bound portions from the resized image.
    The valid pixels from the resized image are pasted into the canvas at the same coordinates,
    so that if the receptive field is, for example, at the top-left, you see the top-left part of the
    resized image in the corresponding canvas region.
    Parameters:
      input_image (PIL.Image): the original input image (can be any size).
      feature_h, feature_w (int): indices for the feature map location.
      image_size (int): the target size of the image (e.g., 224).
      rf_size (int): the ideal receptive field size.
      jump (int): effective stride.
      start (float): center offset for the (0,0) feature.
    Returns: canvas (PIL.Image): a new image with original image content only within the computed receptive field, with the rest of the image white.
    """
    # First, resize the original image to the target size using bilinear interpolation.
    resized_image = input_image.resize((image_size, image_size), resample=Image.BILINEAR)
    # Compute the ideal receptive field box in the target coordinate system.
    # Note: This box is computed in floating point and may extend outside [0, image_size].
    left, top, right, bottom = get_receptive_field_box(feature_h, feature_w, rf_size, jump, start)
    # Compute the valid (overlapping) area between the receptive field and the image boundaries.
    valid_left = max(np.floor(left), 0)
    valid_top  = max(np.floor(top), 0)
    valid_right = min(np.ceil(right), image_size)
    valid_bottom = min(np.ceil(bottom), image_size)
    # Define integer coordinates for cropping from the resized image.
    crop_box = (int(valid_left), int(valid_top), int(valid_right), int(valid_bottom))
    # Extract the valid patch directly from the resized image.
    valid_patch = resized_image.crop(crop_box)
    # Create a white canvas of size image_size x image_size.
    canvas = Image.new("RGB", (image_size, image_size), "white")
    # Paste the valid patch into the canvas at the exact coordinates.
    # In this way, if the receptive field is on the top-left of the image,
    # the top-left portion of the resized image is copied to the top-left of the canvas.
    canvas.paste(valid_patch, (int(valid_left), int(valid_top)))
    return canvas

def visualize_rf_entire_image_rescaled(input_image, feature_h, feature_w,
                                       image_size=224, rf_size=211, jump=16, start=0.5):
    """
    Create a new 224x224 white canvas in which the entire original image (after bilinear interpolation)
    is rescaled to exactly fill the valid receptive field region for a given feature unit.
    In other words, the valid overlapping area between the ideal receptive field and the canvas
    is determined, and then the entire original image is resized to that region's size (which can be rectangular)
    and pasted at the corresponding location. Thus, even for a top-left unit (whose receptive field
    covers only the top-left portion of the canvas), the entire original image is rescaled to fit that area.
    Parameters:
      input_image (PIL.Image.Image): the original image (of any size).
      feature_h (int): vertical index of the feature map unit.
      feature_w (int): horizontal index of the feature map unit.
      image_size (int): size of the target canvas (e.g., 224).
      rf_size (int): ideal receptive field size.
      jump (int): effective stride.
      start (float): offset for the (0,0) unit.
    Returns:
      canvas (PIL.Image.Image): a 224x224 image with a white background and the entire original image rescaled and pasted into the receptive field region.
    """
    # Compute the ideal receptive field box in the canvas coordinate system.
    left, top, right, bottom = get_receptive_field_box(feature_h, feature_w, rf_size, jump, start)
    # Convert the floating point coordinates to integer positions for pasting.
    paste_left = int(np.floor(left))
    paste_top  = int(np.floor(top))
    paste_right = int(np.ceil(right))
    paste_bottom = int(np.ceil(bottom))
    # Determine the valid area (clip the ideal box with the canvas boundaries [0, image_size])
    clip_left = max(paste_left, 0)
    clip_top  = max(paste_top, 0)
    clip_right = min(paste_right, image_size)
    clip_bottom = min(paste_bottom, image_size)
    # The size of the region on the canvas into which we will paste the image.
    region_width = clip_right - clip_left
    region_height = clip_bottom - clip_top
    # Resize the entire original image to exactly the size of this region.
    resized_entire_image = input_image.resize((region_width, region_height), resample=Image.BILINEAR)
    # Create a white canvas of size image_size x image_size.
    canvas = Image.new("RGB", (image_size, image_size), "white")
    # Paste the rescaled entire image into the computed receptive field region.
    canvas.paste(resized_entire_image, (clip_left, clip_top))
    return canvas

In [None]:
# masked ST image examples visualization
fig, axes = plt.subplots(3, 3, figsize=(6, 6))
image_path = folder_path + "V4DT/TextureStimuli/Texture_156.jpg"
feature_h = [0, 3, 6]
feature_w = [0, 3, 6]
for i in range(3):
    for j in range(3):
        img = Image.open(image_path).convert("RGB")
        img = visualize_rf_on_full_canvas(img, feature_h[i], feature_w[j], image_size=224, rf_size=307, jump=32, start=0.5) # <class 'PIL.Image.Image'>
        # img.show()  # To view the image.
        axes[i, j].imshow(img)
        axes[i, j].axis('off')  # Hide axis ticks
        rect = Rectangle(
                (0, 0), 1, 1,  # coordinates in axis fraction
                transform=axes[i, j].transAxes,
                linewidth=2,
                edgecolor='black',
                facecolor='none'
            )
        axes[i, j].add_patch(rect)
plt.tight_layout()
# save the figure
plt.savefig(folder_path + "Fig4/TDANN40_image_rf.png", bbox_inches='tight', dpi=300)

In [None]:
def compute_resnet18_rf_params(layer_tag):
    """
    Compute and print the step-by-step receptive field parameters for a ResNet18 model.
    
    Supported layer tags:
      - "layer31": corresponds to the output of the second block in layer3 (i.e. layer3.1)
      - "layer40": corresponds to the output of the first block in layer4 (i.e. layer4.0)
      
    The function prints the intermediate steps and returns a dictionary containing:
      - 'rf_size': final receptive field size in pixels.
      - 'jump': effective stride (jump) between adjacent units.
      - 'start': center offset for the (0,0) feature (assumed to be 0.5).
    """
    # Initial values at the input (each pixel sees itself):
    r, j = 1, 1  # receptive field and jump.
    start = 0.5  # Assume the center of the first pixel is at 0.5
    print("Initial input: receptive field = 1, jump = 1, start = 0.5")
    # --- conv1: 7×7 conv, stride 2, pad 3 ---
    k, s = 7, 2
    r = r + (k - 1) * j      # 1 + 6*1 = 7
    j = j * s                # 1 * 2 = 2
    print("After conv1 (7x7, stride 2): receptive field = {}, jump = {}".format(r, j))
    # --- maxpool: 3×3, stride 2, pad 1 ---
    k, s = 3, 2
    r = r + (k - 1) * j      # 7 + 2*2 = 11
    j = j * s                # 2 * 2 = 4
    print("After maxpool (3x3, stride 2): receptive field = {}, jump = {}".format(r, j))
    # --- Layer1: Two blocks (4 convolutions total), each conv: 3×3, stride 1 ---
    for i in range(4):
        k, s = 3, 1
        r = r + (k - 1) * j  # Each conv adds 2*j.
        print("After Layer1 conv {}: receptive field = {}, jump = {}".format(i+1, r, j))
    # At end of Layer1: r = 43, j = 4.
    # --- Layer2: Two blocks ---
    # Block1, Conv1 (3×3, stride 2)
    k, s = 3, 2
    r = r + (k - 1) * j   # 43 + 2*4 = 51
    j = j * s             # 4 * 2 = 8
    print("After Layer2 Block1 Conv1 (3x3, stride 2): receptive field = {}, jump = {}".format(r, j))
    # Block1, Conv2 (3×3, stride 1)
    k, s = 3, 1
    r = r + (k - 1) * j   # 51 + 2*8 = 67
    print("After Layer2 Block1 Conv2 (3x3, stride 1): receptive field = {}, jump = {}".format(r, j))
    # Block2, Conv1 (3×3, stride 1)
    k, s = 3, 1
    r = r + (k - 1) * j   # 67 + 2*8 = 83
    print("After Layer2 Block2 Conv1 (3x3, stride 1): receptive field = {}, jump = {}".format(r, j))
    # Block2, Conv2 (3×3, stride 1)
    k, s = 3, 1
    r = r + (k - 1) * j   # 83 + 2*8 = 99
    print("After Layer2 Block2 Conv2 (3x3, stride 1): receptive field = {}, jump = {}".format(r, j))
    # Now: r = 99, j = 8.
    # --- Layer3.0: First block of Layer3 ---
    # Conv1: 3×3, stride 2
    k, s = 3, 2
    r = r + (k - 1) * j   # 99 + 2*8 = 115
    j = j * s             # 8 * 2 = 16
    print("After Layer3.0 Conv1 (3x3, stride 2): receptive field = {}, jump = {}".format(r, j))
    # Conv2: 3×3, stride 1
    k, s = 3, 1
    r = r + (k - 1) * j   # 115 + 2*16 = 147
    print("After Layer3.0 Conv2 (3x3, stride 1): receptive field = {}, jump = {}".format(r, j))
    # Now: for layer3.0, r = 147, j = 16.
    # --- Layer3.1: Second block of Layer3 (always computed for both tags) ---
    # Conv1: 3×3, stride 1
    k, s = 3, 1
    r = r + (k - 1) * j   # 147 + 2*16 = 147 + 32 = 179
    print("After Layer3.1 Conv1 (3x3, stride 1): receptive field = {}, jump = {}".format(r, j))
    # Conv2: 3×3, stride 1
    k, s = 3, 1
    r = r + (k - 1) * j   # 179 + 2*16 = 179 + 32 = 211
    print("After Layer3.1 Conv2 (3x3, stride 1): receptive field = {}, jump = {}".format(r, j))
    # Now: for layer3.1, r = 211, j = 16.
    if layer_tag == "layer31":
        params = {"rf_size": r, "jump": j, "start": start}
        print("\nFinal parameters for layer3.1 (layer31):", params)
        return params
    elif layer_tag == "layer40":
        # --- Layer4.0: First block of Layer4 ---
        # Conv1: 3×3, stride 2
        k, s = 3, 2
        r = r + (k - 1) * j   # 211 + 2*16 = 211 + 32 = 243
        j = j * s             # 16 * 2 = 32
        print("After Layer4.0 Conv1 (3x3, stride 2): receptive field = {}, jump = {}".format(r, j))
        # Conv2: 3×3, stride 1
        k, s = 3, 1
        r = r + (k - 1) * j   # 243 + 2*32 = 243 + 64 = 307
        print("After Layer4.0 Conv2 (3x3, stride 1): receptive field = {}, jump = {}".format(r, j))
        params = {"rf_size": r, "jump": j, "start": start}
        print("\nFinal parameters for layer4.0 (layer40):", params)
        return params
    else:
        raise ValueError("Unsupported layer tag. Use 'layer31' or 'layer40'.")

# Below are the computed theoretical receptive field parameters for ResNet18's layer3.0 and layer4.0.
print("=== Computation for layer3.1 (layer31) ===")
params_layer31 = compute_resnet18_rf_params("layer31") # {'rf_size': 211, 'jump': 16, 'start': 0.5}
print("\n=== Computation for layer4.0 (layer40) ===")
params_layer40 = compute_resnet18_rf_params("layer40") # {'rf_size': 307, 'jump': 32, 'start': 0.5}

In [None]:
# load TDANN final model from checkpoint
def load_model_from_checkpoint(checkpoint_path: str, device: str):
    model = torchvision.models.resnet18(pretrained=False)
    # drop the FC layer
    model.fc = nn.Identity()
    # load weights
    ckpt = torch.load(checkpoint_path, map_location=torch.device(device))
    state_dict = ckpt["classy_state_dict"]["base_model"]["model"]["trunk"]
    new_state_dict = {}
    for k, v in state_dict.items():
        if k.startswith("base_model") and "fc." not in k:
            remainder = k.split("base_model.")[-1]
            new_state_dict[remainder] = v
    model.load_state_dict(new_state_dict)
    # freeze all weights
    for param in model.parameters(): param.requires_grad = False
    return model

# obtain TDANN unit activation values to receptive field masked shape & texture images
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"available device: {DEVICE}")
checkpoint_path = folder_path + "Fig4/TDANNfinal.torch"
model = load_model_from_checkpoint(checkpoint_path, DEVICE)
model.to(DEVICE)
model.eval()
# image preprocessing
preprocess = torchvision.transforms.Compose([torchvision.transforms.Resize((224, 224)), torchvision.transforms.ToTensor()])
# Define the layers of interest
layers_of_interest = {
    "layer3.1": model.layer3[1], # (256, 14, 14)
    "layer4.0": model.layer4[0], # (512, 7, 7)
}
# to store all layers' all channels' central units' activation values to 50k images
num_imgs = 366 + 448
units = [np.zeros((256, 14, 14, num_imgs)), np.zeros((512, 7, 7, num_imgs))] 
# TDANN layer4.0
for feature_h in range(7):
    for feature_w in tqdm(range(7), desc="layer4.0 feature_w...", disable=False):
        for i in tqdm(range(num_imgs), desc="imgs...", disable=True):
            activations = {} # Dictionary to store activations
            # Hook function to capture activations
            def hook_fn(name):
                def hook(module, input, output):
                    activations[name] = output.detach()  # Store detached tensor
                return hook
            # Register hook
            hook = model.layer4[0].register_forward_hook(hook_fn("layer4.0"))
            # Load and preprocess an image
            if i < 366: # shape images
                image_path = folder_path + "V4DT/ShapeStimuli/Shape_" + str(int(i + 1)) + ".jpg"
                img = Image.open(image_path).convert("RGB")
                img = visualize_rf_entire_image_rescaled(img, feature_h, feature_w, image_size=224, rf_size=307, jump=32, start=0.5)
            else: # texture images
                image_path = folder_path + "V4DT/TextureStimuli/Texture_" + str(int(i - 366 + 1)) + ".jpg"
                img = Image.open(image_path).convert("RGB")
                img = visualize_rf_entire_image_rescaled(img, feature_h, feature_w, image_size=224, rf_size=307, jump=32, start=0.5)
            input_tensor = preprocess(img).unsqueeze(0).to(DEVICE, dtype=torch.float) # move to cuda
            with torch.no_grad(): model(input_tensor) # Forward pass
            hook.remove() # Remove hooks (to prevent memory issues)
            for idx, (layer, activation) in enumerate(activations.items()):
                activation = activation.detach().cpu().numpy().squeeze()
                units[idx + 1][:, feature_h, feature_w, i] = activation[:, feature_h, feature_w] # store activation values
            assert idx == 0
# TDANN layer3.1
print("layer3.1:")
for feature_h in range(14):
    for feature_w in tqdm(range(14), desc="layer3.1 feature_w...", disable=False):
        for i in tqdm(range(num_imgs), desc="imgs...", disable=True):
            activations = {} # Dictionary to store activations
            # Hook function to capture activations
            def hook_fn(name):
                def hook(module, input, output):
                    activations[name] = output.detach()  # Store detached tensor
                return hook
            # Register hook
            hook = model.layer3[1].register_forward_hook(hook_fn("layer3.1"))
            # Load and preprocess an image
            if i < 366: # shape images
                image_path = folder_path + "V4DT/ShapeStimuli/Shape_" + str(int(i + 1)) + ".jpg"
                img = Image.open(image_path).convert("RGB")
                img = visualize_rf_entire_image_rescaled(img, feature_h, feature_w, image_size=224, rf_size=211, jump=16, start=0.5)
                # img.show()  # To view the image.
            else: # texture images
                image_path = folder_path + "V4DT/TextureStimuli/Texture_" + str(int(i - 366 + 1)) + ".jpg"
                img = Image.open(image_path).convert("RGB")
                img = visualize_rf_entire_image_rescaled(img, feature_h, feature_w, image_size=224, rf_size=211, jump=16, start=0.5)
            input_tensor = preprocess(img).unsqueeze(0).to(DEVICE, dtype=torch.float) # move to cuda
            with torch.no_grad(): model(input_tensor) # Forward pass
            hook.remove() # Remove hooks (to prevent memory issues)
            for idx, (layer, activation) in enumerate(activations.items()):
                activation = activation.detach().cpu().numpy().squeeze()
                units[idx][:, feature_h, feature_w, i] = activation[:, feature_h, feature_w] # store activation values
            assert idx == 0

# save activation values
np.savez(folder_path + "Fig4/ShapeTextureActivation_TDANN_rfs.npz", layer31=units[0], layer40=units[1])

# TDANN responses to estimated receptive field masked ST images

In [None]:
def visualize_scaled_image_on_feature_location(
    input_image: Image.Image,
    feature_h: int,
    feature_w: int,
    box_size: int,
    canvas_size: int = 224,
    feat_map_size: int = 14
) -> Image.Image:
    """
    Create a white canvas of size canvas_size x canvas_size, then:
      1. Resize the entire input_image to box_size x box_size via bilinear interpolation.
      2. Compute the top-left corner (x, y) for a feature unit at (feature_h, feature_w)
         in a feat_map_size x feat_map_size grid, so that:
           - (0, 0) → top-left of canvas
           - (0, feat_map_size-1) → top-right
           - (feat_map_size-1, 0) → bottom-left
           - (feat_map_size-1, feat_map_size-1) → bottom-right
      3. Paste the resized image into that slot on the white canvas.
    Parameters:
        input_image: PIL.Image.Image, any size.
        feature_h: int, row index in [0, feat_map_size-1].
        feature_w: int, col index in [0, feat_map_size-1].
        box_size: int, side length of the square slot.
        canvas_size: int, size of the white canvas (default 224).
        feat_map_size: int, spatial size of the feature map (default 14).
    Returns:
        A new PIL.Image.Image (canvas_size x canvas_size) with the resized input pasted into the slot corresponding to (feature_h, feature_w).
    """
    # 1) Prepare canvas and resized patch
    canvas = Image.new("RGB", (canvas_size, canvas_size), "white")
    patch = input_image.resize((box_size, box_size), resample=Image.BILINEAR)
    # 2) Compute paste coordinates
    max_offset = canvas_size - box_size
    # Normalize feature indices to [0..1], then scale to [0..max_offset]
    x = round((feature_w / (feat_map_size - 1)) * max_offset)
    y = round((feature_h / (feat_map_size - 1)) * max_offset)
    # 3) Paste and return
    canvas.paste(patch, (x, y))
    return canvas

# load TDANN final model from checkpoint
def load_model_from_checkpoint(checkpoint_path: str, device: str):
    model = torchvision.models.resnet18(pretrained=False)
    # drop the FC layer
    model.fc = nn.Identity()
    # load weights
    ckpt = torch.load(checkpoint_path, map_location=torch.device(device))
    state_dict = ckpt["classy_state_dict"]["base_model"]["model"]["trunk"]
    new_state_dict = {}
    for k, v in state_dict.items():
        if k.startswith("base_model") and "fc." not in k:
            remainder = k.split("base_model.")[-1]
            new_state_dict[remainder] = v
    model.load_state_dict(new_state_dict)
    # freeze all weights
    for param in model.parameters(): param.requires_grad = False
    return model

# obtain TDANN unit activation values to receptive field masked shape & texture images
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"available device: {DEVICE}")
checkpoint_path = folder_path + "Fig4/TDANNfinal.torch"
model = load_model_from_checkpoint(checkpoint_path, DEVICE)
model.to(DEVICE)
model.eval()
# image preprocessing
preprocess = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])
# to store targeted layers' all units' activation values to some images
num_imgs = 366 + 448
units_activation = np.zeros((256, 14, 14, num_imgs))
# load estimated receptive field size
scale = 1
Sigma = np.array(h5py.File(folder_path + "Fig4/layer31/Sigma.mat")["Sigma"]) # 2D gaussian fitted receptive field parametersof shape (2, 256)
Sigma = scale * np.mean(Sigma, axis=0) # mean receptive field size: (2, 256) -> (256,)
assert len(Sigma) == 256
# TDANN layer3.1
print("layer3.1:")
for h in range(14):
    for w in tqdm(range(14), desc="layer3.1 feature_w...", disable=True):
        for c in tqdm(range(256), desc="given h, w, for all channels...", disable=False): # iterate through all channels
            for i in tqdm(range(num_imgs), desc="imgs...", disable=True):
                # Load and preprocess an image
                if i < 366: # shape images
                    image_path = folder_path + "V4DT/ShapeStimuli/Shape_" + str(int(i + 1)) + ".jpg"
                    img = Image.open(image_path).convert("RGB")
                    img = visualize_scaled_image_on_feature_location(img, feature_h=h, feature_w=w, box_size=int(Sigma[c]), canvas_size=224, feat_map_size=14)
                    # img.show()  # To view the image.
                else: # texture images
                    image_path = folder_path + "V4DT/TextureStimuli/Texture_" + str(int(i - 366 + 1)) + ".jpg"
                    img = Image.open(image_path).convert("RGB")
                    img = visualize_scaled_image_on_feature_location(img, feature_h=h, feature_w=w, box_size=int(Sigma[c]), canvas_size=224, feat_map_size=14)
                x = preprocess(img).unsqueeze(0).to(DEVICE, dtype=torch.float) # move to cuda
                x = model.conv1(x)
                x = model.bn1(x)
                x = model.relu(x)
                x = model.maxpool(x)
                x = model.layer1(x)
                x = model.layer2(x)
                x = model.layer3(x) # up until layer3.1
                # x = model.layer4[0](x) # up until layer4.0
                units_activation[c, h, w, i] = x[:, c, h, w] # store activation values

# save activation values
np.save(folder_path + "Fig4/ShapeTextureActivation_TDANN31_1erf.npy", units_activation)

In [None]:
# masked ST image according to estimated receptive field examples visualization
fig, axes = plt.subplots(3, 3, figsize=(6, 6))
image_path = "/Users/dunhan/Desktop/topoV4/V4DT/TextureStimuli/Texture_156.jpg"
feature_hs = [0, 7, 13]
feature_ws = [0, 7, 13]
for i in range(3):
    for j in range(3):
        img = Image.open(image_path).convert("RGB")
        img = visualize_scaled_image_on_feature_location(img, feature_h=feature_hs[i], feature_w=feature_ws[j], box_size=60, canvas_size=224, feat_map_size=14)
        # img.show()  # To view the image.
        axes[i, j].imshow(img)
        axes[i, j].axis('off')  # Hide axis ticks
        rect = Rectangle(
                (0, 0), 1, 1,  # coordinates in axis fraction
                transform=axes[i, j].transAxes,
                linewidth=2,
                edgecolor='black',
                facecolor='none'
            )
        axes[i, j].add_patch(rect)
plt.tight_layout()
# save the figure
plt.savefig(folder_path + "Fig4/TDANN31_image_erf.png", bbox_inches='tight', dpi=300)

# TDANN ST image preference map

In [None]:
# Visualization of TDANN final network & positions, into a 2D 60 by 60 gridded map
# combined_features = np.load(folder_path + "Fig4/TDANN31_Rsp.npy") # (50000, 25088) for layer4.1; (50000, 50176) for layer3.1
# combined_features = np.load(folder_path + "Fig4/ShapeTextureActivation_TDANN_rf.npz")["layer40"].reshape(-1, 814).T
combined_features = np.load(folder_path + "Fig4/TDANN40_2erf.npy").reshape(-1, 814).T
positions = np.load(folder_path + "Fig4/TDANNfinal_positions/layer4.0.npz")["coordinates"]
cortical_size = max(positions[:, 0]) - min(positions[:, 0]) # define the length of the 2D plane
grid_num = 60 # each grid contains the most-preferred 9 images by the mean within-grid-units' response
stmap = np.ones((grid_num, grid_num)) # 1 for shape preferring, 2 for texture preferring
grids_count = int(grid_num ** 2)
num_imgs_each_side = 3 # number of images on each side of the grid, total number of images in a grid should be squared
# define the size of a single image
img_size = 30
line_width = 5
# create a blank map of black color (R=0, G=0, B=0)
map = np.zeros((grid_num * (img_size*3 + line_width) + line_width, 
                grid_num * (img_size*3 + line_width) + line_width, 3))
# To store each grid's agregated response to all 50K images, with an additional roi
# TDANN41_weight = np.zeros((grid_num, grid_num, 50000))
# roi = np.zeros((grid_num, grid_num))
for i in tqdm(range(grid_num), desc="map initialization..."):
    for j in range(grid_num):
        # first find all units in this current grid
        xmin_cortex = cortical_size / grid_num * i
        xmax_cortex = cortical_size / grid_num * (i + 1)
        ymin_cortex = cortical_size / grid_num * j
        ymax_cortex = cortical_size / grid_num * (j + 1)
        # find all units in this current grid
        units_within_grid_indeices = np.where((positions[:, 0] >= xmin_cortex) & (positions[:, 0] < xmax_cortex) & (positions[:, 1] >= ymin_cortex) & (positions[:, 1] < ymax_cortex))[0]
        if len(units_within_grid_indeices) > 0:
            # roi[i, j] = 1
            # compute the mean response of all units (neurons) within this grid to all 50K images (into a row vector)
            mean_responses = np.mean(combined_features[:, units_within_grid_indeices], axis=1).T
            # TDANN41_weight[i, j, :] = mean_responses
            image_label = np.arange(814) + 1 # 1-indexed image names
            # sort the mean responses (from small to large) and the image_label according to the order of mean responses
            mean_responses, image_label = zip(*sorted(zip(mean_responses,image_label)))
            image_label = np.flip(image_label[-int(num_imgs_each_side ** 2):]) # take the top nine images with largest mean response

            # locate the top left corner of the current grid in the map
            x = i * (img_size*3 + line_width) + line_width
            y = j * (img_size*3 + line_width) + line_width
            
            # fill the map's current grid with the selected nine images
            texture_img_count = 0
            for row in range(3):
                for col in range(3):
                    # load the image
                    if image_label[row*3+col] <= 366:
                        path = folder_path + "V4DT/ShapeStimuli/Shape_"  + str(int(image_label[row*3+col])) + ".jpg" # the image name is 1-indexed
                    else:
                        path = folder_path + "V4DT/TextureStimuli/Texture_" + str(int(image_label[row*3+col] - 366)) + ".jpg"
                        texture_img_count += 1
                    img = np.array(Image.open(path).convert("RGB"))
                    # img = cv2.resize(img, (30, 30), interpolation=cv2.INTER_LINEAR)
                    img = resize(img, (img_size, img_size, 3), anti_aliasing=True) # resize the image
                    # put the image onto the map
                    # if i > 10 and i < 50 and j > 15 and j < 50:
                    # img_label.append(image_label[row*3+col])
                    map[x + row * img_size : x + (row + 1) * img_size, 
                        y + col * img_size : y + (col + 1) * img_size, :] = img
            if texture_img_count > 4: stmap[i, j] = 2 # the current pixel is texture preferring
        else: # white out the grid if no units are in this grid
            stmap[i, j] = 0
            x = i * (img_size*3 + line_width)
            y = j * (img_size*3 + line_width)
            map[x : x + img_size*3 + line_width*2, 
                y : y + img_size*3 + line_width*2, :] = 1.0
            
print("texture preferring ratio:", np.sum(stmap == 2) / np.sum(stmap != 0))
matplotlib.image.imsave(folder_path + 'Fig4/TDANN40_ST_2erf.bmp', map)
del map

# TDANN responses to 50k natural images

In [None]:
# load TDANN final model from checkpoint
def load_model_from_checkpoint(checkpoint_path: str, device: str):
    model = torchvision.models.resnet18(pretrained=False)
    # drop the FC layer
    model.fc = nn.Identity()
    # load weights
    ckpt = torch.load(checkpoint_path, map_location=torch.device(device))
    state_dict = ckpt["classy_state_dict"]["base_model"]["model"]["trunk"]
    new_state_dict = {}
    for k, v in state_dict.items():
        if k.startswith("base_model") and "fc." not in k:
            remainder = k.split("base_model.")[-1]
            new_state_dict[remainder] = v
    model.load_state_dict(new_state_dict)
    # freeze all weights
    for param in model.parameters(): param.requires_grad = False
    return model

# obtain TDANN one specific layer all units' activations to 50k images
def get_layer_activations():
    LAYER = "layer4.0"
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"available device: {DEVICE}")
    checkpoint_path = folder_path + "Fig4/TDANNfinal.torch"
    model = load_model_from_checkpoint(checkpoint_path, DEVICE)
    model.to(DEVICE)
    model.eval()
    # image preprocessing
    preprocess = torchvision.transforms.Compose([torchvision.transforms.Resize((224, 224)), torchvision.transforms.ToTensor()])
    # Define the layers of interest
    layers_of_interest = {
        "layer1.0": model.layer1[0], # (64, 56, 56)
        "layer1.1": model.layer1[1], # (64, 56, 56)
        "layer2.0": model.layer2[0], # (128, 28, 28)
        "layer2.1": model.layer2[1], # (128, 28, 28)
        "layer3.0": model.layer3[0], # (256, 14, 14)
        "layer3.1": model.layer3[1], # (256, 14, 14)
        "layer4.0": model.layer4[0], # (512, 7, 7)
        "layer4.1": model.layer4[1], # (512, 7, 7)
    }
    layer_of_interest = layers_of_interest[LAYER]
    # to store all layers' all channels' central units' activation values to 50k images
    num_imgs = 50000
    if LAYER == "layer1.0" or LAYER == "layer1.1":
        storage = np.zeros((num_imgs, 64, 56, 56))
    elif LAYER == "layer2.0" or LAYER == "layer2.1":
        storage = np.zeros((num_imgs, 128, 28, 28))
    elif LAYER == "layer3.0" or LAYER == "layer3.1":
        storage = np.zeros((num_imgs, 256, 14, 14))
    elif LAYER == "layer4.0" or LAYER == "layer4.1":
        storage = np.zeros((num_imgs, 512, 7, 7))
    # iterate through all images
    for i in tqdm(range(num_imgs), desc="50k imgs...", disable=False):
        # Hook function to capture activations
        def hook_fn(module, input, output):
            global activation
            activation = output.detach()  # Detach to avoid computation graph issues
        # Register hook
        hook = layer_of_interest.register_forward_hook(hook_fn)
        # Load and preprocess an image
        image_path = folder_path + "50K_Imgset/" + str(int(i + 1)) + ".bmp"
        input_tensor = preprocess(Image.open(image_path)).unsqueeze(0) # torch.Size([1, 3, 224, 224])
        input_tensor = input_tensor.to(DEVICE, dtype=torch.float) # move to cuda
        with torch.no_grad(): model(input_tensor) # Forward pass
        if activation is not None:
            storage[i, :, :, :] = activation.detach().cpu().numpy().squeeze()  # Convert to NumPy
        hook.remove()
        # break # for testing purposes
    # obtained layer units activation
    storage = storage.reshape(storage.shape[0], -1) # of shape (n_images, n_units)
    print("obtained activation matrix has shape:", storage.shape)
    # load layer units' positions on the TDANN simulated cortical map
    positions = np.load(folder_path + "Fig4/TDANNfinal_positions/" + LAYER + ".npz")["coordinates"] # (num_units, 2)
    cortical_size = max(positions[:, 0]) - min(positions[:, 0]) # define the length of the 2D plane
    grid_num = 60 # each grid contains the most-preferred 9 images by the mean within-grid-units' response
    # To store each grid's agregated response to all 50K images, with an additional roi
    layer_map = np.zeros((grid_num, grid_num, int(num_imgs + 1))) # 50000 image activations + 1 last ROI dimension
    for i in tqdm(range(grid_num), desc="map initialization...", disable=False):
        for j in range(grid_num):
            # first find all units in this current grid
            xmin_cortex = cortical_size / grid_num * i
            xmax_cortex = cortical_size / grid_num * (i + 1)
            ymin_cortex = cortical_size / grid_num * j
            ymax_cortex = cortical_size / grid_num * (j + 1)
            # find all units in this current grid
            units_within_grid_indeices = np.where((positions[:, 0] >= xmin_cortex) & (positions[:, 0] < xmax_cortex) & (positions[:, 1] >= ymin_cortex) & (positions[:, 1] < ymax_cortex))[0]
            if len(units_within_grid_indeices) > 0:
                # compute the mean response of all units (neurons) within this grid to all 50K images (into a row vector)
                layer_map[i, j, :num_imgs] = np.mean(storage[:, units_within_grid_indeices], axis=1).T
                layer_map[i, j, -1] = 1
    # save
    np.save(folder_path + "Fig4/TDANN40map.npy", layer_map)

In [None]:
# obtain TDANN every layer every channel central units' activation values to 50k images
def get_activations():
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"available device: {DEVICE}")
    checkpoint_path = folder_path + "Fig4/TDANNfinal.torch"
    model = load_model_from_checkpoint(checkpoint_path, DEVICE)
    model.to(DEVICE)
    model.eval()
    # image preprocessing
    preprocess = torchvision.transforms.Compose([torchvision.transforms.Resize((224, 224)), torchvision.transforms.ToTensor()])
    # Define the layers of interest
    layers_of_interest = {
        "layer1.0": model.layer1[0], # (64, 56, 56)
        "layer1.1": model.layer1[1], # (64, 56, 56)
        "layer2.0": model.layer2[0], # (128, 28, 28)
        "layer2.1": model.layer2[1], # (128, 28, 28)
        "layer3.0": model.layer3[0], # (256, 14, 14)
        "layer3.1": model.layer3[1], # (256, 14, 14)
        "layer4.0": model.layer4[0], # (512, 7, 7)
        "layer4.1": model.layer4[1], # (512, 7, 7)
    }
    # to store all layers' all channels' central units' activation values to 50k images
    num_imgs = 50000
    units = [np.zeros((64, num_imgs)), np.zeros((64, num_imgs)), # layer1 (64, 56, 56)
             np.zeros((128, num_imgs)), np.zeros((128, num_imgs)), # layer2 (128, 28, 28)
             np.zeros((256, num_imgs)), np.zeros((256, num_imgs)), # layer3 (256, 14, 14)
             np.zeros((512, num_imgs)), np.zeros((512, num_imgs))] # layer4 (512, 7, 7)
    index = np.array([28, 28, 14, 14, 7, 7, 3, 3])
    for i in tqdm(range(num_imgs), desc="50k imgs...", disable=False):
        activations = {} # Dictionary to store activations
        # Hook function to capture activations
        def hook_fn(name):
            def hook(module, input, output):
                activations[name] = output.detach()  # Store detached tensor
            return hook
        # Register hooks
        hooks = []
        for name, layer in layers_of_interest.items():
            hook = layer.register_forward_hook(hook_fn(name))
            hooks.append(hook)
        # Load and preprocess an image
        image_path = folder_path + "50K_Imgset/" + str(int(i + 1)) + ".bmp"
        input_tensor = preprocess(Image.open(image_path)).unsqueeze(0) # torch.Size([1, 3, 224, 224])
        input_tensor = input_tensor.to(DEVICE, dtype=torch.float) # move to cuda
        with torch.no_grad(): model(input_tensor) # Forward pass
        for hook in hooks: hook.remove() # Remove hooks (to prevent memory issues)
        # Print activation shapes
        for idx, (layer, activation) in enumerate(activations.items()):
            activation = activation.detach().cpu().numpy().squeeze()
            units[idx][:, i] = activation[:, index[idx], index[idx]] # store central units' activation values to corresponding arrays
            # print(f"Iteration {idx}: {layer} activation shape: {activation.shape}")
        # break # for testing purposes

    # save activation values
    np.savez(folder_path + "Fig4/activation_TDANN.npz", 
             layer10=units[0], layer11=units[1], 
             layer20=units[2], layer21=units[3], 
             layer30=units[4], layer31=units[5], 
             layer40=units[6], layer41=units[7])

# TDANN natural image preference map

In [None]:
# Visualization of TDANN final network & positions, into a 2D 60 by 60 gridded map, from top left to bottom right
features = np.load(folder_path + "Fig4/layer40/TDANN40map.npy")
Imgset50K_path = folder_path + "50K_Imgset/"
grid_num = 60 # each grid contains the most-preferred 9 images by the mean within-grid-units' response
grids_count = int(grid_num ** 2)
num_imgs_each_side = 3 # number of images on each side of the grid, total number of images in a grid should be squared
# define the size of a single image
img_size = 30
line_width = 5
# create a blank map of black color (R=0, G=0, B=0)
map = np.zeros((grid_num * (img_size*3 + line_width) + line_width, 
                grid_num * (img_size*3 + line_width) + line_width, 
                3))
for i in tqdm(range(grid_num), desc="map initialization..."): # vertical, from top to bottom
    for j in range(grid_num): # horizontal, from left to right
        if features[i, j, -1] == 1: # if there are units in this grid
            image_label = np.arange(50000) + 1 # 1-indexed image names
            # sort the mean responses (from small to large) and the image_label according to the order of mean responses
            mean_responses, image_label = zip(*sorted(zip(features[i, j, :50000],image_label)))
            image_label = np.flip(image_label[-int(num_imgs_each_side ** 2):]) # take the top nine images with largest mean response
            # locate the top left corner of the current grid in the map
            x = i * (img_size*3 + line_width) + line_width
            y = j * (img_size*3 + line_width) + line_width
            # fill the map's current grid with the selected nine images
            for row in range(3):
                for col in range(3):
                    # load the image
                    path = Imgset50K_path + str(int(image_label[row*3+col])) + ".bmp" # the image name is 1-indexed
                    img = np.array(Image.open(path))[20:80, 20:80, :] # obtain the non-blurred central part of the image
                    img = resize(img, (img_size, img_size, 3), anti_aliasing=True) # resize the image
                    # put the image onto the map
                    map[x + row * img_size : x + (row + 1) * img_size, 
                        y + col * img_size : y + (col + 1) * img_size, 
                        :] = img
        else: # white out the grid if no units are in this grid
            x = i * (img_size*3 + line_width)
            y = j * (img_size*3 + line_width)
            map[x : x + img_size*3 + line_width*2, 
                y : y + img_size*3 + line_width*2, 
                :] = 1.0

matplotlib.image.imsave(folder_path + 'Fig4/TDANN40_IT.bmp', map)
del map, features