# Implementation of SMO algorithm from scratch

In [1]:

# File Name: SMOfromScratch.py
# Last Updated: Feb 27, 2018
# Purpose:
# Implementation of SMO algorithm.

import numpy as np
import matplotlib.pyplot as plt

class SVM :
    def __init__ ( self, x, y, C = 3.0 ) :
        self.C = C
        self.x = x
        self.y = y
        # initialize ohter parameters
        self.numExamples = x.shape [ 0 ]
        self.b = 0.0
        self.alphas = np.zeros ( x.shape [ 0 ] )
        self.w = ( self.x.T * y * self.alphas ).sum ( axis = 1 )
        self.tol = 1e-5
        self.isAlphaZero = self.alphas == 0.
        self.isAlphaC = self.alphas == self.C
        self.isAlphaBetween = np.logical_not ( self.isAlphaZero + self.isAlphaC )
        self.isAlphaNonZero = self.isAlphaC + self.isAlphaBetween
        self.errors = np.dot ( self.w, self.x.T ) - self.b - self.y
        self.doesViolateKKT = self.violateKKT ( np.arange ( self.numExamples ) )

    def fit ( self ) :
        # main function for lerning.

        # How many update had been conducted for the last while loop.
        howManyUpdated = 0
        self.howManyWhileLoop = 0
        examineAll = True
        while ( ( howManyUpdated > 0 ) or ( examineAll ) ) :
            ### print "[ fit ] Entered while loop"
            howManyUpdated = 0
            if ( examineAll ) :
                ### print "[ fit ] Examine all"
                # iterate over the entire training examples, searching
                # for an alpha, which violates the KKT.
                # The indexes for the examples that violate the KKT.
                kktViolaterIdxs = \
                  np.arange ( self.numExamples ) [ self.doesViolateKKT ]
                for firstAlphaIdx in kktViolaterIdxs :
                    ### print "[ fit ] Entered for loop"
                    if self.firstHierarchyUpdate ( firstAlphaIdx ) :
                        # the first hierarchy searches a second example
                        # from the nonBound examples, that is, the examples
                        # that have 0 < alpha < C. The
                        # example that brings the maximum difference of
                        # E1 and E2.
                        howManyUpdated += 1
                        continue
                    if self.secondHierarchyUpdate ( firstAlphaIdx ) :
                        # the second example takes a nonBound example at
                        # random and see wheter the joint update succeeds
                        # or not. When it fails, then choose anohter one
                        # from the nonBound set.
                        howManyUpdated += 1
                        continue
                    if self.thirdHierarchyUpdate ( firstAlphaIdx ) :
                        # the third hierarchy is carried out over the
                        # bound examples. Bound examples are examples
                        # that have alpha = C.
                        howManyUpdated += 1
                        continue
                    if self.fourthHierarchyUpdate ( firstAlphaIdx ) :
                        # the last hierarchy is carried out over the non-
                        # support examples. They have alpha = 0.
                        howManyUpdated += 1
                        continue

            else :
                ### print "[ fit ] Examine nonbounds"
                marginKKTViolaterIdxs = \
                  np.arange ( self.numExamples ) [ np.logical_and \
                   ( self.isAlphaBetween, self.doesViolateKKT ) ]
                # iterate over the examples that have margin alphas.
                for firstAlphaIdx in marginKKTViolaterIdxs :
                    ### print "[ fit ] Entered for loop"
                    if self.firstHierarchyUpdate ( firstAlphaIdx ) :
                        howManyUpdated += 1
                        continue
                    if self.secondHierarchyUpdate ( firstAlphaIdx ) :
                        howManyUpdated += 1
                        continue
                    if self.thirdHierarchyUpdate ( firstAlphaIdx ) :
                        howManyUpdated += 1
                        continue
                    if self.fourthHierarchyUpdate ( firstAlphaIdx ) :
                        howManyUpdated += 1
                        continue


            if ( examineAll ) :
                examineAll = False
            elif ( howManyUpdated == 0 ) :
                examineAll = True
            self.howManyWhileLoop += 1

        print "Larning completed.\n"

    def update ( self, firstAlphaIdx, secondAlphaIdx ) :
        ### print "[ update ] Start"
        # returns 1 if the update suceeds, ohterwise 0.
        if ( firstAlphaIdx == secondAlphaIdx ) :
            ### print "[ update ] End. firstAlphaIdx == secondAlphaIdx"
            return 0

        self.oldAlpha1 = self.alphas [ firstAlphaIdx ]
        self.oldAlpha2 = self.alphas [ secondAlphaIdx ]
        x1 = self.x [ firstAlphaIdx ]
        x2 = self.x [ secondAlphaIdx ]
        y1 = self.y [ firstAlphaIdx ]
        y2 = self.y [ secondAlphaIdx ]
        s = y1 * y2
        E1 = self.errors [ firstAlphaIdx ]
        E2 = self.errors [ secondAlphaIdx ]

        L2, H2 = self.calculateL2H2 ( y1, y2  )
        if ( np.abs ( H2 - L2 ) < 1e-5 ) :
            ### print "[ update ] L2 = H2 End"
            return 0
        k11 = np.dot ( x1, x1 )
        k22 = np.dot ( x2, x2 )
        k12 = np.dot ( x1, x2 )
        eta = ( k12 - k11 ) + ( k12 - k22 )
        ### print "[ udate ] eta = %f" % eta
        # when eta is psitive, we cannot maximize W by making its first
        # derivative with respect to alpha2 to be zero. Howerver this does
        # not occur when our kernel is a Mearcer kernel.
        if ( np.abs ( eta ) < 1e-5 ) :
            # eta becomes zero when x1 and x2 share the same features
            ### print "[ update ] eta < 1e-3"
            return 0
        self.newAlpha2 = self.calculateNewAlpha2 ( E1, E2, eta, L2, H2, y2 )
        if  np.abs ( self.newAlpha2 - self.oldAlpha2 ) < 1e-5 :
            # when there is no change in alpha2, the update fails
            ### print "[ update ] newAlpha2 - oldAlpha2 < 1e-5"
            return 0
        self.newAlpha1 = self.calculateNewAlpha1 ( s )

        # update self.alphas
        self.alphas [ firstAlphaIdx ] = self.newAlpha1
        self.alphas [ secondAlphaIdx ] = self.newAlpha2
        # update isAlpha series booleans.
        self.updateAlphaBoolean ( firstAlphaIdx, secondAlphaIdx )
        # update w, b, and erors
        self.w = self.calculateW ( )
        self.b = self.calculateB ( E1, E2, k11, k22, k12,
                              firstAlphaIdx, secondAlphaIdx, y1, y2 )
        self.errors = self.calculateErrors ()
        self.doesViolateKKT = self.violateKKT ( np.arange ( self.numExamples ) )

        # update suceeds
        ### print "[ update ] End. Update succeeded"
        return 1

    def calculateErrors ( self ) :
        ### print "[ calculateErrors ] Start"
        svmouts = np.dot ( self.w, self.x.T ) - self.b
        errors = svmouts - self.y
        return errors

    def calculateB ( self, E1, E2, k11, k22, k12, firstAlphaIdx,
                     secondAlphaIdx, y1, y2 ) :
        ### print "[ calculateB ] Start"
        # b1 and b2 are bs that make the functional margins of the first
        # example and the second example exactly 1.
        #     When both the first and second examples become the margin
        # examples or nonbound examples after the update, the calculated v
        # alues of b1 and b2 become the same.
        #     When one exxample becomes a nonbound example and the other
        # is bound, we take b of the nonbound one.
        #     When both the examples becom nonbond after the update, we take
        # the average of b1 and b2.
        oldb = self.b
        ### print "[ calculateB ] oldb = %f" % oldb
        ### print "[ calculateB ] E1 = %f, E2 = %f, y1 = %f, y2 = %f" % \
                ### ( E1, E2, y1, y2 )
        ### print "[ calculateB ] k11 = %f, k22 = %f, k12 = %f" % ( k11, k22, k12 )
        b1 = E1 + oldb + y1 * ( self.newAlpha1 - self.oldAlpha1 ) * k11 + \
                       + y2 * ( self.newAlpha2 - self.oldAlpha2 ) * k12
        b2 = E2 + oldb + y1 * ( self.newAlpha1 - self.oldAlpha1 ) * k12 + \
                       + y2 * ( self.newAlpha2 - self.oldAlpha2 ) * k22
        ### print "[ calculateB ] b1 = %f, b2 = %f" % ( b1, b2 )
        if self.isAlphaBetween [ firstAlphaIdx ] and \
           self.isAlphaBetween [ secondAlphaIdx ] :
            newb = b1 # or b2. both is ok.
            ### print "[ calculateB ] take b1 or b2"
            ### x1 = self.x [ firstAlphaIdx, : ]
            ### x2 = self.x [ secondAlphaIdx, : ]
            ### print "[ calculateB ] FM1 = %f" % ( ( self.w.dot ( x1 ) - newb ) * y1 )
            ### print "[ calculateB ] FM2 = %f" % ( ( self.w.dot ( x2 ) - newb ) * y2 )
        elif self.isAlphaBetween [ firstAlphaIdx ] :
            ### print "[ calculateB ] take b1"
            newb = b1
            ### x1 = self.x [ firstAlphaIdx, : ]
            ### print "[ calculateB ] FM1 = %f" % ( ( self.w.dot ( x1 ) - newb ) * y1 )
        elif self.isAlphaBetween [ secondAlphaIdx ] :
            ### print "[ calculateB ] take b2"
            newb = b2
            ### x2 = self.x [ secondAlphaIdx, : ]
            ### print "[ calculateB ] FM2 = %f" % ( ( self.w.dot ( x2 ) - newb ) * y2 )

        else :
            newb = ( b1 + b2 ) / 2


        return newb

    def calculateW ( self ) :
        ### print "[ calculateW ] Start"
        # w can be calculated only with non-zero alphas.
        betweenAlphaIdxs = np.arange ( self.numExamples )\
                           [ self.isAlphaBetween + self.isAlphaC ]
        nonBoundX = self.x [ betweenAlphaIdxs ]
        nonBoundY = self.y [ betweenAlphaIdxs ]
        nonBoundAlphas = self.alphas [ betweenAlphaIdxs ]
        w = ( nonBoundX .T * nonBoundY * nonBoundAlphas ).sum ( axis = 1 )
        # w = ( self.x.T * self.y * self.alphas ).sum ( axis = 1 )
        return w



    def updateAlphaBoolean ( self, firstAlphaIdx, secondAlphaIdx ) :
        ### print "[ updateAlphaBoolean ] Start"

        self.isAlphaZero [ firstAlphaIdx ] = False
        self.isAlphaC [ firstAlphaIdx ] = False
        self.isAlphaBetween [ firstAlphaIdx ] = False
        if self.newAlpha1 == 0. :
            self.isAlphaZero [ firstAlphaIdx ] = True
        elif self.newAlpha1 == self.C  :
            self.isAlphaC [ firstAlphaIdx ] = True
        else :
            self.isAlphaBetween [ firstAlphaIdx ] = True

        self.isAlphaZero [ secondAlphaIdx ] = False
        self.isAlphaC [ secondAlphaIdx ] = False
        self.isAlphaBetween [ secondAlphaIdx ] = False
        if self.newAlpha2 == 0. :
            self.isAlphaZero [ secondAlphaIdx ] = True
        elif self.newAlpha2 == self.C  :
            self.isAlphaC [ secondAlphaIdx ] = True
        else :
            self.isAlphaBetween [ secondAlphaIdx ] = True

    def calculateNewAlpha1 ( self, s ) :
        ### print "[ calculateNewAlpha1 ] Start"
        newAlpha1 = self.oldAlpha1 + s * ( self.oldAlpha2 - self.newAlpha2 )
        # newAlpha1 may become -0.0000. We have to fix it.
        if newAlpha1 <= 0 :
            newAlpha1 = 0.
        ### print "[ calculateNewAlpha1 ] newAlpha1 = %f" % newAlpha1
        return newAlpha1

    def calculateNewAlpha2 ( self, E1, E2, eta, L2, H2, y2 ) :
        ### print "[ calculateNewAlpha2 ] Start"
        newAlpha2 = self.oldAlpha2 - y2 * ( E1 - E2 ) / eta
        if ( newAlpha2 < L2 ) :
            newAlpha2 = L2
        if ( newAlpha2 > H2 ) :
            newAlpha2 = H2
        ### print "[ calculateNewAlpha2 ] newAlpha2 = %f" % newAlpha2
        return newAlpha2

    def calculateL2H2 ( self, y1, y2 ) :
        ### print "[ calculateL2H2 ] Start"
        # calculate the lower and uppor bound of the new alpha2
        ### print "[ calculateL2H2 ] oldAlpha1 = %f, oldAlpha2 = %f" \
              ### % ( self.oldAlpha1, self.oldAlpha2 )
        if ( y1 != y2 ) :
            # L2 = np.max ( [ 0., self.oldAlpha2 - self.oldAlpha1 ] )
            # H2 = np.min ( [ self.C, self.C + self.oldAlpha2 - self.oldAlpha1 ] )
            if 0. >= self.oldAlpha2 - self.oldAlpha1 :
                L2 = 0
            else :
                L2 = self.oldAlpha2 - self.oldAlpha1
            if self.C >= self.C + self.oldAlpha2 - self.oldAlpha1 :
                H2 = self.C + self.oldAlpha2 - self.oldAlpha1
            else :
                H2 = self.C

        else :
            # L2 = np.max ( [ 0., self.oldAlpha1 + self.oldAlpha2 - self.C ] )
            # H2 = np.min ( [ self.C, self.oldAlpha1 + self.oldAlpha2 ] )
            if 0. >= self.oldAlpha1 + self.oldAlpha2 - self.C :
                L2 = 0.
            else :
                L2 = self.oldAlpha1 + self.oldAlpha2 - self.C
            if self.C >= self.oldAlpha1 + self.oldAlpha2 :
                H2 = self.oldAlpha1 + self.oldAlpha2
            else :
                H2 = self.C
        ### print "[ calculateL2H2 ] L2 = %f, H2 = %f" % ( L2, H2 )
        ### print "[ calculateL2H2 ] End"
        return L2, H2

    def violateKKT ( self, idx ) :
        ### print "[ violateKKT ] Start"
        err = self.errors [ idx ]
        y = self.y [ idx ]
        alpha = self.alphas [ idx ]
        tol = 1e-5
        b1 = np.logical_and ( y * err < - tol, alpha < self.C )
        b2 = np.logical_and ( y * err > tol, alpha > 0 )
        # b1 =  (  y * err  < - tol ) and ( alpha < self.C )
        # b2 = ( y * err > tol ) and ( alpha > 0 )
        return np.logical_or ( b1, b2 )


    def firstHierarchyUpdate ( self, firstAlphaIdx ) :
        ### print "[ firstHierarchyUpdate ] Start"
        # find a second alpha for joint update, and when the update
        # succeeds this fundtion returns 1, otherwise 0. When this
        # first hierarchy update fails, then we proceed to the second
        # hierarchy.

        if self.isAlphaBetween.sum () == 0 :
            # if there is no unbound exmaple, we can not conduct first
            # hierarchy update nor second hierarchy update. Cheak whether
            # there is at least one unbound example.
            return 0

        # search for a second example for joint optimization from
        # the unbound examples. When
        # the error of the first example is positive, we want the
        # second example to have a negative erorr. When the first
        # error is negative, the second one must be positive.
        E1 = self.errors [ firstAlphaIdx ]
        if ( E1 > 0 ) :
            ### print "[ firstHierarchyUpdate ] E1 > 0"
            # we have to apply a filter on errors array so that it only
            # contains the erorrs of the unbound examples.
            if self.errors [ self.isAlphaBetween ].min () > 0 :
                # no second example for joint update was found, and thus
                # the update fails.
                return 0
            else :
                secondAlphaIdx = \
                 np.argmin ( self.errors [ self.isAlphaBetween ] )
                # There are three patterns that the update fundtion
                # fails. Fist, when the first and the second example
                # are the same, that is their indexes are the same.
                # Second, when the lower bound L2 equals H2. And third,
                # when the difference between the old alpha2 and the
                # new one is less than tolerance. The update function
                # returns 1 when the update succeeds, otherwise 0.
                return self.update ( firstAlphaIdx, secondAlphaIdx )

        elif ( E1 < 0 ) :
            ### print "[ firstHierarchyUpdae ] E1 < 0"
            if self.errors [ self.isAlphaBetween ].max () < 0 :
                return 0
            else :
                secondAlphaIdx = \
                 np.argmax ( self.errors [ self.isAlphaBetween ] )
                return self.update ( firstAlphaIdx, secondAlphaIdx )
        return 0

    def secondHierarchyUpdate ( self, firstAlphaIdx ) :
        ### print "Entered: secondHierarchyUpdate()"
        if self.isAlphaBetween.sum () == 0 :
            return 0
        E1 = self.errors [ firstAlphaIdx ]
        # the second example is choosen at random from the nonbound
        # examples.
        shuffledBetweenAlphaIdxs = \
          np.arange ( self.numExamples ) [ self.isAlphaBetween ]
        np.random.shuffle ( shuffledBetweenAlphaIdxs )
        for secondAlphaIdx in shuffledBetweenAlphaIdxs :
            # iterate over the nonbound examples until the update succeeds.
            if self.update ( firstAlphaIdx, secondAlphaIdx ) :
                return 1
        return 0

    def thirdHierarchyUpdate ( self, firstAlphaIdx ) :
        ### print "Entered: thirdHierarchyUpdate()"
        # the next iteration is carried out over the bound xamples.
        if self.isAlphaC.sum () == 0 :
            return 0
        shuffledCAlphaIdxs = \
          np.arange ( self.numExamples ) [ self.isAlphaC ]
        np.random.shuffle ( shuffledCAlphaIdxs )
        for secondAlphaIdx in shuffledCAlphaIdxs  :
            if self.update ( firstAlphaIdx, secondAlphaIdx ) :
                return 1
        return 0

    def fourthHierarchyUpdate ( self, firstAlphaIdx ) :
        ### print "[ fourthHierarchyUpdate ] Start"
        # the fourth hierarchy iterates over the examples whose alphas
        # are zero.
        if self.isAlphaZero.sum () == 0 :
            ### print "[ fourthHierarchyUpdae ] End"
            return 0
        shuffledZeroAlphaIdxs = \
          np.arange ( self.numExamples ) [ self.isAlphaZero ]
        np.random.shuffle ( shuffledZeroAlphaIdxs )
        for secondAlphaIdx in shuffledZeroAlphaIdxs :
            ### print "[ fourthHierarchyUpdate ] Entered for loop"
            if self.update ( firstAlphaIdx, secondAlphaIdx ) :
                ### print "[ foruthHierarchyUpdae ] End"
                return 1
        ### print "[ fourthHierarchyUpdata ] End"
        return 0

    def plot ( self ) :
        plt.close ()
        fig1 = plt.figure ( figsize = ( 9, 7 ), facecolor = 'w',
                            edgecolor = 'k' )
        ax1 = fig1.add_subplot ( 111 )

        ## plot the training examples
        group1 = ( self.x [ :, 0 ][ self.y == -1 ], self.x [ :, 1 ][ self.y == -1 ] )
        group2 = ( self.x [ :, 0 ][ self.y == 1 ], self.x [ :, 1 ][ self.y == 1 ] )
        ax1.scatter ( group1 [ 0 ], group1 [ 1 ], label = 'Calss-1', c = 'blue' )
        ax1.scatter ( group2 [ 0 ], group2 [ 1 ], label = 'Class+1', c = 'red' )


        ## plot the hypterplane
        t = np.array ( [ 0., 3. ] )
        slope = - self. w [ 0 ] / self.w [ 1 ]
        intercept =  self.b / self.w [ 1 ]
        ax1.plot ( t, slope * t + intercept, 'k-' )
        plt.show ()

