In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [25]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import ModelCheckpoint
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model

In [19]:
train_dir = "/workspace/cnn/nn_datasets/train/"
val_dir = "/workspace/cnn/nn_datasets/val/"
test_dir = "/workspace/cnn/nn_datasets/test/"

image_size = (720, 960)
batch_size = 2

datagen = ImageDataGenerator(
    rescale=1.0/255
)

train_generator = datagen.flow_from_directory(
    train_dir,
    target_size = image_size,
    batch_size = batch_size,
    class_mode = 'input',
    color_mode = 'rgb',
    shuffle = True
)

validation_generator = datagen.flow_from_directory(
    val_dir,
    target_size = image_size,
    batch_size = batch_size,
    class_mode = 'input',
    color_mode = 'rgb',
    shuffle = True
)

test_generator = datagen.flow_from_directory(
    test_dir,
    target_size = image_size,
    batch_size = batch_size,
    class_mode = 'input',
    color_mode = 'rgb',
    shuffle = False
)

Found 480 images belonging to 1 classes.
Found 59 images belonging to 1 classes.
Found 67 images belonging to 1 classes.


In [4]:
class PredictionCallback(tf.keras.callbacks.Callback):
    def __init__(self, validation_generator, num_examples=2):
        super().__init__()
        self.validation_generator = validation_generator
        self.num_examples = num_examples

    def on_epoch_end(self, epoch, logs=None):
        x_val, _ = next(self.validation_generator)

        predictions = self.model.predict(x_val, verbose=0)

        for i in range(self.num_examples):
            plt.figure(figsize=(12, 4))

            plt.subplot(1, 3, 1)
            plt.imshow(x_val[i])
            plt.title("Input Image")
            plt.axis('off')

            plt.subplot(1, 3, 2)
            plt.imshow(x_val[i])
            plt.title("Ground Truth")
            plt.axis('off')

            plt.subplot(1, 3, 3)
            plt.imshow(predictions[i])
            plt.title("Reconstructed Image")
            plt.axis('off')

            plt.show()

In [5]:
class PeriodicCheckpoint(tf.keras.callbacks.Callback):
    def __init__(self, filepath, save_freq):
        super().__init__()
        self.filepath = filepath
        self.save_freq = save_freq

    def on_epoch_end(self, epoch, logs=None):
        if (epoch + 1) % self.save_freq == 0:
            self.model.save(self.filepath.format(epoch=epoch + 1))
            print(f"Checkpoint saved at epoch {epoch + 1}")

In [None]:
input_shape = (image_size[0], image_size[1], 3)
input_img = layers.Input(shape=input_shape)

x = layers.Conv2D(512, (3, 3), activation = 'relu', padding = 'same')(input_img)
x = layers.MaxPooling2D((2, 2), padding = 'same')(x)
x = layers.Conv2D(256, (3, 3), activation = 'relu', padding = 'same')(x)
x = layers.MaxPooling2D((2, 2), padding = 'same')(x)
x = layers.Conv2D(128, (3, 3), activation = 'relu', padding = 'same')(x)
x = layers.MaxPooling2D((2, 2), padding = 'same')(x)
x = layers.Conv2D(64, (3, 3), activation = 'relu', padding = 'same')(x)
encoded = layers.Conv2D(32, (3, 3), activation = 'relu', padding = 'same')(x)

x = layers.UpSampling2D((2, 2))(encoded)
x = layers.Conv2D(64, (3, 3), activation = 'relu', padding = 'same')(x)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(128, (3, 3), activation = 'relu', padding = 'same')(x)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(256, (3, 3), activation = 'relu', padding = 'same')(x)
decoded = layers.Conv2D(3, (3, 3), activation = 'sigmoid', padding = 'same')(x)

