In [83]:
import numpy as np
import json
import os
import re
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, GlobalAveragePooling2D, Dense, Dropout, Concatenate, Flatten, Reshape, Conv2DTranspose, BatchNormalization, Activation
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from spektral.layers import GCNConv
import tensorflow as tf
from tensorflow.keras.initializers import GlorotUniform

# Hyperparameters
Iteration = 2000
lr = 0.0001
train_size = 0.96
l2value = 0.005
reduceRate = 0.9
modelPath = "CNN-GNN.keras"
act = "swish"
batch = 50
initializer = GlorotUniform()
drop = 0.3

# Load node data (25 nodes, 3 features: x, y, z)
with open("nodexy.txt", "r") as file:
    NodeOriginal = json.load(file)
NodeOriginal = np.array(NodeOriginal)  # Convert to NumPy array
# Load element data (features for each sample)
with open("thickness.txt", "r") as file:
    element_features = json.load(file)
element_features = np.array(element_features)  # Convert to NumPy array
# Load target values
energy = np.loadtxt("energy_absorbed.txt")
energy = np.array(energy).reshape(-1, 1)  # Ensure correct shape
minValue=np.argmin(energy)
mask=np.arange((len(energy))) !=minValue
NodeOriginal=NodeOriginal[mask,:,:]
element_features=element_features[mask,:]
energy=energy[mask]
# Extract x, y coordinates
NodeOriginal_x = NodeOriginal[:, :, 0]  # Extract all x-coordinates
NodeOriginal_y = NodeOriginal[:, :, 1]  # Extract all y-coordinates
# Identify and remove edge nodes (x or y = 0 or 30)
_, EdgeNodes = np.where((NodeOriginal_x == 0) | (NodeOriginal_x == 30) | (NodeOriginal_y == 0) | (NodeOriginal_y == 30))
EdgeNodes = np.unique(EdgeNodes)  # Get unique node indices to remove
# Create a mask and filter out edge nodes
mask = np.ones(len(NodeOriginal_x), dtype=bool)
mask[EdgeNodes] = False
filtered_nodes = NodeOriginal[:, :, :]
grid_data_list = []
for sample in filtered_nodes:
    node_positions = sample[:, :2]  # Extract (x, y)
    sorted_indices = np.lexsort((node_positions[:, 0], -node_positions[:, 1]))  # Sort by y (desc), then x (asc)
    sorted_nodes = sample[sorted_indices]  # Reorder nodes spatially
    grid_data = sorted_nodes.reshape(5, 5, 3)  # Convert into 5x5 grid
    grid_data_list.append(grid_data)
node_grid_data = np.array(grid_data_list)

# Normalize data
scaler_x = MinMaxScaler()
scaler_y = MinMaxScaler()
scaler_z = MinMaxScaler()
element_scaler = MinMaxScaler()
energy_scaler = MinMaxScaler()
node_grid_data[:, :, :, 0] = scaler_x.fit_transform(node_grid_data[:, :, :, 0].reshape(-1, 1)).reshape(node_grid_data.shape[:3])
node_grid_data[:, :, :, 1] = scaler_y.fit_transform(node_grid_data[:, :, :, 1].reshape(-1, 1)).reshape(node_grid_data.shape[:3])
node_grid_data[:, :, :, 2] = scaler_z.fit_transform(node_grid_data[:, :, :, 2].reshape(-1, 1)).reshape(node_grid_data.shape[:3])
node_grid_data=node_grid_data[:, : , : , 0:2]
element_features = element_scaler.fit_transform(element_features)
energy = energy_scaler.fit_transform(energy)

# Split data
X_train_nodes, X_test_nodes, X_train_elements, X_test_elements, y_train, y_test = train_test_split(
    node_grid_data, element_features, energy, test_size=1 - train_size, random_state=42
)


