Hoeffding Inequality

In [1]:
import numpy as np
import random as rd
import math
from scipy.stats import bernoulli
from numpy.linalg import inv

In [2]:
class Coin:
    """Coin class for representing and flipping coins"""
    
    def __init__(self, p=0.5):
        """Create a new coin
        Args:
        p (float): Probability of flipping a head 0.0 <= p <= 1.0
        """
        self.p = p
    
    def flip(self, n=1):
        """Flip the coin a number of times independently
        Args:
        n (int): Number of coin flips
        Returns: 
        np.ndarray where each element is either heads (1) or tails (0)  
        """
        return bernoulli.rvs(self.p, size=n)
    
    @staticmethod
    def fraction_of_heads(coin):
        """Calculate fraction of heads obtained in coin flips
        Args:
        coin : Coin object
        Returns:
        Fraction of heads obtained in coin flips (float) 
        """
        return np.mean(coin) 

In [3]:
class Coins:
    """Coins class for representing and flipping coins"""
    
    def __init__(self, N=1, p=0.5):
        """Create a new coin
        Args:
        p (float): Probability of flipping a head 0.0 <= p <= 1.0
        N (int): Number of coins to create
        """
        self.p = p
        self.N = N
        self.flips = None
    
    def flip(self, n=1):
        """Flip the coins a number of times independently
        Args:
        n (int): Number of coin flips
        Returns: 
        np.ndarray where each row represents a particular coin and the elements
        in each row are either heads (1) or tails (0) 
        """
        self.flips = np.stack([Coin(self.p).flip(n) for i in range(0, self.N)])
        return self.flips
    
    def coin_first_head_fraction(self):
        """Fraction of heads in first coin flipped
        Returns: 
        Fraction of heads for the first coin flipped (float)
        """
        coin = self.flips[0]
        return Coin.fraction_of_heads(coin)
    
    def coin_random_head_fraction(self):
        """Random coin flipped
        Returns: 
        Fraction of heads for a random coin flipped (float)
        """
        coin = rd.choice(self.flips)
        return Coin.fraction_of_heads(coin)
    
    def coin_min_head_fraction(self):
        """Fraction of heads in coin with minimum head frequency flipped
        Returns: 
        Fraction of heads for the first coin with the minimum
        frequency of heads flipped (float)
        """
        coin = self.flips[np.argmin(self.flips.sum(axis=1), axis=0)]
        return Coin.fraction_of_heads(coin)

In [4]:
nu_1 = np.array([])
nu_rand = np.array([])
nu_min = np.array([])
n_trials = 1000
for i in range(0, n_trials):
    coins = Coins(N=1000, p=0.5)
    coins.flip(n=10)
    nu_1 = np.append(nu_1, coins.coin_first_head_fraction())
    nu_rand = np.append(nu_rand, coins.coin_random_head_fraction())
    nu_min = np.append(nu_min, coins.coin_min_head_fraction())

print('The average value of ν_1 is equal to ', np.mean(nu_1))
print('The average value of ν_rand is equal to ', np.mean(nu_rand))
print('The average value of ν_min is equal to ', np.mean(nu_min))

The average value of ν_1 is equal to  0.4894
The average value of ν_rand is equal to  0.49310000000000004
The average value of ν_min is equal to  0.039200000000000006


Linear Regression

In [5]:
class RandomPoint:
    """ RandomPoint class for representing and manipulating random n-dimensional coordinates. """
    
    def __init__(self, dim=1, lower=0.0, upper=1.0):
        """ Create a new random point assuming a uniform distribution between the upper and lower bounds. 
        Args:
        dim (int): Dimension.
        lower (float): Lower bound.
        upper (float): Upper bound.
        """
        self.dim = dim # dimension
        self.lower = lower # lower bound
        self.upper = upper # upper bound
        self.point = np.random.uniform(lower, upper, dim) # point (numpy.ndarray) (half open interval!)

