## Setup

In [3]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import PIL 
from PIL import Image

import numpy as np
from keras.utils import np_utils
from keras.utils import to_categorical
from collections import defaultdict
import random
import copy
from scipy.stats import entropy
from tqdm import tqdm

import tensorflow_datasets as tfds
from tensorflow.keras.applications.mobilenet import MobileNet
from tensorflow.keras.applications.efficientnet import EfficientNetB0
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import*
from sklearn.utils import shuffle

import matplotlib.pyplot as plt
from keras.datasets import cifar10
import pickle 
import glob
from sklearn.metrics import accuracy_score
import pandas as pd
import tensorflow_probability as tfp
import tensorflow.keras.backend as K
import os
import math

from pympler.asizeof import asizeof
from keras.utils.vis_utils import plot_model
# from cd_models import*
# from cd_datasets import*

import pdb
import subprocess as sp

1 Physical GPUs, 1 Logical GPUs


In [4]:
# %pdb on

Automatic pdb calling has been turned ON


In [5]:
tfp.__version__ #0.17.0

'0.17.0'

In [6]:
import warnings
warnings.filterwarnings("ignore")

In [7]:
#tf.config.run_functions_eagerly(True)
tf.config.list_physical_devices('GPU')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [8]:
BATCH_SIZE = 50

In [9]:
# Set the seed for numpy
np.random.seed(0)

# Set the global seed for TensorFlow
tf.random.set_seed(0)

# Define Dynamic Entropy Control

In [232]:

##higher temp will make student less certain/confident
class DEC(keras.Model):
    def __init__(self, student, teacher, minTemp = 1.2, maxTemp = 5.0):
        super(DEC, self).__init__() 
        self.student = student
        self.teacher = teacher
        self.oldStudent = teacher #a reference 
        self.minTemp = minTemp
        self.maxTemp = maxTemp


    def compile(self, optimizer, metrics, student_loss_fn, distillation_loss_fn, numClasses=10,  val_set_entropy = None , val_set_norm = None):
        super(DEC, self).compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.numClasses = numClasses
        self.maxEntropy = np.log2(self.numClasses)
        self.avgTemp = tf.Variable(0.0, trainable=False)
        self.totalScaledLogitsNorm = tf.Variable(0.0, trainable=False)
        self.avgScaledLogitsNorm = tf.Variable(0.0, trainable=False)
        self.totalTemp = tf.Variable(0.0, trainable=False)
        self.totalSamples = tf.Variable(0, trainable=False)
        self.val_set_entropy = val_set_entropy
        if val_set_norm is None:
            d = self.teacher.layers[-1].weights[0]
            nnn = tf.norm(d, axis=1)
            self.val_set_norm = nnn
        else:
            self.val_set_norm = val_set_norm

        self.debug = tf.Variable(0.0, trainable=False)

    @tf.function
    def batch_entropy(self, z, avg = False): 
        '''returns shannon entropy on a batch of predictions, z. 
        Expects logits. DOES compute softmax before calculating entropy.''' 
        # teacher_predictions = self.teacher.predict(z, batch_size = BATCH_SIZE)
        teacher_predictions_SM = tf.nn.softmax(z, axis = 1) 

        if avg == True:
            return np.mean(tf.keras.backend.categorical_crossentropy(teacher_predictions_SM, teacher_predictions_SM)/tf.math.log(2.0) )
        
        return tf.keras.backend.categorical_crossentropy(teacher_predictions_SM, teacher_predictions_SM)/tf.math.log(2.0) 
    
    @tf.function 
    def adaptive_T(self, z):
        '''compute certainty regularizer'''
        def sigmoid(x):
            return 1 / (1 + tf.exp(-x))

        class_supports_norm = self.val_set_norm
        logits_norm = tf.norm(z, axis=1)
        # assert(False)
        scaled_logits_norm = logits_norm / class_supports_norm

        # Update the total and average of scaled logits norm
        num_samp = tf.shape(z)[0]
        self.totalScaledLogitsNorm.assign_add(tf.reduce_sum(scaled_logits_norm))
        self.avgScaledLogitsNorm.assign(self.totalScaledLogitsNorm / tf.cast(self.totalSamples, tf.float32))

        # Calculate entropy and adjust for temperature range
        entropy_values = self.batch_entropy(z)
        if self.val_set_entropy is not None:
            #############################################################
            ss = (entropy_values - self.val_set_entropy) #
            scaled_entropy_values = sigmoid( ss/(self.maxEntropy**(1/2)) ) # Adjust using sigmoid function

        new_range_min, new_range_max = self.minTemp, self.maxTemp

        temperature = new_range_min + (scaled_entropy_values) * (new_range_max - new_range_min)

        temp_scaling_factor = (scaled_logits_norm)**(-1.0) 
        tau = temperature*temp_scaling_factor

        # Update average temperature
        self.totalTemp.assign_add(tf.reduce_sum(tau))
        self.totalSamples.assign_add(num_samp)
        self.avgTemp.assign(self.totalTemp / tf.cast(self.totalSamples, tf.float32))
        
        self.debug.assign(tf.reduce_mean(temp_scaling_factor))
        return temperature
    
    def train_step(self, data):
        # Unpack data
        x, y = data #y is used for metrics, but it not used for training

        # Forward pass of teacher
        teacher_predictions = self.teacher(x, training = False)
        t_vec = self.adaptive_T(teacher_predictions)
        t_vec = tf.expand_dims(t_vec, axis=-1) 

        # print(teacher_predictions.shape, t_vec.shape)
        # assert(False)

        with tf.GradientTape() as tape:
            # Forward pass of student
            student_predictions = self.student(x, training = True)
            student_predictionsSM = tf.nn.softmax(student_predictions, axis=1)
            
            # Compute losses
            student_loss = self.student_loss_fn(y, student_predictionsSM) #we test with unit temp during inference, student loss function, NOT USED
                                                                          #should NOT use logits

            # The magnitudes of the gradients produced by the soft targets scale
            # as 1/T^2, multiply them by T^2 when using both hard and soft targets.
            distillation_loss = (
                self.distillation_loss_fn(
                    tf.nn.softmax(student_predictions, axis=1), #"student" has temperature T = 1
                    tf.nn.softmax(teacher_predictions / t_vec, axis=1),
                )*tf.math.reduce_mean(tf.squeeze(t_vec))**2
               
            )  
            loss = distillation_loss # note that the student loss is never back propagated. 

        # Compute gradients
        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars)) #note that we DO NOT back propage w/respect to y_true. 

        # Update the metrics configured in `compile()`.
        self.compiled_metrics.update_state(y, student_predictions)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update(
            {"student_loss": student_loss, "distillation_loss": distillation_loss}
        )
        return results

    def test_step(self, data):
        # Unpack the data
        x, y = data

        # Compute predictions
        y_prediction = tf.nn.softmax(self.student(x, training=False), axis = 1) #self.student(x, training = False) #

        # Calculate the loss
        student_loss = self.student_loss_fn(y, y_prediction) # should have "from logit = false"

        # Update the metrics.
        self.compiled_metrics.update_state(y, y_prediction)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss during Testing": student_loss})
        return results



