## PLA with naive cycles

In [13]:
import numpy as np 
import pandas as pd 

# load data 
data = pd.read_csv('hw1_15_train.dat.txt', sep='\s+', header=None, names=['x1', 'x2', 'x3', 'x4', 'y'])

y = data['y'].to_numpy()
X = data[['x1', 'x2', 'x3', 'x4']].to_numpy()

In [14]:
print(type(y))
print(y[:10])
print(X[:10])

<class 'numpy.ndarray'>
[ 1  1  1  1  1  1 -1  1 -1 -1]
[[0.97681  0.10723  0.64385  0.29556 ]
 [0.67194  0.2418   0.83075  0.42741 ]
 [0.20619  0.23321  0.81004  0.98691 ]
 [0.51583  0.055814 0.92274  0.75797 ]
 [0.70893  0.10836  0.33951  0.77058 ]
 [0.55743  0.67804  0.061044 0.72689 ]
 [0.15654  0.75584  0.01122  0.42598 ]
 [0.50462  0.15137  0.33878  0.41881 ]
 [0.22657  0.59272  0.24103  0.46221 ]
 [0.49174  0.65115  0.24622  0.24796 ]]


In [15]:
def PLA_cyclic(X, y):
    """
    PLA by visiting examples in the naive cycle using the order of examples in the data set (X, y).
    
    Args:
        X: numpy array(n, d), feature matrix
        y: numpy array(n, ), labels
    Returns:
        w         : numpy array(d+1, ), final weights including bias w0
        update_cnt: the total number of updates 
    """

    n, d = X.shape

    # Add a column as x0
    X = np.c_[np.ones((n,1)), X]

    # Initialize w to 0 
    w = np.zeros(d+1)

    update_cnt = 0
    pla_finished = False 
    correct_num = 0
    t = 0

    while not pla_finished:
        xt, yt = X[t], y[t]

        if np.sign(w.T @ xt) == yt:
            correct_num += 1
        else:
            w += yt * xt
            update_cnt += 1
            correct_num = 0
        t = t+1 if t != n-1 else 0
        if correct_num == n:
            pla_finished = True 
    
    return w, update_cnt

In [16]:
w, t = PLA_cyclic(X, y)
print(w)
print(t)

[-3.         3.0841436 -1.583081   2.391305   4.5287635]
45


## PLA by visiting examples in fixed, pre-determined random cycles

In [22]:
def PLA_random(X, y):
    """
    PLA by visiting examples in a fixed, pre-determined random cycles.
    
    Note: it repeat experiment for 2000 times.
    seed.
    
    Args:
        X: numpy array(n, d), feature matrix
        y: numpy array(n, ), labels
    Returns:
        w         : numpy array(d+1, ), final weights including bias w0
        update_cnt: the average number of updates 
    """
    T = 2000
    t_list = []
    n = X.shape[0]
    indices = np.arange(n)
    
    for i in range(T):
        # Shuffle X and y together using random indices
        np.random.shuffle(indices)  
        X = X[indices]
        y = y[indices]
        w, t = PLA_cyclic(X, y)
        t_list.append(t)
        # print(f"{i+1}th experiment: {t} updates!")
    return w, int(np.mean(t_list))

In [23]:
w, t = PLA_random(X, y)
print(w)
print(t)

[-3.         2.266872  -1.595303   2.735574   3.9278546]
39


## PLA by visiting examples in fixed, pre-determined random cycles throughout the algorithm and a new updating rule

In [24]:
def PLA_random_eta(X, y, eta=1.0):
    """
    PLA by visiting examples in a fixed, pre-determined random cycles
    and update the weight using the given learning rate eta with default
    value 1.0.

    Note: It repeat experiment for 2000 times.
    
    Args:
        X  : numpy array(n, d), feature matrix
        y  : numpy array(n, ), labels
        eta: double, learning rate
    Returns:
        w         : numpy array(d+1, ), final weights including bias w0
        update_cnt: the average number of updates 
    """
    T = 2000
    t_list = []
    n = X.shape[0]
    indices = np.arange(n)
    
    for i in range(T):
        # Shuffle X and y together using random indices
        np.random.shuffle(indices)  
        X = X[indices]
        y = y[indices]
        w, t = PLA_cyclic_eta(X, y, eta)
        t_list.append(t)
        # print(f"{i}th experiment: {t} updates!")
    return w, int(np.mean(t_list))


