In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from utils import *
from img_utils import *

# Loading Data 

In [None]:
path = "../nybolig-scrape/output"
houses_df = data_to_DF(path, max_houses=1000)
print("Number of datapoints of type 'Ejerlejlighed': ", len(houses_df))
display(houses_df)

# Setting up splits

In [None]:
from sklearn.model_selection import train_test_split

# Remove outliers
# houses_df = remove_outliers(houses_df, "price", z_score_threshold=2)

# Split the data into train, validation and test sets with a 60-20-20 ratio
train_df, test_df = train_test_split(houses_df, test_size=0.2, random_state=0)
train_df, valid_df = train_test_split(train_df, test_size=0.20, random_state=0)

In [None]:
target_width = 500
target_height = 500

resize: bool = True
gray_scale: bool = False
threshhold: bool = True

train_images_RGB = preprocess_images(train_df, "image_floorplan", target_width, target_height, resize = resize, gray_scale = gray_scale, threshhold = threshhold)
validation_images_RGB = preprocess_images(valid_df, "image_floorplan", target_width, target_height, resize = resize, gray_scale = gray_scale, threshhold = threshhold)
test_images_RGB = preprocess_images(test_df, "image_floorplan", target_width, target_height, resize = resize, gray_scale = gray_scale, threshhold = threshhold)

train_prices = train_df['price']
validation_prices = valid_df['price']
test_prices = test_df['price']

#Plot the first 5 images and their prices 
fig, ax = plt.subplots(1, 5, figsize=(20, 20))
for i in range(5):
    ax[i].imshow(train_images_RGB[i])
    ax[i].set_title(f"Price: {train_prices.iloc[i]}")
    ax[i].axis('off')
plt.show()


# Model Fitting

In [None]:
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
import tensorflow as tf
from multiprocessing import Process, Queue

def train_model(fine_tune_layers: int = 0):
    """
    Train a model with the VGG16 architecture and save the model to disk.
    
    Args:
        fine_tune_layers (int): Number of layers to fine-tune, counting from the top of the model.
    """
    set_gpu()
    
    # Load pre-trained VGG16 model (excluding top layers)
    input_shape = train_images_RGB[0].shape 
    base_model = VGG16(weights='imagenet', include_top=False, input_shape=input_shape)

    # Freeze the pre-trained layers
    for layer in base_model.layers[:-fine_tune_layers] if fine_tune_layers > 0 else base_model.layers:
        layer.trainable = False

    # Add new top layers for regression
    model = Sequential([
        base_model,
        Flatten(),
        Dense(1072, activation='relu'),
        Dropout(0.2),
        Dense(512, activation='relu'),
        Dropout(0.2),
        Dense(1, activation="linear")
    ])

    # Compile the model
    model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_absolute_error')
    model.summary()

    # Train the model with early stopping
    early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
    history = model.fit(train_images_RGB, train_df["price"], validation_data=(validation_images_RGB, valid_df["price"]), epochs=100, callbacks=[early_stopping])

    model.save("./VGG16_regression_model")

    # Plot the training history
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training History')
    plt.xlabel('Epoch')
    plt.ylabel('Mean Absolute Error')
    plt.legend()
    plt.show()

p = Process(target=train_model, args=(0,))
p.start()
p.join()

# Model Evaluation

In [None]:
def evaluate_model(queue: Queue):
    set_gpu()
    
    # Evaluate the model
    loaded_model = tf.keras.models.load_model("./VGG16_regression_model")
    loaded_model.evaluate(test_images_RGB, test_df["price"])
    predictions = loaded_model.predict(test_images_RGB)
    
    queue.put(predictions)

In [None]:
queue = Queue()
p = Process(target=evaluate_model, args=(queue,))
p.start()
p.join()

if queue.empty():
    raise ValueError("Nothing was returned from the process")
predictions = queue.get()
real_prices = test_df['price'].values
predicted_prices = predictions.flatten()

# Print the R2 score, MAE and MSE
print(f"R2 score: {r2_score(real_prices, predicted_prices):.2f}")
print(f"Mean Absolute Error: {mae(real_prices, predicted_prices):.2f}")
print(f"Mean Squared Error: {mse(real_prices, predicted_prices):.2f}")

# Plot the predictions
plt.figure(figsize=(10, 10))
for i, (image, label, prediction) in enumerate(zip(test_images_RGB[0:9], test_df["price"][0:9], predictions[0:9])):
    plt.subplot(3, 3, i + 1)
    plt.imshow(image)
    plt.title(f"Real: {label}\nPredicted: {prediction[0]:.0f}")
    plt.axis("off")
plt.show()

# Plot the predictions vs real prices
plot_regression_results('VGG16', real_prices, predicted_prices)

# Visualizing Predictions

## Saliency

In [None]:
import numpy as np