In [6]:
class TargetFunction:
    """TargetFunction class for representing and manipulation random curves in n-dimensional coordinates."""
    def __init__(self, dim=1, lower=0.0, upper=1.0, line=True):
        """ Create a new random curve assuming a uniform distribution between the upper and lower bounds. 
        Args:
        dim (int): Dimension.
        lower (float): Lower bound.
        upper (float): Upper bound.
        line (bool): True if requesting to draw a line or false to generate the curve x1**2 + x2**2 - 0.6
        """
        self.line = line
        
        if self.line:
            self.points = [RandomPoint(dim, lower, upper) for i in range(0,2)]
        
    def sign(self, randomPoint):
        return np.sign(self.evaluate(randomPoint))
        
    def evaluate(self, randomPoint): 
        if self.line:
            x_minus_x1  = randomPoint.point[0] - self.points[0].point[0]
            y_minus_y1  = randomPoint.point[1] - self.points[0].point[1]
            y2_minus_y1 = self.points[1].point[1] - self.points[0].point[1]
            x2_minus_x1 = self.points[1].point[0] - self.points[0].point[0]
            return x_minus_x1 * y2_minus_y1 - y_minus_y1 * x2_minus_x1
        
        x1 = randomPoint.point[0] 
        x2 = randomPoint.point[1] 
        return x1**2 + x2**2 - 0.6
    
    def weights(self):
        y1_times_x2 = self.points[0].point[1] * self.points[1].point[0]
        x1_times_y2 = self.points[0].point[0] * self.points[1].point[1]
        y2_minus_y1 = self.points[1].point[1] - self.points[0].point[1]
        x2_minus_x1 = self.points[1].point[0] - self.points[0].point[0] 
        
        return np.array([y1_times_x2 - x1_times_y2, y2_minus_y1, -x2_minus_x1])

In [7]:
class RandomLine:
    """ RandomLine class for representing and manipulating random lines in n-dimensional coordinates. """
    
    def __init__(self, dim=1, lower=0.0, upper=1.0):
        """ Create a new random line assuming a uniform distribution between the upper and lower bounds. 
        Args:
        dim (int): Dimension.
        lower (float): Lower bound.
        upper (float): Upper bound.
        """        
        self.points = [RandomPoint(dim, lower, upper) for i in range(0,2)] # inputs (list(RandomPoint))
    
    def target_function(self, randomPoint):
        """ Evaluate target function using equation of line in symmetric form.
        https://math.stackexchange.com/questions/274712/calculate-on-which-side-of-a-straight-line-is-a-given-point-located.
        https://en.wikipedia.org/wiki/Linear_equation.
        Args:
        randomPoint (RandomPoint): Random point.
        Returns: 
        int. +1 for the point being above the line. -1 for the point being below the line. 0 otherwise.     
        """
        x_minus_x1  = randomPoint.point[0] - self.points[0].point[0]
        y_minus_y1  = randomPoint.point[1] - self.points[0].point[1]
        y2_minus_y1 = self.points[1].point[1] - self.points[0].point[1]
        x2_minus_x1 = self.points[1].point[0] - self.points[0].point[0]
        
        return np.sign(x_minus_x1 * y2_minus_y1 - y_minus_y1 * x2_minus_x1)
    
    def weights(self):
        """ Evaluate weights of equation of line in symmetric form, i.e. w0 * x0 + w1 * x1 + w2 * x2 = 0
        https://math.stackexchange.com/questions/274712/calculate-on-which-side-of-a-straight-line-is-a-given-point-located.
        https://en.wikipedia.org/wiki/Linear_equation.
        Returns: 
        np.ndarray consisting of weights of the line [w0, w1, w2]    
        """
        y1_times_x2 = self.points[0].point[1] * self.points[1].point[0]
        x1_times_y2 = self.points[0].point[0] * self.points[1].point[1]
        y2_minus_y1 = self.points[1].point[1] - self.points[0].point[1]
        x2_minus_x1 = self.points[1].point[0] - self.points[0].point[0] 
        
        return np.array([y1_times_x2 - x1_times_y2, y2_minus_y1, -x2_minus_x1])
    

