In [151]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [152]:
%cd gdrive

[Errno 2] No such file or directory: 'gdrive'
/content/gdrive


In [153]:
import sys
colab_file_dir= "MyDrive/Colab Notebooks/ML_FDS/" # where clip_helpers.py is for me
sys.path.append(colab_file_dir)

In [154]:
!jupyter nbconvert --to python "/content/gdrive/My Drive/Colab Notebooks/ML_FDS/CLIP_training_v2.ipynb"

[NbConvertApp] Converting notebook /content/gdrive/My Drive/Colab Notebooks/ML_FDS/CLIP_training_v2.ipynb to python
[NbConvertApp] Writing 24168 bytes to /content/gdrive/My Drive/Colab Notebooks/ML_FDS/CLIP_training_v2.py


In [155]:
import sys
sys.path.append("/content/gdrive/My Drive/Colab Notebooks/ML_FDS/")

from CLIP_training_v2 import MaskedMean, ClipLossLayer, L2Normalize, SmallBERT

In [156]:
from sklearn.decomposition import PCA


In [157]:
import os
import pandas as pd
import re
import numpy as np
import random
import zipfile
import requests
import io
import math
from pathlib import Path
import datetime
import csv

from dataclasses import dataclass

import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from PIL import Image

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import TextVectorization
from tensorflow.keras.utils import register_keras_serializable
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.metrics import Mean
from tensorflow.keras.layers import Dropout, Rescaling
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import load_model
from tensorflow.keras import regularizers

from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import LabelEncoder

# On n'utilise pas le GPU ici
#AUTOTUNE = tf.data.AUTOTUNE


In [191]:
def get_default_params(
        dataset_dir="./flickr_long_subset",
        model_dir="./models_forclip",
        figures_dir="figures") -> dict:

    # returns a dictionary with all the parameters
    # parameters can be modified
    train_dir = os.path.join(dataset_dir, "train_data")
    val_dir = os.path.join(dataset_dir, "val_data")
    test_dir = os.path.join(dataset_dir, "test_data")

    train_image_dir = os.path.join(train_dir, "images")
    train_captions_dir = os.path.join(train_dir, "captions")
    train_captions_csv_path = os.path.join(train_dir, "captions.csv")

    val_image_dir = os.path.join(val_dir, "images")
    val_captions_dir = os.path.join(val_dir, "captions")
    val_captions_csv_path = os.path.join(val_dir, "captions.csv")

    test_image_dir = os.path.join(test_dir, "images")
    test_captions_dir = os.path.join(test_dir, "captions")
    test_captions_csv_path = os.path.join(test_dir, "captions.csv")

    # token form train dataset
    vocab_path = os.path.join(dataset_dir, "vocab.txt")

    image_size = (224, 224)

    param_dict = {
        # directories for data and model weights


        "model_dir": model_dir,
        "dataset_dir": dataset_dir,
        "figures_dir": figures_dir,
        "train_dir": train_dir,
        "train_image_dir": train_image_dir,
        "train_captions_dir": train_captions_dir,
        "train_captions_csv_path": train_captions_csv_path,

        "save_model_with_architecture": True,

        "val_dir": val_dir,
        "val_image_dir": val_image_dir,
        "val_captions_dir": val_captions_dir,
        "val_captions_csv_path": val_captions_csv_path,

        "test_dir": test_dir,
        "test_image_dir": test_image_dir,
        "test_captions_dir": test_captions_dir,
        "test_captions_csv_path": test_captions_csv_path,

        "vocab_path": vocab_path,

        # model training parameters
        # Attention respecter bien l'ordre alphabétique des classes pour
        # le générateur
        "class_names": ['ball', 'bike', 'dog', 'water'],
        # class encoding dict
        "class_dict": {
            "ball": 0,
            "bike": 1,
            "dog": 2,
            "water": 3
        },
        # Pour les images
        "image_size": image_size,
        "image_shape": image_size + (3,),


        # Pour les textes
        "sequence_length": 32,
        "vocab_size": 10000,
        "num_heads": 4,
        "ff_dim": 128,
        "num_layers": 2,
        "nb_image_filters": 32,
        "pad_sequence": True,

        # Pour les images et les textes dans le modèle CLIP
        "embed_dim": 128,
        "learning_rate": 2e-4,
        "data_augmentation": True,

        # pour le training:
        "patience": 5,
        "batch_size": 64,
        "nb_epochs": 15,
    }

    return param_dict


