In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
import os
import csv
import pandas as pd
from scipy.ndimage import distance_transform_edt
import torch.nn.functional as F

In [2]:
# Process shapes data
raw_shapes = []
directory = 'Dataset/shape'
for file in sorted(os.listdir(directory)):
    filepath = os.path.join(directory, file)
    with open(filepath, newline='') as f:
        reader = csv.reader(f)
        data = list(reader)
        raw_shapes.append(data)

clean_shapes = []
for shape in raw_shapes:
    clean_shape = pd.DataFrame(shape).iloc[1:-1, 1:-2]
    clean_shapes.append(clean_shape.transpose())

# Create signed distance fields for shapes
sdf_shapes = []
for s in clean_shapes:
    # Create a mask for the inside and outside of the shape
    shape = s.astype(float)
    inside_mask = shape == -1
    outside_mask = shape == 0

    # Compute the distance transform for both the inside and outside
    distance_inside = distance_transform_edt(inside_mask)
    distance_outside = distance_transform_edt(outside_mask)

    # Create the signed distance field
    signed_distance_field = distance_outside - distance_inside

    # Normalize the signed distance field
    max_distance = np.max(np.abs(signed_distance_field))
    if max_distance > 0:
        signed_distance_field = signed_distance_field / max_distance

    # Set the values inside the original shape to 0
    signed_distance_field[inside_mask] = 0
    sdf_shapes.append(signed_distance_field)


# Process vertical data
raw_verticals = []
directory = 'Dataset/vertical'
for file in sorted(os.listdir(directory)):
    filepath = os.path.join(directory, file)
    with open(filepath, newline='') as f:
        reader = csv.reader(f)
        data = list(reader)
        raw_verticals.append(data)

clean_verticals = []
for vertical in raw_verticals:
    clean_vertical = pd.DataFrame(vertical).iloc[1:-1, 1:-2]
    clean_verticals.append(clean_vertical.transpose())


# Process horizontal data
raw_horizontals = []
directory = 'Dataset/horizontal'
for file in sorted(os.listdir(directory)):
    filepath = os.path.join(directory, file)
    with open(filepath, newline='') as f:
        reader = csv.reader(f)
        data = list(reader)
        raw_horizontals.append(data)

clean_horizontals = []
for horizontal in raw_horizontals:
    clean_horizontal = pd.DataFrame(horizontal).iloc[1:-1, 1:-2]
    clean_horizontals.append(clean_horizontal.transpose())


# Calculate total velocity
total_velocities = []
for i in range(len(clean_shapes)):
    total_velocity = np.sqrt(np.square(clean_verticals[i].astype(float)) + np.square(clean_horizontals[i].astype(float)))
    total_velocities.append(total_velocity)


In [3]:
# Create training dataset
temp = []
for i in range(len(sdf_shapes)):
    input_example = sdf_shapes[i].astype(float)
    output_example = total_velocities[i].values.astype(float)
    temp.append((input_example, output_example))

data_tensor = torch.FloatTensor(temp)

# Split training dataset into training, validation, and test
training_size = int(0.6 * len(data_tensor))
validation_size = int(0.2 * len(data_tensor))
test_size = len(data_tensor) - training_size - validation_size

training_dataset, validation_dataset, test_dataset = torch.utils.data.random_split(data_tensor, [training_size, validation_size, test_size])

  data_tensor = torch.FloatTensor(temp)


