In [58]:
from torch import nn
import torch
import numpy as np
import random 

class LinearNet(nn.Module):
    def __init__(self, hidden_size=2):
        super().__init__()
        self.last_layer = nn.Linear(2, 2)
        self.rll = nn.ReLU()
        self.classifier = nn.Linear(2, 2)
        self.softmax = nn.Softmax(dim=0)
        
    def forward(self, x):
        x = self.last_layer(x)
        x = self.rll(x)
        x = self.classifier(x)
        x = self.softmax(x)
        return x    

In [15]:
input =torch.tensor([0.0,1.0])
label = torch.tensor(1)

torch.manual_seed(0)
np.random.seed(0)
random.seed(0)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(0)
model = LinearNet()
for n,m in model.named_modules():
    m.register_forward_hook(lambda m,i,o: print(o))

model.classifier.register_full_backward_pre_hook(lambda m,o: print(m,o))
def break_graph(module, grad_output):
    print(grad_output)
model.classifier.register_full_backward_pre_hook(break_graph,prepend=True)
model.classifier.register_full_backward_hook(lambda m,i,o: print(m,i,o))

model.last_layer.register_full_backward_hook(lambda m,i,o: print(m,i,o))

criterion = nn.CrossEntropyLoss()


logits = model(input)

# print(logits)
loss = criterion(logits, label)
loss.backward(retain_graph=True)
# print(loss)


for n,p in model.named_parameters():
    print(n,p.grad)
    # print(n,p.grad_fn)
    # print(n,p.grad_fn.next_functions)
    # print(n,p.grad_fn.next_functions[0][0].next_functions)
    # print(n,p.grad_fn.next_functions[0][0].next_functions[0][0].next_functions)
    # print(n,p.grad_fn.next_functions[0][0].next_functions[0][0].next_functions[0][0].next_functions)
    # print(n,p.grad_fn.next_functions[0][0].next_functions[0][0].next_functions[0][0].next_functions[0][0].next_functions

tensor([ 0.1070, -0.3308], grad_fn=<ViewBackward0>)
tensor([0.1070, 0.0000], grad_fn=<ReluBackward0>)
tensor([-0.2152, -0.1457], grad_fn=<ViewBackward0>)
tensor([-0.2152, -0.1457], grad_fn=<BackwardHookFunctionBackward>)
(tensor([ 0.4826, -0.4826]),)
Linear(in_features=2, out_features=2, bias=True) (tensor([ 0.4826, -0.4826]),)
Linear(in_features=2, out_features=2, bias=True) (tensor([0.0235, 0.1803]),) (tensor([ 0.4826, -0.4826]),)
Linear(in_features=2, out_features=2, bias=True) (None,) (tensor([0.0235, 0.0000]),)
last_layer.weight tensor([[0.0000, 0.0235],
        [0.0000, 0.0000]])
last_layer.bias tensor([0.0235, 0.0000])
classifier.weight tensor([[ 0.0516,  0.0000],
        [-0.0516, -0.0000]])
classifier.bias tensor([ 0.4826, -0.4826])


In [16]:
model.zero_grad()
relevant_layers = [np for np in model.named_parameters()][1:]
print("Relevant Layers")
print(relevant_layers)
grad_top = torch.autograd.grad(loss, [np[1] for np in relevant_layers], retain_graph=True)
print("Grad Top")
print(grad_top)
print("Gradients")
for n,p in model.named_parameters():
    print(n,p.grad)


Relevant Layers
[('last_layer.bias', Parameter containing:
tensor([-0.2723,  0.1896], requires_grad=True)), ('classifier.weight', Parameter containing:
tensor([[-0.0140,  0.5607],
        [-0.0628,  0.1871]], requires_grad=True)), ('classifier.bias', Parameter containing:
tensor([-0.2137, -0.1390], requires_grad=True))]
(tensor([ 0.4826, -0.4826]),)
Linear(in_features=2, out_features=2, bias=True) (tensor([ 0.4826, -0.4826]),)
Linear(in_features=2, out_features=2, bias=True) (tensor([0.0235, 0.1803]),) (tensor([ 0.4826, -0.4826]),)
Linear(in_features=2, out_features=2, bias=True) (None,) (tensor([0.0235, 0.0000]),)
Grad Top
(tensor([0.0235, 0.0000]), tensor([[ 0.0516,  0.0000],
        [-0.0516, -0.0000]]), tensor([ 0.4826, -0.4826]))
