In [17]:
import numpy as np
import skimage.io as io
from time import time
import pandas as pd
import matplotlib.pyplot as plt
import cv2
%matplotlib notebook

In [None]:
class TestDetectors:
    
    # This method passes a small sequence of input images to a family detectors
    # if they need to be initialized to track a target object
    # Parameters:
    # (1) detectorFamily: a list of detector objects to test
    # (2) inputImgs or inputImgNames: 
    #       One of these must not be None
    #       inputImgs is input sequence of image data as a list
    #       inputImgNames is input sequence of image filenames as a list
    @staticmethod
    def prepareDetectors(detectorFamily, inputImgs=None, inputImgNames=None):
        if inputImgs is not None:
            TestDetectors._prepareDetectorsImgs(detectorFamily, inputImgs)
        else:
            TestDetectors._prepareDetectorsNames(detectorFamily, inputImgNames)
        
    @staticmethod
    def _prepareDetectorsImgs(detectorFamily, inputImgs):
        for detector in detectorFamily:
            for i, inputImg in enumerate(inputImgs):
                detector.prepare(i, inputImg)

    @staticmethod
    def _prepareDetectorsNames(detectorFamily, inputImgNames):
        for i, inputImgName in enumerate(inputImgNames):
            inputImg = io.imread(inputImgName)
            for detector in detectorFamily:
                detector.prepare(i, inputImg)
    
    # This method is one of two tests; assumes detector returns coordinate immediately
    # but blocks itself for some number of subsequent frames 
    # Parameters:
    # (1) detectorFamily: a list of detector objects to test
    # (2) fps: frames per second of input sequence
    # (3) inputGT: the ground truth coordinates of the tracked object (optional)
    # (4) inputImgs or inputImgNames: 
    #       One of these must not be None
    #       inputImgs is input sequence of image data as a list
    #       inputImgNames is input sequence of image filenames as a list
    @staticmethod
    def testDetectorsDelayForward(detectorFamily, fps, inputGT=None, inputImgs=None, inputImgNames=None):
        readImgLambda = lambda x: io.imread(x) if inputImgs is None else x
        imgList = inputImgs if inputImgNames is None else inputImgs
        T_s = 1/(1.*fps)
        
        # for each detector in detector family, store an array (to be modified)
        # where each element in the array indicates if the detector should be run
        # on the frame with corresponding index
        detectorThreadBlocked = {detector.name(): np.ones(len(imgList), dtype=np.int32) for detector in detectorFamily}
        
        # The return of the test
        resultsDf = pd.DataFrame(columns=[detector.name() for detector in detectorFamily] + ['GT'])
        
        # Kalman filters per detector
        kf = {detector.name(): KF() for detector in detectorFamily}

        # outer loop over images so that we load an image once and share between detectors
        for i, img in enumerate(imgList):
            img = readImgLambda(img)
            
            # results are the Kalman filter predictions
            resultDict = {detector.name(): 0 for detector in detectorFamily}
            resultDict['GT'] = inputGT[i] if inputGT is not None else None
            
            # inner loop over detectors in family
            for detector in detectorFamily:
                # The object detector observation
                y = None
                
                # The Kalman filter corrected estimate
                x_k_k = None
                
                # The Kalman filter prediction
                x_kplus_k = None
                
                # In forward delay, let the detector find the newest object location,
                # block the detector from being called for a number of frames equal
                # to the ceiling of the fraction of detector runtime to T_s=1/fps
                if detectorThreadBlocked[detector.name()][i] == 1:
                    start = time()
                    y = detector.detect(i, inputImg)
                    runtime = time()-start()
                    delayedFrames = int(np.ceil(runtime/T_s))
                    detectorThreadBlocked[detector.name()][i:i+delayedFrames] = 0
                
                # We can only correct if 'y' has valid data,
                # and more importantly, we can only correct if the detector was not blocked
                if y is not None:
                    x_k_k = kf[detector.name()].correct(i, y)
                
                # We can always predict; This is the 'filtering' in Kalman filtering
                x_kplus_k = kf[detector.name()].predict(i)
                
                # Save the result for this detector
                resultDict[detector.name()] = x_kplus_k
                
            resultsDf.loc[i+1] = resultDict
        
        return resultsDf
                 
    # This method is one of two tests; assumes detector returns coordinate after some
    # number of frames. This creates an information delay
    # Parameters:
    # (1) detectorFamily: a list of detector objects to test
    # (2) fps: frames per second of input sequence
    # (3) inputGT: the ground truth coordinates of the tracked object (optional)
    # (4) inputImgs or inputImgNames: 
    #       One of these must not be None
    #       inputImgs is input sequence of image data as a list
    #       inputImgNames is input sequence of image filenames as a list
    @staticmethod
    def testDetectorsDelayBackward(detectorFamily, fps, inputGT, inputImgs=None, inputImgNames=None):
        readImgLambda = lambda x: io.imread(x) if inputImgs is None else x
        imgList = inputImgs if inputImgNames is None else inputImgs
        T_s = 1/(1.*fps)
        
        # for each detector in detector family, store an array (to be modified)
        # where each element in the array indicates if the detector should be run
        # on the frame with corresponding index
        detectorThreadBlocked = {detector.name(): np.ones(len(imgList), dtype=np.int32) for detector in detectorFamily}
        detectorResult = {detector.name(): -1 for detector in detectorFamily}

        # The return of the test
        resultsDf = pd.DataFrame(columns=[detector.name() for detector in detectorFamily] + ['GT'])
        
        # Kalman filters per detector
        kf = {detector.name(): KF() for detector in detectorFamily}

        # outer loop over images so that we load an image once and share between detectors
        for i, img in enumerate(imgList):
            img = readImgLambda(img)
            
            # results are the Kalman filter predictions
            resultDict = {detector.name(): 0 for detector in detectorFamily}
            resultDict['GT'] = inputGT[i] if inputGT is not None else None
            
            # inner loop over detectors in family
            for detector in detectorFamily:
                # The object detector observation, which may or may not have arrived
                y = detectorResult[detector.name()] if detectorThreadBlocked[detector.name()][i] == 1 else None
                
                # The Kalman filter corrected estimate
                x_k_k = None
                
                # The Kalman filter prediction
                x_kplus_k = None
                
                # We can only correct if detector result has arrived
                if y is not None:
                    x_k_k = kf[detector.name()].correct(i, y)
                
                # We can always predict; This is the 'filtering' in Kalman filtering
                x_kplus_k = kf[detector.name()].predict(i)
                
                # Save the result for this detector
                resultDict[detector.name()] = x_kplus_k
                
                # In backward delay, let the detector find the newest object location,
                # block the detector from being called for a number of frames equal
                # to the ceiling of the fraction of detector runtime to T_s=1/fps,
                # and show the Kalman filter the result only after those frames have passed
                if detectorThreadBlocked[detector.name()][i] == 1:
                    start = time()
                    detectorResult[detector.name()] = detector.detect(i, inputImg)
                    runtime = time()-start()
                    delayedFrames = int(np.ceil(runtime/T_s))
                    detectorThreadBlocked[detector.name()][i:i+delayedFrames] = 0
                
            resultsDf.loc[i+1] = resultDict
        
        return resultsDf