In [8]:
class DataSet:
    """ DataSet class for representing and manipulating random datasets. """
    def __init__(self, N=10, dim=1, lower=0.0, upper=1.0):
        """ Create a new data set assuming a uniform distribution between the 
            upper and lower bounds. 
        Args:
        N (int): Number of points.
        dim (int): Dimension of points.
        lower (float): Lower bound.
        upper (float): Upper bound.
        """
        self.inputs = [RandomPoint(dim, lower, upper) for i in range(0,N)] # inputs (list(RandomPoint))
        self.outputs = [TargetFunction(2, -1.0, 1.0, True).sign(input) for input in self.inputs] # outputs (list(int))
        #self.data = list(zip(self.inputs, self.outputs)) # inputs (list(Tuple(RandomPoint, int)))        

In [9]:
class LinearRegression:
    """ Linear Regression class. """
    
    def __init__(self, trainingSet, nonLinearTransform=False):
        """ Create a new Linear Regression. 
        Args:
        TrainingSet (DataSet): Training set.
        nonLinearTransfom (bool): If false, use linear feature vector (1, x1, x2) 
                                  If true, use non-linear feature vector (1, x1, x2, x1*x2, x1**2, x2**2) 
        """
        self.trainingSet = trainingSet
        self.nonLinearTransform = nonLinearTransform
        self.xn = self.transformPoints(trainingSet.inputs)
        
    def run(self, targetFunction, fractNoise = 0.0):    
        """Run of Linear Regression Algorithm.
        Args:
        targetFunction (RandomLine): Random line
        fractNoise (float): Fraction of noise to generate in the output (0.0 <= fracNoise <= 1.0)
        Returns:
        Tuple(g, yn)
        """
        # calculate outputs and add noise
        yn = np.array([targetFunction.sign(input) for input in self.trainingSet.inputs], copy=True) # outputs (list(int))
        self.addNoise(yn, fractNoise)
                
        # calculate pseudo-inverse matrix and return g (weights)
        X = self.xn
        X_trans = np.transpose(X)
        X_dagger = np.matmul(inv(np.matmul(X_trans, X)), X_trans)
        return np.matmul(X_dagger, yn), yn
    
    def Ein(self, weights, yn):
        """Evaluate fraction of in-sample points which are misclassified
        Args:
        weights (np.ndarray): g function
        yn (np.ndarray): f function (outputs)
        Returns:
        Fraction of in-sample points which are misclassified (0.0 <= float <= 1.0)
        """
        ein = 0
        for i in range(0, len(self.xn)):
            if np.sign(np.matmul(np.transpose(weights), self.xn[i])) != yn[i]:
                ein += 1
        return float(ein / len(self.xn))
    
    def Eout(self, weights, targetFunction, fractNoise=0.0, npoints=1000):
        """Evaluate fraction of out-of-sample points which are misclassified
        Args:
        weights (np.ndarray): g function
        targetFunction (TargetFunction): f function
        fractNoise (float): Fraction of noise to generate in the output (0.0 <= fracNoise <= 1.0)
        npoints (int): number of points to generate
        Returns:
        Fraction of in-sample points which are misclassified (0.0 <= float <= 1.0)
        """
        # generate random points    
        estPoints = [RandomPoint(2, -1.0, 1.0) for i in range(0, npoints)]
        transPoints = self.transformPoints(estPoints)
        
        # compute outputs and add noise
        yn = np.array([targetFunction.sign(estPoint) for estPoint in estPoints], copy=True)
        self.addNoise(yn, fractNoise)
        
        # compute Eout
        eout = 0
        for i in range(0, len(transPoints)):
            if np.sign(np.matmul(np.transpose(weights), transPoints[i])) != yn[i]:
                eout += 1
        return float(eout / len(transPoints))
    
    def Eout_closest(self, weights, hypotheses, targetFunction, fractNoise=0.0, npoints=1000):
        """Evaluate fraction of out-of-sample points which are misclassified
        Args:
        weights (np.ndarray): g function
        hypotheses (List(np.ndarray)): potential g functions
        targetFunction (TargetFunction): f function
        fractNoise (float): Fraction of noise to generate in the output (0.0 <= fracNoise <= 1.0)
        npoints (int): number of points to generate
        Returns:
        Tuple(int, float): Index of hypothesis in hypothesis that is besy match to weights and
        the probability of the match
        """
        # generate random points    
        estPoints = [RandomPoint(2, -1.0, 1.0) for i in range(0, npoints)]
        transPoints = self.transformPoints(estPoints)
        
        # compute outputs and add noise
        #yn = np.array([targetFunction.sign(estPoint) for estPoint in estPoints], copy=True)
        #self.addNoise(yn, fractNoise)
        
        # compute Eout
        eouts = np.zeros(len(hypotheses))
        for p in range(0, len(transPoints)):
            for h in range(0, len(hypotheses)):
                if np.sign(np.matmul(np.transpose(hypotheses[h]), transPoints[p])) \
                == np.sign(np.matmul(np.transpose(weights), transPoints[p])):
                    eouts[h] += 1
    
        return np.argmax(eouts), float(np.max(eouts) / len(transPoints))
    
    # privates
    def transformPoints(self, points):
        if self.nonLinearTransform:
            featureVectors = []
            for pnt in points:
                x1 = pnt.point[0]
                x2 = pnt.point[1]
                featureVector = [1.0, x1, x2, x1*x2, x1**2, x2**2]
                featureVectors.append(featureVector)
            return np.array(featureVectors)
        else:
            return np.array([np.insert(pnt.point, 0, 1.0) for pnt in points])
        
    def addNoise(self, outputs, fractNoise):
        if fractNoise > 0.0:
            idxs = [idx for (idx, value) in rd.sample(list(enumerate(outputs)), int(fractNoise * len(outputs)))]
            for idx in idxs:
                outputs[idx] = -outputs[idx] # flip the output in fracNoise of output      