In [103]:
class CFDNet(nn.Module):
    def __init__(self):
        super(CFDNet, self).__init__()

        # Input: [batch_size, 1, 32, 32]
        
        # Convolutional layers
        # Output size: [batch_size, 128, 16, 16]
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=128, kernel_size=(4, 4), stride=2, padding=1)
        
        # Output size: [batch_size, 512, 8, 8]
        self.conv2 = nn.Conv2d(in_channels=128, out_channels=512, kernel_size=(4, 4), stride=2, padding=1)
        
        # Fully connected layer
        # Flattened input size: 512 * 8 * 8 = 32768
        # Output size: 512 * 4 * 4 = 8192
        self.fc1 = nn.Linear(in_features=512*8*8, out_features=512*4*4)
        
        # Deconvolutional layers for x-component
        # Output size: [batch_size, 256, 8, 8]
        self.deconv1_x = nn.ConvTranspose2d(in_channels=512, out_channels=256, kernel_size=(4, 4), stride=2, padding=1)
        
        # Output size: [batch_size, 128, 16, 16]
        self.deconv2_x = nn.ConvTranspose2d(in_channels=256, out_channels=128, kernel_size=(4, 4), stride=2, padding=1)
        
        # Output size: [batch_size, 64, 32, 32]
        self.deconv3_x = nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=(4, 4), stride=2, padding=1)
        
        # Output size: [batch_size, 1, 32, 32]
        self.deconv4_x = nn.ConvTranspose2d(in_channels=64, out_channels=1, kernel_size=(3, 3), stride=1, padding=1)

        # Deconvolutional layers for y-component
        # Output size: [batch_size, 256, 8, 8]
        self.deconv1_y = nn.ConvTranspose2d(in_channels=512, out_channels=256, kernel_size=(4, 4), stride=2, padding=1)
        
        # Output size: [batch_size, 128, 16, 16]
        self.deconv2_y = nn.ConvTranspose2d(in_channels=256, out_channels=128, kernel_size=(4, 4), stride=2, padding=1)
        
        # Output size: [batch_size, 64, 32, 32]
        self.deconv3_y = nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=(4, 4), stride=2, padding=1)
        
        # Output size: [batch_size, 1, 32, 32]
        self.deconv4_y = nn.ConvTranspose2d(in_channels=64, out_channels=1, kernel_size=(3, 3), stride=1, padding=1)

    def forward(self, x):
        # Input: [batch_size, 1, 32, 32]

        # Convolutional layers
        x_conv = F.relu(self.conv1(x))  # Output: [batch_size, 128, 16, 16]
        x_conv = F.relu(self.conv2(x_conv))  # Output: [batch_size, 512, 8, 8]

        # Flatten and Fully connected layer
        x_fc = x_conv.view(x_conv.size(0), 512*8*8)  # Flatten to 32768 elements
        x_fc = F.relu(self.fc1(x_fc))  # Output: 8192 elements

        # Reshape for deconvolution
        x_reshaped = x_fc.view(x.size(0), 512, 4, 4)  # Reshape to [batch_size, 512, 4, 4]

        # Deconvolution path for x-component
        x_out = F.relu(self.deconv1_x(x_reshaped))  # Output: [batch_size, 256, 8, 8]
        x_out = F.relu(self.deconv2_x(x_out))  # Output: [batch_size, 128, 16, 16]
        x_out = F.relu(self.deconv3_x(x_out))  # Output: [batch_size, 64, 32, 32]
        x_out = torch.sigmoid(self.deconv4_x(x_out))  # Output: [batch_size, 1, 32, 32]

        # Deconvolution path for y-component
        y_out = F.relu(self.deconv1_y(x_reshaped))  # Output: [batch_size, 256, 8, 8]
        y_out = F.relu(self.deconv2_y(y_out))  # Output: [batch_size, 128, 16, 16]
        y_out = F.relu(self.deconv3_y(y_out))  # Output: [batch_size, 64, 32, 32]
        y_out = torch.sigmoid(self.deconv4_y(y_out))  # Output: [batch_size, 1, 32, 32]

        # Element-wise multiplication with the original input resized to match the final output dimensions
        x_resized = F.interpolate(x, size=(32, 32))  # Ensure x is resized to [batch_size, 1, 32, 32]
        x_component = x_out * x_resized  # Element-wise multiplication

        y_resized = F.interpolate(x, size=(32, 32))  # Ensure x is resized to [batch_size, 1, 32, 32]
        y_component = y_out * y_resized  # Element-wise multiplication

        return x_component, y_component


# Example usage:
# model = CFDNet()
# input_tensor = torch.randn(1, 1, 32, 32)
# x_output, y_output = model(input_tensor)


In [104]:
# Initialize the model, loss function, and optimizer
model = CFDNet()
criterion = nn.MSELoss()
optimizer = optim.RMSprop(model.parameters(), lr=0.001)

In [106]:
for epoch in range(50):  # loop over the dataset multiple times

    running_loss = 0.0
    for data in training_dataset:
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data

        tensor_inputs = torch.FloatTensor(inputs).reshape(1, 1, 32, 32)
        tensor_labels = torch.FloatTensor(labels)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        x_output, y_output = model(tensor_inputs)
        total_output = torch.sqrt(torch.square(x_output) + torch.square(y_output))
        loss = criterion(total_output, tensor_labels)
        loss.backward()
        optimizer.step()

        # accumulate the running loss
        running_loss += loss.item()

    # print statistics after processing the entire dataset
    print('[%d] loss: %.3f' % (epoch + 1, running_loss / len(training_dataset)))
    running_loss = 0.0  # Reset running loss after each epoch

print('Finished Training')


[1] loss: nan


KeyboardInterrupt: 