Citations: https://arxiv.org/abs/2006.11239

In [1]:
#@title
#!pip install foolbox;
#!pip install torchattacks;
#!pip install labml_nn;
#!pip install datasets;
#!pip install torchmetrics;
#!pip install tqdm;
#!pip install git+https://github.com/fra31/auto-attack;
#!pip3 install torch==1.11.0+cu115 torchvision torchaudio -f https://download.pytorch.org/whl/torch_stable.html;
import os;
os.environ['CUDA_LAUNCH_BLOCKING'] = "1";
import torch;
torch.backends.cudnn.enabled = False;

In [2]:
device = "cuda"
from utils import *
from UNet import *
import torch
import gc
eps_model = load_eps_model()
eps_model.eval();
certified_denoiser = load_cert_denoiser()
certified_denoiser.eval();
acc = load_accuracy()
resnet, resnet_scaled = load_resnet()
x_train,y_train,x_test,y_test = load_data()
set_random_seed(32)

  warn(f"Failed to load image Python extension: {e}")


In [3]:
from torchmetrics import Accuracy
from tqdm.notebook import tnrange
from torch import nn
import torch
from typing import Tuple, Optional
from labml_nn.diffusion.ddpm.utils import gather
def check_t(self,t):
    if t is None:
        t = self.t
    else: 
        t = torch.tensor([t]).to(device)
    return t
def check_MonteCarlo_iter(self,mc_iter):
    if mc_iter is None:
        mc_iter = 5
    return mc_iter
def check_spread(self,spread):
    if spread is None:
        spread = 5
    return spread
