In [1]:
import pandas as pd
import numpy as np
from torch.utils.data import Dataset

In [None]:
class WaterDataset(Dataset):
    def __init__(self, csv_path):
        super().__init__()
        # Load data to pandas DataFrame
        df = pd.read_csv(csv_path)
        # Convert data to a NumPy array and assign to self.data
        self.data = df.to_numpy()
        
    # Implement __len__ to return the number of data samples
    def __len__(self):
        return self.data.shape[0]
    
    def __getitem__(self, idx):
        features = self.data[idx, :-1]
        # Assign last data column to label
        label = self.data[idx, -1]
        return features, label

In [None]:
from torch.utils.data import DataLoader
# Create an instance of the WaterDataset
dataset_train = WaterDataset("water_train.csv")

# Create a DataLoader based on dataset_train
dataloader_train = DataLoader(
    dataset_train,
    batch_size=2,
    shuffle=True,
)

# Get a batch of features and labels
features, labels = next(iter(dataloader_train))
print(features, labels)

In [None]:
dataloader_test = DataLoader(dataset_train, train=False, batch_size=2, shuffle=True)

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # Define the three linear layers
        self.fc1 = nn.Linear(9,16)
        self.fc2 = nn.Linear(16,8)
        self.fc3 = nn.Linear(8,1)
        
    def forward(self, x):
        # Pass x through linear layers adding activations
        x = nn.functional.relu(self.fc1(x))
        x = nn.functional.relu(self.fc2(x))
        x = nn.functional.sigmoid(self.fc3(x))
        return x

Optimizer, Training and Evaluation

In [None]:
def train_model(optimizer, net, num_epochs):
    criterion = nn.BCELoss()
    for epoch in range(num_epochs):
        running_loss = 0.
        for features, labels in dataloader_train:
            optimizer.zero_grad()
            outputs = net(features)
            loss = criterion(outputs, labels.view(-1, 1))
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
    train_loss = running_loss / len(dataloader_train)
    print(f"Training loss after {num_epochs} epochs: {train_loss}")

In [None]:
import torch.optim as optim
net = Net()
# Define the SGD optimizer
optimizer = optim.SGD(net.parameters(), lr=0.001)
train_model(
    optimizer=optimizer,
    net=net,
    num_epochs=10,
)

In [None]:
# import torch.optim as optim
net = Net()
# Define the Adam optimizer
optimizer = optim.Adam(net.parameters(), lr=0.01)
train_model(
    optimizer=optimizer,
    net=net,
    num_epochs=10,
)

In [None]:
import torch
from torchmetrics import Accuracy

# Set up binary accuracy metric
acc = Accuracy(task="binary")

net.eval()
with torch.no_grad():
    for features, labels in dataloader_test:
        # Get predicted probabilities for test data batch
        outputs = net(features)
        preds = (outputs >= 0.5).float()
        acc(preds, labels.view(-1, 1))

# Compute total test accuracy
test_accuracy = acc.compute()
print(f"Test accuracy: {test_accuracy}")

Vanishing and Exploding gradients

In [None]:
import torch.nn.init as init
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(9, 16)
        self.fc2 = nn.Linear(16, 8)
        self.fc3 = nn.Linear(8, 1)
        
        # Apply He initialization
        init.kaiming_uniform_(self.fc1.weight)
        init.kaiming_uniform_(self.fc2.weight)
        init.kaiming_uniform_(self.fc3.weight, nonlinearity="sigmoid")

    def forward(self, x):
        # Update ReLU activation to ELU
        x = nn.functional.elu(self.fc1(x))
        x = nn.functional.elu(self.fc2(x))
        x = nn.functional.sigmoid(self.fc3(x))
        return x

In [None]:
'''By learning how to optimally re-scale the next layer's inputs, batch normalization mitigates the unstable gradients problems!'''

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(9, 16)
        self.fc2 = nn.Linear(16, 8)
        self.fc3 = nn.Linear(8, 1)
        # Add two batch normalization layers
        self.bn1 = nn.BatchNorm1d(16)
        self.bn2 = nn.BatchNorm1d(8)
        
        init.kaiming_uniform_(self.fc1.weight)
        init.kaiming_uniform_(self.fc2.weight)
        init.kaiming_uniform_(self.fc3.weight, nonlinearity="sigmoid") 
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = nn.functional.elu(x)

        # Pass x through the second set of layers
        x = self.fc2(x)
        x = self.bn2(x)
        x = nn.functional.elu(x)

        x = nn.functional.sigmoid(self.fc3(x))
        return x

In [1]:
# import torch
# import torch.nn as nn