# autoencoder = models.Model(input_img, decoded)
autoencoder = load_model('drive/MyDrive/Colab Notebooks/Colab Notebooks/checkpoints2/autoencoder_epoch_09.h5')
autoencoder.compile(optimizer = 'AdamW', loss = 'mean_squared_error')
checkpoint_callback = PeriodicCheckpoint(
    filepath='drive/MyDrive/Colab Notebooks/Colab Notebooks/checkpoints2/autoencoder_epoch_{epoch:02d}.h5',
    save_freq=1
)
autoencoder.summary()

In [None]:
prediction_callback = PredictionCallback(test_generator, num_examples=2)


In [None]:
history = autoencoder.fit(
    train_generator,
    epochs = 15,
    callbacks = [checkpoint_callback, prediction_callback],
    validation_data = validation_generator,
    verbose = 1,
)

plt.plot(history.history['loss'], label = 'train')
plt.plot(history.history['val_loss'], label = 'val')
plt.title('Loss durante l\'addestramento')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Dati estratti dal file
epochs = list(range(1, 31))  # Epoch da 1 a 30
loss = [
    0.0449, 0.0074, 0.0046, 0.0036, 0.0032, 0.0049, 0.0228, 0.0072, 0.0112, 0.0083, 
    0.0096, 0.0057, 0.0074, 0.0038, 0.0031, 0.0030, 0.0025, 0.0024, 0.0022, 0.0025,
    0.0022, 0.0019, 0.0020, 0.0018, 0.0018, 0.0053, 0.0063, 0.0081, 0.0039, 0.0031
]
val_loss = [
    0.0075, 0.0056, 0.0037, 0.0032, 0.0027, 0.0366, 0.0069, 0.0088, 0.0047, 0.0054,
    0.0061, 0.0054, 0.0037, 0.0030, 0.0034, 0.0026, 0.0022, 0.0021, 0.0021, 0.0020,
    0.0020, 0.0017, 0.0018, 0.0019, 0.0074, 0.0067, 0.0115, 0.0041, 0.0031, 0.0028
]

# Creazione del plot
plt.figure(figsize=(10, 6))
plt.plot(epochs, loss, label='Training Loss', marker='o', color='blue')
plt.plot(epochs, val_loss, label='Validation Loss', marker='s', color='orange')

# Configurazioni del grafico
plt.title('Andamento della Loss durante l\'Addestramento', fontsize=14)
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Loss', fontsize=12)
plt.xticks(epochs)
plt.legend(fontsize=12)

# Visualizzazione
plt.tight_layout()
plt.show()

In [None]:
# Compressione immagini di test per ogni epoch

models_dir = '/workspace/cnn/checkpoints'
output_dir = '/workspace/results/cnn/'
os.makedirs(output_dir, exist_ok=True)

