In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import torch

In [4]:
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [5]:
import tensorflow as tf
import cv2
import numpy as np

# Lists to store images and labels
images = []
labels = []

# Feature dictionary to specify the TFRecord structure
feature_dictionary = {
    'label': tf.io.FixedLenFeature([], tf.int64),
    'label_normal': tf.io.FixedLenFeature([], tf.int64),
    'image': tf.io.FixedLenFeature([], tf.string)  # Changed from 'image' to 'image_raw' as per previous suggestions
}

# Function to parse TFRecord examples
def _parse_function(example):
    parsed_example = tf.io.parse_single_example(example, feature_dictionary)
    return parsed_example

# Function to read and process data from TFRecord files
def read_data(filename):
    full_dataset = tf.data.TFRecordDataset(filename, num_parallel_reads=tf.data.experimental.AUTOTUNE)
    full_dataset = full_dataset.shuffle(buffer_size=31000)
    full_dataset = full_dataset.cache()

    print(f"Size of Dataset from {filename}: ", len(list(full_dataset)))

    # Map the parsing function to the dataset
    full_dataset = full_dataset.map(_parse_function, num_parallel_calls=tf.data.experimental.AUTOTUNE)

    # Process each example in the dataset
    for image_features in full_dataset:
        # Decode the raw image string
        image = tf.io.decode_raw(image_features['image'], tf.uint8)
        image = tf.reshape(image, [299, 299])

        # Optionally resize the image if needed
        image = cv2.resize(image.numpy(), (100, 100))
        image = cv2.merge([image, image, image])  # Ensure the image has 3 channels

        # Append image and label to lists
        images.append(image)
        labels.append(image_features['label_normal'].numpy())

# List of TFRecord filenames
filenames=[
    '/content/drive/My Drive/MY_DATA/training10_0/training10_0.tfrecords']#,
#     '/content/drive/My Drive/MY_DATA/training10_1/training10_1.tfrecords',
#     '/content/drive/My Drive/MY_DATA/training10_2/training10_2.tfrecords',
#     '/content/drive/My Drive/MY_DATA/training10_3/training10_3.tfrecords',
#     '/content/drive/My Drive/MY_DATA/training10_4/training10_4.tfrecords',
# ]
# Read data from each TFRecord file
for file in filenames:
    read_data(file)

# Print the lengths of images and labels
print("Total number of images:", len(images))
print("Total number of labels:", len(labels))




Size of Dataset from /content/drive/My Drive/MY_DATA/training10_0/training10_0.tfrecords:  11177
Total number of images: 11177
Total number of labels: 11177


In [7]:

# Convert images and labels to PyTorch tensors
images_np = np.array(images, dtype=np.float32)  # Convert to NumPy array
images_np /= 255.0
images_tensor = torch.tensor(images_np).permute(0, 3, 1, 2)  # Change shape to (N, C, H, W)

labels_np = np.array(labels)
labels_tensor = torch.tensor(labels_np, dtype=torch.float32).view(-1, 1)

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(images_tensor.numpy(), labels_tensor.numpy(), test_size=0.2, random_state=42)

# Create DataLoaders for the train and test sets
train_dataset = TensorDataset(torch.tensor(X_train), torch.tensor(y_train))
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)

test_dataset = TensorDataset(torch.tensor(X_test), torch.tensor(y_test))
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)



In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import numpy as np
import cv2
import tensorflow as tf
from tqdm import tqdm

