In [1]:
import numpy as np
from matplotlib import pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import random
from scipy.optimize import minimize
import time

Implementing the DS algorithm

In [4]:

def generate_points(x_k, sigma_k):
    n= len(x_k)
    I_n = np.eye(n)
    n_I_n = -np.eye(n)
    D = np.concatenate((I_n, n_I_n), axis=0)
    
    points = []
    for d_i in D:
        x_i = x_k + sigma_k * d_i
        points.append(x_i)
    
    return np.array(points)


def choose_xn(points, func):

    fk_values = [func(xi) for xi in points]
    min_index = np.argmin(fk_values)
    min_f = np.min(fk_values)
    x0 = points[min_index]
    
    return x0,min_f,len(fk_values)

def forcing(c,sigma_k):
    return c*sigma_k**2

In [11]:
def euclidean_projectionx(x, max_norm=10):
    return np.clip(x, -max_norm, max_norm)

def euclidean_projectiony(point):
    return(point)

def direct_search(func,x_0,c,T,lamda,sigma_0,sigma_max ):
    sigma = np.zeros(T+1)
    sigma[0] = sigma_0
    x = np.zeros((len(x_0),T+1))
    x[:,0] = x_0
    f_calls = 0
    for k in range(T):
        x_nom = generate_points(x[:,k],sigma[k])
        x_chosen, a,fc = choose_xn(x_nom, func)
        b = func(x[:,k]) - forcing(c,sigma[k])
        f_calls = f_calls + fc
        if a < b:
            x[:,k+1] = x_chosen
            sigma[k+1] = np.min([sigma_max, lamda*sigma[k]])
        else:
            x[:,k+1] = x[:,k]
            sigma[k+1] = sigma[k] / lamda

    return x[:,k+1], f_calls

def one_step_direct_search(func,x_0,c,lamda,sigma_0,sigma_max ):
    sigma = np.min([sigma_max, lamda*sigma_0])
    f_calls = 0
    while True:
        x = x_0
        x_nom = generate_points(x_0,sigma)
        x_chosen, a,fc = choose_xn(x_nom, func)
        b = func(x_0) - forcing(c,sigma)
        f_calls = f_calls + fc
        if a < b:
            x = x_chosen
            break
        else:
            sigma = sigma/lamda
    return x,sigma, f_calls

In [12]:
def fix_variable(f, value, variable):
    if variable == 'x':
        def new_function(y):
            return -f(value, y)
    elif variable == 'y':
        def new_function(x):
            return f(x, value)

    return new_function

def minimax_direct_search(func, x_0,y_0,c,T,lamda,sigma_0,sigma_max ):
    sigma = np.zeros(T+1)
    sigma[0] = sigma_0
    x = np.zeros((len(x_0),T+1))
    x[:,0] = x_0 
    y = np.zeros((len(y_0),T+1))
    y[:,0] = y_0 
    f_calls = np.zeros((1,T+1))
    for k in range(1,T+1):
        func1 = fix_variable(func,x[:,k-1],'x')
        yk,fc1 = direct_search(func1,y[:,k-1],c,5,lamda,sigma[k-1],sigma_max )
        y[:,k] = euclidean_projectiony (yk)
        func2 = fix_variable(func,y[:,k],'y')
        xk,sigma[k],fc2 = one_step_direct_search(func2,x[:,k-1],c,lamda,sigma[k-1],sigma_max )
        x[:,k] = euclidean_projectionx(xk)
        f_calls[:,k] = fc1+fc2
    return x,y,f_calls

Code for Psioning Attack problem. The results here will be used in Low_dim.ipynb file for comparison.

In [None]:
def calculate_accuracy(T, T_new):
    num_correct = np.sum(T == T_new)
    accuracy = num_correct / len(T)
    return accuracy
# Define the logistic function
def logistic_function(z, theta):
    return 1 / (1 + np.exp(-np.dot(z, theta)))

# Define the loss function for training set D_train
def h(x, theta, D):
    loss = 0
    for z, t in D:
        h_x = logistic_function(z + x, theta)
        loss += t * np.log(h_x) + (1 - t) * np.log(1 - h_x)
    return -loss / len(D)

def objective(x, theta):
    F_tr = h(x, theta, D_train_1) + h(np.zeros(n), theta, D_train_2) + 0.001* np.linalg.norm(theta, 2) ** 2
    return -F_tr
    
