In [1]:
# imports
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import cv2
import tensorflow as tf
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow import data as tf_data
import time
from sklearn.metrics import confusion_matrix, precision_score, recall_score, log_loss
import numpy as np
from typing import List, Dict, Any, Union

from utils import print_progress_bar
print("Modules imported")

Modules imported


In [2]:
# variables
all_labels = ['nature', 'country', 'city']
path = "../datasets/all_data/entropy_results_short.json"
parallel_jobs = 5

# hyperparameters
test_part = 0.05
epochs = 2
batch_size = 64
learning_rate = 0.01

# check gpu
devices = tf.config.list_physical_devices()
print("Available devices:")
for device in devices:
    print(device.name)
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
    print(f"The model will run on GPU: {physical_devices[0].name}")
else:
    print("No GPU found, the model will run on CPU.")

Available devices:
/physical_device:CPU:0
No GPU found, the model will run on CPU.


In [3]:
# data validation functions
def validate_element(element: Dict[str, Any]) -> Union[bool, str]:
    """Validate the format of an element in the dataset."""
    try:
        # Validate 'image' key
        image = element['image']
        
        # Validate 'dwt' key
        dwt = image[0]
        if not isinstance(dwt, tf.Tensor) or dwt.shape[1] != 10:
            return "Invalid shape or type for 'dwt'. Expected a tensor with shape: (batch_size, 10)"
        
        # Validate 'lvl0' key
        lvl0 = image[1]
        if not isinstance(lvl0, tf.Tensor) or lvl0.shape[1] != 17:
            return "Invalid shape or type for 'lvl0'. Expected a tensor with shape: (batch_size, 17)"
        
        # Validate 'lvl1', 'lvl2', and 'lvl3' keys
        for i, shape in zip([2, 3, 4], [(2, 2, 1), (4, 4, 1), (8, 8, 1)]):
            for lvl in image[i]:
                if not isinstance(lvl, tf.Tensor) or lvl.shape[2:] != shape:
                    return f"Invalid shape or type for 'lvl{i-1}'. Expected a tensor with shape: (batch_size, ..., {shape})"
        
        # Validate 'label' key
        label = element['label']
        if not isinstance(label, tf.Tensor) or label.dtype != tf.string:
            return "Invalid format for 'label'. Expected a tensor of strings."
        
        return True
    except (KeyError, ValueError, TypeError) as e:
        return f"An error occurred: {e}"



# Function to validate all elements in the dataset
def validate_dataset(dataset: List[Dict[str, Any]]) -> bool:
    return all(validate_element(element) for element in dataset)