# Define the Multiscale DEQ Model with Implicit Differentiation
class MultiscaleDEQModel(nn.Module):
    def __init__(self, max_iterations=5, tolerance=1e-5):
        super(MultiscaleDEQModel, self).__init__()

        # Define three convolutional layers with different kernel sizes
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)  # 3x3 convolution
        self.conv2 = nn.Conv2d(3, 16, kernel_size=5, padding=2)  # 5x5 convolution
        self.conv3 = nn.Conv2d(3, 16, kernel_size=7, padding=3)  # 7x7 convolution
        self.pool = nn.MaxPool2d(2)  # 2x2 pooling
        self.relu = nn.ReLU()

        # Fully connected layers
        self.fc1 = nn.Linear(48 * 50 * 50, 32)  # Adjusted based on the output from the convolutions
        self.fc2 = nn.Linear(32, 1)  # Output layer for binary classification

        self.max_iterations = max_iterations
        self.tolerance = tolerance

    def F(self, z, x):
        # Compute features at different scales
        scale1 = self.pool(self.relu(self.conv1(x)))  # Output: (B, 16, 50, 50)
        scale2 = self.pool(self.relu(self.conv2(x)))  # Output: (B, 16, 50, 50)
        scale3 = self.pool(self.relu(self.conv3(x)))  # Output: (B, 16, 50, 50)

        # Concatenate the outputs from all scales
        z_new = torch.cat((scale1, scale2, scale3), dim=1)  # Output: (B, 48, 50, 50)

        # Flatten the concatenated output and apply fully connected layers
        z_new = z_new.view(z_new.size(0), -1)  # Flatten to (B, 48 * 50 * 50)
        z_new = self.relu(self.fc1(z_new))  # Fully connected layer
        z_new = self.fc2(z_new)  # Output layer
        return z_new

    def forward(self, x):
        # Initialize z as zeros
        z = torch.zeros(x.size(0), 1).to(x.device)

        # Iteratively find the fixed point
        for _ in range(self.max_iterations):
            z_new = self.F(z, x)
            if torch.max(torch.abs(z_new - z)) < self.tolerance:
                break
            z = z_new

        return z

    def backward(self, x, z, grad_output):
        """
        Implicit differentiation for DEQ: computes the gradient of the loss with respect to inputs.
        """
        with torch.enable_grad():
            z = z.detach().requires_grad_()
            f_z = self.F(z, x)

        # Compute Jacobian-vector product (JVP) for the implicit function.
        # Here, grad_output comes from the gradient of the loss with respect to the output.
        grad_z = torch.autograd.grad(f_z, z, grad_outputs=grad_output, retain_graph=True)[0]

        # Compute gradient of the loss with respect to the input x.
        grad_x = torch.autograd.grad(f_z, x, grad_outputs=grad_z, retain_graph=True)[0]

        return grad_x

# Instantiate the model, define loss, and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MultiscaleDEQModel().to(device)
criterion = nn.BCEWithLogitsLoss()  # Suitable for binary classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Example training loop with progress bar
num_epochs = 3
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    epoch_loss = 0.0

    # Use tqdm to add a progress bar for each epoch
    with tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{num_epochs}', unit='batch') as tepoch:
        for batch_images, batch_labels in tepoch:
            batch_images = batch_images.to(device)
            batch_labels = batch_labels.to(device)

            optimizer.zero_grad()  # Clear gradients
            outputs = model(batch_images)  # Forward pass using the implicit layer
            loss = criterion(outputs, batch_labels)  # Compute loss

            # Compute gradients using the backward method with implicit differentiation
            loss.backward()  # Backpropagation for implicit differentiation
            optimizer.step()  # Update weights

            epoch_loss += loss.item()
            # Update progress bar with the current loss value
            tepoch.set_postfix(loss=loss.item())

    print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {epoch_loss/len(train_dataloader):.4f}')

# Evaluation step
model.eval()  # Set model to evaluation mode
total_correct = 0
total_samples = 0

with torch.no_grad():  # No need to compute gradients during evaluation
    for batch_images, batch_labels in test_dataloader:
        batch_images = batch_images.to(device)
        batch_labels = batch_labels.to(device)

        outputs = model(batch_images)  # Forward pass
        predicted = torch.sigmoid(outputs).round()  # Apply sigmoid and round to get predicted labels
        total_correct += (predicted == batch_labels).sum().item()  # Count correct predictions
        total_samples += batch_labels.size(0)

# Calculate and print the accuracy
accuracy = total_correct / total_samples
print(f'Accuracy on test set: {accuracy * 100:.2f}%')


Epoch 1/3: 100%|██████████| 280/280 [03:16<00:00,  1.42batch/s, loss=0.276]


Epoch [1/3], Average Loss: 0.3743


Epoch 2/3: 100%|██████████| 280/280 [03:12<00:00,  1.46batch/s, loss=0.257]


Epoch [2/3], Average Loss: 0.2847


Epoch 3/3: 100%|██████████| 280/280 [03:21<00:00,  1.39batch/s, loss=0.0636]


Epoch [3/3], Average Loss: 0.2623
Accuracy on test set: 89.49%


In [21]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import numpy as np
import cv2
import tensorflow as tf
from tqdm import tqdm

