In [None]:
# Adjustable Privacy - train_obfuscator.ipynb
# - Train an obfuscator.
# - Uses image dataset (CelebA).
# - Saves models after each epoch number (to google drive and locally).
# - It can stop and resume training.
# - Draws loss, psnr, and accuracy plots and saves them (to google drive).
# - Also it can load models and draw plots (from google drive).
# - You can manage notebook parameters in parser block

In [None]:
# Imports
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

import torch
from torch import nn
from torch import optim
import torch.nn.parallel
import torch.backends.cudnn as cudnn
from torch.utils.data import random_split
from torchvision import datasets, transforms, models
import torchvision.utils as vutils
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
matplotlib.style.use('ggplot')
import itertools
import random
import shutil
from zipfile import ZipFile
import os
from tqdm import tqdm
from collections import OrderedDict
import time
from math import floor
import argparse

In [None]:
# Parser
parser = argparse.ArgumentParser(description='Adjustable Privacy - Train an obfuscator. '
                                 + 'Uses image dataset (CelebA). '
                                 + 'Saves models after each epoch number (to google drive and locally). '
                                 + 'It can stop and resume training.'
                                 + 'Draws loss, psnr, and accuracy plots and saves them (to google drive and locally). '
                                 + 'Also it can load models and draw plots (from google drive).')

parser.add_argument('--resume', default = False, help = 'Accepts "True" or "False". ')
parser.add_argument('--last_epoch', type=int, default = 0, help = 'In case of resuming training use last saved epoch number and in case of loading a model, set to model number.')
parser.add_argument('--target_index', type=int, required=True, help = 'gender(20), smiling(31), open mouth(21), and high cheekbone(19)')
parser.add_argument('--save_path', type=str, required=True, help = 'Full path on your google drive to save model and plots. And also load from it. Like "drive/MyDrive/adjustable-privacy/Models/CelebA-G-S-Obfuscator/"')
parser.add_argument('--epoch_numbers', type=int, default = 50, help = 'Number of epochs to train model. (when you want load a model, it should set to that model number)')
parser.add_argument('--download_dataset', default = False, help = 'Accepts "True" or "False". Download CelebA dataset or use CelebA shared directory.')
parser.add_argument('--dataset_path', type=str, default = "", help = '(if --download_dataset=False) Full path on your google drive to CelebA shared directory shortcut. Like "drive/MyDrive/adjustable-privacy/Datasets/CelebA/"')

command_string = "--resume False" \
" --last_epoch 0" \
" --target_index 20" \
" --save_path drive/MyDrive/adjustable-privacy/Models/CelebA-G-S-Obfuscator/" \
" --epoch_numbers 200" \
" --download_dataset False" \
" --dataset_path drive/MyDrive/adjustable-privacy/Datasets/CelebA/"

args = parser.parse_args(command_string.split())

In [None]:
# Hyper parameters:
isFirstRun = args.resume=='False'
lastRunEpochNumber = args.last_epoch
manual_seed = 20
image_size = 64
use_whole_dataset = True
usage_percent = 1.0
learning_rate = 0.001 #0.2
batch_size = 64
celeba_male_index = 20
celeba_smiling_index = 31
celeba_mouth_open_index = 21
celeba_high_cheekbone_index = 19
using_index = args.target_index
saving_path = args.save_path
# Number of workers for dataloader
workers = 2
# Beta1 hyperparam for Adam optimizers
beta1 = 0.5
# Number of GPUs available.
ngpu = 1
# Size of feature maps in encoder
nef = 64
# Size of feature maps in decoder
ndf = 64
# Number of channels in the training images. For color images this is 3
nc = 3
# Number of training epochs
num_epochs = args.epoch_numbers
data_dir = 'celeba'
download_dataset = args.download_dataset=='True'
dataset_folder_path = args.dataset_path

In [None]:
# Check if CUDA is available
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