In [10]:
def runLinearRegression(trainingSet, fractNoise=0.0, line=True, nonLinearTransform=False, numRuns=1):
    '''Run linear regression algorithm.
    Args:
    trainingSet (DataSet): Training set.
    fracNoise (float): Fraction of noise to generate in the output (0.0 <= fracNoise <= 1.0)
    line (bool): True if requesting to draw a line or false to generate the curve x1**2 + x2**2 - 0.6
    numRuns (int): number of runs  
    Returns:
    meanEin (float): mean number of in-sample misclassifications
    '''
    mean_ein = 0.0
    mean_eout = 0.0
    for i in range(0, numRuns):
        lr = LinearRegression(trainingSet, nonLinearTransform)
        targetFunction = TargetFunction(2, -1.0, 1.0, line)
        weights, yn = lr.run(targetFunction, fractNoise)
        mean_ein += lr.Ein(weights, yn)
        mean_eout += lr.Eout(weights, targetFunction, fractNoise, npoints=1000)
    
    mean_ein = mean_ein / numRuns
    mean_eout = mean_eout / numRuns
    return mean_ein, mean_eout

In [11]:
mean_Ein, mean_Eout = runLinearRegression(DataSet(N=100, dim=2, lower=-1.0, upper=1.0), line=True, fractNoise=0.0, numRuns=1000)
print("Fraction of mislassified in-sample points Ein = ", mean_Ein)
print("Fraction of mislassified out-of-sample points Eout = ", mean_Eout)

Fraction of mislassified in-sample points Ein =  0.03826000000000006
Fraction of mislassified out-of-sample points Eout =  0.04862099999999999