def parse_inp_file(filename):
    connectivity_list = []
    inside_element_section = False  # Flag to track when inside *Element section

    with open(filename, 'r') as file:
         for line in file:
            line = line.strip()  # Remove leading/trailing spaces
                
            # Detect start of element section
            if line.startswith("*Element"):
                inside_element_section = True
                continue  # Move to next line
                # Detect end of element section (if another keyword appears)
            if inside_element_section and line.startswith("*"):
                break  # Stop reading elements
            if inside_element_section and line:
                parts = line.split(",")  # Split by commas
                nodes = list(map(int, parts[1:]))  # Convert remaining parts to integers
                node1, node2 = nodes[0], nodes[1]
                connectivity_list.append((node1 - 1, node2 - 1))
    return connectivity_list


inp_file_path = "ExplcitBeam.inp"
connectivity_list = parse_inp_file(inp_file_path)

def create_adjacency_matrices(connectivity_list, num_nodes, element_thickness):
    A_connectivity = np.zeros((num_nodes, num_nodes))
    A_thickness = np.zeros((num_nodes, num_nodes))

    for (i, j) in connectivity_list:
        A_connectivity[i, j] = A_connectivity[j, i] = 1
        A_thickness[i, j] = A_thickness[j, i] = element_thickness[i, 0]  # Assign thickness value

    A_thickness = A_thickness / np.max(A_thickness)  # Normalize
    return A_connectivity, A_thickness


A_connectivity, A_thickness = create_adjacency_matrices(connectivity_list, 25, element_features)
A_connectivity = tf.convert_to_tensor(A_connectivity, dtype=tf.float32)
A_thickness = tf.convert_to_tensor(A_thickness, dtype=tf.float32)

print(f"X_train_nodes shape: {X_train_nodes.shape}")  # Should be (batch_size, 5, 5, 2)
print(f"X_train_elements shape: {X_train_elements.shape}")  # Should be (batch_size, num_features)
print(f"A_connectivity shape: {A_connectivity.shape}")  # Should be (batch_size, 25, 25)
print(f"A_thickness shape: {A_thickness.shape}")  # Should be (batch_size, 25, 25)
print(f"y_train shape: {y_train.shape}")  # Should NOT be None

node_input = Input(shape=(5, 5, 2), name="Node_Input")
node_branch = Conv2D(16, kernel_size= (3, 3), padding="same", activation=act, kernel_initializer=initializer)(node_input)
node_branch = BatchNormalization()(node_branch)
node_branch = GlobalAveragePooling2D()(node_branch)



element_input = Input(shape=(X_train_elements.shape[1],), name="Element_Input")
element_branch = Dense(50, activation=act, kernel_regularizer=l2(l2value), kernel_initializer=initializer)(element_input)
element_branch = Dropout(drop)(element_branch)
element_branch = Dense(16, activation=act, kernel_initializer=initializer)(element_branch)


gnn_node_input = Input(shape=(25, 2),batch_size=None, name="GNN_Node_Input")
adj_input = Input(shape=(25, 25),batch_size=None, name="Adjacency_Connectivity")  # Now supports batching
adj_thickness_input = Input(shape=(25, 25),batch_size=1, name="Adjacency_Thickness")

gnn_branch = GCNConv(25, activation=act)([gnn_node_input,adj_input])
gnn_branch = GCNConv(100, activation=act)([adj_input, adj_thickness_input])
gnn_branch = Flatten()(gnn_branch)

# Merge CNN, ANN, and GNN
merged = Concatenate()([node_branch, element_branch, gnn_branch])

# Latent Space
latentSpace = Dense(50, activation=act, kernel_initializer=initializer)(merged)
latentSpace = Dropout(drop)(latentSpace)
latentSpace = Dense(10, activation=act, name="Latent")(latentSpace)

encode = Dense(1, activation="linear", name="Encode")(latentSpace)

# Decoding Output
decoded = Dense(30, activation=act, kernel_initializer=initializer)(latentSpace)
decoded = Dense(25 * 2, activation=act, kernel_initializer=initializer)(decoded)
decoded = Reshape((5, 5, 2))(decoded)
decode = Conv2DTranspose(2, (3, 3), activation=act, padding="same", kernel_initializer=initializer, name="Decode")(decoded)

