#### Laod Packages

In [1]:
import sys
import os
import importlib
import matplotlib
# Get the absolute path of the parent directory
project_path = os.path.abspath("..")  
sys.path.append(project_path)
matplotlib.rcParams.update(matplotlib.rcParamsDefault)

import MEAL
import MEAL.builder_block as BD
import os
import tensorflow as tf
from tensorflow import keras
import MEAL.builder_block as BD
import MEAL.without_augmentation as NA
import MEAL.traditional_augmentation as TA
import MEAL.fusion_layer as FL
import MEAL.encoder_concatenation as CC
import MEAL.processing as pr
from MEAL.builder_block import (
    FlipAugmentation, RotateAugmentation, CropAugmentation, IntensityAugmentation
)
import MEAL.processing as pr

import unittest
import numpy as np
import tensorflow as tf

#### Without Augmentation

In [2]:
class TestComponents(unittest.TestCase):

    def setUp(self):
        self.input_shape = (128, 128, 64, 1)
        self.batch_size = 1

    def test_model_building(self):
        model = NA.refined_diffusion_model(self.input_shape)
        self.assertIsInstance(model, tf.keras.Model)
        self.assertEqual(model.output_shape, (None, 128, 128, 64, 1))

    def test_residual_block_output_shape(self):
        x = tf.random.normal((1, 64, 64, 32, 64))
        out = pr.refined_residual_block(x, 64)
        self.assertEqual(out.shape, x.shape)

    def test_dataset_loading(self):
        dummy_path = "tests/dummy_data"
        os.makedirs(os.path.join(dummy_path, "input"), exist_ok=True)
        os.makedirs(os.path.join(dummy_path, "target"), exist_ok=True)
        
        # Create dummy .npy files
        for i in range(2):
            np.save(os.path.join(dummy_path, "input", f"{i}.npy"), np.random.rand(*self.input_shape))
            np.save(os.path.join(dummy_path, "target", f"{i}.npy"), np.random.rand(*self.input_shape[:3]))

        dataset = pr.create_dataset(
            os.path.join(dummy_path, "input"),
            os.path.join(dummy_path, "target"),
            self.input_shape,
            self.input_shape[:3],
            self.batch_size,
            shuffle=False
        )
        self.assertIsInstance(dataset, tf.data.Dataset)
        for x, y in dataset.take(1):
            self.assertEqual(x.shape, (1, 128, 128, 64, 1))
            self.assertEqual(y.shape, (1, 128, 128, 64))

    def tearDown(self):
        import shutil
        shutil.rmtree("tests/dummy_data", ignore_errors=True)

if __name__ == "__main__":
    
    unittest.TextTestRunner(verbosity=2).run(unittest.TestLoader().loadTestsFromTestCase(TestComponents))

    # unittest.main()


test_dataset_loading (__main__.TestComponents.test_dataset_loading) ... ok
test_model_building (__main__.TestComponents.test_model_building) ... ok
test_residual_block_output_shape (__main__.TestComponents.test_residual_block_output_shape) ... ok

----------------------------------------------------------------------
Ran 3 tests in 0.847s

OK


In [3]:
## integration test

In [4]:
class TestPipelineIntegration(unittest.TestCase):

    def setUp(self):
        self.input_shape = (128, 128, 64, 1)
        self.target_shape = self.input_shape[:3]
        self.batch_size = 1

        # Generate dummy training data (1 sample only)
        x = np.random.rand(*self.input_shape).astype(np.float32)
        y = np.random.rand(*self.target_shape).astype(np.float32)

        self.train_ds = tf.data.Dataset.from_tensor_slices((x[np.newaxis, ...], y[np.newaxis, ...]))
        self.train_ds = self.train_ds.batch(self.batch_size)

    def test_model_training_pipeline(self):
        model = NA.refined_diffusion_model(self.input_shape)
        model.compile(
            optimizer=tf.keras.optimizers.Adam(1e-4),
            loss=pr.combined_loss,
            metrics=['mae']
        )

        # Only run for 1 epoch on dummy data
        history = model.fit(
            self.train_ds,
            epochs=2,
            verbose=0
        )

        # Check training history exists
        self.assertTrue("loss" in history.history)
        self.assertGreater(len(history.history['loss']), 0)

if __name__ == "__main__":
    unittest.TextTestRunner().run(unittest.TestLoader().loadTestsFromTestCase(TestPipelineIntegration))


.
----------------------------------------------------------------------
Ran 1 test in 99.701s

OK


#### Traditional Augmentation (TA)