def PLA_cyclic_eta(X, y, eta=1.0):
    """
    PLA by visiting examples in the naive cycle using the order of examples in the data set (X, y).
    
    Args:
        X: numpy array(n, d), feature matrix
        y: numpy array(n, ), labels
    Returns:
        w         : numpy array(d+1, ), final weights including bias w0
        update_cnt: the total number of updates 
    """
    
    n, d = X.shape
    # Add a column of ones as first column
    X = np.c_[np.ones((n, 1)), X]
    # Initialize w to 0 and add an extra zero for w0
    w = np.zeros(d + 1)
        
    # Count the number of updates
    update_cnt = 0 
    
    is_finished = 0
    correct_num = 0
    t = 0

    while not is_finished:
        x_t, y_t = X[t], y[t]
        
        if np.sign(w.T @ x_t) == y_t:  # Correctly classify the current example
            correct_num += 1
        else:                       # Find a mistake
            w += eta * y_t * x_t    # Correct the mistake
            update_cnt += 1         # Increment update count
            correct_num = 0         # Reset correct num to 0 to retest the new w
        if t == n - 1:              # Start the next cycle
            t = 0
        else:
            t += 1
        if correct_num == n:        # Have all examples classified correctly
            is_finished = 1
  
    return w, update_cnt

In [25]:
w, t = PLA_random_eta(X, y, eta=0.5)
print(w)
print(t)

[-1.5        1.162222  -0.5386015  1.1827945  2.1648815]
40


## PLA by using pocket

In [27]:
# load data
train_data = pd.read_csv('hw1_18_train.dat.txt', sep='\s+', header=None, names=['x1', 'x2', 'x3', 'x4', 'y'])
test_data = pd.read_csv('hw1_18_test.dat.txt', sep='\s+', header=None, names=['x1', 'x2', 'x3', 'x4', 'y'])

X_train = train_data[['x1', 'x2', 'x3', 'x4']].to_numpy()
y_train = train_data['y'].to_numpy()

X_test = test_data[['x1', 'x2', 'x3', 'x4']].to_numpy()
y_test = test_data['y'].to_numpy()

In [29]:
def PLA_pocket(X, y, num_update=50):
    """
    Modified PLA algorithm by keeping best weights in pocket.
    
    Args:
        X         : numpy array(n, d), feature matrix
        y         : numpy array(n, ), labels
        num_update: int, number of updates of w_pocket to run on the data set
    
    Returns:
        w_pocket: numpy array(d + 1, ), best weights including bias w0
    """
    
    n, d = X.shape
    # Add a column of ones as first column
    X = np.c_[np.ones((n, 1)), X]
    
    # Initialize w to 0 and add an extra zero for w0
    w = np.zeros(d + 1)
    w_pocket = np.zeros(d + 1)
    
    smallest_error_rate = 1.0
    update_cnt = 0
    t = 0
    correct_num = 0
    
    while update_cnt < num_update and correct_num < n:
        xt, yt = X[t], y[t]
        if np.sign(w.T @ xt) == yt:
            correct_num += 1
        else:
            w += yt * xt
            update_cnt += 1
            correct_num = 0
            current_error_rate = error_rate(X, y, w)
            if current_error_rate < smallest_error_rate:
                w_pocket = w.copy()  #### NOTE: DO NOT write w_pocket=w directly, otherwise, w_pocket and w will point to the object
                smallest_error_rate = current_error_rate
        if t == n - 1:
            t = 0
        else:
            t += 1   
            
    return w_pocket


################ Helper functions ################

# Vectorized version of sign function
sign_vec = np.vectorize(np.sign)