In [182]:
# Set your directories here

data_analysis_dir = "My Drive/Colab Notebooks/ML_FDS/data_analysis"
model_dir = "My Drive/Colab Notebooks/ML_FDS/models_forclip"
dataset_dir = "My Drive/Colab Notebooks/ML_FDS/flickr_long_subset"
figure_dir = 'My Drive/Colab Notebooks/ML_FDS/figures/'
param_dict = get_default_params(
    dataset_dir=dataset_dir,
    model_dir=model_dir,
    figures_dir=figure_dir
)

In [183]:
class EmbeddingExtractor:
  def __init__(self,
               model,
               model_name,
               data_directory,
               vocab_size,
               vocab_path,
               output_sequence_length,
               dataset_name = "test"):
      self.model = model
      self.model_name = model_name

      image_input = model.inputs[0]
      image_output = model.get_layer("image_latent_vector").output

      self.directory = data_directory
      self.dataset_name = dataset_name

      # Creation of Image Encoder
      self.image_encoder = tf.keras.Model(
          inputs=image_input,
          outputs=image_output,
          name="image_encoder"
      )

      self.tokenizer = TextVectorization(
          max_tokens=vocab_size,
          standardize='lower_and_strip_punctuation',
          split='whitespace',
          vocabulary=vocab_path,
          output_sequence_length=output_sequence_length,
          output_mode="int"  # save 0 for pad tokens
      )

      text_input = model.inputs[1]
      text_output = model.get_layer("text_latent_vector").output

      self.text_encoder = tf.keras.Model(
          inputs=text_input,
          outputs=text_output,
          name="text_encoder"
      )

  def compute_embeddings(self):
      # Image embeddings
      captions_path = os.path.join(self.directory, "captions.csv")
      df = pd.read_csv(captions_path)

      image_paths = [str(Path(self.directory) / p) for p in df["image_path"]]

      # Compute Image embeddings
      image_embeddings = []
      for i in range(0, len(image_paths), 32):
          batch = image_paths[i:i+32]
          imgs = np.array([img_to_array(load_img(p, target_size=(224, 224)))/255. for p in batch])
          image_embeddings.append(self.image_encoder.predict(imgs, verbose=0))

      self.image_embeddings = np.vstack(image_embeddings)

      # Text embeddings
      captions = df["caption"].fillna("").tolist()

      text_embeddings = []
      for i in range(0, len(captions), 32):
          batch = captions[i:i+32]
          tokens = self.tokenizer(batch)
          text_embeddings.append(self.text_encoder.predict(tokens, verbose=0))

      self.text_embeddings = np.vstack(text_embeddings)

  def get_embeddings(self):
      return self.image_embeddings, self.text_embeddings