In [None]:
class KF:
    def __init__(self):
        self.Transition_Matrix=[[1,0,1,0],[0,1,0,1],[0,0,1,0],[0,0,0,1]]
        self.Observation_Matrix=[[1,0,0,0],[0,1,0,0]]

        xinit=0
        yinit=0
        vxinit=0
        vyinit=0
        self.initstate=[xinit,yinit,vxinit,vyinit]
        self.initcovariance=1.0e-3*np.eye(4)
        self.transistionCov=1.0e-4*np.eye(4)
        self.observationCov=1.0e-1*np.eye(2)
        
        self.recentstate = self.initstate
        self.recentcovariance = self.initcovariance
        
        self.kf=KalmanFilter(transition_matrices=self.Transition_Matrix,
                observation_matrices=self.Observation_Matrix,
                initial_state_mean=self.initstate,
                initial_state_covariance=self.initcovariance,
                transition_covariance=self.transistionCov,
                observation_covariance=self.observationCov)
    
    def predict(self):
        self.recentstate, self.recentcovariance = self.kf.filter_update(self.recentstate, self.recentcovariance)
        return self.recentstate
    
    def correct(self, y):
        self.recentstate, self.recentcovariance = self.kf.filter_update(self.recentstate, self.recentcovariance, y)
        return self.recentstate

In [None]:
class Detector:
    def __init__(self):
        return
    
    def prepare(self, i, img):
        return
    
    def detect(self, i, img):
        return
    
    def name(self):
        return 'BaseClass'

In [None]:
class MOGDetector(Detector):
    def __init__(self, d):    
        history = 10
        bgThresh = 0.6
        shadows = True
        self.bgs = cv2.createBackgroundSubtractorMOG2(history,bgThresh,shadows)
        self.d = d
        
        self.detectorName = 'mog_{}'.format(d)
        
    def prepare(self, i, img):
        dsize = img.shape[1]/self.d, img.shape[0]/self.d
        imgResized = cv2.resize(img, dsize=dsize)
        self.bgs.apply(img, learningRate=-1)
        
    def detect(self, i, img):
        dsize = img.shape[1]/self.d, img.shape[0]/self.d
        imgResized = cv2.resize(img, dsize=dsize) 
        foremat=self.bgs.apply(imgResized, learningRate=1/(1.*i))
        ret,thresh = cv2.threshold(foremat,127,255,0)
        contours, hierarchy = cv2.findContours(thresh,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE)
        m = None
        if len(contours) > 0:
            m = np.mean(contours[0],axis=0)
        return m
            
    def name(self):
        return self.detectorName

