# Prophecy paper example

### 1. Pytorch model

In [1]:
import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

In [2]:
# device = (
#   "cuda"
#   if torch.cuda.is_available()
#   else "mps"
#   if torch.backends.mps.is_available()
#   else "cpu"
# )
# print(f"Using {device} device")

In [3]:
class ProphecyExampleNetwork(nn.Module):
  def __init__(self):
    super().__init__()
    self.flatten = nn.Flatten()    
    self.linear_relu_stack = nn.Sequential(
      nn.Linear(2, 2, bias=False), # First hidden layer, with relu activation
      nn.ReLU(),
      nn.Linear(2, 2, bias=False), # Second hidden layer, with relu activation 
      nn.ReLU(),
    )
    self.final_output = nn.Linear(2, 2, bias=False)

  def forward(self, x):
    x = self.flatten(x)
    relu_stack_outputs = self.linear_relu_stack(x)
    logits = self.final_output(relu_stack_outputs)
    return logits

In [4]:
model = ProphecyExampleNetwork()
print(model)

ProphecyExampleNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=2, out_features=2, bias=False)
    (1): ReLU()
    (2): Linear(in_features=2, out_features=2, bias=False)
    (3): ReLU()
  )
  (final_output): Linear(in_features=2, out_features=2, bias=False)
)


In [5]:
# set weight of each layer to be the same as in the paper
with torch.no_grad():
  model.linear_relu_stack[0].weight = nn.Parameter(torch.tensor([[1.0, -1.0], [1.0, 1.0]], dtype=torch.float))
  model.linear_relu_stack[2].weight = nn.Parameter(torch.tensor([[0.5, -0.2], [-0.5, 0.1]], dtype=torch.float))
  model.final_output.weight = nn.Parameter(torch.tensor([[1.0, -1.0], [-1.0, 1.0]], dtype=torch.float))

In [6]:
# test example input provided in paper background section
input_data = [[1,-1]]
X = torch.tensor(input_data, dtype=torch.float)
logits = model(X)
print(logits)
prediction_prob = nn.Softmax(dim=1)(logits)
y_pred = prediction_prob.argmax(1)
print(f"Predicted class: {y_pred}")

# test example input [1, -3], which will yield [2, -2]
input_data = [[1,-3]]
X = torch.tensor(input_data, dtype=torch.float)
logits = model(X)
print(logits)
prediction_prob = nn.Softmax(dim=1)(logits)
y_pred = prediction_prob.argmax(1)
print(f"Predicted class: {y_pred}")

tensor([[ 1., -1.]], grad_fn=<MmBackward0>)
Predicted class: tensor([0])
tensor([[ 2., -2.]], grad_fn=<MmBackward0>)
Predicted class: tensor([0])


### 2. Forward hook to get activation signatures in hidden layers

In [7]:
activation = {}
def get_relu_activation(name):
  def hook(model, inputs, outputs):
    activation_signature = ["ON" if val > 0 else "OFF" for val in outputs[0]]
    activation[name] = activation_signature
  return hook

# keep track of handles so that we can remove hooks later 
# (by calling handle.remove() for each handle in handles)
relu_handles = []
for idx, layer in enumerate(model.linear_relu_stack):
  if isinstance(layer, torch.nn.ReLU):
    handle = layer.register_forward_hook(get_relu_activation(idx))
    relu_handles.append(handle)

In [8]:
layer_outputs = {}
def get_layer_outputs(name):
  def hook(model, inputs, outputs):
    layer_outputs[name] = outputs.detach()
  return hook

layer_output_handles = []
for idx, layer in enumerate(model.linear_relu_stack):
  handle = layer.register_forward_hook(get_layer_outputs(idx))
  layer_output_handles.append(handle)

handle = model.final_output.register_forward_hook(get_layer_outputs("final_output"))
layer_output_handles.append(handle)

In [9]:
# forward pass -- getting the outputs
# input_data = [[1,-1]]
input_data = [[1, -1.0]]
X = torch.tensor(input_data, dtype=torch.float)
logits = model(X)
prediction_prob = nn.Softmax(dim=1)(logits)
y_pred = prediction_prob.argmax(1)
print(f"Predicted class: {y_pred}")
print(activation)
print(layer_outputs)