class DDPM():
    def __init__(self,classifier,eps_model,t,num_classes = 10,batch_size = 2,grad = True):
        self.classifier = classifier
        self.num_classes = num_classes
        self.eps_model = eps_model 
        self.n_steps = 5000
        self.beta = torch.linspace(0.0001, 0.02, self.n_steps).to(device) #Noise scheduling, the beta t's 
        self.alpha = 1. - self.beta #Reparametarization trick to get at any timestep
        self.alpha_bar = torch.cumprod(self.alpha, dim=0)
        self.sigma2 = self.beta
        self.t = torch.tensor([t]).to(device)
        self.acc = Accuracy(num_classes = self.num_classes).to(device)
        self.batch_size = batch_size
        self.grad = grad
    def q_sample(self, x0: torch.Tensor, t: torch.Tensor, eps: Optional[torch.Tensor] = None): 
        if eps is None: #Create epsilon
            eps = torch.randn_like(x0)  
        mean, var = self.q_xt_x0(x0, t) #Get the mean and variance at this timestep
        imgs = mean + (var ** 0.5)* eps
        #Scale the images back to the same scale
        return  imgs,eps
    ##Forward process, now we sample from p which is the function that estimates xt-1 from xt
    def p_sample(self, xt: torch.Tensor, t: torch.Tensor):
        eps_theta = self.eps_model(xt, t) #Uses the model to predict the error
        alpha_bar = gather(self.alpha_bar, t)
        alpha = gather(self.alpha, t)
        eps_coef = (1 - alpha) / (1 - alpha_bar) ** .5
        mean = 1 / (alpha ** 0.5) * (xt - eps_coef * eps_theta)
        var = gather(self.sigma2, t)
        eps = torch.randn(xt.shape, device=xt.device)
        return mean + (var ** 0.5) * eps #Returns a new sample
    def denoise(self, xt: torch.Tensor, t: torch.Tensor):
        num_iter = int(t)
        for i in range(num_iter):
            xt = self.p_sample(xt, t)
            t -=1
        return xt
    def purify(self,x,t = None):
        t = check_t(self,t)
        if self.grad:
            x_noise = self.q_sample(x,t)[0]
            x_pure = self.denoise(x_noise,t)
        else:
            with torch.no_grad():
                x_noise = self.q_sample(x,t)[0]
                x_pure = self.denoise(x_noise,t)
        return torch.clamp(x_pure,0,1)
    def purify_batches(self,x,t = None):
        t = check_t(self,t)
        x_pure = torch.clone(x)
        for i in range(0,len(x),self.batch_size):
            x_pure[i:i+self.batch_size] = self.purify(x[i:i+self.batch_size],t)
        return x_pure
    def pytorch_model(self,x,t = None):
        t = check_t(self,t)
        #Make model such that foolbox can interact with it
        purify_batches = self.purify_batches
        class PurifyLayer(nn.Module):
            def __init__(self):
                super().__init__()
            def forward(self,x):
                return purify_batches(x,t)
        purify_layer = PurifyLayer()
        model = nn.Sequential(
        purify_layer,
        resnet_scaled)
        model.eval();
        if self.grad:
            return model(x)
        else:
            with torch.no_grad():
                return model(x)
    def mc_predict(self,x,mc_iter = None, t = None):
        t = check_t(self,t)
        mc_iter = check_MonteCarlo_iter(self,mc_iter)
        proba = torch.zeros((x.shape[0],self.num_classes)).to(device)
        for i in tnrange(mc_iter):
            proba += self.pytorch_model(x,t)
        return proba/mc_iter
    def spread_predict(self,x,mc_iter = None, t = None,spread_factor = None):
        t = check_t(self,t)
        mc_iter = check_MonteCarlo_iter(self,mc_iter)
        spread_factor = check_spread(self,spread_factor)
        proba = torch.zeros((x.shape[0],self.num_classes)).to(device)
        for i in tnrange(mc_iter):
            _t_ = (t - (mc_iter // 2)*spread_factor) + i*spread_factor
            print("Using t = "+ str(float(_t_)), end = "\r")
            proba += self.pytorch_model(x,_t_)
        return proba/mc_iter
    def eval_normal(self,x,y, t = None):
        proba = self.pytorch_model(x,t)
        return float(self.acc(proba,y))*100
    def eval_montecarlo(self,x,y,mc_iter = None, t = None):
        proba = self.mc_predict(x,mc_iter,t)
        return float(self.acc(proba,y))*100
    def eval_spread(self,x,y,mc_iter = None, t = None,spread_factor = None):
        proba = self.spread_predict(x,mc_iter,t,spread_factor)
        return float(self.acc(proba,y))*100

In [4]:
from warnings import catch_warnings,simplefilter
from sklearn.gaussian_process import GaussianProcessRegressor
def surrogate(model, x):
    # catch any warning generated when making a prediction
    with catch_warnings():
        # ignore generated warnings
        simplefilter("ignore")
    return model.predict(x, return_std=True)

def opt_acquisition(x, y, model,high,low):
    # random search, generate random samples
    xsamples = np.random.randint(low,high,(high-low))
    xsamples = xsamples.reshape(len(xsamples), 1)
    # calculate the acquisition function for each sample
    scores = acquisition(x, xsamples, model)
    # locate the index of the largest scores
    ix = np.argmax(scores)
    return xsamples[ix, 0]

from scipy.stats import norm
# probability of improvement acquisition function
def acquisition(X, Xsamples, model):
    # calculate the best surrogate score found so far
    yhat, _ = surrogate(model, X)
    best = max(yhat)
    # calculate mean and stdev via surrogate function
    mu, std = surrogate(model, Xsamples)
    mu = mu[:, 0]
    # calculate the probability of improvement
    probs = norm.cdf((mu - best) / (std+1E-9))
    return probs

def bayes_opt(x,y,high,low):
    x_ = np.array(x)
    y_ = np.array(y)
    #x_ = x
    #y_ = y
    model = GaussianProcessRegressor()
    model.fit(x_.reshape(-1, 1),y_.reshape(-1, 1))
    return torch.tensor([opt_acquisition(x_.reshape(-1, 1),y_.reshape(-1, 1),model,high,low)]).numpy(), model
    

In [5]:
from labml_nn.diffusion.ddpm.utils import gather
from tqdm.notebook import tnrange, tqdm
from scipy.stats import norm
import statsmodels
from statsmodels.stats.proportion import proportion_confint
from sklearn.gaussian_process import GaussianProcessRegressor

def check_class_argument(self,arg,attrib):
    if arg is None:
        return getattr(self, attrib)
    else:
        return arg
class CertifiedRobustnessModel():
    def __init__(self,classifier,denoise_model,t,num_classes = 10,batch_size = 2,num_iter = 10, grad = True):
        self.classifier = classifier
        self.denoise_model = denoise_model
        self.t = torch.tensor([t]).to(device).long()
        self.num_classes = num_classes
        self.batch_size = batch_size
        self.num_iter = num_iter
        self.grad = grad
        self.acc = Accuracy(num_classes = self.num_classes).to(device)
        self.beta = torch.linspace(0.0001, 0.02, 5000).to(device) #Noise scheduling, the beta t's 
        self.alpha = 1. - self.beta #Reparametarization trick to get at any timestep
        self.alpha_bar = torch.cumprod(self.alpha, dim=0)
    def forward_sample(self,x,t = None):
        t = check_t(self,t)
        #torch.manual_seed(torch.seed())
        sigma2 = (1 - self.alpha_bar[t])/self.alpha_bar[t]
        delta = torch.randn(x.shape).to(device) * sigma2
        return self.alpha_bar[t]**0.5 * x + delta
    def backward_sample(self,x,t = None):
        t = check_t(self,t)
        return self.denoise_model(x,t)
    def purify_batch(self,x,t = None):
        t = check_t(self,t)
        if self.grad:
            x_pure = torch.clone(x)
            for i in range(0,len(x),self.batch_size):
                x_noise = self.forward_sample(x[i:i+self.batch_size],t)
                x_pure[i:i+self.batch_size] = self.backward_sample(x_noise,t)
        else:
            with torch.no_grad():
                x_pure = torch.clone(x)
                for i in range(0,len(x),self.batch_size):
                    x_noise = self.forward_sample(x[i:i+self.batch_size],t)
                    x_pure[i:i+self.batch_size] = self.backward_sample(x_noise,t)
        return x_pure
    def pred_batch(self,x,batch_size):
        logits = torch.empty(len(x),self.num_classes)
        if self.grad:
            for i in range(0,len(x),batch_size):
                logits[i:i+batch_size] = self.classifier(x[i:i+batch_size])
        else:
            with torch.no_grad():
                for i in range(0,len(x),batch_size):
                    logits[i:i+batch_size] = self.classifier(x[i:i+batch_size])  
        return logits
    #Num iter is supposed to be 100_000
    def algo_1(self,x,num_iter = None,t = None):
        t = check_t(self,t)
        num_iter = check_class_argument(self,num_iter,'num_iter')
        preds = torch.zeros(size = (len(x),self.num_classes)).to(device)
        for i in tnrange(num_iter):
            x_pure = self.purify_batch(x,t)
            logits = self.pred_batch(x_pure,100)
            classes_predicted = torch.argmax(logits,dim = 1)
            for i,class_ in enumerate(classes_predicted):
                preds[i,class_] += 1
        return (preds.float()/num_iter)
    def spread(self,x,num_iter = None,t = None, mc_iter = None, spread_factor = None):
        t = check_t(self,t)
        mc_iter = check_MonteCarlo_iter(self,mc_iter)
        spread_factor = check_spread(self,spread_factor)
        proba = torch.zeros((x.shape[0],self.num_classes)).to(device)
        for i in tnrange(mc_iter):
            _t_ = (t - (mc_iter // 2)*spread_factor) + i*spread_factor
            print("Using t = "+ str(float(_t_)), end = "\r")
            proba += self.algo_1(x,t = _t_)
        return proba/mc_iter
    def eval_(self,x,y,t = None,num_iter = None):
        t = check_t(self,t)
        num_iter = check_class_argument(self,num_iter,'num_iter')
        proba = self.algo_1(x,t = t,num_iter = num_iter)
        return float(self.acc(proba,y))*100
    def certify(self,x, n0 = 100, n = 1_000_000, alpha = .05, t = None):
        t = check_t(self,t)
        counts0 = self.algo_1(x,num_iter = n0, t = t) * n0
        ca = counts0.topk(1)[1].flatten()
        counts = self.algo_1(x,num_iter = n, t = t) * n
        top_ix = torch.empty(len(x))
        for i in range(len(top_ix)):
            top_ix[i] = counts[i,ca[i]]
        pa = proportion_confint(top_ix.cpu(),n,alpha = 2*(1-alpha),method = "beta")[0]
        sigma = ((1 - self.alpha_bar[t])/self.alpha_bar[t])**.5
        radius = sigma.cpu() * norm.ppf(pa)
        return ca, radius
    def gaussian_process(self,x,y,high,low,initial_points = 5,batch_size = 100, mc_iter = 100):
        ts = list()
        scores = list()
        preds = torch.zeros(size = (len(x),self.num_classes)).to(device)
        #Sample the random initial points
        for j in range(initial_points):
            curr_t = torch.randint(low = low, high = high,size = [1]).numpy()[0]
            ts.append(curr_t)
            x_pure = self.purify_batch(x,t = curr_t)
            logits = self.pred_batch(x_pure,batch_size)
            scores.append(self.acc(logits.to(device),y.to(device)))
            classes_predicted = torch.argmax(logits,dim = 1)
            for z,class_ in enumerate(classes_predicted):
                preds[z,class_] += 1
        #Now use the Gaussian process
        for j in range(initial_points,mc_iter):
            curr_t,model = bayes_opt(ts,scores,high,low)
            curr_t = curr_t[0]
            ts.append(curr_t)
            print("Currently trying t = "+ str(curr_t), end = "\r")
            x_pure = self.purify_batch(x,t = curr_t)
            logits = self.pred_batch(x_pure,batch_size)
            scores.append(self.acc(logits.to(device),y.to(device)) )         
            classes_predicted = torch.argmax(logits,dim = 1)
            for z,class_ in enumerate(classes_predicted):
                preds[z,class_] += 1
        return preds,np.mean(ts)
    def gauss_batch(self,x,y,high,low,initial_points = 10,batch_size = 1, mc_iter = 100):
        ts = torch.empty(len(x)).to(device)
        preds = torch.zeros(size = (len(x),self.num_classes)).to(device)
        for i in tnrange(0,len(x),batch_size):
            preds[i:i+batch_size], ts[i:i+batch_size] = self.gaussian_process(x[i:i+batch_size],y[i:i+batch_size],
                                                          high,low,initial_points = initial_points,
                                                          batch_size = 100, mc_iter = mc_iter)
        return preds, ts
    def random_t(self,x,high,low,mc_iter = 100):
        ts = torch.empty(mc_iter).to(device)
        preds = torch.zeros(size = (len(x),self.num_classes)).to(device)
        for j in tnrange(mc_iter):
            curr_t = torch.randint(low = low, high = high,size = [1]).numpy()[0]
            ts[j] = curr_t
            x_pure = self.purify_batch(x,t = curr_t)
            logits = self.pred_batch(x_pure,100)
            classes_predicted = torch.argmax(logits,dim = 1)
            for z,class_ in enumerate(classes_predicted):
                preds[z,class_] += 1
        t_s = torch.ones(len(x)).to(device)* ts.float().mean()
        return preds, t_s
    def random_t_batch(self,x,high,low,mc_iter = 100,batch_size = 100):
        preds = torch.zeros(size = (len(x),self.num_classes)).to(device)
        ts = torch.zeros(len(x))
        for i in range(0,len(x),batch_size):
            preds[i:i+batch_size],ts[i:i+batch_size] = self.random_t(x[i:i+batch_size],
                                                          high,low,mc_iter = mc_iter)
        return preds,ts
    def certify_gauss(self,x,y,n0 = 100, n = 1_000_000, alpha = .05):
        counts0,_ = self.gauss_batch(x,y,400,10,initial_points = int(n0/10),mc_iter = n0,batch_size = 1)
        ca = counts0.topk(1)[1].flatten()
        counts,ts = self.gauss_batch(x,y,400,10,initial_points = int(n/10),mc_iter = n,batch_size = 1)
        t1 = torch.round(ts.mean()).long().to(device)
        print(t1)
        top_ix = torch.empty(len(x))
        for i in range(len(top_ix)):
            top_ix[i] = counts[i,ca[i]]
        pa = proportion_confint(top_ix.cpu(),n,alpha = 2*(1-alpha),method = "beta")[0]
        sigma = ((1 - self.alpha_bar[t1])/self.alpha_bar[t1])**.5
        radius = sigma.cpu() * norm.ppf(pa)
        return ca, radius
    def certify_rand(self,x,n0 = 100, n = 1_000_000, alpha = .05):
        counts0,_ = self.random_t_batch(x,400,10,mc_iter = n0)
        ca = counts0.topk(1)[1].flatten()
        counts,ts = self.random_t_batch(x,400,10,mc_iter = n)
        t1 = torch.round(ts.mean()).long().to(device)
        print(t1)
        top_ix = torch.empty(len(x))
        for i in range(len(top_ix)):
            top_ix[i] = counts[i,ca[i]]
        pa = proportion_confint(top_ix.cpu(),n,alpha = 2*(1-alpha),method = "beta")[0]
        sigma = ((1 - self.alpha_bar[t1])/self.alpha_bar[t1])**.5
        radius = sigma.cpu() * norm.ppf(pa)
        return ca, radius
    def eval_spread(self,x,y,t = None,num_iter = None, mc_iter = None, spread_factor = None):
        t = check_t(self,t)
        mc_iter = check_MonteCarlo_iter(self,mc_iter)
        spread_factor = check_spread(self,spread_factor)
        proba = self.spread(x,t = t, num_iter = num_iter, mc_iter = mc_iter,spread_factor = spread_factor)
        return float(self.acc(proba,y))*100
    def exhaustive_t_search(self,x,y,high,low,batch_size):
        results = dict()
        ts = np.arange(low,high,20)
        for i,t in tqdm(enumerate(ts),total = len(ts)):
            results[t] = self.eval_(x[0:batch_size],y[0:batch_size], t = torch.tensor([t]).to(device))
        return results

In [6]:
#Load all the models to test
ddpm = DDPM(resnet_scaled,eps_model,100,grad = False)
crm = CertifiedRobustnessModel(resnet_scaled,certified_denoiser,100,grad = False)
x_adv = load_x_adv('PGDL2 vs resnet','pgdl2')

In [74]:
logits = crm.gauss_batch(x_adv[0:1000],y_test[0:1000],400,10,batch_size = 1)

  0%|          | 0/1000 [00:00<?, ?it/s]

Currently trying t = 994

AttributeError: 'tuple' object has no attribute 'numel'

In [79]:
crm.acc(logits[0],y_test[0:1000])

tensor(0.8490, device='cuda:0')

In [80]:
logits,ts = crm.gauss_batch(x_adv[0:1000],y_test[0:1000],400,10,batch_size = 10)
crm.acc(logits,y_test[0:1000])

  0%|          | 0/100 [00:00<?, ?it/s]

Currently trying t = 130

tensor(0.7890, device='cuda:0')

In [107]:
logits = crm.gauss_batch(x_adv[0:1000],y_test[0:1000],400,10)
crm.acc(logits,y_test[0:1000])

  0%|          | 0/90 [00:00<?, ?it/s]

Currently trying t = 157

  0%|          | 0/90 [00:00<?, ?it/s]

Currently trying t = 103

  0%|          | 0/90 [00:00<?, ?it/s]

Currently trying t = 148

  0%|          | 0/90 [00:00<?, ?it/s]

Currently trying t = 130

  0%|          | 0/90 [00:00<?, ?it/s]

Currently trying t = 121

  0%|          | 0/90 [00:00<?, ?it/s]

Currently trying t = 526

  0%|          | 0/90 [00:00<?, ?it/s]

Currently trying t = 126

  0%|          | 0/90 [00:00<?, ?it/s]

Currently trying t = 153

  0%|          | 0/90 [00:00<?, ?it/s]

Currently trying t = 406

  0%|          | 0/90 [00:00<?, ?it/s]

Currently trying t = 260

tensor(0.7710, device='cuda:0')

-Compare fixed t vs randomized sampling versus bayesian
-Prepare results on the radius
-

In [19]:
logits = crm.algo_1(x_adv[0:1000],num_iter = 100, t = 120)
crm.acc(logits,y_test[0:1000])

  0%|          | 0/100 [00:00<?, ?it/s]

tensor(0.7690, device='cuda:0')

In [7]:
preds, radii = crm.certify(x_adv[0:100], n0 = 100, n = 3_000, alpha = .05, t = 120)

  0%|          | 0/100 [00:00<?, ?it/s]

  0%|          | 0/3000 [00:00<?, ?it/s]

In [8]:
print(crm.acc(preds,y_test[0:100]))
print(torch.mean(radii))

tensor(0.7700, device='cuda:0')
tensor(0.5327, dtype=torch.float64)


In [7]:
preds2, radii2 = crm.certify_rand(x_adv[0:100], n0 = 100, n = 3_000, alpha = .05)

  0%|          | 0/100 [00:00<?, ?it/s]

  0%|          | 0/3000 [00:00<?, ?it/s]

tensor(204, device='cuda:0')


In [9]:
print(crm.acc(preds2,y_test[0:100]))
print(torch.mean(radii2))

tensor(0.7600, device='cuda:0')
tensor(0.2181, dtype=torch.float64)