In [184]:
class EmbeddingAnalyzer:
  def __init__(self,
               image_embeddings, text_embeddings,
               model_name = "MODEL",
               dataset_name = "test",
               data_analysis_dir = "./data_analysis"):
      self.image_embeddings = image_embeddings / np.linalg.norm(image_embeddings, axis=1, keepdims=True)
      self.text_embeddings = text_embeddings / np.linalg.norm(text_embeddings, axis=1, keepdims=True)

      self.model_name = model_name
      self.dataset_name = dataset_name
      self.data_analysis_dir = data_analysis_dir

      self.df_embeddings_2d = None
      self.df_stats = None
      self.df_data_cos_sim = None

  def compute_all(self):
      self.compute_embeddings_stats()
      self.compute_embeddings_cos_sim()
      self.compute_pca_2d()

  def compute_embeddings_stats(self):
      mean_image_vector = self.image_embeddings.mean(axis=0)
      max_image_vector = self.image_embeddings.max(axis=0)
      min_image_vector = self.image_embeddings.min(axis=0)
      std_image_vector = self.image_embeddings.std(axis=0)

      mean_text_vector = self.text_embeddings.mean(axis=0)
      max_text_vector = self.text_embeddings.max(axis=0)
      min_text_vector = self.text_embeddings.min(axis=0)
      std_text_vector = self.text_embeddings.std(axis=0)

      stats = {
          "mean_image_vector": mean_image_vector,
          "min_image_vector": min_image_vector,
          "max_image_vector": max_image_vector,
          "std_image_vector": std_image_vector,

          "mean_text_vector": mean_text_vector,
          "min_text_vector": min_text_vector,
          "max_text_vector": max_text_vector,
          "std_text_vector": std_text_vector,
      }

      self.df_stats = pd.DataFrame(stats)

  def compute_embeddings_cos_sim(self):
      cos_sim = []

      for i in range(len(self.image_embeddings)):
          image_vector = self.image_embeddings[i]
          text_vector = self.text_embeddings[i]

          cos_sim.append(np.dot(image_vector, text_vector))

      data_cos_sim = {
          "cos_sim": cos_sim
      }

      self.df_data_cos_sim = pd.DataFrame(data_cos_sim)

  def compute_pca_2d(self):
      all_embeddings = np.vstack([self.image_embeddings, self.text_embeddings])

      pca = PCA(n_components=2)
      all_embeddings_2d = pca.fit_transform(all_embeddings)

      image_embeddings_2d = all_embeddings_2d[:len(self.image_embeddings)]
      text_embeddings_2d = all_embeddings_2d[len(self.image_embeddings):]

      data_image_embeddings_2d = {
          "X": image_embeddings_2d[:, 0],
          "Y": image_embeddings_2d[:, 1],
          "type": "image"
      }

      data_text_embeddings_2d = {
          "X": text_embeddings_2d[:, 0],
          "Y": text_embeddings_2d[:, 1],
          "type": "text"
      }

      df_data_image_embeddings_2d = pd.DataFrame(data_image_embeddings_2d)
      df_data_text_embeddings_2d = pd.DataFrame(data_text_embeddings_2d)

      self.df_embeddings_2d = pd.concat([df_data_image_embeddings_2d, df_data_text_embeddings_2d], ignore_index=True)

  def get_pca_2d(self):
      if self.df_embeddings_2d is None:
          raise Exception("[EmbeddingAnalyzer] PCA 2d embedding dataframe is empty. You should execute compute_pca_2d before getting the results.")

      return self.df_embeddings_2d

  def get_embeddings_stats(self):
      if self.df_stats is None:
          raise Exception("EmbeddingAnalyzer: Statistics dataframe is empty. You should execute compute_embeddings_stats before getting the results.")

      return self.df_stats

  def get_embeddings_cos_sim(self):
      if self.df_data_cos_sim is None:
          raise Exception("EmbeddingAnalyzer: Cosinus similarity dataframe is empty. You should execute compute_embeddings_cos_sim before getting the results.")

      return self.df_data_cos_sim

  def save_pca_2d(self, filename = None):
      if self.df_embeddings_2d is None:
          raise Exception("[EmbeddingAnalyzer] PCA 2d embedding dataframe is empty. You should execute compute_pca_2d before saving the results.")

      if filename == None:
          filename = f"{self.model_name}_dataset_{self.dataset_name}_pca_2d.csv"

      if not os.path.exists(self.data_analysis_dir):
          os.makedirs(self.data_analysis_dir)

      csv_file_save_path = os.path.join(self.data_analysis_dir, filename)
      self.df_embeddings_2d.to_csv(csv_file_save_path)

  def save_embeddings_stats(self, filename = None):
      if self.df_stats is None:
          raise Exception("[EmbeddingAnalyzer] Statistics dataframe is empty. You should execute compute_embeddings_stats before saving the results.")

      if filename == None:
          filename = f"{self.model_name}_dataset_{self.dataset_name}_embeddings_stats.csv"

      if not os.path.exists(self.data_analysis_dir):
          os.makedirs(self.data_analysis_dir)

      csv_file_save_path = os.path.join(self.data_analysis_dir, filename)
      self.df_stats.to_csv(csv_file_save_path)

  def save_embeddings_cos_sim(self, filename = None):
      if self.df_data_cos_sim is None:
          raise Exception("[EmbeddingAnalyzer] Cosinus similarity dataframe is empty. You should execute compute_embeddings_cos_sim before saving the results.")

      if filename == None:
          filename = f"{self.model_name}_dataset_{self.dataset_name}_cos_sim.csv"

      if not os.path.exists(self.data_analysis_dir):
          os.makedirs(self.data_analysis_dir)

      csv_file_save_path = os.path.join(self.data_analysis_dir, filename)
      self.df_data_cos_sim.to_csv(csv_file_save_path)

