# **Demo**<br/>
**Master's Degree in Data Science (A.Y. 2023/2024)**<br/>
**University of Milano - Bicocca**<br/>

Vittorio Haardt, Luca Porcelli

You must have a link to the project folder on "My Drive" for this demo to work.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Installing packages and loading libraries

In [2]:
pip install pydub

Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1


In [3]:
import os
import librosa
import numpy as np
import joblib
from PIL import Image, ImageFilter, ImageChops
from tensorflow.keras.models import load_model
from scipy.io import wavfile as wav
import IPython.display as ipd
from scipy.io import wavfile
import librosa.display
import matplotlib.pyplot as plt
from torch.autograd import Variable
from torchvision.utils import save_image
import torch
import torch.nn.functional as F
import numpy as np
import os
import time
import datetime
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import os
from PIL import Image
from tqdm import tqdm
import tensorflow as tf
from torch.utils import data
from torchvision import transforms as T
from torchvision.datasets import ImageFolder
from PIL import Image
import torch
import os
import random
from pydub import AudioSegment
import warnings
warnings.filterwarnings('ignore')

# Functions

In [4]:
class Demo(object):
    def __init__(self, input_path=None, audio_model_path='/content/drive/MyDrive/Digital/Models/Mono-Dimensional/Support Vector Machine_best_model.pkl', image_model_path="/content/drive/MyDrive/Digital/Models/Bi-Dimensional/best_model_ResNet50_lr.h5"):
        self.input_path = input_path
        self.audio_model_path = audio_model_path
        self.image_model_path = image_model_path

    @staticmethod
    def combo(input):
        rms = np.sqrt(np.mean(input**2))  # Root Mean Square (RMS) level
        spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=input, n_fft=100))  # Spectral centroid
        bandwidth = np.mean(librosa.feature.spectral_bandwidth(y=input, n_fft=100))  # Bandwidth
        zero_crossing_rate = np.mean(librosa.feature.zero_crossing_rate(input)[0])  # Zero-crossing rate
        spectral_rolloff = np.mean(librosa.feature.spectral_rolloff(y=input, n_fft=100))  # Spectral-roll off
        spectrum_magnitude = np.mean(np.abs(librosa.core.magphase(librosa.stft(input))[0])) # Spectrum magnitude
        energy = np.sum((input*1.0)**2, keepdims=True)[0] # Energy
        return np.array([rms, spectral_centroid, bandwidth, zero_crossing_rate, spectral_rolloff, spectrum_magnitude, energy])

    @staticmethod
    def is_wav_file(file_path):
        try:
            librosa.load(file_path, sr=None)
            return True
        except Exception as e:
            print(f"Error while checking the file {file_path}: {e}")
            return False

    @staticmethod
    def load_single_audio(file_path, feature_extractor=None, length=int(129235.83204633204), normalize=False):
        features = []
        if file_path.endswith('.wav') and Demo.is_wav_file(file_path):
            signal, _ = librosa.load(file_path, sr=48000)
            if len(signal) < length:
                signal = np.pad(signal, (0, length - len(signal)))
            elif len(signal) > length:
                signal = signal[:length]
            if feature_extractor is not None:
                features = feature_extractor(signal)
        if normalize:
            eps = 0.001
            X_train = np.array(features)
            X_train_mean = X_train.mean(axis=0)
            X_train_std = X_train.std(axis=0)
            X_train = (X_train - X_train_mean + eps) / (X_train_std + eps)
            X_train = [row for row in X_train]
            return X_train
        return features

    def audio_emotion(self, input_path, audio_model_path=None, normalize=True):
        file_path = input_path
        if file_path.endswith('.mp3'):
              audio = AudioSegment.from_mp3(file_path)
              audio.export("/content/drive/MyDrive/Digital/Data/Audio.wav", format="wav")
              file_path="/content/drive/MyDrive/Digital/Data/Audio.wav"
        X_audio = Demo.load_single_audio(file_path, feature_extractor=self.combo, normalize=normalize)
        if audio_model_path is None:
            audio_model_path = self.audio_model_path
        best_svc_loaded = joblib.load(audio_model_path)
        predictions = best_svc_loaded.predict([X_audio])[0]
        print('The following classification is based on the first 2 sec.')
        sound_data, sound_rate = librosa.load(input_path)
        ipd.display(ipd.Audio(sound_data, rate=sound_rate))
        return predictions

    def preprocess_image_for_model(file_path):
        img = Image.open(file_path)
        img_gray = img.convert("L")
        kernel_x = ImageFilter.Kernel((3, 3), (-1, 0, 1, -2, 0, 2, -1, 0, 1), 1, 0)
        kernel_y = ImageFilter.Kernel((3, 3), (-1, -2, -1, 0, 0, 0, 1, 2, 1), 1, 0)

        edges_x = img_gray.filter(kernel_x)
        edges_y = img_gray.filter(kernel_y)

        final_image = ImageChops.add(edges_x, edges_y)
        final_image = final_image.resize((224, 224))

        image_array = np.array(final_image)
        image_array = np.expand_dims([image_array], axis=-1)

        return image_array

    def image_emotion(self, input_path, image_model_path=None):
        file_path = input_path
        img = Image.open(file_path)

        # Image preprocessing
        image_array = Demo.preprocess_image_for_model(file_path)

        # Print image
        plt.imshow(img)
        plt.axis('off')
        plt.show()

        if image_model_path is None:
            image_model_path = self.image_model_path
        model = load_model(image_model_path, compile=False)
        class_labels = ["Angry", "Happy", "Neutral", "Sad", "Surprise"]
        predicted_class_label = class_labels[np.argmax(model.predict(image_array))]
        return predicted_class_label