acc_k = np.zeros((331,20))
f_callstotal = []
ptime2 = np.zeros(20)
for j in range(20):
    # Generate the dataset
    k = 500
    n = 20
    poisoning_ratio = 0.15
    v = np.random.normal(loc=0, scale=1e-3, size=k)
    Z = np.random.multivariate_normal(mean=np.zeros(n), cov=np.eye(n), size=k)
    Theta_ast = np.random.normal(size=n)
    T = np.where(1 / (1 + np.exp(-(np.dot(Z, Theta_ast) + v))) >= 0.5, 1, 0)

    # Split the dataset into training and testing sets
    D_train = list(zip(Z[:int(k*0.7)], T[:int(k*0.7)]))
    poisoning_size = int(len(D_train) * poisoning_ratio)
    D_train_1 = D_train[:poisoning_size]
    D_train_2 = D_train[poisoning_size:]
    D_test = list(zip(Z[int(k*0.7):], T[int(k*0.7):]))

    x_init = np.zeros(n)*0.1
    theta_init = Theta_ast
    eps = 0.01
    xt = np.zeros(n)
    yt = np.zeros(n)
    c = 1
    T0 = 330
    lamda = 1.5
    sigma_0 = 1e-3
    sigma_max = 5e-1
    start = time.time()
    xt,yt, f_callst=minimax_direct_search(objective,x_init,theta_init,c,T0,lamda,sigma_0,sigma_max )
    end = time.time()
    ptime2[j] = end - start
    # Calculate the accuracy
    accuracy3 =  np.zeros(np.shape(yt)[1])
    for i in range (np.shape(yt)[1]):
        T_new = np.where(1 / (1 + np.exp(-(np.dot(Z[int(k*0.7):], yt[:,i]) + v[int(k*0.7):]))) >= 0.5, 1, 0)
        accuracy3[i] = calculate_accuracy(T[int(k*0.7):], T_new)
    acc_k[:,j] = accuracy3
    f_callstotal.append(f_callst)

ptime_avg2 = np.mean(ptime2)

In [33]:
np.array(f_callstotal).shape


(20, 1, 331)

In [32]:
np.save('acc_kdr.npy', acc_k)
np.save('ptime_avg2.npy', ptime_avg2)
np.save('f_callstotal.npy',np.array(f_callstotal))

Code for Robust Least Square problem. The results here will be used in Low_dim.ipynb file for comparison.

In [70]:
def euclidean_projectiony(x, max_norm=5):
    norm_x = np.linalg.norm(x)
    if norm_x > max_norm:
        x = x * (max_norm / norm_x)
    return x
def euclidean_projectionx(point):
    return(point)



def minimax_direct_search(func, x_0,y_0,c,T,lamda,sigma_0,sigma_max ):
    sigma = np.zeros(T+1)
    sigma[0] = sigma_0
    x = np.zeros((len(x_0),T+1))
    x[:,0] = x_0 
    y = np.zeros((len(y_0),T+1))
    y[:,0] = y_0 
    f_calls = []
    function_values = []
    F_0 = func(x_0,y_0)
    function_values.append(F_0)
    f_calls.append(0)
    for k in range(1,T+1):
        func1 = fix_variable(func,x[:,k-1],'x')
        y[:,k],fc1 = direct_search(func1,y[:,k-1],c,5,lamda,sigma[k-1],sigma_max )
        y[:,k] = euclidean_projectiony(y[:,k])
        func2 = fix_variable(func,y[:,k],'y')
        x[:,k],sigma[k],fc2 = one_step_direct_search(func2,x[:,k-1],c,lamda,sigma[k-1],sigma_max )
        x[:,k] = euclidean_projectionx(x[:,k])
        function_value = func(x[:,k], y[:,k])
        function_values.append(function_value)
        f_calls.append(fc1+fc2)
        if function_value <=0.005*F_0:
            return function_values,f_calls
    return function_values,f_calls

In [74]:
etime3 = []
fvalues3 = []
f_callstotal = []
for j in range(10):
    start = time.time()
    n = 150
    m = 250

    A = np.random.randn(n, m)
    x_star = np.random.randn(m)
    epsilon = 0.001 * np.random.randn(n)
    y0 = A@x_star + epsilon
    x_initial = np.random.randn(m)
    y_initial = np.random.randn(n)

    def F(x, y):    
        return np.linalg.norm(A@x-y0+y)**2

    c = 1
    T0 = 1500
    lamda = 1.5
    sigma_0 = 1e-3  
    sigma_max = 5e-1
    function_values, f_callst=minimax_direct_search(F,x_initial,y_initial,c,T0,lamda,sigma_0,sigma_max )
    fvalues3.append(function_values)
    end = time.time()
    execution_time = end-start
    etime3.append(execution_time)
    f_callstotal.append(f_callst)


In [77]:
print(len(f_callstotal[0]),len(fvalues3[0]))

413 413


In [79]:
import json
with open('fvalues3.json', 'w') as file:
    json.dump(fvalues3, file)
with open('etime3.json', 'w') as file:
    json.dump(etime3, file)
with open('f_callstotal_RLS.json', 'w') as file:
    json.dump(f_callstotal, file)