# Define the Multiscale DEQ Model with Implicit Differentiation
class MultiscaleDEQModel(nn.Module):
    def __init__(self, max_iterations=5, tolerance=1e-5):
        super(MultiscaleDEQModel, self).__init__()

        # Define three convolutional layers with different kernel sizes
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)  # 3x3 convolution
        self.conv2 = nn.Conv2d(3, 16, kernel_size=5, padding=2)  # 5x5 convolution
        self.conv3 = nn.Conv2d(3, 16, kernel_size=7, padding=3)  # 7x7 convolution
        self.pool = nn.MaxPool2d(2)  # 2x2 pooling
        self.relu = nn.ReLU()

        # Fully connected layers
        self.fc1 = nn.Linear(48 * 50 * 50, 32)  # Adjusted based on the output from the convolutions
        self.fc2 = nn.Linear(32, 1)  # Output layer for binary classification

        self.max_iterations = max_iterations
        self.tolerance = tolerance

    def F(self, z, x):
        # Compute features at different scales
        scale1 = self.pool(self.relu(self.conv1(x)))  # Output: (B, 16, 50, 50)
        scale2 = self.pool(self.relu(self.conv2(x)))  # Output: (B, 16, 50, 50)
        scale3 = self.pool(self.relu(self.conv3(x)))  # Output: (B, 16, 50, 50)

        # Concatenate the outputs from all scales
        z_new = torch.cat((scale1, scale2, scale3), dim=1)  # Output: (B, 48, 50, 50)

        # Flatten the concatenated output and apply fully connected layers
        z_new = z_new.view(z_new.size(0), -1)  # Flatten to (B, 48 * 50 * 50)
        z_new = self.relu(self.fc1(z_new))  # Fully connected layer
        z_new = self.fc2(z_new)  # Output layer
        return z_new

    def forward(self, x):
        # Initialize z as zeros
        z = torch.zeros(x.size(0), 1).to(x.device)

        # Iteratively find the fixed point
        for _ in range(self.max_iterations):
            z_new = self.F(z, x)
            if torch.max(torch.abs(z_new - z)) < self.tolerance:
                break
            z = z_new

        return z

    def backward(self, x, z, grad_output):
        """
        Implicit differentiation for DEQ: computes the gradient of the loss with respect to inputs.
        """
        with torch.enable_grad():
            z = z.detach().requires_grad_()
            f_z = self.F(z, x)

        # Compute Jacobian-vector product (JVP) for the implicit function.
        # Here, grad_output comes from the gradient of the loss with respect to the output.
        grad_z = torch.autograd.grad(f_z, z, grad_outputs=grad_output, retain_graph=True)[0]

        # Compute gradient of the loss with respect to the input x.
        grad_x = torch.autograd.grad(f_z, x, grad_outputs=grad_z, retain_graph=True)[0]

        return grad_x

# Instantiate the model, define loss, and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MultiscaleDEQModel().to(device)
criterion = nn.BCEWithLogitsLoss()  # Suitable for binary classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Example training loop with progress bar
num_epochs = 2
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    epoch_loss = 0.0

    # Use tqdm to add a progress bar for each epoch
    with tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{num_epochs}', unit='batch') as tepoch:
        for batch_images, batch_labels in tepoch:
            batch_images = batch_images.to(device)
            batch_labels = batch_labels.to(device)

            optimizer.zero_grad()  # Clear gradients
            outputs = model(batch_images)  # Forward pass using the implicit layer
            loss = criterion(outputs, batch_labels)  # Compute loss

            # Compute gradients using the backward method with implicit differentiation
            loss.backward()  # Backpropagation for implicit differentiation
            optimizer.step()  # Update weights

            epoch_loss += loss.item()
            # Update progress bar with the current loss value
            tepoch.set_postfix(loss=loss.item())

    print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {epoch_loss/len(train_dataloader):.4f}')

# Evaluation step
model.eval()  # Set model to evaluation mode
total_correct = 0
total_samples = 0

with torch.no_grad():  # No need to compute gradients during evaluation
    for batch_images, batch_labels in test_dataloader:
        batch_images = batch_images.to(device)
        batch_labels = batch_labels.to(device)

        outputs = model(batch_images)  # Forward pass
        predicted = torch.sigmoid(outputs).round()  # Apply sigmoid and round to get predicted labels
        total_correct += (predicted == batch_labels).sum().item()  # Count correct predictions
        total_samples += batch_labels.size(0)

# Calculate and print the accuracy
accuracy = total_correct / total_samples
print(f'Accuracy on test set: {accuracy * 100:.2f}%')


Epoch 1/2: 100%|██████████| 280/280 [03:20<00:00,  1.39batch/s, loss=0.366]


Epoch [1/2], Average Loss: 0.3529


Epoch 2/2: 100%|██████████| 280/280 [03:15<00:00,  1.43batch/s, loss=0.572]


Epoch [2/2], Average Loss: 0.2799
Accuracy on test set: 87.25%


In [22]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import numpy as np
import cv2
import tensorflow as tf
from tqdm import tqdm