In [None]:
# Mount google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# getting dataset ready from shared directory
if not download_dataset:
    dataset_zip_path = dataset_folder_path + '/Img/img_align_celeba.zip'
    list_eval_partition_path = dataset_folder_path + '/Eval/list_eval_partition.txt'
    identity_celeba_path = dataset_folder_path + '/Anno/identity_CelebA.txt'
    list_attr_celeba_path = dataset_folder_path + '/Anno/list_attr_celeba.txt'
    list_bbox_celeba_path = dataset_folder_path + '/Anno/list_bbox_celeba.txt'
    list_landmarks_align_celeba_path = dataset_folder_path + '/Anno/list_landmarks_align_celeba.txt'

    try:
      os.mkdir(data_dir)
      print("data folder created successfully")
    except OSError as e:
      print("Error: %s" % (e.strerror))

    shutil.copyfile(dataset_zip_path, data_dir + r'/img_align_celeba.zip')
    shutil.copyfile(list_eval_partition_path, data_dir + r'/list_eval_partition.txt')
    shutil.copyfile(identity_celeba_path, data_dir + r'/identity_CelebA.txt')
    shutil.copyfile(list_attr_celeba_path, data_dir + r'/list_attr_celeba.txt')
    shutil.copyfile(list_bbox_celeba_path, data_dir + r'/list_bbox_celeba.txt')
    shutil.copyfile(list_landmarks_align_celeba_path, data_dir + r'/list_landmarks_align_celeba.txt')

    try:
        shutil.rmtree(data_dir + r'/img_align_celeba')
        print("old unzipped directory removed successfully")
    except OSError as e:
        print("Error: %s" % (e.strerror))

    archive = data_dir + r'/img_align_celeba.zip'
    with ZipFile(archive, 'r') as zip:
       zip.extractall(data_dir)

In [None]:
# make saving path dir
try:
    os.makedirs(saving_path)
    print("saving_path directory created successfully")
except OSError as e:
    print("Error: %s" % (e.strerror))

In [None]:
# Define transforms
train_transforms = transforms.Compose([transforms.Resize(image_size),
                                       transforms.CenterCrop(image_size),
                                       transforms.ToTensor(),
                                       transforms.Normalize([0.485, 0.456, 0.406],
                                                            [0.229, 0.224, 0.225])])

test_transforms = transforms.Compose([transforms.Resize(image_size),
                                      transforms.CenterCrop(image_size),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406],
                                                           [0.229, 0.224, 0.225])])

In [None]:
# Function - use some percent of data
def shorten_dataset(dataset, usage_percent=1.0):
  len_used = floor(len(dataset)*usage_percent)
  len_not_used = len(dataset) - len_used
  used_dataset, not_used_dataset = random_split(dataset, [len_used, len_not_used], generator=torch.Generator().manual_seed(manual_seed)) 
  return used_dataset

In [None]:
# Load Datas
train_set = datasets.CelebA(root='', download=download_dataset, split='train', target_type=["attr", "identity"], transform=train_transforms)
test_set = datasets.CelebA(root='', download=download_dataset, split='test', target_type=["attr", "identity"], transform=test_transforms)
valid_set = datasets.CelebA(root='', download=download_dataset, split='valid', target_type=["attr", "identity"], transform=test_transforms)

# shorten Dataset
if not use_whole_dataset:
  train_set = shorten_dataset(train_set, usage_percent)

# DataLoader
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=workers, drop_last=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, num_workers=workers, drop_last=True)
valid_loader = torch.utils.data.DataLoader(valid_set, batch_size=batch_size, num_workers=workers, drop_last=True)

In [None]:
# Decide which device we want to run on
device = torch.device("cuda:0" if (torch.cuda.is_available() and ngpu > 0) else "cpu")