In [12]:
class PLA:
    """ Perceptron Learning Algorithm (PLA) class. """
    
    def __init__(self, initialWeights, trainingSet):
        """ Create a new Perceptron Learning Algorithm (PLA). 
        Args:
        dim (int): Dimension.
        lower (float): Lower bound.
        upper (float): Upper bound.
        """
        self.initialWeights = initialWeights # initial weights
        self.xn = np.array([np.insert(input.point, 0, 1.0) for input in trainingSet.inputs], copy=True) # training set inputs
        self.trainingSet = trainingSet
        #self.yn = np.array(trainingSet.outputs, copy=True) # training set outputs
    
    def run(self, targetFunction, maxIterations=100):    
        """Run of Perceptron Learning Algorithm (PLA).
        Returns:
        g
        """        
        # calculate outputs
        yn = np.array([targetFunction.sign(input) for input in self.trainingSet.inputs], copy=True) # outputs (list(int))
        
        # calculate target function (weights)
        targetWeights = targetFunction.weights()
        #print('Target weights ', targetWeights)
        
        # misclassified points
        #misPointIdxs = self.misclassifiedPoints(self.initialWeights, yn)
          
        # PLA iterations
        numIterations = 0
        weights = np.copy(self.initialWeights) # initialize weights
        converged = False
        for i in range(0, maxIterations):
            numIterations += 1
            #print('Iteration # ', numIterations)
            
            # misclassified points
            misPointIdxs = self.misclassifiedPoints(weights, yn)
        
            # break out of loop if converged
            if len(misPointIdxs) == 0:
                converged = True
                #print('PLA algorithm converged after ', numIterations, ' iterations')
                break
            
            # pick misclassified point at random
            misPointIdx = rd.choice(misPointIdxs)
            #print('Chosen misclassified point index ', misPointIdx)
            
            # update weights
            weights = np.add(weights, np.multiply(yn[misPointIdx], self.xn[misPointIdx]))
            #print('Weights ', weights)
            
        if not converged:
            raise Exception('Perceptron algorithm failed to converge after maximum number of iterations.')
        
        # estimate disagreement probability between f and g
        disagreementProb = self.disagreementProbability(1000, weights, targetWeights)
        
        return weights, numIterations, disagreementProb
    
    # private methods
    def misclassifiedPoints(self, weights, yn):
        misPointIdxs = []      
        for i in range(0, len(self.xn)):
            if np.sign(np.dot(weights, self.xn[i])) != yn[i]:
                misPointIdxs.append(i)
        #print('Misclassified point indexes', misPointIdxs)
        return misPointIdxs
    
    def disagreementProbability(self, npoints, weights, targetWeights):
        # generate random points
        estPoints = [RandomPoint(2, -1.0, 1.0) for i in range(0,npoints)]
        points = np.array([np.insert(estPoint.point, 0, 1.0) for estPoint in estPoints])
        
        # calculate probability
        disagreementProb = 0
        for point in points:
            g = np.sign(weights[0] * point[0] + weights[1] * point[1] + weights[2] * point[2])
            f = np.sign(targetWeights[0] * point[0] + targetWeights[1] * point[1] + targetWeights[2] * point[2])
            if g != f:
                disagreementProb += 1
                
        return float(disagreementProb / npoints)

In [13]:
def runPerceptron(initialWeights, trainingSet, numRuns=1, maxIterations=100):
    '''Run perceptron algorithm.
    Args: 
    numRuns (int): number of runs
    
    Returns:
    meanIterations (float): mean number of iterations for convergence
    '''
    meanIterations = 0
    meanDisagreementProb = 0.0
    for i in range(0, numRuns):
        pla = PLA(initialWeights, trainingSet)
        targetFunction = TargetFunction(2, -1.0, 1.0, True)
        weights, numIterations, disagreementProb = pla.run(targetFunction, maxIterations)
        #print('Final weights ', weights)
        meanIterations += numIterations
        meanDisagreementProb += disagreementProb
    
    meanIterations = meanIterations / numRuns
    meanDisagreementProb = meanDisagreementProb / numRuns
    
    return meanIterations, meanDisagreementProb