Gradients
last_layer.weight None
last_layer.bias None
classifier.weight None
classifier.bias None


# Backward pass from gradient

In [17]:
import torch
import torch.nn as nn

# Create a 2-dimensional tensor.
x = torch.randn(1, 2, requires_grad=True)

# Define a linear layer.
linear = nn.Linear(2, 2)

# Apply the linear layer to x to get y.
y = linear(x)

# Apply softmax to y to get z.
softmax = nn.Softmax(dim=1)
z = softmax(y)

# Manually calculate dz/dy.
dz_dy = torch.tensor([[1.0, 0.0]])

# Call backward on y, passing in the manually calculated gradient.
y.backward(dz_dy)

# Print dx/dy.
print(x.grad)

tensor([[0.2795, 0.4243]])


# Dynamically starting the DCG

In [47]:
input =torch.tensor([0.0,1.0])
label = torch.tensor(1)

torch.manual_seed(0)
np.random.seed(0)
random.seed(0)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(0)
model = LinearNet()

for p in model.parameters():
    p.requires_grad = False

for p in model.classifier.parameters():
    p.requires_grad = True

def start_dcg(module, input):
    for tensor in input:
        tensor.requires_grad = True
    return input

def debug_forward(module, input):
    print(input)

def debug_backward(module, grad_input, grad_output):
    print(grad_input)

model.classifier.register_forward_pre_hook(start_dcg)
model.classifier.register_forward_pre_hook(debug_forward)
model.classifier.register_full_backward_hook(debug_backward)


criterion = nn.CrossEntropyLoss()
logits = model(input)

loss = criterion(logits, label)
loss.backward()

for n,p in model.named_parameters():
    print(n,p.grad)

(tensor([0.1070, 0.0000], requires_grad=True),)
(tensor([0.0120, 0.0917]),)
last_layer.weight None
last_layer.bias None
classifier.weight tensor([[ 0.0262,  0.0000],
        [-0.0262, -0.0000]])
classifier.bias tensor([ 0.2454, -0.2454])


# Starting the gradient calculation in the middle of the graph

In [64]:
input =torch.tensor([0.0,1.0])
label = torch.tensor(1)

torch.manual_seed(0)
np.random.seed(0)
random.seed(0)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(0)
model = LinearNet()

for p in model.classifier.parameters():
    p.requires_grad = False
class Catch_Hook():
    def __init__(self, module):
        self.hook = module.register_forward_pre_hook(self.hook_fn)
        self.bhook = module.register_full_backward_hook(self.bhook_fn)

    def hook_fn(self, module, input):
        self.caught_tensor = input

    def bhook_fn(self, module, grad_input, grad_output):
        print('Input grad')
        print(grad_input)

    def close(self):
        self.hook.remove()


hook = Catch_Hook(model.classifier)

criterion = nn.CrossEntropyLoss()
logits = model(input)
print(hook.caught_tensor)
loss = criterion(logits, label)

starting_grad = torch.Tensor([0.0120, 0.0917])
# hook.caught_tensor[0].backward(starting_grad)
loss.backward()

for n,p in model.named_parameters():
    print(n,p.grad)

(tensor([0.1070, 0.0000], grad_fn=<ReluBackward0>),)
Input grad
(tensor([0.0120, 0.0917]),)
last_layer.weight tensor([[0.0000, 0.0120],
        [0.0000, 0.0000]])
last_layer.bias tensor([0.0120, 0.0000])
classifier.weight None
classifier.bias None


In [78]:
for np in model.named_parameters():
    print(np)

print('+'*40)
set_status = True
for m in reversed(model.modules()):
    if model.classifier == m:
        set_status = not set_status
    for p in m.parameters():
        p.requires_grad = set_status

for np in model.named_parameters():
    print(np)

('last_layer.weight', Parameter containing:
tensor([[-0.0053,  0.3793],
        [-0.5820, -0.5204]], requires_grad=True))
('last_layer.bias', Parameter containing:
tensor([-0.2723,  0.1896], requires_grad=True))
('classifier.weight', Parameter containing:
tensor([[-0.0140,  0.5607],
        [-0.0628,  0.1871]]))
('classifier.bias', Parameter containing:
tensor([-0.2137, -0.1390]))
++++++++++++++++++++++++++++++++++++++++


TypeError: 'generator' object is not reversible

Also ich brauche:

1. ich wähle eine Layer
2. 

In [None]:
class Fumbrella():
    def __init__(
            self,
            module,
            all_modules,
            batch_size,
            position : str = 'input',
            stage = 1
            ):
        # position: 'input', 'output' 
        self.position = position
        self.all_modules = all_modules
        self.module = module
        if position == 'input':
            self.fhook = module.register_forward_pre_hook(self.forward_pre_hook_fn)
            self.bhook = module.register_full_backward_hook(self.backward_hook_fn)
        elif position == 'output':
            self.fhook = module.register_forward_hook(self.forward_hook_fn)
            self.bhook = module.register_full_backward_pre_hook(self.backward_pre_hook_fn)

        self.stage1_grad = None
        self.set_stage(stage)

        # metrics
        self.vector_norms = []
        self.rescaled_diffs = []
        self.avg_diff_per_class = []
        self.avg_diff_all_classes = []
        self.batch_size = batch_size

    def forward_pre_hook_fn(self, module, input):
        # stage 1
        # need to activate the gradient calculation
        if self.stage == 1:
            for tensor in input:
                tensor.requires_grad = True
            return input
        
    def forward_hook_fn(self, module, input, output):
        # stage 1
        # need to activate the gradient calculation
        if self.stage == 1:
            for tensor in output:
                tensor.requires_grad = True
            return output

    def backward_hook_fn(self, module, grad_input, grad_output):
        # stage 1
        if self.stage == 1:
            self.stage1_grad = grad_input
        # stage 2
        if self.stage == 2:
            self.add_metrics(self.stage1_grad, grad_input)
            return self.stage1_grad
            
        
    def backward_pre_hook_fn(self, module, grad_input, grad_output):
        # stage 1
        if self.stage == 1:
            self.stage1_grad = grad_output
        # stage 2
        if self.stage == 2:
            self.add_metrics(self.stage1_grad, grad_output)
            return self.stage1_grad

    def set_stage(self, stage):
        self.stage = stage
        if self.position == 'input':
            direction = 'ascending'
        elif self.position == 'output':
            direction = 'descending'
        if stage == 1:
            state = False
        if stage == 2:
            state = True
        self.set_requires_grad(state, direction)

    def set_requires_grad(self, state : bool,direction : str):
        # direction: 'ascending', 'descending'
        module_iterator = self.all_modules
        if direction == 'descending':
            module_iterator = reversed(module_iterator)
        for m in module_iterator:
            if self.module == m:
                state = not state
            for p in m.parameters():
                p.requires_grad = state

    def add_metrics(self,stage1_grad, stage2_grad):
        grad_diff = stage2_grad[0] - stage1_grad[0]
        grad_diff_rescaled = grad_diff *self.batch_size
        self.rescaled_diffs.append(grad_diff_rescaled)
        self.vector_norms.append(torch.linalg.vector_norm(grad_diff_rescaled,dim=1))
        self.avg_diff_per_class.append(grad_diff_rescaled.abs().mean(dim=0))
        self.avg_diff_all_classes.append(self.avg_diff_per_class[-1].mean())

        
    def compute_diff_metrics(self,accelerator):
        diff_metrics = {}
        vector_norms = accelerator.gather_for_metrics(self.vector_norms)
        if isinstance(vector_norms, list):
            vector_norms = torch.cat(vector_norms)
        avg_grad_diffs_per_class = accelerator.gather_for_metrics(self.avg_diff_per_class)
        if isinstance(avg_grad_diffs_per_class, list):
            avg_grad_diffs_per_class = torch.stack(avg_grad_diffs_per_class).mean(dim=0)
        avg_grad_diffs_all_classes = accelerator.gather_for_metrics(self.avg_diff_all_classes)
        if isinstance(avg_grad_diffs_all_classes, list):
            avg_grad_diffs_all_classes = torch.stack(avg_grad_diffs_all_classes).mean()
        self.clear_metrics()
        diff_metrics = {
            "avg_grad_diff_all_classes" : avg_grad_diffs_all_classes,
            "avg_grad_diff_per_class" : avg_grad_diffs_per_class,
            "vector_norms" : vector_norms
        }
        return diff_metrics
    
    def clear_metrics(self):
        self.vector_norms = []
        self.rescaled_diffs = []
        self.avg_diff_per_class = []
        self.avg_diff_all_classes = []

    def close(self):
        self.fhook.remove()
        self.bhook.remove()