# Create Model
model = Model(inputs=[node_input, element_input, gnn_node_input, adj_input, adj_thickness_input], outputs=[encode, decode])
model.compile(optimizer=Adam(learning_rate=lr), loss={"Encode": "mse", "Decode": "mse"})

# Train Model
history = model.fit(
    [X_train_nodes, X_train_elements, X_train_nodes.reshape(-1, 25, 2), A_connectivity, A_thickness],
    [y_train, X_train_nodes],
    epochs=Iteration,
    batch_size=batch,
    validation_split=0.05
)

X_train_nodes shape: (8639, 5, 5, 2)
X_train_elements shape: (8639, 36)
A_connectivity shape: (25, 25)
A_thickness shape: (25, 25)
y_train shape: (8639, 1)


ValueError: Exception encountered when calling GCNConv.call().

[1mCould not automatically infer the output shape / dtype of 'gcn_conv_56' (of type GCNConv). Either the `GCNConv.call()` method is incorrect, or you need to implement the `GCNConv.compute_output_spec() / compute_output_shape()` method. Error encountered:

Tried to convert 'y' to a tensor and failed. Error: None values not supported.[0m

Arguments received by GCNConv.call():
  • args=(['<KerasTensor shape=(None, 25, 2), dtype=float32, sparse=False, name=GNN_Node_Input>', '<KerasTensor shape=(None, 25, 25), dtype=float32, sparse=False, name=Adjacency_Connectivity>'],)
  • kwargs={'mask': ['None', 'None']}

In [87]:
import numpy as np
import tensorflow as tf
from spektral.layers import GCNConv
from tensorflow.keras.layers import Input, Flatten, Dense
from tensorflow.keras.models import Model

# Define number of nodes and features
batch_size = 4
num_nodes = 5
num_features = 3

# Node features (batch_size, num_nodes, num_features)
X = np.random.rand(batch_size, num_nodes, num_features).astype("float32")

# Adjacency matrix (batch_size, num_nodes, num_nodes)
A = np.random.randint(0, 2, (batch_size, num_nodes, num_nodes)).astype("float32")

# Mask (batch_size, num_nodes) - Example: masking the last node in each graph
mask = np.ones((batch_size, num_nodes), dtype="float32")
mask[:, -1] = 0  # Mark last node in every graph as ignored

# Define Input layers
node_input = Input(shape=(num_nodes, num_features), name="Node_Features")
adj_input = Input(shape=(num_nodes, num_nodes), name="Adjacency_Matrix")
mask_input = Input(shape=(num_nodes,), dtype=tf.float32, name="Mask")  # Mask input
mask_float = tf.reshape(mask_input, (-1, num_nodes, 1))  # (batch_size, num_nodes, 1)

# GCN with mask
gcn = GCNConv(16, activation="relu")([node_input, adj_input], mask=mask_float)
gcn = GCNConv(8, activation="relu")([gcn, adj_input], mask=mask_input)
gcn = Flatten()(gcn)

# Output
output = Dense(1, activation="linear")(gcn)

# Create Model
model = Model(inputs=[node_input, adj_input, mask_input], outputs=output)
model.compile(optimizer="adam", loss="mse")
model.summary()

# Train Model
history = model.fit([X, A, mask], np.random.rand(batch_size, 1), epochs=5, verbose=1)


ValueError: A KerasTensor cannot be used as input to a TensorFlow function. A KerasTensor is a symbolic placeholder for a shape and dtype, used when constructing Keras Functional models or Keras Functions. You can only use it as input to a Keras layer or a Keras operation (from the namespaces `keras.layers` and `keras.operations`). You are likely doing something like:

```
x = Input(...)
...
tf_fn(x)  # Invalid.
```

What you should do instead is wrap `tf_fn` in a layer:

```
class MyLayer(Layer):
    def call(self, x):
        return tf_fn(x)

x = MyLayer()(x)
```