In [185]:
class EmbeddingVisualizer:
  def __init__(self,
               model_name = "MODEL",
               dataset_name = "test",
               data_analysis_dir = "./data_analysis",
               figures_dir = "./figures"):
      self.model_name = model_name
      self.dataset_name = dataset_name
      self.data_analysis_dir = data_analysis_dir
      self.figures_dir = figures_dir

  def generate_embeddings_plots_with_error_bars(self, df_stats, save=True, show=True):
      embed_dim = len(df_stats)

      x = np.arange(0, embed_dim)

      # Mean text / image vectors with error bars

      plt.figure(figsize=(12,5))
      plt.errorbar(x, df_stats["mean_text_vector"], df_stats["std_text_vector"], label='Mean text vector', fmt='o')
      plt.errorbar(x, df_stats["mean_image_vector"], df_stats["std_image_vector"], label='Mean image vector', fmt='o')
      plt.legend()

      if save:
          fig_path = os.path.join(self.figures_dir, f"{self.model_name}_dataset_{self.dataset_name}_plot_mean_embeddings_vector.png")
          plt.savefig(fig_path)

      if show:
          plt.show()
      else:
          plt.close()

  def generate_embeddings_frequency_histograms(self, df_stats, save=True, show=True):
      # Frequency histograms
      plt.hist(df_stats["mean_text_vector"], bins=30, color='blue', alpha=0.7)

      plt.xlabel("Value of coordinate")
      plt.ylabel("Frequency")
      plt.title("Histogram of values of coordinates of text latent vector")

      if save:
          fig_path = os.path.join(self.figures_dir, f"{self.model_name}_dataset_{self.dataset_name}_plot_hist_mean_text_vector.png")
          plt.savefig(fig_path)

      if show:
          plt.show()
      else:
          plt.close()

      plt.hist(df_stats["mean_image_vector"], bins=30, color='red', alpha=0.7)

      plt.xlabel("Value of coordinate")
      plt.ylabel("Frequency")
      plt.title("Histogram of values of coordinates of image latent vector")

      if save:
          fig_path = os.path.join(self.figures_dir, f"{self.model_name}_dataset_{self.dataset_name}_plot_hist_mean_image_vector.png")
          plt.savefig(fig_path)

      if show:
          plt.show()
      else:
          plt.close()

  def generate_embeddings_plots(self, df_stats, save=True, show=True):
      embed_dim = len(df_stats)

      x = np.arange(0, embed_dim)
      # Min / mean / max image vectors scatter plot

      plt.figure(figsize=(12,5))
      plt.scatter(x, df_stats["mean_image_vector"], label='Mean image vector', color='blue')
      plt.scatter(x, df_stats["min_image_vector"], label='Min image vector', color='green')
      plt.scatter(x, df_stats["max_image_vector"], label='Max image vector', color='red')

      plt.xlabel("Coordinate of latent vector")
      plt.ylabel("Value")
      plt.title("Image latent vector embeddings")

      plt.legend()

      if not os.path.exists(self.figures_dir):
          os.makedirs(self.figures_dir)

      if save:
          fig_path = os.path.join(self.figures_dir, f"{self.model_name}_dataset_{self.dataset_name}_plot_scatter_image_vector.png")
          plt.savefig(fig_path)

      if show:
          plt.show()
      else:
          plt.close()

      # Min / mean / max text vectors scatter plot

      plt.figure(figsize=(12,5))
      plt.scatter(x, df_stats["mean_text_vector"], label='Mean text vector', color='blue')
      plt.scatter(x, df_stats["min_text_vector"], label='Min text vector', color='green')
      plt.scatter(x, df_stats["max_text_vector"], label='Max text vector', color='red')

      plt.xlabel("Coordinate of latent vector")
      plt.ylabel("Value")
      plt.title("Text latent vector embeddings")

      plt.legend()

      if save:
          fig_path = os.path.join(self.figures_dir, f"{self.model_name}_dataset_{self.dataset_name}_plot_scatter_text_vector.png")
          plt.savefig(fig_path)

      if show:
          plt.show()
      else:
          plt.close()

  def generate_embeddings_cos_sim_plots(self, df_data_cos_sim, save=True, show=True):
      number_of_samples = len(df_data_cos_sim)

      x = np.arange(0, number_of_samples)

      plt.figure(figsize=(12,5))
      plt.scatter(x, df_data_cos_sim["cos_sim"])

      plt.xlabel("Sample")
      plt.ylabel("Cos similarity value")
      plt.title("Cosinus similarity")

      if save:
          if not os.path.exists(self.figures_dir):
              os.makedirs(self.figures_dir)

          fig_path = os.path.join(self.figures_dir, f"{self.model_name}_dataset_{self.dataset_name}_plot_cos_sim.png")
          plt.savefig(fig_path)

      if show:
          plt.show()
      else:
          plt.close()

      plt.hist(df_data_cos_sim["cos_sim"], bins=30, color='red', alpha=0.7)

      plt.xlabel("Cosinus similarity value")
      plt.ylabel("Frequency")
      plt.title("Distribution of cosinus similarity value")

      if save:
          fig_path = os.path.join(self.figures_dir, f"{self.model_name}_dataset_{self.dataset_name}_plot_hist_cos_sim.png")
          plt.savefig(fig_path)

      if show:
          plt.show()
      else:
          plt.close()

  def generate_pca_2d_plot(self, df_embeddings_2d, save=True, show=True):
      plt.figure(figsize=(8,6))
      for t, color, marker in zip(['image', 'text'], ['red', 'blue'], ['o', '^']):
          subset = df_embeddings_2d[df_embeddings_2d['type'] == t]
          plt.scatter(subset['X'], subset['Y'], c=color, label=t, marker=marker, s=100)

      plt.xlabel('X')
      plt.ylabel('Y')
      plt.title('PCA of Text and Image Vectors')
      plt.legend()

      if save:
          fig_path = os.path.join(self.figures_dir, f"{self.model_name}_dataset_{self.dataset_name}_plot_pca2d.png")
          plt.savefig(fig_path)

      if show:
          plt.show()
      else:
          plt.close()