Predicted class: tensor([0])
{1: ['ON', 'OFF'], 3: ['ON', 'OFF']}
{0: tensor([[2., 0.]]), 1: tensor([[2., 0.]]), 2: tensor([[ 1., -1.]]), 3: tensor([[1., 0.]]), 'final_output': tensor([[ 1., -1.]])}


### 3. Use MarabouCore as Decision Procedure (DP)

In [10]:
import sys
import os
import numpy as np
sys.path.append('/Users/khoanguyen-cp/gmu/Marabou')

from maraboupy import Marabou
from maraboupy import MarabouCore
from maraboupy.Marabou import createOptions

Instructions for updating:
non-resource variables are not supported in the long term


In [11]:
activation

{1: ['ON', 'OFF'], 3: ['ON', 'OFF']}

In [12]:
layer_outputs

{0: tensor([[2., 0.]]),
 1: tensor([[2., 0.]]),
 2: tensor([[ 1., -1.]]),
 3: tensor([[1., 0.]]),
 'final_output': tensor([[ 1., -1.]])}

In [13]:
model.linear_relu_stack[0].weight

Parameter containing:
tensor([[ 1., -1.],
        [ 1.,  1.]], requires_grad=True)

In [14]:
def set_boundary_for_relu_vars(relu_vars, layer_activations, inputQuery):
  for idx, var in enumerate(relu_vars):
    if layer_activations[idx] == "ON": # var > 0
      inputQuery.setLowerBound(var, 1e-6)
      inputQuery.setUpperBound(var, 100)
      
    elif layer_activations[idx] == "OFF":
      inputQuery.setLowerBound(var, 0)
      inputQuery.setUpperBound(var, 0)
      
  return inputQuery
      
  
def set_boundary_for_linear_vars(linear_vars, inputQuery):
  for var in linear_vars:
    inputQuery.setLowerBound(var, -100)
    inputQuery.setUpperBound(var, 100)
  return inputQuery

In [15]:
def set_linear_constraints(layer_vars, layer, inputQuery):
  # in dense layers, each neuron is connected to every neuron in the preceding layer
  # we can check the current layer's in_features to see the size of the preceding layer
  prev_layer_vars = list(range(layer.out_features - layer.in_features, layer.in_features))
  
  # Ex: x2 = w0*x0 + w1*x1 + b
  # <=> w0*x0 + w1*x1 - x2 = -b
  for idx, var in enumerate(layer_vars):
    coefficients = layer.weight[idx]
    equation = MarabouCore.Equation()
    equation.addAddend(-1, var)
    for prev_idx, prev_var in enumerate(prev_layer_vars):
      equation.addAddend(coefficients[prev_idx], prev_var)
      
    scalar = 0.0 if layer.bias == None else (-layer.bias)
    equation.setScalar(scalar)
    inputQuery.addEquation(equation)
      
  return inputQuery


def set_relu_constraints(relu_vars, inputQuery):
  # each relu step is accompanied by a preceding linear layer of the same size
  linear_vars = [var - len(relu_vars) for var in relu_vars]
  for idx, relu_var in enumerate(relu_vars):
    corresponding_linear_var = linear_vars[idx]
    MarabouCore.addReluConstraint(inputQuery, corresponding_linear_var, relu_var)
  return inputQuery


def set_classification_constraints(layer_vars, layer, classification, inputQuery):
  # Create disjunction pairs for classification_var and other vars in the output layer
  classification_var = layer_vars[classification]
  disjunction = []
  for var in layer_vars: 
    if var == classification_var: continue
    equation_type = MarabouCore.Equation.EquationType(2) # type 2 is Less than or equal (LE inequality)
    equation = MarabouCore.Equation(equation_type) 
    equation.addAddend(1, classification_var)
    equation.addAddend(-1, var)
    equation.setScalar(0)
    disjunction.append([equation])
    
  MarabouCore.addDisjunctionConstraint(inputQuery, disjunction)
  return inputQuery