In [8]:
class TestTraditionalAugmentation(unittest.TestCase):
    
    def setUp(self):
        self.input_shape = (128, 128, 64, 1)
        self.batch_size = 1
        self.sample_input = tf.random.normal((1, *self.input_shape))

    def test_flip_layer(self):
        layer = TA.FlipAugmentation()
        output = layer(self.sample_input)
        self.assertEqual(output.shape, self.sample_input.shape)

    def test_rotate_layer(self):
        layer = TA.RotateAugmentation(axis=(1, 2))
        output = layer(self.sample_input)
        self.assertEqual(output.shape, self.sample_input.shape)

    def test_crop_layer(self):
        layer = TA.CropAugmentation()
        output = layer(self.sample_input)
    
        # Extract shape
        b, h, w, d, c = output.shape
        self.assertEqual(b, 1)
        self.assertEqual(h, 128)
        self.assertEqual(w, 128)
        self.assertTrue(d <= 64)  # depth can be cropped
        self.assertEqual(c, 1)

    def test_intensity_layer(self):
        layer = TA.IntensityAugmentation()
        output = layer(self.sample_input)
        self.assertEqual(output.shape, self.sample_input.shape)

    def test_model_creation(self):
        model = TA.refined_diffusion_model(self.input_shape)
        self.assertIsInstance(model, tf.keras.Model)
        self.assertEqual(model.output_shape, (None, 128, 128, 64, 1))
        
if __name__ == "__main__":
    unittest.TextTestRunner(verbosity=2).run(unittest.TestLoader().loadTestsFromTestCase(TestTraditionalAugmentation))


test_crop_layer (__main__.TestTraditionalAugmentation.test_crop_layer) ... ok
test_flip_layer (__main__.TestTraditionalAugmentation.test_flip_layer) ... ok
test_intensity_layer (__main__.TestTraditionalAugmentation.test_intensity_layer) ... ok
test_model_creation (__main__.TestTraditionalAugmentation.test_model_creation) ... ok
test_rotate_layer (__main__.TestTraditionalAugmentation.test_rotate_layer) ... ok

----------------------------------------------------------------------
Ran 5 tests in 0.265s

OK


In [9]:
#### integration test for TA

In [10]:
class TestTAPipelineIntegration(unittest.TestCase):

    def setUp(self):
        self.input_shape = (128, 128, 64, 1)
        self.target_shape = self.input_shape[:3]
        self.batch_size = 1
        
        # Simulated data
        x = np.random.rand(*self.input_shape).astype(np.float32)
        y = np.random.rand(*self.target_shape).astype(np.float32)

        # Expand to batch and wrap as tf.data.Dataset
        self.train_ds = tf.data.Dataset.from_tensor_slices((x[np.newaxis, ...], y[np.newaxis, ...]))
        self.train_ds = self.train_ds.batch(self.batch_size)
        self.train_ds = self.train_ds.map(lambda x, y: (TA.IntensityAugmentation()(x), y))

    def test_model_training(self):
        model = TA.refined_diffusion_model(self.input_shape)
        model.compile(
            optimizer=tf.keras.optimizers.Adam(1e-4),
            loss=pr.combined_loss,
            metrics=['mae']
        )

        history = model.fit(self.train_ds, epochs=1, verbose=0)
        self.assertTrue("loss" in history.history)
        self.assertGreater(len(history.history["loss"]), 0)
        
if __name__ == "__main__":
    unittest.TextTestRunner(verbosity=2).run(unittest.TestLoader().loadTestsFromTestCase(TestTAPipelineIntegration))


test_model_training (__main__.TestTAPipelineIntegration.test_model_training) ... ok

----------------------------------------------------------------------
Ran 1 test in 43.942s

OK


#### Encoder Concatenation (CC)

In [13]:
class TestEncoderConcatenation(unittest.TestCase):

    def setUp(self):
        self.input_shape = (128, 128, 64, 1)
        self.sample = tf.random.normal((1, *self.input_shape))

    def test_flip_aug(self):
        layer = CC.FlipAugmentation()
        out = layer(self.sample)
        self.assertEqual(out.shape, self.sample.shape)

    def test_rotate_aug(self):
        layer = CC.RotateAugmentation()
        out = layer(self.sample)
        self.assertEqual(out.shape, self.sample.shape)

    def test_crop_aug(self):
        crop_size = (96, 96, 48, 1)  # reduced depth on purpose
        layer = CC.CropAugmentation(
            crop_size=crop_size,
            original_size=self.input_shape[:3] + (1,)
        )
        out = layer(self.sample)

        # Check that height and width are restored, and depth is <= original
        self.assertEqual(out.shape[1], 128)  # height
        self.assertEqual(out.shape[2], 128)  # width
        self.assertLessEqual(out.shape[3], 64)  # depth should be ≤ original
        self.assertEqual(out.shape[4], 1)  # channel


    def test_intensity_aug(self):
        layer = CC.IntensityAugmentation()
        out = layer(self.sample)
        self.assertEqual(out.shape, self.sample.shape)

    def test_model_build(self):
        model = CC.build_multistream_model(self.input_shape)
        self.assertIsInstance(model, tf.keras.Model)
        self.assertEqual(len(model.inputs), 4)
        self.assertEqual(model.output_shape, (None, 128, 128, 64, 1))
        
if __name__ == "__main__":
    unittest.TextTestRunner(verbosity=2).run(unittest.TestLoader().loadTestsFromTestCase(TestEncoderConcatenation))