In [14]:
def runPerceptronWithLinearRegressionInitialWeights(trainingSet, numRuns=1, maxIterations=100):
    '''Run perceptron algorithm using linear regression for initial weights
    Args: 
    numRuns (int): number of runs
    
    Returns:
    meanIterations (float): mean number of iterations for convergence
    '''
    meanIterations = 0
    for i in range(0, numRuns):
        lr = LinearRegression(trainingSet, False) # run linear regression
        targetFunction = TargetFunction(2, -1.0, 1.0, True)
        initialWeights, yn = lr.run(targetFunction)
        pla = PLA(initialWeights, trainingSet)
        weights, numIterations, disagreementProb = pla.run(targetFunction, maxIterations)
        meanIterations += numIterations
    
    meanIterations = meanIterations / numRuns   
    return meanIterations

In [15]:
trainingSet = DataSet(N=10, dim=2, lower=-1.0, upper=1.0)
meanIterations = runPerceptronWithLinearRegressionInitialWeights(trainingSet, numRuns=1000, maxIterations=1000)
print('Average number of iterations = ', meanIterations)

Average number of iterations =  5.042


Nonlinear Transformation

In [16]:
def runLinearRegressionHypotheses(testHypotheses, trainingSet, fractNoise=0.0, line=True, nonLinearTransform=False, numRuns=1):
    '''Run linear regression algorithm comparing against hypotheses.
    Args:
    testHypotheses (dict(name, weights)) Test hypothesis
    trainingSet (DataSet): Training set.
    fracNoise (float): Fraction of noise to generate in the output (0.0 <= fracNoise <= 1.0)
    line (bool): True if requesting to draw a line or false to generate the curve x1**2 + x2**2 - 0.6
    numRuns (int): number of runs  
    Returns:
    dict(name, agreementProb): Name and agreement probability of the best hypothesis
    '''
    meanProb = 0.0
    hyps = []
    for i in range(0, numRuns):
        lr = LinearRegression(trainingSet, nonLinearTransform)
        targetFunction = TargetFunction(2, -1.0, 1.0, line)
        weights, yn = lr.run(targetFunction, fractNoise)
        hyp, prob = lr.Eout_closest(weights, testHypotheses, targetFunction, fractNoise, npoints=1000)
        hyps.append(hyp) # always same value
        meanProb += prob
    
    return hyps[0], meanProb / numRuns

In [17]:
testHypotheses = [
    np.array([-1.0, -0.05, 0.08, 0.13, 1.5, 1.5]),
    np.array([-1.0, -0.05, 0.08, 0.13, 1.5, 15.0]),
    np.array([-1.0, -0.05, 0.08, 0.13, 15.0, 1.5]),
    np.array([-1.0, -1.5, 0.08, 0.13, 0.05, 0.05]),
    np.array([-1.0, -0.05, 0.08, 1.5, 0.15, 0.15])
]

hyp, meanProb = (runLinearRegressionHypotheses(testHypotheses, DataSet(N=1000, dim=2, lower=-1.0, upper=1.0), line=False, 
                                               nonLinearTransform=True, fractNoise=0.1, numRuns=10))

print('The closes hypothesis is ', testHypotheses[hyp], 'with a mean probability of ', meanProb)

The closes hypothesis is  [-1.   -0.05  0.08  0.13  1.5   1.5 ] with a mean probability of  0.9642000000000002


In [18]:
mean_Ein, mean_Eout = (runLinearRegression(DataSet(N=1000, dim=2, lower=-1.0, upper=1.0), line=False, 
                                           nonLinearTransform=True, fractNoise=0.1, numRuns=1000))
print("Fraction of mislassified out-of-sample points Eout = ", mean_Eout)

Fraction of mislassified out-of-sample points Eout =  0.11723099999999965