In [None]:
def testTennisMog(start, end, fps, D):
    tennisGT = pd.read_csv('')
    tennisImgNames = ['{}'.format(i) for i in range(start, end)]
    mogDetectorFamily = [MOGDetector(d) for d in range(D)]
    forwardDf = TestDetectors.testDetectorsDelayForward(mogDetectorFamily, 
                                                        fps, 
                                                        tennisGT, 
                                                        inputImgNames=tennisImgNames)
    backwardDf = TestDetectors.testDetectorsDelayBackward(mogDetectorFamily, 
                                                          fps, 
                                                          tennisGT, 
                                                          inputImgNames=tennisImgNames)
    return forwardDf, backwardDf

In [None]:
def runScript():
    import cv2
    import numpy as np
    import matplotlib.pyplot as plt
    
    filename = "singleball.mov"
    capture = cv2.VideoCapture(filename)
    print "\t Width: ",capture.get(cv2.cv.CV_CAP_PROP_FRAME_WIDTH)
    print "\t Height: ",capture.get(cv2.cv.CV_CAP_PROP_FRAME_HEIGHT)
    print "\t FourCC: ",capture.get(cv2.cv.CV_CAP_PROP_FOURCC)
    print "\t Framerate: ",capture.get(cv2.cv.CV_CAP_PROP_FPS)
    numframes=capture.get(7)
    print "\t Number of Frames: ",numframes
    
    count=0
    history = 10
    nGauss = 3
    bgThresh = 0.6
    noise = 20
    bgs = cv2.BackgroundSubtractorMOG(history,nGauss,bgThresh,noise)
    
    plt.figure()
    plt.hold(True)
    plt.axis([0,480,360,0])
    
    measuredTrack=np.zeros((numframes,2))-1
    while count<numframes:
        count+=1
        img2 = capture.read()[1]
        cv2.imshow("Video",img2)
        foremat=bgs.apply(img2)
        cv2.waitKey(100)
        foremat=bgs.apply(img2)
        ret,thresh = cv2.threshold(foremat,127,255,0)
        contours, hierarchy = cv2.findContours(thresh,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE)
        if len(contours) > 0:
            m= np.mean(contours[0],axis=0)
            measuredTrack[count-1,:]=m[0]
            plt.plot(m[0,0],m[0,1],'ob')
        cv2.imshow('Foreground',foremat)
        cv2.waitKey(80)
    capture.release()
    print measuredTrack
    np.save("ballTrajectory", measuredTrack)
    plt.show()
    
def runKalmanFilter():
    import numpy as np
    from pykalman import KalmanFilter
    from matplotlib import pyplot as plt
    
    Measured=np.load("ballTrajectory.npy")
    while True:
        if Measured[0,0]==-1.:
            Measured=np.delete(Measured,0,0)
        else:
            break
    numMeas=Measured.shape[0]
    
    MarkedMeasure=np.ma.masked_less(Measured,0)
    
    Transition_Matrix=[[1,0,1,0],[0,1,0,1],[0,0,1,0],[0,0,0,1]]
    Observation_Matrix=[[1,0,0,0],[0,1,0,0]]
    
    xinit=MarkedMeasure[0,0]
    yinit=MarkedMeasure[0,1]
    vxinit=MarkedMeasure[1,0]-MarkedMeasure[0,0]
    vyinit=MarkedMeasure[1,1]-MarkedMeasure[0,1]
    initstate=[xinit,yinit,vxinit,vyinit]
    initcovariance=1.0e-3*np.eye(4)
    transistionCov=1.0e-4*np.eye(4)
    observationCov=1.0e-1*np.eye(2)
    kf=KalmanFilter(transition_matrices=Transition_Matrix,
                observation_matrices =Observation_Matrix,
                initial_state_mean=initstate,
                initial_state_covariance=initcovariance,
                transition_covariance=transistionCov,
                observation_covariance=observationCov)
    
    (filtered_state_means, filtered_state_covariances) = kf.filter(MarkedMeasure)
    plt.plot(MarkedMeasure[:,0],MarkedMeasure[:,1],'xr',label='measured')
    plt.axis([0,520,360,0])
        plt.hold(True)
    plt.plot(filtered_state_means[:,0],filtered_state_means[:,1],'ob',label='kalman output')
    plt.legend(loc=2)
    plt.title("Constant Velocity Kalman Filter")
    plt.show()