test_crop_aug (__main__.TestEncoderConcatenation.test_crop_aug) ... ok
test_flip_aug (__main__.TestEncoderConcatenation.test_flip_aug) ... ok
test_intensity_aug (__main__.TestEncoderConcatenation.test_intensity_aug) ... ok
test_model_build (__main__.TestEncoderConcatenation.test_model_build) ... ok
test_rotate_aug (__main__.TestEncoderConcatenation.test_rotate_aug) ... ok

----------------------------------------------------------------------
Ran 5 tests in 0.371s

OK


In [12]:
#### CC's integration test

In [14]:
class TestCCPipelineIntegration(unittest.TestCase):

    def setUp(self):
        self.input_shape = (128, 128, 64, 1)
        self.target_shape = self.input_shape[:3]

        # Simulate input streams
        x = tf.random.normal((1, *self.input_shape))
        y = tf.random.normal((1, *self.target_shape))

        self.train_ds = tf.data.Dataset.from_tensor_slices(
            ((x, x, x, x), y)
        ).batch(1)

    def test_model_training_step(self):
        model = CC.build_multistream_model(self.input_shape)
        model.compile(
            optimizer=tf.keras.optimizers.Adam(1e-4),
            loss=pr.combined_loss,
            metrics=['mae']
        )

        history = model.fit(self.train_ds, epochs=1, verbose=0)
        self.assertIn("loss", history.history)
        self.assertGreater(len(history.history["loss"]), 0)
if __name__ == "__main__":
    unittest.TextTestRunner(verbosity=2).run(unittest.TestLoader().loadTestsFromTestCase(TestCCPipelineIntegration))


ok

----------------------------------------------------------------------
Ran 1 test in 163.190s

OK


#### Builder Block (BD)

In [None]:
#### unittest

In [16]:
class TestBuilderBlockComponents(unittest.TestCase):

    def setUp(self):
        self.input_shape = (128, 128, 64, 1)
        self.sample_input = tf.random.normal((1, *self.input_shape))

    def test_flip_augmentation(self):
        layer = BD.FlipAugmentation()
        output = layer(self.sample_input)
        self.assertEqual(output.shape, self.sample_input.shape)

    def test_rotate_augmentation(self):
        layer = BD.RotateAugmentation()
        output = layer(self.sample_input)
        self.assertEqual(output.shape, self.sample_input.shape)

    def test_crop_augmentation(self):
        crop_size = (64, 64, 64, 1)
        original_size = self.input_shape
        layer = BD.CropAugmentation(crop_size, original_size)
        output = layer(self.sample_input)
        self.assertEqual(output.shape[1:], (128, 128, 64, 1))  # h, w are resized

    def test_intensity_augmentation(self):
        layer = BD.IntensityAugmentation()
        output = layer(self.sample_input)
        self.assertEqual(output.shape, self.sample_input.shape)

    def test_model_building(self):
        model = BD.build_multistream_model(self.input_shape)
        self.assertIsInstance(model, tf.keras.Model)
        self.assertEqual(model.output_shape, (None, 128, 128, 64, 1))
        
if __name__ == "__main__":
    unittest.TextTestRunner(verbosity=2).run(unittest.TestLoader().loadTestsFromTestCase(TestBuilderBlockComponents))


test_crop_augmentation (__main__.TestBuilderBlockComponents.test_crop_augmentation) ... ok
test_flip_augmentation (__main__.TestBuilderBlockComponents.test_flip_augmentation) ... ok
test_intensity_augmentation (__main__.TestBuilderBlockComponents.test_intensity_augmentation) ... ok
test_model_building (__main__.TestBuilderBlockComponents.test_model_building) ... 




ok
test_rotate_augmentation (__main__.TestBuilderBlockComponents.test_rotate_augmentation) ... ok

----------------------------------------------------------------------
Ran 5 tests in 9.400s

OK


In [17]:
#### integration test

In [18]:
class TestBuilderBlockIntegration(unittest.TestCase):

    def setUp(self):
        self.input_shape = (128, 128, 64, 1)
        self.target_shape = self.input_shape[:3]
        self.sample_input = tf.random.normal((1, *self.input_shape))
        self.sample_target = tf.random.normal((1, *self.target_shape))

        self.train_ds = tf.data.Dataset.from_tensor_slices((self.sample_input, self.sample_target))
        self.train_ds = self.train_ds.batch(1)

    def test_training_step(self):
        model = BD.build_multistream_model(self.input_shape)
        model.compile(
            optimizer=tf.keras.optimizers.Adam(1e-4),
            loss=BD.pr.combined_loss,
            metrics=['mae']
        )

        history = model.fit(self.train_ds, epochs=1, verbose=0)
        self.assertIn("loss", history.history)
        self.assertGreater(len(history.history["loss"]), 0)
if __name__ == "__main__":
    unittest.TextTestRunner(verbosity=2).run(unittest.TestLoader().loadTestsFromTestCase(TestBuilderBlockIntegration))


ok

----------------------------------------------------------------------
Ran 1 test in 288.078s

OK