In [None]:
# custom weights initialization
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [None]:
# Encoder Model
class Encoder(nn.Module):
    def __init__(self, ngpu):
        super(Encoder, self).__init__()
        self.ngpu = ngpu
        
        # input is nc x 64 x 64 
        self.conv1 = nn.Conv2d(nc, nef, 4, 2, 1, bias=False)
        self.actv1 = nn.LeakyReLU(0.2, inplace=True)
        # state size. (nef) x 32 x 32
        self.conv2 = nn.Conv2d(nef, nef, 4, 2, 1, bias=False)
        self.bnor2 = nn.BatchNorm2d(nef)
        self.actv2 = nn.LeakyReLU(0.2, inplace=True)
        # state size. (nef) x 16 x 16
        self.conv3 = nn.Conv2d(nef, nef, 4, 2, 1, bias=False)
        self.bnor3 = nn.BatchNorm2d(nef)
        self.actv3 = nn.LeakyReLU(0.2, inplace=True)
        # state size. (nef) x 8 x 8
        self.conv4 = nn.Conv2d(nef, nef * 2, 4, 2, 1, bias=False)
        self.bnor4 = nn.BatchNorm2d(nef * 2)
        self.actv4 = nn.LeakyReLU(0.2, inplace=True)
        # state size. (nef*2) x 4 x 4
        # shaping would be here: nef*2 x 4 x 4 -> 2048
        # state size. 2048
        self.fllc5 = nn.Linear(nef*2*4*4, nef*1*4*4)
        self.actv5 = nn.LeakyReLU(0.2, inplace=True)
        # state size. 1024

        # split features: 1024 -> 1022 + 2
        # classifier:
        self.fllc_male_features1 = nn.Linear(nef*1*4*4, nef*1*4*4)
        self.actv_male_features1 = nn.LeakyReLU(0.2, inplace=True)
        self.dropout_male_features1 = nn.Dropout(p=0.5)
        self.fllc_male_features2 = nn.Linear(nef*1*4*4, nef*4)
        self.actv_male_features2 = nn.LeakyReLU(0.2, inplace=True)
        self.dropout_male_features2 = nn.Dropout(p=0.5)
        self.fllc_male_features3 = nn.Linear(nef*4, nef)
        self.actv_male_features3 = nn.LeakyReLU(0.2, inplace=True)
        self.fllc_male_features4 = nn.Linear(nef, 2)
        self.actv_male_features4 = nn.LogSoftmax(dim=1)
        # other features
        self.fllc_other_features1 = nn.Linear(nef*1*4*4, nef*1*4*4)
        self.actv_other_features1 = nn.LeakyReLU(0.2, inplace=True)
        self.dropout_other_features1 = nn.Dropout(p=0.5)
        self.fllc_other_features2 = nn.Linear(nef*1*4*4, nef*1*4*4)
        self.actv_other_features2 = nn.LeakyReLU(0.2, inplace=True)
        self.dropout_other_features2 = nn.Dropout(p=0.5)
        self.fllc_other_features3 = nn.Linear(nef*1*4*4, nef*1*4*4 - 2)
        self.actv_other_features3 = nn.LeakyReLU(0.2, inplace=True)
        # aggregate features for output

    def forward(self, x):
        # Part 1:
        x = self.conv1(x)
        x = self.actv1(x)
        x = self.conv2(x)
        x = self.bnor2(x)
        x = self.actv2(x)
        x = self.conv3(x)
        x = self.bnor3(x)
        x = self.actv3(x)
        x = self.conv4(x)
        x = self.bnor4(x)
        x = self.actv4(x)
        # flatten
        x = torch.flatten(x, start_dim = 1)
        # Part 2:
        x = self.fllc5(x)
        x = self.actv5(x)
        # first classifier:
        y1 = self.fllc_male_features1(x)
        y1 = self.actv_male_features1(y1)
        y1 = self.dropout_male_features1(y1)
        y1 = self.fllc_male_features2(y1)
        y1 = self.actv_male_features2(y1)
        y1 = self.dropout_male_features2(y1)
        y1 = self.fllc_male_features3(y1)
        y1 = self.actv_male_features3(y1)
        y1 = self.fllc_male_features4(y1)
        y1 = self.actv_male_features4(y1)
        # other features
        y3 = self.fllc_other_features1(x) 
        y3 = self.actv_other_features1(y3)
        y3 = self.dropout_other_features1(y3)
        y3 = self.fllc_other_features2(y3) 
        y3 = self.actv_other_features2(y3)
        y3 = self.dropout_other_features2(y3)
        y3 = self.fllc_other_features3(y3) 
        y3 = self.actv_other_features3(y3)
        return y1, y3