# Define the linear layer class
class LinearLayer(nn.Module):
    def __init__(self, input_size, output_size):
        super(LinearLayer, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, x):
        # x = nn.functional.relu(self.fc(x))      #can add activation layer
        x = self.fc(x)
        return x

input_size = 784
output_size = 10
linear_layer = LinearLayer(input_size, output_size)
batch_size = 64
input_data = torch.randn(batch_size, input_size)
output = linear_layer(input_data)

print("Output shape:", output.shape)



Output shape: torch.Size([64, 10])


In [None]:
import torch
import torch.nn as nn

class Net(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.feature_extractor = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Flatten(),
        )
        self.classifier = nn.Linear(64*16*16, out_features=num_classes)

    def forward(self, x):
        x = self.feature_extractor(x)
        x = self.classifier(x)
        x = nn.Softmax(x)
        return x

In [None]:
torch.nn.ReLU(inplace=False)

In [None]:
import numpy as np
from PIL import Image
class Keypoint:
    def __init__(self, x, y, sigma):
        self.x = x
        self.y = y
        self.sigma = sigma
        self.orientation = None  
sigmas = [1, 2]        
def my_sift_like(image):

    sigmas = [1, 2]
    scaled_images = [gaussian_filter(image, sigma) for sigma in sigmas]

    dog_images = [scaled_images[i] - scaled_images[i+1] for i in range(len(scaled_images)-1)]
    keypoints = find_extrema(dog_images, threshold=0.05)  

    for kp in keypoints:
        kp.orientation = average_gradient_direction(image, kp.x, kp.y)

    descriptors = compute_simple_descriptor(image, keypoints)

    return keypoints, descriptors


def gaussian_filter(image, sigma):
  size = int(2 * (np.ceil(3 * sigma)) + 1)
  x, y = np.mgrid[-size // 2 + 1:size // 2 + 1, -size // 2 + 1:size // 2 + 1]
  g = np.exp(-(x**2 + y**2) / (2 * sigma**2))  
  g /= g.sum()
  return np.convolve(image, g, mode='same')


def find_extrema(dog_images, threshold=0.05):
    
    keypoints = []
    for i, dog_img in enumerate(dog_images):
        rows, cols = dog_img.shape
        for x in range(1, rows - 1):
            for y in range(1, cols - 1):
                center_point = dog_img[x, y]
                if abs(center_point) > threshold:  
                    is_extremum = True
                    for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1), (-1, -1), (-1, 1), (1, -1), (1, 1)]:
                        neighbor = dog_img[x + dx, y + dy]
                        if abs(center_point) < abs(neighbor):
                            is_extremum = False
                            break
                    if is_extremum:
                        keypoint = Keypoint(x, y, sigma=sigmas[i])  
                        keypoints.append(keypoint)
    return keypoints


def average_gradient_direction(image, x, y):
    
    window_size = 3
    window = image[x - window_size // 2: x + window_size // 2 + 1,
                   y - window_size // 2: y + window_size // 2 + 1]
    sobelx = np.sobel(window, axis=1, mode='constant')
    sobely = np.sobel(window, axis=0, mode='constant')
    angles = np.arctan2(sobely, sobelx) 
    return np.average(angles)  


def compute_simple_descriptor(image, keypoints):
    
    descriptors = []
    for kp in keypoints:
        window_size = 8  
        half_window = window_size // 2
        subregion_size = window_size // 4 
        half_subregion = subregion_size // 2

        patch_center = (kp.x, kp.y)
        rotation_matrix = np.array([[np.cos(kp.orientation), -np.sin(kp.orientation)],
                                    [np.sin(kp.orientation), np.cos(kp.orientation)]])
        rotated_offsets = np.dot(rotation_matrix, np.array([[-dx, -dy] for dx in range(-half_window, half_window + 1) for dy in range(-half_window, half_window + 1)]))
        patch_coordinates = patch_center + rotated_offsets.T
        patch = image[patch_coordinates[:, 0].astype(int), patch_coordinates[:, 1].astype(int)] 

        descriptor = []
        for x in range(0, window_size, subregion_size):
            for y in range(0, window_size, subregion_size):
                subregion = patch[x:x+subregion_size, y:y+subregion_size]
                sobelx = np.sobel(subregion, axis=1, mode='constant')
                sobely = np.sobel(subregion, axis=0, mode='constant')
                avg_mag = np.mean(np.sqrt(sobelx**2 + sobely**2))  
                avg_dir = np.arctan2(np.mean(sobely), np.mean(sobelx)) 
                descriptor.extend([avg_mag, avg_dir]) 

        descriptors.append(descriptor)
    return descriptors

img = Image.open('11102.jpg')
image = np.asarray(img)
keypoints, descriptors = my_sift_like(image)

print("Number of keypoints:", len(keypoints))
print("Example descriptor:", descriptors[0])  