## Load model and analyze

In [190]:
analysis_models_names = {"INITIAL": "clip_pipeline_model_big_dataset_betterCNN_data_augmentation_bs64_val_loss_patience5_epoch30_latent128_textDrop0.2_3ConvLayer_lessAugmentation_dropoutTransformer_0.2&_lerningrate2-4_temp0.14_50_epochs"}

In [187]:
def run_full_analysis(model_name, quiet=False):
    # 0. Load
    if not quiet:
        print("Loading...")

    model_path = os.path.join(param_dict["model_dir"], f"{model_name}.keras")
    model = load_model(model_path)

    # 1. Extract
    if not quiet:
        print("Extracting...")

    extractor = EmbeddingExtractor(
        model=model,
        model_name=model_name,
        data_directory=param_dict['test_dir'],
        vocab_path=param_dict['vocab_path'],
        vocab_size=param_dict['vocab_size'],
        output_sequence_length=param_dict['sequence_length']
    )

    extractor.compute_embeddings()
    image_embeddings, text_embeddings = extractor.get_embeddings()

    # 2. Analyze & Export
    if not quiet:
        print("Analyzing...")

    analyzer = EmbeddingAnalyzer(
        image_embeddings, text_embeddings,
        model_name=model_name,
        dataset_name="test",
        data_analysis_dir=data_analysis_dir)

    analyzer.compute_all()
    analyzer.save_embeddings_stats()
    analyzer.save_embeddings_cos_sim()
    analyzer.save_pca_2d()

    # 3. Visualize
    if not quiet:
        print("Visualizing...")

    visualizer = EmbeddingVisualizer(
        model_name = "MODEL",
        dataset_name = "test",
        data_analysis_dir = data_analysis_dir,
        figures_dir = param_dict["figures_dir"]
    )

    visualizer.generate_embeddings_plots(analyzer.get_embeddings_stats(), show=(not quiet))
    visualizer.generate_embeddings_cos_sim_plots(analyzer.get_embeddings_cos_sim(), show=(not quiet))
    visualizer.generate_embeddings_frequency_histograms(analyzer.get_embeddings_stats(), show=(not quiet))
    visualizer.generate_pca_2d_plot(analyzer.get_pca_2d(), show=(not quiet))

    if not quiet:
        print("Done!")

In [188]:
def run_full_analysis_on_models(models_names, quiet=True):
  for model_name in models_names:
    run_full_analysis(model_name, quiet=quiet)

In [189]:
run_full_analysis_on_models(list(analysis_models_names.values()))

