In [1]:
%matplotlib notebook
import cvxpy as cp
import dccp
import torch
import numpy as np
from cvxpylayers.torch import CvxpyLayer
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn import svm
from sklearn.metrics import zero_one_loss, confusion_matrix
import time
import torch.optim as optim


torch.set_default_dtype(torch.float64)
XDIM = 5
COST = 0.5

# Utils

In [2]:
def get_data(N, w, b):
        X = torch.randn(N, XDIM)
        Y = torch.sign(X @ w + b) # + np.random.randn(N))
        return X, Y
    
def split_data(X, Y, percentage):
    num_val = int(len(X)*percentage)
    return X[num_val:], Y[num_val:], X[:num_val], Y[:num_val]

def optimize_X(X, w, b, problem):
    problem.w.value = w
    problem.b.value = b
    Xp = np.zeros_like(X)
    for i, sample in enumerate(X):
        problem.x.value = sample
        result = problem.prob.solve(method = 'dccp')
        Xp[i] = problem.z.value
    return Xp

def fit(loss, params, X, Y, Xval, Yval, epochs=100, verbose=False):
    opt = optim.Adam(params)
    train_losses = []
    val_losses = []
    for epoch in range(epochs):
        with torch.no_grad():
            val_losses.append([])
            for x, y in zip(Xval, Yval):
                l = loss(x, y)
                val_losses[-1].append(l.item())
            val_losses.append(np.mean(val_losses[-1]))
        if verbose:
            print("----- epoch %03d / %03d | %3.5f" % (epoch + 1, epochs, val_losses[-1]))
        train_losses.append([])
        for x, y in zip(X, Y):
            opt.zero_grad()
            l = loss(x, y)
            l.backward()
            opt.step()
            train_losses[-1].append(l.item())
    return val_losses, train_losses

In [3]:
class Problem():
    
    def __init__(self, f, g, c):
        self.f = f
        self.g = g
        self.c = c
        
        self.z = cp.Variable(XDIM)
        self.z.value = np.zeros(XDIM)
        y = cp.Variable(1)
        self.x = cp.Parameter(XDIM)
        self.w = cp.Parameter(XDIM)
        self.b = cp.Parameter(1)

        target =  self.f(self.z, self.w, self.b) - y
        constraints = [y == self.g(self.z, self.w, self.b) + self.c(self.z, self.x), self.z >= -500, self.z <= 500]
        self.prob = cp.Problem(cp.Maximize(target), constraints)

        print("problem is DCP:", self.prob.is_dcp())   # false
        print("problem is DCCP:", dccp.is_dccp(self.prob))  # true


# Gain & Cost functions

In [4]:
def f(x, w, b):
    return 0.5*cp.norm(cp.hstack([1, (x@w + b + 1)]), 2)
    
def g(x, w, b):
    return 0.5*cp.norm(cp.hstack([1, (x@w + b - 1)]), 2)

def c(x, r):
    return COST*cp.sum_squares(x-r)

def f_derivative(x, w, b):
    return 0.5*((x@w + b + 1)/cp.sqrt((x@w + b + 1)**2 + 1))*w

def g_derivative(x, w, b):
    return 0.5*((x@w + b - 1)/cp.sqrt((x@w + b - 1)**2 + 1))*w

def c_derivative(x, r):
    return 2*COST*(x-r)

def pred(X, w, b):
    return np.sign(X @ w + b)

# My ccp

In [5]:
def ccp(w, b, r):
    x = cp.Variable(XDIM)
    xt = cp.Parameter(XDIM)
    
    target =  f_derivative(xt, w, b)@x - g(x, w, b) - c(x, r)
    constraints = [x >= -500, x <= 500]
    prob = cp.Problem(cp.Maximize(target), constraints)
    
    xt.value = r
    result = prob.solve()
    diff = np.linalg.norm(xt.value - x.value)
    while diff > 0.001:
        # print("xt+1: {}\ndiff from xt: {}".format(x.value, diff))
        xt.value = x.value
        result = prob.solve()
        diff = np.linalg.norm(x.value - xt.value)
    return torch.from_numpy(x.value)

In [6]:
# w = np.random.randn(XDIM)
# r = np.random.randn(XDIM)
# print(w, r)
# x_total = ccp(w, r)
# print(x_total)