In [11]:
# heads
class SelfAttention(tf.keras.layers.Layer):
    def __init__(self, embed_size, heads):
        super(SelfAttention, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads

        assert (
            self.head_dim * heads == embed_size
        ), "Embedding size needs to be divisible by heads"

        self.wq = tf.keras.layers.Dense(self.head_dim)
        self.wk = tf.keras.layers.Dense(self.head_dim)
        self.wv = tf.keras.layers.Dense(self.head_dim)

    def call(self, inputs):
        Q = self.wq(inputs)
        K = self.wk(inputs)
        V = self.wv(inputs)

        matmul_qk = tf.matmul(Q, K, transpose_b=True)

        depth = tf.cast(tf.shape(K)[-1], tf.float32)
        logits = matmul_qk / tf.math.sqrt(depth)

        attention_weights = tf.nn.softmax(logits, axis=-1)

        output = tf.matmul(attention_weights, V)
        return output

In [4]:
# model
class EntropyClassifier(tf.keras.Model):
    def __init__(self, possible_labels):
        super(EntropyClassifier, self).__init__()

        self.possible_labels = possible_labels

        self.dwt_input_layer = tf.keras.layers.Dense(10, activation='relu')
        self.lvl0_input_layer = tf.keras.layers.Dense(17, activation='relu')
        self.lvl1_input_layers = [tf.keras.layers.Conv2D(1, (2, 2), activation='relu') for _ in range(17)]
        self.lvl2_input_layers = [tf.keras.layers.Conv2D(1, (2, 2), activation='relu') for _ in range(17)]
        self.lvl3_input_layers = [tf.keras.layers.Conv2D(1, (2, 2), activation='relu') for _ in range(17)]

        self.self_attention_heads = [SelfAttention(10, 1), SelfAttention(17, 1)] + [SelfAttention(4, 1) for _ in range(51)]

        self.combination_layer = tf.keras.layers.Concatenate()

        self.fc_layer = tf.keras.layers.Dense(128, activation='relu')
        self.output_layer = tf.keras.layers.Dense(len(possible_labels), activation='softmax')

    def call(self, inputs):
        """
        Method to forward propagate through the model
        Args:
        inputs (Tensor): Input tensor
    
        Returns:
        Tensor: Output tensor
        """
        dwt_output, lvl0_output, lvl1_outputs, lvl2_outputs, lvl3_outputs = inputs
    
        # Passing the components through the respective layers
        dwt_output = self.dwt_input_layer(dwt_output)
        lvl0_output = self.lvl0_input_layer(lvl0_output)
        lvl1_outputs = [layer(input) for layer, input in zip(self.lvl1_input_layers, lvl1_outputs)]
        lvl2_outputs = [layer(input) for layer, input in zip(self.lvl2_input_layers, lvl2_outputs)]
        lvl3_outputs = [layer(input) for layer, input in zip(self.lvl3_input_layers, lvl3_outputs)]
        
        # Expanding the dimensions of the outputs
        lvl1_outputs = [tf.expand_dims(input, axis=0) for input in lvl1_outputs]
        lvl2_outputs = [tf.expand_dims(input, axis=0) for input in lvl2_outputs]
        lvl3_outputs = [tf.expand_dims(input, axis=0) for input in lvl3_outputs]
    
        attention_inputs = [dwt_output, lvl0_output] + [tf.keras.layers.Flatten()(output) for output in (lvl1_outputs + lvl2_outputs + lvl3_outputs)]
        
        attention_outputs = [layer(input) for layer, input in zip(self.self_attention_heads, attention_inputs)]
        
        # Calculate the common dimension size for concatenation
        common_dimension_size = min([tf.shape(output)[-1] for output in attention_outputs])
        
        # Reshape tensors to have the same size in the last dimension
        attention_outputs = [tf.slice(output, [0,0], [-1, common_dimension_size]) for output in attention_outputs]
        
        combined_output = self.combination_layer(attention_outputs)
        
        fc_output = self.fc_layer(combined_output)
        output = self.output_layer(fc_output)
        
        return output

    def train_model(self, dataset, epochs=100, batch_size=64, lr=0.01):
        loss = None
        formatted_dataset = format_dataset(dataset)

        train_dataset = formatted_dataset.batch(batch_size)

        criterion = CategoricalCrossentropy(from_logits=True)
        optimizer = Adam(learning_rate=lr)

        for epoch in range(epochs):
            print(f'Starting epoch {epoch+1}/{epochs}')
            for batch_idx, (data, target) in enumerate(train_dataset):
                target = tf.convert_to_tensor([tf.one_hot(t, len(self.possible_labels)) for t in target], dtype=tf.float32)

                with tf.GradientTape() as tape:
                    output = self(data, training=True)
                    print(f"Output shape: {output.shape}, Target shape: {target.shape}")  # Added this line to debug
                    loss = criterion(target, output)
                gradients = tape.gradient(loss, self.trainable_variables)
                optimizer.apply_gradients(zip(gradients, self.trainable_variables))

                if batch_idx % 10 == 0:
                    print(f'Batch {batch_idx}, Loss: {loss.numpy()}')

            if loss is not None:
                print(f'Epoch {epoch+1} completed, Loss: {loss.numpy()}')

    def predict(self, image):
        image = (image[0], image[1], image[2], image[3], image[4])
        output_ = self(image, training=False)
        probabilities = tf.nn.softmax(output_)
        max_index = tf.argmax(probabilities)
        label_prob_dict = {label: prob.numpy() for label, prob in zip(self.possible_labels, probabilities[0])}

        return str(self.possible_labels[max_index.numpy()]), label_prob_dict


In [5]:
def process_entry(entry):
    """Process the entropy results to extract the levels."""
    label = entry['label']
    machine_input = {0: [], 1: [], 2: [], 3: [], 'dwt': []}
    for ent in entry['entropy_results']:
    
        if ent['method'] == 'dwt':
            machine_input['dwt'] = tf.convert_to_tensor(ent['result'], dtype=tf.float32)
        else:
            for lvl, content in enumerate(ent['result']):
                machine_input[lvl].append(tf.convert_to_tensor(content, dtype=tf.float32))
    
    machine_input[0] = tf.concat(machine_input[0], axis=-1)
    machine_input[1] = tf.concat(machine_input[1], axis=-1)
    machine_input[2] = tf.concat(machine_input[2], axis=-1)
    machine_input[3] = tf.concat(machine_input[3], axis=-1)
    machine_input['dwt'] = tf.reshape(machine_input['dwt'], [1, 1, 10])
    
    return {'input': machine_input, 'label': label}  


def format_dataset(dataset):
    """Formats and shuffles the dataset for training"""
    label_num = {'nature': 0, 'country': 1, 'city': 2}
    formatted_dataset = []
    for item in dataset:
        machine_input = item['input']
        label = label_num[item['label']]
        formatted_dataset.append((
            (
                machine_input[0], 
                machine_input[1], 
                machine_input[2], 
                machine_input[3], 
                machine_input['dwt']
            ), 
            label
        ))
    return tf.data.Dataset.from_generator(
        lambda: iter(formatted_dataset), 
        output_signature=(
            (
                tf.TensorSpec(shape=(1, 1, 17), dtype=tf.float32),
                tf.TensorSpec(shape=(2, 2, 17), dtype=tf.float32),
                tf.TensorSpec(shape=(4, 4, 17), dtype=tf.float32),
                tf.TensorSpec(shape=(8, 8, 17), dtype=tf.float32),
                tf.TensorSpec(shape=(1, 1, 10), dtype=tf.float32)
            ),
            tf.TensorSpec(shape=(), dtype=tf.int32),
        )
    ).shuffle(buffer_size=len(dataset))


def process_json(path, test_part, parallel_jobs=4):
    """Process JSON data to extract dataset and features."""
    with open(path, 'r') as f:
        metadata = json.load(f)
    dataset = []

    t = time.time()
    n = len(metadata)

    with ThreadPoolExecutor(max_workers=parallel_jobs) as executor:
        futures = [executor.submit(process_entry, entry) for entry in metadata]
        for i, future in enumerate(as_completed(futures)):
            result = future.result()
            if result is not None:
                dataset.append(result)
            print_progress_bar('Processed entry', i+1, n, t)

    if isinstance(test_part, float):
        i = int(test_part * len(dataset))
    elif isinstance(test_part, str):
        i = int(test_part)
    else:
        raise ValueError("Incompatible format for 'test_part'.")

    test_set = dataset[-i:]
    dataset = dataset[:-i]

    num_classes = len(all_labels)
    dataset_length = len(dataset)

    return dataset, test_set, num_classes, dataset_length


In [7]:
# model evaluation functions
def evaluate_model(model, test_set):
    stats = {'test_samples': 0, 'right_predictions': 0}
    y_true = []
    y_pred = []
    y_prob = []
    
    for test in test_set:
        stats['test_samples'] += 1
        
        image = test['image']
        if device == "GPU":
            image = tf.convert_to_tensor(image)
        
        predicted_label, label_probs = model.predict(image)
        
        y_true.append(test["label"])
        y_pred.append(predicted_label)
        y_prob.append(label_probs[test["label"][0]])
        
        if predicted_label == test["label"]:
            stats['right_predictions'] += 1
            print(f'Predicted label: {predicted_label}.  Real label: {test["label"]}. Prediction correct!')
        else:
            print(f'Predicted label: {predicted_label}.  Real label: {test["label"]}. False prediction.')
    
    # Calculate additional metrics
    conf_matrix = confusion_matrix(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average='weighted')
    recall = recall_score(y_true, y_pred, average='weighted')
    logloss = log_loss(y_true, np.array(y_prob))
    
    stats['success_rate'] = 100 * stats['right_predictions'] / stats['test_samples']
    stats['confusion_matrix'] = conf_matrix
    stats['precision'] = precision
    stats['recall'] = recall
    stats['log_loss'] = logloss
    
    print(f"{stats['right_predictions']} samples out of {stats['test_samples']} were predicted correctly.\n"
          f"The model's success rate is: {stats['success_rate']}%")
    print(f"Confusion Matrix: \n{conf_matrix}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"Log Loss: {logloss}")


In [8]:
# load data
dataset, test_set, num_classes, dataset_length = process_json(path, test_part)
print('Dataset processed.')
print(f"Total number of entries in the dataset: {dataset_length}")
print(f"Total number of entries in the test set: {len(test_set)}")    
print(f"Number of classes: {num_classes}")
print(f"Dataset length: {dataset_length}") 

Processed entry: ██████████████████████████████████████████████████ | Completed: 256/256 100.0% | Time elapsed: 00:01/00:01 | Time left: ~00:00Dataset processed.
Total number of entries in the dataset: 244
Total number of entries in the test set: 12
Number of classes: 3
Dataset length: 244


In [9]:
# data validity check
# %whos
validate_dataset(dataset)
# print(type(dataset[0]))
# print(type(dataset[0]['image']))
for key in dataset[0]['image']:
    print(type(key))

KeyError: 'image'

In [12]:
# model creation
file_name = f"EntropyClassifier_e={epochs}_ds={dataset_length}.pth"
model = EntropyClassifier(all_labels)
print('Model created')

Model created


In [13]:
# train model
model.train_model(dataset, epochs=epochs, batch_size=batch_size, lr=learning_rate)
print('Model trained.')

Starting epoch 1/2


ValueError: Exception encountered when calling layer 'entropy_classifier_1' (type EntropyClassifier).

Input 0 of layer "conv2d_51" is incompatible with the layer: expected min_ndim=4, found ndim=3. Full shape received: (4, 4, 17)

Call arguments received by layer 'entropy_classifier_1' (type EntropyClassifier):
  • inputs=('tf.Tensor(shape=(64, 1, 1, 17), dtype=float32)', 'tf.Tensor(shape=(64, 2, 2, 17), dtype=float32)', 'tf.Tensor(shape=(64, 4, 4, 17), dtype=float32)', 'tf.Tensor(shape=(64, 8, 8, 17), dtype=float32)', 'tf.Tensor(shape=(64, 1, 1, 10), dtype=float32)')

In [None]:
#save model
torch.save(model.state_dict(), file_name)
print("Model saved")

In [None]:
# load model
model.load_state_dict(torch.load(file_name, map_location=device))
model.eval()
print("model loaded")

In [None]:
# model evaluation
evaluate_model(model, test_set)