In [None]:
# Decoder Model
class Decoder(nn.Module):
    def __init__(self, ngpu):
        super(Decoder, self).__init__()
        self.ngpu = ngpu
        
        # input size is 1024
        self.fllc6 = nn.Linear(nef*1*4*4, ndf*2*4*4)
        self.actv6 = nn.LeakyReLU(0.2, inplace=True)
        # state size. 2048
        # shaping would be here: 2048 -> ndf*2 x 4 x 4
        # state size. (ndf*2) x 4 x 4
        self.cnvt7 = nn.ConvTranspose2d( ndf*2, ndf, 4, 2, 1, bias=False)
        self.bnor7 = nn.BatchNorm2d(ndf)
        self.actv7 = nn.ReLU(True)
        # state size. (ndf) x 8 x 8
        self.cnvt8 = nn.ConvTranspose2d(ndf, ndf, 4, 2, 1, bias=False)
        self.bnor8 = nn.BatchNorm2d(ndf)
        self.actv8 = nn.ReLU(True)
        # state size. (ndf) x 16 x 16
        self.cnvt9 = nn.ConvTranspose2d( ndf, ndf, 4, 2, 1, bias=False)
        self.bnor9 = nn.BatchNorm2d(ndf)
        self.actv9 = nn.ReLU(True)
        # state size. (ndf) x 32 x 32
        self.cnvt10 = nn.ConvTranspose2d( ndf, nc, 4, 2, 1, bias=False)
        self.actv10 = nn.Sigmoid() # nn.Tanh()
        # state size. (nc) x 64 x 64 

    def forward(self, x):
        x = self.fllc6(x)
        x = self.actv6(x)
        x = x.view(batch_size, ndf*2, 4, 4)
        x = self.cnvt7(x)
        x = self.bnor7(x)
        x = self.actv7(x)
        x = self.cnvt8(x)
        x = self.bnor8(x)
        x = self.actv8(x)
        x = self.cnvt9(x)
        x = self.bnor9(x)
        x = self.actv9(x)
        x = self.cnvt10(x)
        x = self.actv10(x)
        return x

In [None]:
# AE Model
class AEModel(nn.Module):
    def __init__(self, ngpu):
        super(AEModel, self).__init__()
        self.ngpu = ngpu
        
        self.encoder = Encoder(ngpu).to(device)
        self.decoder = Decoder(ngpu).to(device)

    def forward(self, x):
        y1, y3 = self.encoder(x)
        y = torch.cat((y1, y3), 1)
        x = self.decoder(y)
        return x, y1

In [None]:
# Create the AE
netAE = AEModel(ngpu).to(device)

# Handle multi-gpu if desired
if (device.type == 'cuda') and (ngpu > 1):
    netAE = nn.DataParallel(netAE, list(range(ngpu)))

# Apply the weights_init function to randomly initialize all weights
netAE.apply(weights_init)

In [None]:
# total parameters
total_params = sum(p.numel() for p in netAE.parameters())
print(f"{total_params:,} total parameters.")

In [None]:
criterion = nn.MSELoss()
criterion_main_features = nn.NLLLoss()
optimizer = optim.Adam(netAE.parameters(), lr=learning_rate, betas=(beta1, 0.999))

In [None]:
# Function - Save:
def save_model(name, number, model, res):
  checkpoint = {'res': res,
                'state_dict': model.state_dict()}
  torch.save(checkpoint, saving_path + 'checkpoint-' + name + '-' + str(number) + '.pth')
  return True

In [None]:
# Function - Load:
def load_model(name, number, model, device):
  
  checkpoint = torch.load(saving_path + 'checkpoint-' + name + '-' + str(number) + '.pth', map_location=device)
  res = checkpoint['res']
  model.load_state_dict(checkpoint['state_dict'])
  return {'model':model,
          'res':res}

In [None]:
# Save Start Checkpoint
if(isFirstRun):
  res = {'train_losses': [],
         'valid_losses': [],
         'y1_train_losses': [],
         'y1_valid_losses': [],
         'test_mse': [],
         'test_psnr': [],
         'test_y1_acc': [],
         'epoch_number': 0
        };
  save_model('ae', 0, netAE, res)

In [None]:
# Load Last Checkpoint:
ae_load = load_model('ae', lastRunEpochNumber, netAE, device)

train_losses = ae_load['res']['train_losses']
valid_losses = ae_load['res']['valid_losses']
y1_train_losses = ae_load['res']['y1_train_losses']
y1_valid_losses = ae_load['res']['y1_valid_losses']
test_mse = ae_load['res']['test_mse']
test_psnr = ae_load['res']['test_psnr']
test_y1_acc = ae_load['res']['test_y1_acc']
last_epoch = ae_load['res']['epoch_number']

In [None]:
# Function - training function
def fit(netAE, train_loader, optimizer, criterion):
    print('Training')
    netAE.train()

    train_loss = 0
    y1_train_loss = 0
    # For each batch in the dataloader
    prog_bar = tqdm(enumerate(train_loader), total=len(train_loader))
    for i, data in prog_bar:
        inputs, labels = data[0], data[1]
        inputs = inputs.to(device)
        main_target = labels[0][:, using_index]
        main_target = main_target.to(device)
        netAE.zero_grad()
        output, y1 = netAE(inputs)
        err = criterion(output, inputs)
        err_y1 = criterion_main_features(y1, main_target)
        err.backward(retain_graph=True)
        err_y1.backward(retain_graph=True)
        optimizer.step()
        train_loss += err.item()
        y1_train_loss += err_y1.item()
    train_loss = train_loss / len(train_loader)
    y1_train_loss = y1_train_loss / len(train_loader)
    return train_loss, y1_train_loss