# problem = Problem(f, g, c)
# problem.x.value = r
# problem.w.value = w
# res = problem.prob.solve(method='dccp')
# print(problem.z.value)

# Data generation

In [6]:
N = 1000
_w_true = torch.randn(XDIM)
_b_true = torch.randn(1)

X, Y = get_data(N, _w_true, _b_true)
print("percent of positive samples: {}%".format(100 * len(Y[Y == 1]) / len(Y)))
X, Y, Xval, Yval = split_data(X, Y, 0.2)

percent of positive samples: 47.5%


# Train

In [7]:
x = cp.Variable(XDIM)
w = cp.Parameter(XDIM, value = np.random.randn(XDIM))
b = cp.Parameter(1, value = np.random.randn(1))
r = cp.Parameter(XDIM, value = np.random.randn(XDIM))
f_der = cp.Parameter(XDIM, value = np.random.randn(XDIM))

target = f_der@x - g(x, w, b) - c(x, r)
constraints = [x >= -10, x <= 10]
objective = cp.Maximize(target)
problem = cp.Problem(objective, constraints)
print(problem.is_dcp())
print(problem.is_dpp())
layer = CvxpyLayer(problem, parameters=[f_der, w, b, r], variables=[x])   

True
True


In [8]:
_w = torch.randn(XDIM, requires_grad=True)
_b = torch.randn(1, requires_grad=True)

EPOCHS=5

def loss(x, y, w, b):
    xt = ccp(w.detach().numpy(), b.detach().numpy(), x.numpy())
    f_der = 0.5*((xt@w + b + 1)/torch.sqrt((xt@w + b + 1)**2 + 1))*w
    output = w@(layer(f_der, w, b, x)[0]) + b
    loss = torch.mean(torch.clamp(1 - output * y, min=0))
    return loss

val_losses, train_losses = fit(lambda x, y: loss(x, y, _w, _b), [_w, _b], X, Y, Xval, Yval, EPOCHS, True)

	https://www.cvxpy.org/tutorial/advanced/index.html#disciplined-parametrized-programming


----- epoch 001 / 005 | 1.31627


KeyboardInterrupt: 

# Test results

In [10]:
problem = Problem(f, g, c)
w_eval = _w.detach().numpy()
b_eval = _b.detach().numpy()

w_true = _w_true.detach().numpy()
b_true = _b_true.detach().numpy()

print(w_eval, b_eval, w_true, b_true)

_Xval = Xval.numpy()
Xval_opt = optimize_X(_Xval, w_eval, b_eval, problem)

FpXp = pred(Xval_opt, w_eval, b_eval)
FXp = pred(Xval_opt, w_true, b_true)
FpX = pred(_Xval, w_eval, b_eval)
FX = pred(_Xval, w_true, b_true)

num_of_samples = len(Yval)
res = confusion_matrix(Yval, FX, labels=[-1, 1])*100/num_of_samples
acc = np.trace(res)
print("y vs f(x):\n", res, "\naccuracy: ", acc)
res = confusion_matrix(Yval, FXp, labels=[-1, 1])*100/num_of_samples
acc = np.trace(res)
print("y vs f(x\'):\n", res, "\naccuracy: ", acc)
res = confusion_matrix(Yval, FpX, labels=[-1, 1])*100/num_of_samples
acc = np.trace(res)
print("y vs f\'(x):\n", res, "\naccuracy: ", acc)
res = confusion_matrix(Yval, FpXp, labels=[-1, 1])*100/num_of_samples
acc = np.trace(res)
print("y vs f\'(x\'):\n", res, "\naccuracy: ", acc)

problem is DCP: False
problem is DCCP: True
[0.20251378 0.75438478 0.43561403 0.68897852 0.37830942] [-1.05709947] [0.21756379 0.65316635 0.34598884 0.56987381 0.3458043 ] [-0.20622603]
y vs f(x):
 [[58.5  0. ]
 [ 0.  41.5]] 
accuracy:  100.0
y vs f(x'):
 [[29.5 29. ]
 [ 0.  41.5]] 
accuracy:  71.0
y vs f'(x):
 [[58.5  0. ]
 [26.  15.5]] 
accuracy:  74.0
y vs f'(x'):
 [[53.   5.5]
 [ 0.  41.5]] 
accuracy:  94.5
