In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F

# Step 1: Generate Synthetic Data
image_size = 8  # Size of each image (smaller for demonstration)
num_images = 1000  # Number of images for training

# Generate random images
X1 = np.random.rand(num_images, image_size, image_size).astype(np.float32)
X2 = np.random.rand(num_images, image_size, image_size).astype(np.float32)

# Calculate element-wise product
y = X1 * X2  # The target output is the element-wise product of X1 and X2

# Convert data to PyTorch tensors
X1_tensor = torch.tensor(X1).unsqueeze(1)  # Add channel dimension
X2_tensor = torch.tensor(X2).unsqueeze(1)
y_tensor = torch.tensor(y).unsqueeze(1)

# Smaller channel size
hs = 16

# Step 2: Define CNN Architecture
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, hs, kernel_size=3)  # Input: (1, 8, 8), Output: (hs, 6, 6)
        self.pool1 = nn.MaxPool2d(2, 2)  # Output: (hs, 3, 3)
        self.conv2 = nn.Conv2d(hs, hs, kernel_size=3)  # Output: (hs, 1, 1)
        self.pool2 = nn.MaxPool2d(1, 1)  # Adjusted pool2 to avoid size reduction issue
        self.fc1 = nn.Linear(hs * 1 * 1 * 2, hs * 2)  # Adjusted for concatenated tensor size
        self.fc2 = nn.Linear(hs * 2, image_size * image_size)  # Adjusted for target size

    def forward(self, x1, x2):
        # Process x1 through conv layers
        x1 = self.conv1(x1)
#        print(f'After conv1 for x1: {x1.size()}')
        x1 = F.relu(x1)
#        print(f'After ReLU for x1: {x1.size()}')
        x1 = self.pool1(x1)
#        print(f'After pool1 for x1: {x1.size()}')

        x1 = self.conv2(x1)
#        print(f'After conv2 for x1: {x1.size()}')
        x1 = F.relu(x1)
#        print(f'After ReLU for x1: {x1.size()}')
        x1 = self.pool2(x1)
#        print(f'After pool2 for x1: {x1.size()}')

        x1 = torch.flatten(x1, 1)
#        print(f'Flattened x1: {x1.size()}')

        # Process x2 through conv layers (same layers as x1)
        x2 = self.conv1(x2)
 #       print(f'After conv1 for x2: {x2.size()}')
        x2 = F.relu(x2)
 #       print(f'After ReLU for x2: {x2.size()}')
        x2 = self.pool1(x2)
 #       print(f'After pool1 for x2: {x2.size()}')

        x2 = self.conv2(x2)
#        print(f'After conv2 for x2: {x2.size()}')
        x2 = F.relu(x2)
#        print(f'After ReLU for x2: {x2.size()}')
        x2 = self.pool2(x2)
#        print(f'After pool2 for x2: {x2.size()}')

        x2 = torch.flatten(x2, 1)
#        print(f'Flattened x2: {x2.size()}')

        # Concatenate x1 and x2
        x = torch.cat((x1, x2), dim=1)
#        print(f'Concatenated tensor: {x.size()}')

        # Fully connected layers
        x = self.fc1(x)
#        print(f'After fc1: {x.size()}')
        x = F.relu(x)
        x = self.fc2(x)
#        print(f'After fc2: {x.size()}')

        # Reshape x to match the shape of labels (if necessary)
        x = torch.reshape(x, (-1, 1, 8, 8))  # Adjust according to your target shape
#        print(f'Reshaped x: {x.size()}')

        return x
    
# Step 3: Create DataLoader
batch_size = 32  # Adjusted for practical batch size
dataset = TensorDataset(X1_tensor, X2_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Step 4: Define Training Loop
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleCNN().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10

# Inside your training loop
for epoch in range(num_epochs):
    running_loss = 0.0
    for i, (inputs1, inputs2, labels) in enumerate(dataloader):
        inputs1, inputs2, labels = inputs1.to(device), inputs2.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs1, inputs2)

        # Print shapes for debugging
#        print(f'Outputs shape: {outputs.shape}, Labels shape: {labels.shape}')

        # Ensure outputs and labels have the same shape
        assert outputs.shape == labels.shape, "Shapes of outputs and labels must match"

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if (i+1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(dataloader)}], Loss: {running_loss/10:.4f}')
            running_loss = 0.0

print('Finished Training')


Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Epoch [1/10], Step [10/32], Loss: 0.1142
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32, 32])
Epoch [1/10], Step [20/32], Loss: 0.0995
Concatenated tensor: torch.Size([32, 32])
Concatenated tensor: torch.Size([32,