In [5]:
class Logger(object):
    """Tensorboard logger."""

    def __init__(self, log_dir):
        """Initialize summary writer."""
        self.writer = tf.summary.create_file_writer(log_dir)

    def scalar_summary(self, tag, value, step):
        """Add scalar summary."""
        with self.writer.as_default():
            tf.summary.scalar(tag, value, step=step)
            self.writer.flush()

def get_loader(image_dir, attr_path, selected_attrs, crop_size=178, image_size=224,
              batch_size=16, mode='train', num_workers=1):
    """Build and return a data loader."""
    transform = []
    if mode == 'train':
        transform.append(T.RandomHorizontalFlip())
    transform.append(T.CenterCrop(crop_size))
    transform.append(T.Resize(image_size))
    transform.append(T.ToTensor())
    transform.append(T.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)))
    transform = T.Compose(transform)

    dataset = ImageFolder(image_dir, transform)

    data_loader = data.DataLoader(dataset=dataset,
                                  batch_size=batch_size,
                                  shuffle=(mode=='train'),
                                  num_workers=num_workers)
    return data_loader

def make_square(image_path, output_size=(256, 256)):
  img = Image.open(image_path)
  img = img.resize(output_size, Image.ANTIALIAS)
  new_img = Image.new("RGB", output_size, (255, 255, 255))
  position = ((output_size[0] - img.size[0]) // 2, (output_size[1] - img.size[1]) // 2)
  new_img.paste(img, position)
  return new_img

def process_images_in_directory(input_dir, output_dir):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    for root, dirs, files in os.walk(input_dir):
        for file in files:
            if file.endswith(".jpg") or file.endswith(".jpeg") or file.endswith(".png"):
                input_path = os.path.join(root, file)
                output_subdir = os.path.join(output_dir, os.path.relpath(root, input_dir))
                output_path = os.path.join(output_subdir, file)
                if not os.path.exists(output_subdir):
                    os.makedirs(output_subdir)
                square_img = make_square(input_path)
                square_img.save(output_path)

class ResidualBlock(nn.Module):
    """Residual Block with instance normalization."""
    def __init__(self, dim_in, dim_out):
        super(ResidualBlock, self).__init__()
        self.main = nn.Sequential(
            nn.Conv2d(dim_in, dim_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.InstanceNorm2d(dim_out, affine=True, track_running_stats=True),
            nn.ReLU(inplace=True),
            nn.Conv2d(dim_out, dim_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.InstanceNorm2d(dim_out, affine=True, track_running_stats=True))

    def forward(self, x):
        return x + self.main(x)


class Generator(nn.Module):
    """Generator network."""
    def __init__(self, conv_dim=64, c_dim=5, repeat_num=6):
        super(Generator, self).__init__()

        layers = []
        layers.append(nn.Conv2d(3+c_dim, conv_dim, kernel_size=7, stride=1, padding=3, bias=False))
        layers.append(nn.InstanceNorm2d(conv_dim, affine=True, track_running_stats=True))
        layers.append(nn.ReLU(inplace=True))

        # Down-sampling layers.
        curr_dim = conv_dim
        for i in range(2):
            layers.append(nn.Conv2d(curr_dim, curr_dim*2, kernel_size=4, stride=2, padding=1, bias=False))
            layers.append(nn.InstanceNorm2d(curr_dim*2, affine=True, track_running_stats=True))
            layers.append(nn.ReLU(inplace=True))
            curr_dim = curr_dim * 2

        # Bottleneck layers.
        for i in range(repeat_num):
            layers.append(ResidualBlock(dim_in=curr_dim, dim_out=curr_dim))

        # Up-sampling layers.
        for i in range(2):
            layers.append(nn.ConvTranspose2d(curr_dim, curr_dim//2, kernel_size=4, stride=2, padding=1, bias=False))
            layers.append(nn.InstanceNorm2d(curr_dim//2, affine=True, track_running_stats=True))
            layers.append(nn.ReLU(inplace=True))
            curr_dim = curr_dim // 2

        layers.append(nn.Conv2d(curr_dim, 3, kernel_size=7, stride=1, padding=3, bias=False))
        layers.append(nn.Tanh())
        self.main = nn.Sequential(*layers)

    def forward(self, x, c):
        c = c.view(c.size(0), c.size(1), 1, 1)
        c = c.repeat(1, 1, x.size(2), x.size(3))
        x = torch.cat([x, c], dim=1)
        return self.main(x)


class Discriminator(nn.Module):
    """Discriminator network with PatchGAN."""
    def __init__(self, image_size=128, conv_dim=64, c_dim=5, repeat_num=6):
        super(Discriminator, self).__init__()
        layers = []
        layers.append(nn.Conv2d(3, conv_dim, kernel_size=4, stride=2, padding=1))
        layers.append(nn.LeakyReLU(0.01))

        curr_dim = conv_dim
        for i in range(1, repeat_num):
            layers.append(nn.Conv2d(curr_dim, curr_dim*2, kernel_size=4, stride=2, padding=1))
            layers.append(nn.LeakyReLU(0.01))
            curr_dim = curr_dim * 2

        kernel_size = int(image_size / np.power(2, repeat_num))
        self.main = nn.Sequential(*layers)
        self.conv1 = nn.Conv2d(curr_dim, 1, kernel_size=3, stride=1, padding=1, bias=False)
        self.conv2 = nn.Conv2d(curr_dim, c_dim, kernel_size=kernel_size, bias=False)

    def forward(self, x):
        h = self.main(x)
        out_src = self.conv1(h)
        out_cls = self.conv2(h)
        return out_src, out_cls.view(out_cls.size(0), out_cls.size(1))

class Solver(object):

  def __init__(self, rafd_loader = None, resume_iters = None, g_lr = 0.0001, d_lr = 0.0001):
      """Initialize configurations."""

      # Data loader.
      #self.celeba_loader = celeba_loader
      self.rafd_loader = rafd_loader

      # Model configurations.
      self.c_dim = 5
      self.image_size = 224
      self.g_conv_dim = 64
      self.d_conv_dim = 64
      self.g_repeat_num = 6
      self.d_repeat_num = 6
      self.lambda_cls = 1
      self.lambda_rec = 10
      self.lambda_gp = 10

      # Training configurations.
      self.batch_size = 16
      self.num_iters = 200000
      self.num_iters_decay = 100000
      self.g_lr = g_lr
      self.d_lr = d_lr
      self.n_critic = 5
      self.beta1 = 0.5
      self.beta2 = 0.999
      self.resume_iters = resume_iters
      self.selected_attrs = ['Angry', 'Happy', 'Neutral', 'Sad', 'Surprise']

      # Miscellaneous.
      self.use_tensorboard = 'True'
      self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

      # Directories.
      self.log_dir = '/content/drive/MyDrive/Digital/stargan/logs'
      self.model_save_dir = '/content/drive/MyDrive/Digital/stargan/models'
      self.result_dir = '/content/'

      # Build the model and tensorboard.
      self.build_model()
      if self.use_tensorboard:
          self.build_tensorboard()

  def build_model(self):
      """Create a generator and a discriminator."""
      self.G = Generator(self.g_conv_dim, self.c_dim, self.g_repeat_num)
      self.D = Discriminator(self.image_size, self.d_conv_dim, self.c_dim, self.d_repeat_num)

      self.g_optimizer = torch.optim.Adam(self.G.parameters(), self.g_lr, [self.beta1, self.beta2])
      self.d_optimizer = torch.optim.Adam(self.D.parameters(), self.d_lr, [self.beta1, self.beta2])

      self.G.to(self.device)
      self.D.to(self.device)

  def restore_model(self, resume_iters):
      """Restore the trained generator and discriminator."""
      #print('Loading the trained models from step {}...'.format(resume_iters))
      G_path = os.path.join(self.model_save_dir, '{}-G.ckpt'.format(resume_iters))
      D_path = os.path.join(self.model_save_dir, '{}-D.ckpt'.format(resume_iters))
      self.G.load_state_dict(torch.load(G_path, map_location=lambda storage, loc: storage))
      self.D.load_state_dict(torch.load(D_path, map_location=lambda storage, loc: storage))

  def build_tensorboard(self):
      """Build a tensorboard logger."""
      self.logger = Logger(self.log_dir)

  def reset_grad(self):
      """Reset the gradient buffers."""
      self.g_optimizer.zero_grad()
      self.d_optimizer.zero_grad()

  def denorm(self, x):
      """Convert the range from [-1, 1] to [0, 1]."""
      out = (x + 1) / 2
      return out.clamp_(0, 1)

  def label2onehot(self, labels, dim):
      """Convert label indices to one-hot vectors."""
      batch_size = labels.size(0)
      out = torch.zeros(batch_size, dim)
      out[np.arange(batch_size), labels.long()] = 1
      return out

  def create_labels(self, c_org, c_dim=5, selected_attrs=None):
      """Generate target domain labels for debugging and testing."""
      # Get hair color indices

      c_trg_list = []
      for i in range(c_dim):
          c_trg = self.label2onehot(torch.ones(c_org.size(0))*i, c_dim)

          c_trg_list.append(c_trg.to(self.device))
      return c_trg_list



def starGan(input_directory = None, output_directory = None, Name = None):
  if input_directory is None or output_directory is None or Name is None:
    raise ValueError("Both input_directory and output_directory, and Name must be provided.")

  process_images_in_directory(input_directory, output_directory)

  solver = Solver(resume_iters = 200000)
  prova_loader = get_loader(output_directory, None, None, 224, 224, 1, 'test', 1)

  solver.restore_model(200000)
  data_loader = prova_loader

  with torch.no_grad():
    for i, (x_real, c_org) in enumerate(data_loader):
      # Prepare input images and target domain labels.
      x_real = x_real.to(solver.device)
      c_trg_list = solver.create_labels(c_org, solver.c_dim, solver.selected_attrs)

      # Translate images.
      x_fake_list = [x_real]
      for c_trg in c_trg_list:
        x_fake_list.append(solver.G(x_real, c_trg))

      # Save the translated images.
      x_concat = torch.cat(x_fake_list, dim=3)
      result_path = os.path.join(solver.result_dir, '{}-{}.jpg'.format(Name, i+1))
      save_image(solver.denorm(x_concat.data.cpu()), result_path, nrow=1, padding=0)
      #print('Saved real and fake images into {}...'.format(result_path))
      img = Image.open(result_path)
      plt.figure(figsize=(15, 10))
      plt.imshow(img)
      plt.axis('off')
      plt.show()

# Demo showcase

In [6]:
demo = Demo()

In [None]:
demo.audio_emotion(input_path = "/content/drive/MyDrive/Digital/Data/Audio_Demo.mp3")

In [None]:
demo.image_emotion(input_path="/content/drive/MyDrive/Digital/Data/Imm_Demo.jpg")#Imm_Demo3.png

In [None]:
starGan(input_directory = '/content/drive/MyDrive/Digital/Data/Luca',
        output_directory = '/content/Luca',
        Name = 'imm')