In [16]:
# For now, only supports postconditions that are network outputs being a particular class
def dp(network_activations, model, postcondition):
  num_of_vars = 0
  inputQuery = MarabouCore.InputQuery()
  
  # INPUT CONSTRAINTS
  # Input layer isn't part of the network, so we'll use the first layer's in_features
  input_features = list(range(0, model.linear_relu_stack[0].in_features))
  num_of_vars += len(input_features)
  print(f"input: {input_features}")
  inputQuery.setNumberOfVariables(num_of_vars)
  inputQuery = set_boundary_for_linear_vars(input_features, inputQuery)
  
  # HIDDEN LAYER CONSTRAINTS
  for relu_layer_idx, activation_values in network_activations.items():
    # each relu step is accompanied by a preceding linear layer of the same size
    relu_layer = model.linear_relu_stack[relu_layer_idx]
    linear_layer = model.linear_relu_stack[relu_layer_idx - 1]
    
    # constraints for linear layer
    linear_vars = list(range(num_of_vars, num_of_vars + linear_layer.out_features))
    num_of_vars += linear_layer.out_features
    print(f"linear: {linear_vars}")
    inputQuery.setNumberOfVariables(num_of_vars)
    inputQuery = set_boundary_for_linear_vars(linear_vars, inputQuery)
    inputQuery = set_linear_constraints(linear_vars, linear_layer, inputQuery)
    
    # constraints for relu layer
    relu_vars = list(range(num_of_vars, num_of_vars + linear_layer.out_features))
    num_of_vars += linear_layer.out_features
    print(f"relu: {relu_vars}")
    inputQuery.setNumberOfVariables(num_of_vars)
    inputQuery = set_boundary_for_relu_vars(relu_vars, activation_values, inputQuery)
    inputQuery = set_relu_constraints(relu_vars, inputQuery)
    
  # OUTPUT CONSTRAINTS 
  # if we want to check for postcondition of a class, we have to encode its inverse.
  # Ex: given 3 classes, y1, y2, y3. If postcondition is y1, then we have to encode y != y1, i.e. y1 <= y2 or y1 <= y3.
  layer = model.final_output
  layer_vars = list(range(num_of_vars, num_of_vars + layer.out_features))
  num_of_vars += layer.out_features
  print(f"output: {layer_vars}")
  
  inputQuery.setNumberOfVariables(num_of_vars)
  inputQuery = set_boundary_for_linear_vars(layer_vars, inputQuery)
  inputQuery = set_linear_constraints(layer_vars, layer, inputQuery)
  inputQuery = set_classification_constraints(layer_vars, layer, postcondition, inputQuery)
  
  ## Run Marabou to solve the query
  print("solving with Marabou...")
  options = createOptions()
  return MarabouCore.solve(inputQuery, options, "")

In [17]:
activation = {1: ['ON', 'OFF'], 3: ['ON', 'OFF']}
dp(activation, model, 0)

input: [0, 1]
linear: [2, 3]
relu: [4, 5]
linear: [6, 7]
relu: [8, 9]
output: [10, 11]
solving with Marabou...


('unsat', {}, <maraboupy.MarabouCore.Statistics at 0x2914bd7f0>)

In [18]:
dp(activation, model, 1)

input: [0, 1]
linear: [2, 3]
relu: [4, 5]
linear: [6, 7]
relu: [8, 9]
output: [10, 11]
solving with Marabou...


('sat',
 {0: -0.0,
  1: -100.0,
  2: 100.0,
  3: -100.0,
  4: 100.0,
  5: 0.0,
  6: 20.000000298023224,
  7: -10.000000149011612,
  8: 20.000000298023224,
  9: 0.0,
  10: 100.0,
  11: -100.0},
 <maraboupy.MarabouCore.Statistics at 0x291600630>)

In [21]:
new_activation = {1: ['ON', 'OFF']}
dp(new_activation, model, 0)

input: [0, 1]
linear: [2, 3]
relu: [4, 5]
output: [6, 7]
solving with Marabou...


('unsat', {}, <maraboupy.MarabouCore.Statistics at 0x105cf7b70>)

In [20]:
input_data = [[0.0, -100.0]]
X = torch.tensor(input_data, dtype=torch.float)
logits = model(X)
prediction_prob = nn.Softmax(dim=1)(logits)
y_pred = prediction_prob.argmax(1)
print(f"Predicted class: {y_pred}")

Predicted class: tensor([0])
