In [None]:
from PIL import Image, ImageDraw
import random
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision import models
import torch.nn.functional as F
from tqdm import tqdm   
import torch.optim.lr_scheduler as lr_scheduler

In [None]:
def calculate_circle(image_size,min_radius=10):


    # Generate random circle parameters
    while True:
        center_x = random.randint(0, image_size[0] - 1)
        center_y = random.randint(0, image_size[1] - 1)
        c2border = min(center_x, center_y, image_size[0] - center_x, image_size[1] - center_y) 
        if c2border > min_radius:
            break
    
    radius = random.randint(min_radius, min(center_x, center_y, image_size[0] - center_x, image_size[1] - center_y))
    
    return radius , center_x, center_y 

In [None]:
def generate_random_circle(image_size,min_radius=10):
    # Create a blank image
    img = Image.new('L', image_size, color='black')
    draw = ImageDraw.Draw(img)

    radius , center_x, center_y  = calculate_circle(image_size,min_radius)
    # Draw the circle on the image
    draw.ellipse((center_x - radius, center_y - radius, center_x + radius, center_y + radius), outline='white', fill='black')

    return img, radius , center_x, center_y


# Generate a random circle
img, radius , center_x, center_y = generate_random_circle(image_size=[224,224])

display(img)


In [None]:
class RandomSparsifyTransform:
    def __init__(self, percent_change=0.1):
        self.percent_change = percent_change

    def __call__(self, img):
        # Convert PIL image to NumPy array
        img_array = np.array(img)

        # Get the coordinates of black pixels
        white_pixels = np.argwhere(img_array == 255)

        # Randomly select a percentage of black pixels to change to white
        if self.percent_change < 1.0:
            num_pixels_to_keep = int(self.percent_change * len(white_pixels))
        else:
            num_pixels_to_keep = int(self.percent_change)
            if num_pixels_to_keep > len(white_pixels):
                num_pixels_to_keep = len(white_pixels)
        selected_pixels = random.sample(range(len(white_pixels)), num_pixels_to_keep)   

        sparse_img = np.zeros(img_array.shape, dtype = np.uint8)
        # Change the selected black pixels to white
        self.coords = []
        for idx in selected_pixels:
            x, y = white_pixels[idx]
            self.coords.append([x,y])
            sparse_img[x, y] = 255  # White color

        # Convert NumPy array back to PIL image
        # transformed_img = Image.fromarray(img_array)

        return sparse_img, np.array(self.coords,dtype=np.float32)



# Generate a random circle
img, radius , center_x, center_y = generate_random_circle(image_size=[224,224])

display(img)
display(Image.fromarray(RandomSparsifyTransform(percent_change=10)(img)[0],mode='L'))

In [None]:
# Set image size
image_size = (224, 224)
all_r = []
for i in range(1000000):
    r,_,_ = calculate_circle(image_size,min_radius=10)
    all_r.append(r)
    

In [None]:
all_r = np.array(all_r)
np.sum(all_r>21)/np.size(all_r)

