In [2]:
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
import pennylane as qml
from pennylane import numpy as np
import json
import os
from PIL import Image
import random
import numpy as np
import matplotlib.pyplot as plt

KeyboardInterrupt: 

In [None]:
# Function to load data
def load_data(path):
    for folder in os.listdir(path):
        folder_path = os.path.join(path, folder)
        if os.path.isdir(folder_path):
            for img_name in os.listdir(folder_path):
                img_path = os.path.join(folder_path, img_name)
                key_name = folder + '/' + img_name
                label = train_data[key_name]

                if samples[label] > 2000:
                    continue

                samples[label] += 1
                img = Image.open(img_path).resize(imgpx)
                img = tf.convert_to_tensor(img)
                data.append((img, onehot[label]))

                if not np.mod(len(data), 100):
                    print(f"Total images loaded : {len(data)}, Label count : {samples}", end='\r')

In [None]:
# Load training data
training_data_file = 'D:/QML/al5083/al5083/train/train.json'

with open(training_data_file, 'r') as tdf:
    train_data = json.load(tdf)

# Define labels and one-hot encoding
labels = ["good weld", "burn through", "contamination", "lack of fusion", "misalignment", "lack of penetration"]
labels = {i: labels[i] for i in range(6)}
onehot = tf.keras.utils.to_categorical([int(key) for key in labels.keys()], num_classes=6)
samples = [0, 0, 0, 0, 0, 0]

for label in train_data:
    samples[train_data[label]] += 1

# Initialize data storage
data = []
imgpx = (256, 256)
samples = [0, 0, 0, 0, 0, 0]
load_data('D:/QML/al5083/al5083/train')

# Shuffle and prepare data for training
random.shuffle(data)
X = []
y = []

for datum in data:
    X.append(datum[0])
    y.append(datum[1])

In [None]:
# Display some random images
index = random.randint(0, len(data))
X[index], y[index]

figure = plt.figure(figsize=(18, 11))
cols, rows = 4, 2
for i in range(1, cols * rows + 1, 2):
    img_idx = random.randint(0, len(X) - 1)

    figure.add_subplot(rows, cols, i)
    label_idx = np.argmax(y[img_idx]).item()  # Convert tensor to plain integer
    plt.title(labels[label_idx])
    plt.axis("off")
    plt.imshow(X[img_idx])

    figure.add_subplot(rows, cols, i + 1)
    label_idx = np.argmax(y[img_idx]).item()  # Convert tensor to plain integer
    plt.title(labels[label_idx])
    plt.axis("off")
    plt.imshow(X[img_idx], cmap='gray')

plt.tight_layout()
plt.show()

In [None]:
# Split data into training and test sets
data_size = len(X)
split = int(data_size * 0.8)
X_train, y_train = X[:split], y[:split]
X_test, y_test = X[split:], y[split:]

X_train = tf.convert_to_tensor(X_train, dtype=tf.float32)
y_train = tf.convert_to_tensor(y_train, dtype=tf.float32)
X_test = tf.convert_to_tensor(X_test, dtype=tf.float32)
y_test = tf.convert_to_tensor(y_test, dtype=tf.float32)

# Reshape data for CNN input
X_train = tf.reshape(X_train, shape=(len(X_train), 256, 256, 1))
X_test = tf.reshape(X_test, shape=(len(X_test), 256, 256, 1))

In [None]:
# Define the number of qubits for the quantum layer
# Quantum circuit setup
n_qubits = 4
dev = qml.device('default.qubit', wires=n_qubits)

@qml.qnode(dev)
def amplitude_embedding_circuit(data):
    norm = np.linalg.norm(data)
    data_normalized = data / norm
    for i in range(n_qubits):
        theta = 2 * np.arccos(data_normalized[i])
        qml.RY(theta, wires=i)
    return np.array(qml.expval(qml.PauliZ(0)), dtype=np.float32)

class QuantumLayer(tf.keras.layers.Layer):
    def __init__(self, output_size, **kwargs):
        super().__init__(**kwargs)
        self.output_size = output_size

    def call(self, inputs):
        flat_inputs = tf.reshape(inputs, (tf.shape(inputs)[0], -1))
        
        # Ensure numpy function output is of type tf.float32
        quantum_data = tf.numpy_function(
            func=lambda x: np.array([amplitude_embedding_circuit(x_i).astype(np.float32) for x_i in x]),
            inp=[flat_inputs],
            Tout=tf.float32
        )
        
        # Ensure proper reshaping and data type
        quantum_data = tf.reshape(quantum_data, (-1, self.output_size))
        return quantum_data

In [None]:
class HybridModel(tf.keras.Model):
    def __init__(self, l2_reg=0.01, quantum_output_size=16):
        super().__init__()
        self.conv1 = layers.Conv2D(256, (3, 3), activation='relu', padding='same', kernel_regularizer=regularizers.l2(l2_reg))
        self.pool1 = layers.MaxPooling2D((2, 2))
        self.dropout1 = layers.Dropout(0.2)

        self.conv4 = layers.Conv2D(128, (3, 3), activation='relu', padding='same', kernel_regularizer=regularizers.l2(l2_reg))
        self.pool4 = layers.MaxPooling2D((2, 2))
        self.dropout3 = layers.Dropout(0.2)

        self.conv5 = layers.Conv2D(64, (3, 3), activation='relu', kernel_regularizer=regularizers.l2(l2_reg))
        self.pool5 = layers.MaxPooling2D((2, 2))

        self.flatten = layers.Flatten()

        self.dense1 = layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(l2_reg))
        self.dense2 = layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(l2_reg))
        self.quantum_layer = QuantumLayer(output_size=quantum_output_size)
        self.dense3 = layers.Dense(len(labels), activation='softmax')

    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.pool1(x)
        x = self.dropout1(x)

        x = self.conv4(x)
        x = self.pool4(x)
        x = self.dropout3(x)

        x = self.conv5(x)
        x = self.pool5(x)

        x = self.flatten(x)

        x = self.dense1(x)
        x = self.dense2(x)
        x = self.quantum_layer(x)
        output = self.dense3(x)

        return output

In [None]:
model = HybridModel()

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss=tf.keras.losses.CategoricalCrossentropy(),
    metrics=['accuracy']
)

# Display train and test dataset sizes
print(f"Train dataset Size : {len(X_train)}\nTest Dataset size : {len(X_test)}")
model.summary()
# Setup model checkpointing
callback1 = tf.keras.callbacks.ModelCheckpoint(
    filepath='/model.weights.h5',
    monitor='val_accuracy',
    verbose=1,
    save_best_only=True,
    save_weights_only=True,
)


In [None]:
# Train the model
model.fit(x=X_train, y=y_train, epochs=30, validation_data=(X_test, y_test), callbacks=[callback1])

# Load the best model weights

model.load_weights('/model.weights.h5')

# Make predictions on the test set
predictions = model.predict(X_test)

In [None]:
# Display some predictions
for _ in range(5):
    img = random.randint(0, len(X_test) - 1)
    plt.imshow(X_test[img].numpy().squeeze(), cmap='gray')
    plt.show()
    print(f"Prediction : {labels[np.argmax(predictions[img])]}")
    print(f"Original : {labels[np.argmax(y_test[img])]}")

# Evaluate the model on the training set
train_metric = model.evaluate(X_train, y_train, verbose=0)
print(f"Training Loss : {train_metric[0]} , Training Accuracy : {train_metric[1]}")

# Evaluate the model on the test set
test_metric = model.evaluate(X_test, y_test, verbose=0)
print(f"Testing Loss : {test_metric[0]} , Testing Accuracy : {test_metric[1]}")