def process_images(model, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    
    c = 0
    while c < 67:
        batch = next(test_generator)
        decoded_images = model.predict(batch[0])
        n = len(decoded_images)
        
        for i in range(n):
            plt.imshow(decoded_images[i])
            plt.axis("off")
            img_name = f'decoded_{c}.png'
            output_path = os.path.join(output_dir, img_name)
            plt.savefig(output_path, transparent=True, bbox_inches='tight', pad_inches=0)
            plt.close()
            
            decoded_image = Image.open(output_path)
            if decoded_image.size != (960, 720):
                print("Resizing image...")
                decoded_image = decoded_image.resize((960, 720))
                decoded_image.save(output_path)
            
            c += 1

    
for epoch in range(1, 31):
    model_path = os.path.join(models_dir, f'autoencoder_epoch_{epoch:02d}.h5')
    model_output_dir = os.path.join(output_dir, f'epoch_{epoch:02d}')
    if os.path.exists(model_path):
        model = load_model(model_path)
        print(f"Model for epoch {epoch} loaded.")
        
        process_images(model, model_output_dir)
    else:
        print(f'Model for epoch {epoch} not found.')

In [23]:
import subprocess, re

def calculate_metrics(decompressed_path, original_path):  
    # Esegui il comando per calcolare SSIM usando ffmpeg
    ssim_result = subprocess.run(
        ["../../bin/ffmpeg", "-i", decompressed_path, "-i", original_path, "-lavfi", "ssim=stats_file=ssim_logfile.txt", "-f", "null", "-"],
        capture_output = True, text = True
    )
    
    # Esegui il comando per calcolare PSNR usando ffmpeg
    psnr_result = subprocess.run(
        ["../../bin/ffmpeg", "-i", decompressed_path, "-i", original_path, "-lavfi", "psnr=stats_file=psnr_logfile.txt", "-f", "null", "-"],
        capture_output = True, text = True
    )
    
    # Estrai il valore SSIM dal log
    ssim_match = re.search(r"All:([\d.]+)", ssim_result.stderr)
    ssim = float(ssim_match.group(1)) if ssim_match else None

    # Estrai il valore PSNR medio dal log
    psnr_match = re.search(r"average:([\d.]+)", psnr_result.stderr)
    #print(psnr_result.stderr)
    psnr = float(psnr_match.group(1)) if psnr_match else None
    
    return ssim, psnr

In [None]:
# Testing e calcolo metriche PSNR e SSIM

test_images_dir = "/workspace/cnn/nn_datasets/test/images"
results_dir = "/workspace/results/cnn"

ssim_scores, psnr_scores = [], []
for epoch in range(1, 31):
    epoch_dir = os.path.join(results_dir, f'epoch_{epoch:02d}')
    if os.path.exists(epoch_dir):
        ssim, psnr = calculate_metrics(os.path.join(epoch_dir, 'decoded_0.png'), os.path.join(test_images_dir, 'original_0.png'))
    
        ssim_scores.append(ssim)
        psnr_scores.append(psnr)
    else:
        print(f"Results for epoch {epoch} not found.")
        
print("SSIM scores:", ssim_scores)
print("PSNR scores:", psnr_scores)

In [None]:
import matplotlib.pyplot as plt

# Data
epochs = list(range(1, 31))
ssim_scores = [0.807223, 0.808263, 0.837779, 0.846401, 0.857456, 0.768931, 0.808735, 0.814435, 0.825814, 0.820059, 
               0.820981, 0.819906, 0.834162, 0.837852, 0.835027, 0.842921, 0.84883, 0.853307, 0.855639, 0.857455, 
               0.860958, 0.865396, 0.864806, 0.867825, 0.834639, 0.811821, 0.831888, 0.816515, 0.838149, 0.842964]
psnr_scores = [23.080706, 23.45738, 25.606309, 25.76544, 26.653386, 18.444387, 23.632753, 23.343533, 25.344902, 
               25.04543, 25.153869, 25.272871, 25.865402, 26.219113, 25.773821, 26.357748, 26.629119, 26.837197, 
               26.887446, 26.915344, 26.942715, 27.363051, 27.193375, 26.941089, 23.492196, 24.268306, 25.700318, 
               25.134393, 26.099779, 26.237189]

# Plot
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))

# SSIM Plot
ax1.plot(epochs, ssim_scores, marker='o', color='tab:blue')
ax1.set_title('SSIM Scores over Epochs')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('SSIM Score')
ax1.grid(True)

# PSNR Plot
ax2.plot(epochs, psnr_scores, marker='o', color='tab:orange')
ax2.set_title('PSNR Scores over Epochs')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('PSNR Score')
ax2.grid(True)

# Show the plots
plt.tight_layout()
plt.show()

In [None]:
import os
import shutil

def get_folder_size(folder):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(folder):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            total_size += os.path.getsize(fp)
    return total_size

results_dir = "/workspace/results/cnn"
folders = [f for f in os.listdir(results_dir) if os.path.isdir(os.path.join(results_dir, f))]

# Ordina le cartelle in ordine alfabetico
folders.sort()