In [None]:
# Function - validation function
def validate(netAE, valid_loader, optimizer, criterion):
    print('Validating')
    netAE.eval()

    valid_loss = 0
    y1_valid_loss = 0
    # For each batch in the dataloader
    prog_bar = tqdm(enumerate(valid_loader), total=len(valid_loader))
    with torch.no_grad():
        for i, data in prog_bar:
            inputs, labels = data[0], data[1]
            inputs = inputs.to(device)
            main_target = labels[0][:, using_index]
            main_target = main_target.to(device)
            output, y1 = netAE(inputs)
            err = criterion(output, inputs)
            err_y1 = criterion_main_features(y1, main_target)
            valid_loss += err.item()
            y1_valid_loss += err_y1.item()
    valid_loss = valid_loss / len(valid_loader)
    y1_valid_loss = y1_valid_loss / len(valid_loader)
    return valid_loss, y1_valid_loss

In [None]:
def _reduce(x: torch.Tensor, reduction: str = 'mean') -> torch.Tensor:
    r"""Reduce input in batch dimension if needed.
    Args:
        x: Tensor with shape (N, *).
        reduction: Specifies the reduction type:
            ``'none'`` | ``'mean'`` | ``'sum'``. Default: ``'mean'``
    """
    if reduction == 'none':
        return x
    elif reduction == 'mean':
        return x.mean(dim=0)
    elif reduction == 'sum':
        return x.sum(dim=0)
    else:
        raise ValueError("Unknown reduction. Expected one of {'none', 'mean', 'sum'}")

In [None]:
# Function PSNR:
from typing import Union
def psnr_calc(x: torch.Tensor, y: torch.Tensor, data_range: Union[int, float] = 1.0,
         reduction: str = 'mean', convert_to_greyscale: bool = False) -> torch.Tensor:
    r"""Compute Peak Signal-to-Noise Ratio for a batch of images.
    Supports both greyscale and color images with RGB channel order.
    Args:
        x: An input tensor. Shape :math:`(N, C, H, W)`.
        y: A target tensor. Shape :math:`(N, C, H, W)`.
        data_range: Maximum value range of images (usually 1.0 or 255).
        reduction: Specifies the reduction type:
            ``'none'`` | ``'mean'`` | ``'sum'``. Default:``'mean'``
        convert_to_greyscale: Convert RGB image to YIQ format and computes PSNR
            only on luminance channel if `True`. Compute on all 3 channels otherwise.
    Returns:
        PSNR Index of similarity between two images.
    References:
        https://en.wikipedia.org/wiki/Peak_signal-to-noise_ratio
    """
    #_validate_input([x, y], dim_range=(4, 5), data_range=(0, data_range))

    # Constant for numerical stability
    EPS = 1e-8

    x = x / float(data_range)
    y = y / float(data_range)

    if (x.size(1) == 3) and convert_to_greyscale:
        # Convert RGB image to YIQ and take luminance: Y = 0.299 R + 0.587 G + 0.114 B
        rgb_to_grey = torch.tensor([0.299, 0.587, 0.114]).view(1, -1, 1, 1).to(x)
        x = torch.sum(x * rgb_to_grey, dim=1, keepdim=True)
        y = torch.sum(y * rgb_to_grey, dim=1, keepdim=True)

    mse = torch.mean((x - y) ** 2, dim=[1, 2, 3])
    score: torch.Tensor = - 10 * torch.log10(mse + EPS)

    return _reduce(score, reduction)

In [None]:
# Calc Accuracy
def calcAccuracyTest(netAE, test_loader):
    print('Testing')
    netAE.to(device)
    print("Calculating Accuracy...")
    netAE.eval()
    mse_loss = 0
    psnr_loss = 0
    y1_accuracy = 0

    prog_bar = tqdm(enumerate(test_loader), total=len(test_loader))
    with torch.no_grad():
        for i, data in prog_bar:
            inputs, labels = data[0], data[1]
            inputs = inputs.to(device)
            main_target = labels[0][:, using_index]
            main_target = main_target.to(device)
            output, y1 = netAE(inputs)
            mse = criterion(output, inputs)
            psnr = psnr_calc(output, inputs)
            ps_y1 = torch.exp(y1)
            top_p_y1, top_class_y1 = ps_y1.topk(1, dim=1)
            equals_y1 = top_class_y1 == main_target.view(*top_class_y1.shape)
            acc_y1 = equals_y1.sum().item()
            mse_loss += mse.item()
            psnr_loss += psnr.item()
            y1_accuracy += (acc_y1 / len(equals_y1))
    mse_loss = mse_loss / len(test_loader)
    psnr_loss = psnr_loss / len(test_loader)
    y1_accuracy = y1_accuracy / len(test_loader)
    return mse_loss, psnr_loss, y1_accuracy