def error_rate(X, y, w):
    """
    Calculate the current error rate with the given weights w and examples (X, y).
    
    Returns:
        err: double, error rate 
    Argss
        X: numpy array(n, d + 1), feature matrix including a column of ones as first column
        y: numpy array(n, ), labels
        w: numpy array(d + 1, ), current weight
    """
    
    n = y.shape[0]
    err = np.sum(sign_vec(X @ w) != y) / n
    
    return err


def PLA_pocket_test_random(X_train, y_train, X_test, y_test, num_updates=50):
    """
    Train PLA by pocket algorithm using trainning set and test on test set. 
    Repeat experiment for 2000 times and return average error rate.
    
    Note: we visit examples purely randomly
    
    Args:
        X_train    : numpy array(n, d), training feature matrix
        y_train    : numpy array(n, ), training labels
        X_test     : numpy array(m, d), test feature matrix
        y_test     : numpy array(m, ), test labels
        num_updates: int, number of updates of pocket weights to run on the data set
    Returns:
        avg_error: the average of error rate
    """

    n = X_test.shape[0]
    X_test = np.c_[np.ones((n, 1)), X_test]

    T = 2000
    indices = np.arange(n) 
    total_error = 0.0

    for _ in range(T):
        np.random.shuffle(indices)  
        X_train = X_train[indices]
        y_train = y_train[indices]
        w = PLA_pocket(X_train, y_train, num_updates)
        total_error += error_rate(X_test, y_test, w)
        
    avg_error = total_error / T
    
    return avg_error

In [47]:
print(PLA_pocket_test_random(X_train, y_train, X_test, y_test))
print(PLA_pocket_test_random(X_train, y_train, X_test, y_test, 100))


0.13117699999999954
0.11434500000000018


## PLA return w50 directyly

In [45]:
def PLA_fixed_pocket(X, y, num_update=50):
    """
    Modified PLA algorithm by keeping best weights in pocket.
    
    Args:
        X         : numpy array(n, d), feature matrix
        y         : numpy array(n, ), labels
        num_update: int, number of updates of w_pocket to run on the data set
    
    Returns:
        w: the final w after 50 updates
    """
    
    n, d = X.shape
    # Add a column of ones as first column
    X = np.c_[np.ones((n, 1)), X]
    
    # Initialize w to 0 and add an extra zero for w0
    w = np.zeros(d + 1)
    
    update_cnt = 0
    t = 0
    correct_num = 0
    
    while update_cnt < num_update and correct_num < n:
        xt, yt = X[t], y[t]
        if np.sign(w.T @ xt) == yt:
            correct_num += 1
        else:
            w += yt * xt
            update_cnt += 1
            correct_num = 0
        if t == n - 1:
            t = 0
        else:
            t += 1   
            
    return w


def PLA_fixed_updates_test_random(X_train, y_train, X_test, y_test, num_updates=50):
    """
    Train PLA by pocket algorithm using trainning set and test on test set. 
    Repeat experiment for 2000 times and return average error rate.
    
    Note: we visit examples purely randomly
    
    Args:
        X_train    : numpy array(n, d), training feature matrix
        y_train    : numpy array(n, ), training labels
        X_test     : numpy array(m, d), test feature matrix
        y_test     : numpy array(m, ), test labels
        num_updates: int, number of updates of pocket weights to run on the data set
    Returns:
        avg_error: the average of error rate
    """

    n = X_test.shape[0]
    X_test = np.c_[np.ones((n, 1)), X_test]

    T = 2000
    indices = np.arange(n) 
    total_error = 0.0

    for _ in range(T):
        np.random.shuffle(indices)  
        X_train = X_train[indices]
        y_train = y_train[indices]
        w = PLA_fixed_pocket(X_train, y_train, num_updates)
        total_error += error_rate(X_test, y_test, w)
        
    avg_error = total_error / T
    
    return avg_error

In [46]:
print(PLA_fixed_updates_test_random(X_train, y_train, X_test, y_test, 50))


0.36468299999999976