folder_sizes = {}
for folder in folders:
    folder_path = os.path.join(results_dir, folder)
    size_in_mb = get_folder_size(folder_path) / (1024 * 1024)
    folder_sizes[folder] = size_in_mb

for folder, size in folder_sizes.items():
    print(f"{size:.2f} MB")

In [None]:
test_images_dir = "/workspace/cnn/nn_datasets/test/images"

def get_folder_size(folder):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(folder):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            total_size += os.path.getsize(fp)
    return total_size

test_images_size = get_folder_size(test_images_dir) / (1024 * 1024)  # Convert to MB
print(f"Size of test images folder: {test_images_size:.2f} MB")

# Compressione con codifica entropica

In [None]:
import time
import zstandard as zstd
import pickle
from tensorflow.keras.models import Model, load_model
import os

import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="tensorflow")


models_dir = 'drive/MyDrive/Colab Notebooks/Colab Notebooks/nn_datasets/checkpoints2'
output_dir = 'drive/MyDrive/Colab Notebooks/Colab Notebooks/nn_datasets/results'
os.makedirs(output_dir, exist_ok=True)

def extract_encoder(model):
    encoder_layers = ['input_layer', 'conv2d', 'max_pooling2d', 'conv2d_1', 'max_pooling2d_1', 'conv2d_2', 'max_pooling2d_2', 'conv2d_3', 'conv2d_4']
    encoder = Model(inputs=model.input, outputs=model.get_layer(encoder_layers[-1]).output)
    return encoder

def process_images(encoder, output_dir, epoch):
    os.makedirs(output_dir, exist_ok=True)

    c = 0
    total_encoded = []

    start_save_time = time.time()
    while c < 67:
        batch = next(test_generator)
        latent_representations = encoder.predict(batch[0],  verbose=0)
        total_encoded.append(latent_representations)
        c += 1

    with open(os.path.join(output_dir, f'total_encoded_{epoch}.pkl'), 'wb') as f:
        pickle.dump(total_encoded, f)

    end_save_time = time.time()
    save_time = end_save_time - start_save_time
    uncompressed_size = os.path.getsize(os.path.join(output_dir, f'total_encoded_{epoch}.pkl'))
    print(f"Uncompressed size for epoch {epoch}: {round(uncompressed_size / 1024, 2)} KB")
    print(f"Time taken to save the uncompressed data: {save_time:.2f} seconds")

    start_compress_time = time.time()
    with open(os.path.join(output_dir, f"total_encoded_{epoch}.pkl"), "rb") as f_in, \
         open(os.path.join(output_dir, f"total_encoded_{epoch}.pkl.zst"), "wb") as f_out:
        cctx = zstd.ZstdCompressor(level = 22)
        f_out.write(cctx.compress(f_in.read()))

    end_compress_time = time.time()
    compress_time = end_compress_time - start_compress_time
    compressed_size = os.path.getsize(os.path.join(output_dir, f"total_encoded_{epoch}.pkl.zst"))
    print(f"Compressed size for epoch {epoch}: {round(compressed_size / 1024, 2)} KB")
    print(f"Time taken to compress the data with Zstandard: {compress_time:.2f} seconds")

    total_time = save_time + compress_time
    print(f"Total time for epoch {epoch} (save + compression): {total_time:.2f} seconds")

for epoch in range(1, 31):
    model_path = os.path.join(models_dir, f'autoencoder_epoch_{epoch:02d}.h5')
    encoder_path = os.path.join(models_dir, f'encoder_epoch_{epoch:02d}.h5')
    model_output_dir = os.path.join(output_dir, f'epoch_{epoch:02d}')

    if os.path.exists(model_path):
        model = load_model(model_path)
        encoder = extract_encoder(model)
        encoder.save(encoder_path)
        print(f"Encoder for epoch {epoch} saved.")
        process_images(encoder, model_output_dir, epoch)
    else:
        print(f'Model for epoch {epoch} not found.')