def example_generator ( num ) :
    ### print "Entered: exammple_generator()"
    # We use np.random.multivariate_normal to
    # generate n-dimentional gaussian distribution. In this example, we
    # use two-dimentional data as a training data. So 'mean' is a tow-
    # dimentional vector that specifis the coordinate of the center point
    # of the distibution. 'cvmat' is a covariance matrix. In this example,
    # we only use its diagonal elements to specify the variance of the
    # data for each axis.

    # the number of data for each class.
    num_data = num
    x1_mean = np.array ( [ 1.0, 1.0 ] )
    x2_mean = np.array ( [ 3.0, 4.0 ] )
    x1_cvmat = np.array ( [ [ 0.9, 0.0 ],
                            [ 0.0, 0.9 ] ] )
    x2_cvmat = np.array ( [ [ 0.8, 0.0 ],
                            [ 0.0, 0.8 ] ] )
    y1 = np.ones ( num_data ) * ( + 1.0 )
    y2 = np.ones ( num_data ) * ( - 1.0 )
    # the data for the Class+1
    x1_data = np.random.multivariate_normal ( x1_mean, x1_cvmat, num_data )
    # the data for the Class-1
    x2_data = np.random.multivariate_normal ( x2_mean, x2_cvmat, num_data )
    x = np.concatenate ( ( x1_data, x2_data ), axis = 0 ) # ( num_data * 2, 2 )
    y = np.concatenate ( ( y1, y2 ), axis = 0 ) # ( num_data * 2, 1 )
    return x, y