In [None]:
class MLPCircleClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(MLPCircleClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc15 = nn.Linear(hidden_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        

    def forward(self, x):
        # Assuming x is a tensor of shape (batch_size, 3, 2) for three 2D coordinates
        batch_size = x.size(0)

        # Flatten the input tensor
        x = x.view(batch_size, -1)

        # Pass through the first fully connected layer
        x = F.relu(self.fc1(x))

        x = F.relu(self.fc15(x))
        # Pass through the second fully connected layer
        x = self.fc2(x)

        return x

# Example usage
input_size = 6  # 3 coordinates with 2 values each
hidden_size = 64
output_size = 1  # Binary classification

model = MLPCircleClassifier(input_size, hidden_size, output_size)

# Generate dummy input data
dummy_input = torch.rand((32, 3, 2))  # Batch size of 32

# Forward pass
output = model(dummy_input)
print(output.shape)


In [None]:
# Define the CNN model
class CNNCircleClassifier(nn.Module):
    def __init__(self):
        super(CNNCircleClassifier, self).__init__()
        self.resnet34 = models.get_model('resnet34', weights="DEFAULT")
        self.resnet34.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.resnet34.fc = nn.Linear(self.resnet34.fc.in_features, 1)

    def forward(self, x):
        return self.resnet34(x)

In [None]:
# Define the CNN model
class StupidCircleClassifier(nn.Module):
    def __init__(self,b):
        super().__init__()
        self.b = b

    def forward(self, x):
        outputs = -torch.ones([x.shape[0],1], dtype=torch.float32, device=x.device)
        self.max_d = torch.zeros([x.shape[0],1], dtype=torch.float32, device=x.device)
        for i in range(x.shape[0]):
           
            self.max_d[i] = self.max_distance(x[i])
            if self.max_d[i] > 2*self.b:
                outputs[i] = 1.0
        return outputs

    def max_distance(self, coordinates):
        # Calculate pair-wise distances
        distances = torch.cdist(coordinates, coordinates)

        # Set diagonal elements to zero since they represent distances between the same points
        distances.fill_diagonal_(0)

        # Find the maximum distance
        max_dist = distances.max()

        return max_dist.item()

# Example usage
N = 5  # replace with the actual size of your tensor
coordinates = torch.rand(5,N, 2)*42  # replace with your actual tensor

model = StupidCircleClassifier(b = 21)
model.eval()
out = model(coordinates)
print(f"Maximum distance between coordinates: {model.max_d}")
print(f"Output: {out}")


In [None]:
# Define a custom dataset
class CircleDataset(Dataset):
    def __init__(self, num_samples, image_size, b, transform=None):
        self.num_samples = num_samples
        self.image_size = image_size
        self.b = b
        self.transform = transform

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        # Generate a random circle image
        circle_image, radius , center_x, center_y = generate_random_circle(self.image_size)
        
        # Apply the transform if provided
        if self.transform:
            img, coords = self.transform(circle_image)

        # Convert PIL image to PyTorch tensor
        transform = transforms.Compose([transforms.ToTensor()])
        img = transform(img)
        coords = torch.tensor(coords)

        # Calculate the label
        label = 1 if radius > self.b else 0
        return img, coords, label


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

mlp_test = False
# Set parameters
num_samples = 1000
image_size = (224, 224)
b = 21  # Choose a predefined radius
number_pixel_to_keep = 10
input_size = 2 * number_pixel_to_keep
hidden_size = 64
output_size = 1

# Training loop
num_epochs = 50

# Example usage with the transform
transform = transforms.Compose([
    RandomSparsifyTransform(percent_change=number_pixel_to_keep),
])

# Create a dataset and DataLoader
circle_dataset = CircleDataset(num_samples, image_size, b, transform=transform)
dataloader = DataLoader(circle_dataset, batch_size=32, shuffle=True)

# Initialize the model, loss function, and optimizer

model_mlp = MLPCircleClassifier(input_size, hidden_size, output_size).to(device)
model_cnn = CNNCircleClassifier().to(device)
models_all = [model_mlp,model_cnn]

criterion = nn.BCEWithLogitsLoss()
optimizer_mlp = optim.Adam(model_mlp.parameters(), lr=0.001)
optimizer_cnn = optim.Adam(model_cnn.parameters(), lr=0.001)
optimizers_all = [optimizer_mlp,optimizer_cnn]


# Add cosine annealing learning rate scheduler
# scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs, eta_min=1e-6)
scheduler_mlp = lr_scheduler.ReduceLROnPlateau(optimizer_mlp, mode='min', patience=10, factor=0.5, verbose=True)
scheduler_cnn = lr_scheduler.ReduceLROnPlateau(optimizer_cnn, mode='min', patience=10, factor=0.5, verbose=True)
schedulers_all = [scheduler_mlp,scheduler_cnn]


In [None]:
for epoch in range(num_epochs):
    
    total_loss = []
    correct_predictions = []
    current_lr = []
    total_samples = 0
    for _ in range(len(models_all)):
        correct_predictions.append(0)
        total_loss.append(0.0)
        current_lr.append(0.0)

    for imgs, coords, labels in tqdm(dataloader):
        imgs = imgs.to(device)
        coords = coords.to(device)
        labels = labels.to(device)
        # Zero the parameter gradients
        for optimizer in optimizers_all:
            optimizer.zero_grad()

        # Forward pass
        for i, model in enumerate(models_all):
            if 'CNN' in model._get_name():
                outputs = model(imgs)
            else:
                outputs = model(coords)
        

            # Calculate the loss
            loss = criterion(outputs, labels.float().view(-1, 1))

            # Backward pass and optimization
  
            loss.backward()
            predictions = (torch.sigmoid(outputs) > 0.5)
            correct_predictions[i] += (predictions == labels.view(-1, 1)).sum().item()
            total_loss[i] += loss.item()
        
        for optimizer in optimizers_all:
            optimizer.step()



        
        
        total_samples += labels.size(0)

        # Accumulate the total loss
        

    # Get the current learning rate from the optimizer
    for i, optimizer in enumerate(optimizers_all):
        current_lr[i] = optimizer.param_groups[0]['lr']

    # Calculate average loss and accuracy for the epoch
    avg_loss = [t / len(dataloader) for t in total_loss]
    accuracy = [c / total_samples for c in correct_predictions]
    
    for i, scheduler in enumerate(schedulers_all):
        scheduler.step(avg_loss[i])
    
    for i in range(len(models_all)):
        print(f'Epoch {epoch + 1}/{num_epochs}, Model: {models_all[i]._get_name()}, Loss: {avg_loss[i]:.4f}, Accuracy: {accuracy[i]:.4f}, Learning Rate: {current_lr[i]:.6f}')
    
    # print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}, Learning Rate: {current_lr:.6f}')


# Save the trained model
for model in models_all:
    torch.save(model.state_dict(), f'{model._get_name()}.pth')


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Create a test dataset and DataLoader
model_cnn = CNNCircleClassifier().to(device)
model_mlp = MLPCircleClassifier(input_size, hidden_size, output_size).to(device)
model_cnn.load_state_dict(torch.load(model_cnn._get_name()+'.pth'))
model_mlp.load_state_dict(torch.load(model_mlp._get_name()+'.pth'))
model_stupid = StupidCircleClassifier(b = 21).to(device)
models_all = [model_cnn, model_mlp, model_stupid]

test_dataset = CircleDataset(num_samples=10000, image_size=image_size, b=b, transform=RandomSparsifyTransform(percent_change=2))
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Evaluation loop
for model in models_all:
    model.eval()  # Set the model to evaluation mode

total_correct = []
total_samples = 0

for _ in models_all:
    total_correct.append(0)

with torch.no_grad():
    for imgs, coords, labels in test_dataloader:
        imgs = imgs.to(device)
        coords = coords.to(device)
        labels = labels.to(device)  
        # Forward pass
        # Forward pass
        for i, model in enumerate(models_all):
            if 'CNN' in model._get_name():
                outputs = model(imgs)
            else:
                outputs = model(coords)
        

            # Compute accuracy
            predictions = (torch.sigmoid(outputs) > 0.5)
            total_correct[i] += (predictions == labels.view(-1, 1)).sum().item()
        total_samples += labels.size(0)

# Calculate accuracy on the test set
for i in range(len(models_all)):
    test_accuracy = total_correct[i] / total_samples
    print(f'Test Accuracy {i} for {models_all[i]._get_name()}: {test_accuracy:.4f}')


In [None]:
class SaveFeatures():
    def __init__(self, module, device=None):
        # we are going to hook some model's layer (module here)
        self.hook = module.register_forward_hook(self.hook_fn)
        self.device = device

    def hook_fn(self, module, input, output):
        # when the module.forward() is executed, here we intercept its
        # input and output. We are interested in the module's output.
        self.features = output.clone()
        if self.device is not None:
            self.features = self.features.to(self.device)
        self.features.requires_grad_(True)

    def close(self):
        # we must call this method to free memory resources
        self.hook.remove()
        del self.features

In [None]:
feature_maps = [        SaveFeatures(model.resnet34.relu,device=device),
                        SaveFeatures(model.resnet34.layer1[-1],device=device),
                        SaveFeatures(model.resnet34.layer2[-1],device=device),
                        SaveFeatures(model.resnet34.layer3[-1],device=device),
                        SaveFeatures(model.resnet34.layer4[-1],device=device)]


In [None]:
img, radius , center_x, center_y = generate_random_circle(image_size=[224,224])
print('Radius is {}'.format(radius))


fm_transform = RandomSparsifyTransform(percent_change=3)

model.eval()  # Set the model to evaluation mode
with torch.no_grad():
    tensor_1 = transforms.ToTensor()(fm_transform(img))
    out_1 = model(tensor_1.unsqueeze(0).to(device))

    # Compute accuracy
    prediction = (torch.sigmoid(out_1) > 0.5)
    print('Prediction: R > 21? {}'.format(prediction.item()))
    
    
    
plt.figure(figsize=(12, 12))

tensor_1[0,...] = tensor_1[0,...]/(torch.max(tensor_1[0,...])+1e-8)
plt.imshow((tensor_1[0,...]), cmap='viridis')
plt.colorbar(fraction=0.046, pad=0.04)
plt.axis('off')
plt.show()


tensor = feature_maps[0].features.clone().detach().cpu().numpy()
plt.figure(figsize=(12,12))
for i in range(4):
    plt.subplot(2, 2, i + 1)
    tensor[0,i,...] = tensor[0,i,...]/(np.max(tensor[0,i,...], axis=None)+1e-8)
    plt.imshow(tensor[0,i,...], cmap='viridis')
    plt.colorbar(fraction=0.046, pad=0.04)
    plt.axis('off')
plt.show()
tensor = feature_maps[1].features.clone().detach().cpu().numpy()
plt.figure(figsize=(12,12))
for i in range(9):
    plt.subplot(3, 3, i+ 1)
    tensor[0,i,...] = tensor[0,i,...]/(np.max(tensor[0,i,...], axis=None)+1e-8)
    plt.imshow(tensor[0,i,...], cmap='viridis')
    plt.colorbar(fraction=0.046, pad=0.04)
    plt.axis('off')
plt.show()
tensor = feature_maps[2].features.clone().detach().cpu().numpy()
plt.figure(figsize=(12,12))
for i in range(16):
    plt.subplot(4,4 , i+ 1)
    tensor[0,i,...] = tensor[0,i,...]/(np.max(tensor[0,i,...], axis=None)+1e-8)
    plt.imshow(tensor[0,i,...], cmap='viridis')
    plt.colorbar(fraction=0.046, pad=0.04)
    plt.axis('off')
plt.show()
tensor = feature_maps[3].features.clone().detach().cpu().numpy()
plt.figure(figsize=(12,12))
for i in range(25):
    plt.subplot(5,5 , i+ 1)
    tensor[0,i,...] = tensor[0,i,...]/(np.max(tensor[0,i,...], axis=None)+1e-8)
    plt.imshow(tensor[0,i,...], cmap='viridis')
    plt.colorbar(fraction=0.046, pad=0.04)
    plt.axis('off')
plt.show()
tensor = feature_maps[4].features.clone().detach().cpu().numpy()
plt.figure(figsize=(12,12))
for i in range(36):
    plt.subplot(6,6 , i+ 1)
    tensor[0,i,...] = tensor[0,i,...]/(np.max(tensor[0,i,...], axis=None)+1e-8)
    # plt.pcolor(tensor[0,i,...], cmap='viridis', vmin=0, vmax=1)
    plt.imshow(tensor[0,i,...], cmap='viridis')
    plt.colorbar(fraction=0.046, pad=0.04)
    plt.axis('off')
    plt.axis('equal')
plt.show()

In [None]:
for fm in feature_maps:
    fm.close()
    
    # plt.hist(fm.features.view(-1).clone().detach().cpu().numpy(), bins=100)
    # plt.show()

In [None]:
from scipy.spatial.distance import euclidean


sparsify = RandomSparsifyTransform(percent_change=2)


r_and_c = []
for i in range(10000):
    img, radius , center_x, center_y = generate_random_circle(image_size=[224,224])
    img_sparse = sparsify(img)

    # Get the coordinates of the non-zero pixels
    nonzero_pixels = np.argwhere(img_sparse != 0)
    
    # Calculate the pairwise distances between the non-zero pixels
    assert len(sparsify.coords) == 2, "The transform should only keep 2 pixels"
    
    # Calculate the Euclidean distance between the two coordinates
    distance = euclidean(sparsify.coords[0], sparsify.coords[1])
    r_and_c.append([radius, distance])
    
    
r_and_c = np.array(r_and_c)
# sort r_and_c by second column
sorted_r_and_c = r_and_c[r_and_c[:,1].argsort()]
ind_r_big = sorted_r_and_c[:,0]>21
ind_r_small = sorted_r_and_c[:,0]<=21
ind_d_big = sorted_r_and_c[:,1]>42
ind_d_small = sorted_r_and_c[:,1]<=42
same_big = ind_d_big & ind_r_big
same_small = ind_d_small & ind_r_small
same_big.sum() + same_small.sum()


In [None]:
plt.plot(sorted_r_and_c[:,1], sorted_r_and_c[:,0], '.')
plt.plot()
plt.show()

In [None]:
plt.hist(sorted_r_and_c[ind_r_big,1], bins=100)
plt.hist(sorted_r_and_c[ind_r_small,1], bins=100)
plt.show()