# Define the Multiscale DEQ Model with Implicit Differentiation
class MultiscaleDEQModel(nn.Module):
    def __init__(self, max_iterations=5, tolerance=1e-5):
        super(MultiscaleDEQModel, self).__init__()

        # Define three convolutional layers with different kernel sizes
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)  # 3x3 convolution
        self.conv2 = nn.Conv2d(3, 16, kernel_size=5, padding=2)  # 5x5 convolution
        self.conv3 = nn.Conv2d(3, 16, kernel_size=7, padding=3)  # 7x7 convolution
        self.pool = nn.MaxPool2d(2)  # 2x2 pooling
        self.relu = nn.ReLU()

        # Fully connected layers
        self.fc1 = nn.Linear(48 * 50 * 50, 32)  # Adjusted based on the output from the convolutions
        self.fc2 = nn.Linear(32, 1)  # Output layer for binary classification

        self.max_iterations = max_iterations
        self.tolerance = tolerance

    def F(self, z, x):
        # Compute features at different scales
        scale1 = self.pool(self.relu(self.conv1(x)))  # Output: (B, 16, 50, 50)
        scale2 = self.pool(self.relu(self.conv2(x)))  # Output: (B, 16, 50, 50)
        scale3 = self.pool(self.relu(self.conv3(x)))  # Output: (B, 16, 50, 50)

        # Concatenate the outputs from all scales
        z_new = torch.cat((scale1, scale2, scale3), dim=1)  # Output: (B, 48, 50, 50)

        # Flatten the concatenated output and apply fully connected layers
        z_new = z_new.view(z_new.size(0), -1)  # Flatten to (B, 48 * 50 * 50)
        z_new = self.relu(self.fc1(z_new))  # Fully connected layer
        z_new = self.fc2(z_new)  # Output layer
        return z_new

    def forward(self, x):
        # Initialize z as zeros
        z = torch.zeros(x.size(0), 1).to(x.device)

        # Iteratively find the fixed point
        for _ in range(self.max_iterations):
            z_new = self.F(z, x)
            if torch.max(torch.abs(z_new - z)) < self.tolerance:
                break
            z = z_new

        return z

    def backward(self, x, z, grad_output):
        """
        Implicit differentiation for DEQ: computes the gradient of the loss with respect to inputs.
        """
        with torch.enable_grad():
            z = z.detach().requires_grad_()
            f_z = self.F(z, x)

        # Compute Jacobian-vector product (JVP) for the implicit function.
        # Here, grad_output comes from the gradient of the loss with respect to the output.
        grad_z = torch.autograd.grad(f_z, z, grad_outputs=grad_output, retain_graph=True)[0]

        # Compute gradient of the loss with respect to the input x.
        grad_x = torch.autograd.grad(f_z, x, grad_outputs=grad_z, retain_graph=True)[0]

        return grad_x

# Instantiate the model, define loss, and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MultiscaleDEQModel().to(device)
criterion = nn.BCEWithLogitsLoss()  # Suitable for binary classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Example training loop with progress bar
num_epochs = 1
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    epoch_loss = 0.0

    # Use tqdm to add a progress bar for each epoch
    with tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{num_epochs}', unit='batch') as tepoch:
        for batch_images, batch_labels in tepoch:
            batch_images = batch_images.to(device)
            batch_labels = batch_labels.to(device)

            optimizer.zero_grad()  # Clear gradients
            outputs = model(batch_images)  # Forward pass using the implicit layer
            loss = criterion(outputs, batch_labels)  # Compute loss

            # Compute gradients using the backward method with implicit differentiation
            loss.backward()  # Backpropagation for implicit differentiation
            optimizer.step()  # Update weights

            epoch_loss += loss.item()
            # Update progress bar with the current loss value
            tepoch.set_postfix(loss=loss.item())

    print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {epoch_loss/len(train_dataloader):.4f}')

# Evaluation step
model.eval()  # Set model to evaluation mode
total_correct = 0
total_samples = 0

with torch.no_grad():  # No need to compute gradients during evaluation
    for batch_images, batch_labels in test_dataloader:
        batch_images = batch_images.to(device)
        batch_labels = batch_labels.to(device)

        outputs = model(batch_images)  # Forward pass
        predicted = torch.sigmoid(outputs).round()  # Apply sigmoid and round to get predicted labels
        total_correct += (predicted == batch_labels).sum().item()  # Count correct predictions
        total_samples += batch_labels.size(0)

# Calculate and print the accuracy
accuracy = total_correct / total_samples
print(f'Accuracy on test set: {accuracy * 100:.2f}%')


Epoch 1/1: 100%|██████████| 280/280 [03:14<00:00,  1.44batch/s, loss=0.142]


Epoch [1/1], Average Loss: 0.3309
Accuracy on test set: 87.08%