In [None]:
# Training Loop
netAE.to(device)
save_every_epoch = 1

start = time.time()
print("Starting Training Loop...")

for epoch in range(last_epoch+1, num_epochs+1):
    print(f"Epoch {epoch}/{num_epochs}: ")
    train_loss, y1_train_loss = fit(netAE, train_loader, optimizer, criterion)
    valid_loss, y1_valid_loss = validate(netAE, valid_loader, optimizer, criterion)
    mse_loss, psnr_loss, y1_accuracy = calcAccuracyTest(netAE, test_loader)

    train_losses.append(train_loss)
    valid_losses.append(valid_loss)
    y1_train_losses.append(y1_train_loss)
    y1_valid_losses.append(y1_valid_loss)
    test_mse.append(mse_loss)
    test_psnr.append(psnr_loss)
    test_y1_acc.append(y1_accuracy)

    res = {'train_losses': train_losses,
           'valid_losses': valid_losses,
           'y1_train_losses': y1_train_losses,
           'y1_valid_losses': y1_valid_losses,
           'test_mse': test_mse,
           'test_psnr': test_psnr,
           'test_y1_acc': test_y1_acc,
           'epoch_number': epoch
          };
    if epoch % save_every_epoch == 0:
      save_model('ae', epoch, netAE, res)

    print(f"Train Loss: {train_loss:.4f}")
    print(f"Valid Loss: {valid_loss:.4f}")
    print(f"Main Label Train Loss: {y1_train_loss:.4f}")
    print(f"Main Label Valid Loss: {y1_valid_loss:.4f}")
    print(f"Main Label Accuracy on Testset: {y1_accuracy:.4f}")

end = time.time()
print(f"Training time: {(end-start)/60:.3f} minutes")

print('TRAINING COMPLETE')

In [None]:
# Install LATEX
!sudo apt-get update
!sudo apt install cm-super dvipng texlive-latex-extra texlive-latex-recommended texlive texlive-fonts-recommended
import os
from matplotlib.pyplot import text
matplotlib.style.use('default')
from matplotlib import rc

rc('font', **{'family':'serif','serif':['Palatino']})
rc('text', usetex=True)

In [None]:
# e2e Train Loss and PSNR Plot
%matplotlib inline
%config InlineBackend.figure_format = 'svg'
print('Loss and PSNR plot...')
fig = plt.figure(figsize=(5,3))
matplotlib.rcParams.update({'font.size': 10})
fig.tight_layout()
ax=fig.add_subplot(111, label="1")
ax2=fig.add_subplot(111, label="2", frame_on=False)

lns1 = ax.plot(train_losses, color='green', label='Training Loss')
lns2 = ax.plot(test_mse, color='blue', label='Validation Loss')
ax.set_xlabel("Epoch")
ax.set_ylabel("MSE Loss")
ax.set_xlim([0,200])

lns3 = ax2.plot(test_psnr, color='black', label='PSNR')
ax2.set_ylabel("PSNR (dB)")
ax2.yaxis.tick_right()
ax2.yaxis.set_label_position('right') 
ax2.set_xlim([0,200])
ax2.tick_params(axis='x', which='both', bottom=False, top=False, labelbottom=False)

leg = lns1 + lns2 + lns3
labs = [l.get_label() for l in leg]
ax.legend(leg, labs, frameon=True, loc=5)

plt.grid(color = 'gray', linestyle = 'dotted', linewidth = 0.5)
plt.savefig(saving_path + "e2e_psnr_and_loss_plot.svg", bbox_inches = 'tight')
plt.savefig(saving_path + "e2e_psnr_and_loss_plot.png", bbox_inches = 'tight')
plt.savefig(saving_path + "e2e_psnr_and_loss_plot.eps", bbox_inches = 'tight', format='eps')
plt.show()