In [2]:
# Example usage:

teacher = MobileNet() # some place holder model
student = copy.deepcopy(student)
            
            opt = keras.optimizers.SGD(learning_rate=0.001)
            loss_ = tf.keras.losses.BinaryCrossentropy(from_logits = False)
dirtyX, dirtyY = get_domain_shift_data() # returns some domain shift. 

# freeze weights here
# print("Freezing some of the student") 
# z = len(student.layers[1].layers)
# for j in student.layers[1].layers[z - 50: z]: #smaller number means freeze fewer 
#     j.trainable = False
# student.layers[3].trainable = False
# student.compile(keras.optimizers.RMSprop(learning_rate=2e-5), 
# loss= tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True),
# metrics = ['accuracy'])

distiller = DEC(student=student, teacher = teacher, minTemp= 1.0, maxTemp=3.0)
distiller.compile(
    optimizer =  opt,
    metrics=['accuracy'],
    student_loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits = False), #<--- not used; this is a dummy loss funciton
    distillation_loss_fn = loss_,
    numClasses=10,
    val_set_entropy=H_batch(tpx := teacher.predict(ogX, batch_size = BATCH_SIZE)),
    val_set_norm= np.median(tf.norm(tpx, axis=1))
)


distiller.fit(dirtyX, dirtyY, batch_size= BATCH_SIZE, epochs = 1, verbose = False) #the 'y' labels are not used for adaptation and are there just for measuring performance

T_scale_estimate = np.array(distiller.avgTemp, dtype = np.float32) # get the Tau value after adaptation to use for temperature scaling

IndentationError: unexpected indent (1238337341.py, line 6)