def get_saliency_maps(images: np.ndarray):
    set_gpu()
    model = tf.keras.models.load_model("./VGG16_regression_model")
    saliency_maps = []
    for image in images:
        with tf.GradientTape() as tape:
            image = tf.convert_to_tensor(image, dtype=tf.float32)
            image = tf.expand_dims(image, axis=0)
            tape.watch(image)
            predictions = model(image)

        # Compute gradients of the output with respect to the input image
        gradient = tape.gradient(predictions, image)
        
        # Take absolute value of gradients to get saliency map
        saliency_map = tf.abs(gradient)
        
        # Reshape saliency map
        saliency_map = tf.reshape(saliency_map, image.shape[1:])  # Remove batch dimension
        
        # Normalize between 0 and 1
        saliency_map /= tf.reduce_max(saliency_map)

        # Set color channels to 0
        saliency_map = saliency_map[:, :, 0]

        saliency_maps.append(saliency_map)

    return saliency_maps

def plot_saliency_maps(images):
    fig, axes = plt.subplots(n_images, 2, figsize=(10, 5 * n_images))
    saliency_maps = get_saliency_maps(images)

    if len(images) > 1:
        for i, image in enumerate(images):
            # Plot the original image
            axes[i, 0].imshow(image)
            axes[i, 0].set_title("Original Image")
            axes[i, 0].axis("off")
            
            # Plot the saliency map
            axes[i, 1].imshow(saliency_maps[i], cmap="plasma")
            axes[i, 1].set_title("Saliency Map")
            axes[i, 1].axis("off")
    else:
        # Plot the original image
        axes[0].imshow(images[0])
        axes[0].set_title("Original Image")
        axes[0].axis("off")
        
        # Plot the saliency map
        axes[1].imshow(saliency_maps[0], cmap="plasma")
        axes[1].set_title("Saliency Map")
        axes[1].axis("off")

    plt.show()

n_images = 4

p = Process(target=plot_saliency_maps, args=(test_images_RGB[:n_images],))
p.start()
p.join()

## Class Activation Maps (doesn't work, WIP)

In [None]:
# from tensorflow.keras import backend as K

# def get_class_activation_maps(images: np.ndarray):
#     set_gpu()
#     model = tf.keras.models.load_model("./VGG16_regression_model")
#     classifier_layer = model.get_layer("dense_2")
#     base_model = model.get_layer("vgg16")
#     last_conv_layer = base_model.get_layer("block5_conv3")

#     class_activation_maps = []
#     for image in images:
#         # Convert the image to a tensor of type float32
#         image = tf.convert_to_tensor(image, dtype=tf.float32)
#         image = tf.expand_dims(image, axis=0)

#         # Get the model's prediction
#         predictions = model(image)

#         # Get the class with the highest probability
#         predicted_class = tf.argmax(predictions[0]).numpy()

#         # Get the output of the classifier layer for the predicted class
#         class_out = model.output[:, predicted_class]

#         # Get the output of the last convolutional layer
#         last_conv_output = last_conv_layer.output

#         with tf.GradientTape() as tape:
#             # Compute the gradient of the class output value with respect to the feature map
#             grads = tape.gradient(class_out, last_conv_output)

#         # Vector of shape (512,), where each entry is the mean intensity of the gradient over a specific feature map channel
#         pooled_grads = K.mean(grads, axis=(0, 1, 2))

#         # Multiply each channel in the feature map array by "how important this channel is" with regard to the class
#         last_conv_output_value = last_conv_output[0].numpy()
#         pooled_grads_value = pooled_grads.numpy()
#         for i in range(512):
#             last_conv_output_value[:, :, i] *= pooled_grads_value[i]

#         # The channel-wise mean of the resulting feature map is our class activation map
#         class_activation_map = np.mean(last_conv_output_value, axis=-1)

#         # Normalize between 0 and 1
#         class_activation_map -= np.min(class_activation_map)
#         class_activation_map /= np.max(class_activation_map)

#         class_activation_maps.append(class_activation_map)

#     return class_activation_maps

# def plot_class_activation_maps(images):
#     fig, axes = plt.subplots(n_images, 2, figsize=(10, 5 * n_images))
#     class_activation_maps = get_class_activation_maps(images)

#     if len(images) > 1:
#         for i, image in enumerate(images):
#             # Plot the original image
#             axes[i, 0].imshow(image)
#             axes[i, 0].set_title("Original Image")
#             axes[i, 0].axis("off")
            
#             # Plot the class activation map
#             axes[i, 1].imshow(class_activation_maps[i], cmap="plasma")
#             axes[i, 1].set_title("Class Activation Map")
#             axes[i, 1].axis("off")
#     else:
#         # Plot the original image
#         axes[0].imshow(images[0])
#         axes[0].set_title("Original Image")
#         axes[0].axis("off")
        
#         # Plot the class activation map
#         axes[1].imshow(class_activation_maps[0], cmap="plasma")
#         axes[1].set_title("Class Activation Map")
#         axes[1].axis("off")

#     plt.show()

# p = Process(target=plot_class_activation_maps, args=(test_images_RGB[:n_images],))
# p.start()
# p.join()