In [None]:
# Python Standard libraries
import time
from collections import deque
import copy
import warnings
import csv
# Image processing and numerical calculations
import cv2
import numpy as np

# Generic set up and selection of detector, descriptor and matcher objects
DETECTOR_TYPES = {1: "FAST", 2: "GridFAST", 3: "BRISK"}
DESCRIPTOR_TYPES = {1: "FREAK", 2: "BRISK"}
MATCHER_TYPES = {1: "BRUTEFORCE"}
DETECTOR = DETECTOR_TYPES[2]
DESCRIPTOR = DESCRIPTOR_TYPES[1]
MATCHER = MATCHER_TYPES[1]
MTCH_METHODS = [cv2.NORM_L1, cv2.NORM_L2, cv2.NORM_HAMMING]
MTCH_METHOD = MTCH_METHODS[2] # HAMMING for binary descriptors
MATCH_QTY = 1500 # number keypoints desired per image


In [None]:
# Parameters for Lucas-Kanade optical flow
LK_PARAMS = dict(winSize=(10, 10),
maxLevel=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT,10, 0.03))
# This parameter adjusts resilience of backwards calculated points from LK
# optical flow. The higher the number, the more a point will persist from
# lucas kanade. Basically, this means points stick around on objects with
# large motions or have a higher chance to reattach to different pixels.
# The downside is the higher this number is, the more likely flow vectors
# will be incorrect under large motions. Better to keep it low (20 max) and
# introduce new points rather than retain old, often incorrect ones.
LK_BACKCALC_PIXEL_DEVIATION = 25

# Number of (x,y) points retained for keypoint track history
MAX_TRACK_LENGTH = 5

In [None]:
class Image(object):
    """
    Base container class used in the image processing library
    """
    def __init__(self, frame, frame_time=None):
        height = frame.shape[0]
        width = frame.shape[1]
        size = (width, height)
        self.size = size
        if frame_time is None:
            self.frame_time = time.time()
        else:
            self.frame_time = frame_time
            self.clr_img = frame
            self.gry_img = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        # Lazily set attributes. Test for existence before use
        self.kp = []
        self.desc = []
        self.tracks = []
    
    def copy(self):
        return copy.deepcopy(self)
    
    def get_color(self):
        return self.clr_img
    
    def get_grey(self):
        return self.gry_img
    
    def get_time(self):
        return self.frame_time
    
    def get_size(self):
        return self.size

    def set_details(self, kp, desc):
        self.kp = kp
        self.desc = desc

    def set_flow_tracks(self, tracks):
        self.tracks = tracks

    def get_kp(self):
        return self.kp

    def get_desc(self):
        return self.desc

    def get_flow_tracks(self):
        return self.tracks

In [None]:
class ImageCompare(object):
    def __init__(self):
        """
        Enacts a number of comparison techniques to track key points between
        frames from a video source or individual pictures.
        Automatically tracks time and retains a history of previous image data.
        Class loads ImageData instances with single instance of OpenCV
        descriptor and detector classes.
        """
        # Data attributes storing frame data
        self.frame_prev = None
        self.frame_curr = None
        # Data attributes storing comparison data
        self.matches_curr = None

        # creates detector descriptor and matcher
        self.detector = cv2.FeatureDetector_create(DETECTOR)

        if "grid" in DETECTOR.lower():
            grid_adapted = cv2.GridAdaptedFeatureDetector
            self.detector = grid_adapted(self.detector, MATCH_QTY/3, 6, 6)
            self.descriptor = cv2.DescriptorExtractor_create(DESCRIPTOR)

        if MATCHER == MATCHER_TYPES[1]: # BruteForce
            self.matcher = cv2.BFMatcher(MTCH_METHOD, crossCheck=False)

        # Data attributes storing time tracking
        self.t0 = None
        self.t_prev = None
        self.t_curr = None
        self.dt = None

        # previous points and tracked points (list of dequoues)
        self.points = []
        self.tracks = []

        # Data attributes storing time tracking
        self.t0 = None
        self.t_prev = None
        self.t_curr = None
        self.dt = None

        # previous points and tracked points (list of dequoues)
        self.points = []
        self.tracks = []
    
    
    def compare(self, image):
        """
        Action: Compares two images using selected method
        Input: Image instance
        Output: (Current keypoints, Previous Keypoints)
        """
        
        # Advance time tracking with new image time
        frame_time = image.get_time()
        self.__track_time(frame_time)
        
        # Advance state machine image storage
        self.frame_prev = self.frame_curr
        self.frame_curr = image
        
        # Calculate optical flow for keypoints. Determine new from old position
        new_points, old_points = self.__optical_flow(tracking=True)
        
        return new_points, old_points
    
    def get_dt(self):
        """
        Action: Returns delta t between two frames
        Input: None
        Output: time difference between current and previous frame
        """
        return self.dt
    
    def __detector_descriptor(self, image, mask=None):
        """
        Action: Add key points and key point descriptors to Image instance
        Input: Image object being tested
        optional mask of areas to test (same shape as frame)
        Output: Updated image object with keypoints and descriptors
        """
        
        # Get grey scale frame and create mask of ones if nothing passed in.
        frame_grey = image.get_grey()
        mask = np.ones_like(frame_grey) if mask is None else mask
        
        # Detect keypoints and time function call
        TIMER.set()
        kp = self.detector.detect(frame_grey, mask=mask)
        TIMER.delta("Detector")
        
        # Create descriptors for keypoints and time function call
        TIMER.set()
        (kp, desc) = self.descriptor.compute(frame_grey, kp)
        TIMER.delta("Descriptor")
        
        # Lazily set keypoint and descriptors on Image object
        image.set_details(kp, desc)
        
        return image
    
    def __match(self):
        """
        Action: Determine matches between two frames
        Input: None
        Output: List of match objects (each object has index of train and query
        descriptors. Use to create list of matching features)
        Note: Call this function after using "__detector_descriptor" method
        """
        
        # Access descriptors from current and previous frame.
        # Need descriptors from both images to create matches.
        desc_curr = self.frame_curr.get_desc()
        desc_prev = self.frame_prev.get_desc()
        
        # If there are no descriptors associated with previous frame default
        # is to return empty list (meaning no matches)
        if desc_prev is None:
            return []
        
        if len(desc_prev) == 0:
            return []
        
        # Try to perform knn (k-nearest-neighbors) match. Return two closest
        # matches for keypoint filtering by Lowe ratio test
        
        TIMER.set()
        knn_kwargs = {'queryDescriptors': desc_curr,
                      'trainDescriptors': desc_prev,
                       'k': 2}
        try:
            raw_matches = self.matcher.knnMatch(**knn_kwargs)
        
        except cv2.error:
            raw_matches = []
        
        else:
            TIMER.delta("Matching")
        
        # Perform Lowe-ratio test to discard 90% of false matches and reject
        # less than 5% of correct matches
        
        matches = self.__lowe_filtering(raw_matches)
        
        return matches
    
    def __optical_flow(self, tracking=False):
        """
        Action: Perform point introductions if fewer keypoints than threshold
        Track keypoints with Lucas-Kanade optical flow w/ filtering
        Input: Optional flag for tracking: retain point correspondences for
        MAX_TRACK_LENGTH number of frames
        Output: returned_new_keypoints - current frame keypoints
        returned_old_keypoints - previous frame keypoints
        """
        
        # global constants which have largest impact on processing time
        global MAX_TRACK_LENGTH
        global MATCH_QTY
        global LK_BACKCALC_PIXEL_DEVIATION
        
        # Frame margin ignored to prevent index out of bounds from descriptors.
        EDGE_MARGIN = 5
        
        # Default is to return empty list of points
        returned_new_points = []
        returned_old_points = []
        
        # if compare function has only been called once, no frame to compare to
        if not self.frame_prev:
            return returned_new_points, returned_old_points
        
        points = self.points
        points_detected = False
        
        # If fewer points than threshold, introduce new keypoints
        # NOTE: If entered, this section may modify points before performing
        # Optical Flow calc under next conditional.
        # NOTE: This section may be entered knowing points will not be matched
        # and points NOT introduced as the descriptors from the current
        # frame need to be readied for the following frame.
        
        if len(points) < MATCH_QTY:
            # Perform detection, description and matching
            self.frame_curr = self.__detector_descriptor(self.frame_curr)
            matches = self.__match()
            points_detected = True
            
            TIMER.set()
            # The remainder of the point introduction block concatenates the
            # newly discovered keypoints onto the previous set of points.
            #
            # *** This is actually slightly tricky ***
            #
            # Because the previous points are all defined relative to the train
            # image we MUST use the keypoints from the train image rather than
            # query image. (despite the query being newer). This way, when the
            # LK-OptFlow algorithm is executed in the next block, there will be
            # agreement between the carried over points which associate with
            # the previous image. If this isn’t done and new points are used,
            # the (x,y) positions which associate with the new frame will be
            # applied to the previous frame and will almost certainly be
            # different keypoints. This means the points will not be as robust
            # as those we’ve just worked so hard to acquire.
            
            kp_prev = self.frame_prev.get_kp()
            
            # Perform indexing to create list of (x,y) coords for best matches
            best_kp_prev = [kp_prev[m.trainIdx].pt for m in matches]
            
            # If tracking, create new deques for newly introduced points
            if tracking:
                for x, y in best_kp_prev:
                    self.tracks.append(deque([(x, y)],maxlen=MAX_TRACK_LENGTH))
            
            # Optical flow algorithm executes next using "points" variable.
            # Conditional makes sure optical flow gets newly introduced AND
            # retained points both associated with the older frame.
            # Also deals with edge cases when no points are introduced.
            #
            # If no old points or new points, points is an empty list
            if len(points) == 0 and len(best_kp_prev) == 0:
                points = []
            # If no old points, just float and return newest points
            elif len(points) == 0:
                points = np.float32(best_kp_prev)
            # If no new points are found, return previous points
            elif len(best_kp_prev) == 0:
                pass
            # Otherwise, concatenate new and old points and return
            else:
                points = np.concatenate((points, np.float32(best_kp_prev)), 0)
            
            returned_new_points = points
            TIMER.delta("Point Insertion")
            
            # Perform optical flow calculations if there are keypoints
            if len(points) > 0:
                # Store references to frame size, current and previous grey scales
                width, height = self.frame_curr.get_size()
                grey_curr = self.frame_curr.get_grey()
                grey_prev = self.frame_prev.get_grey()
                
                # Renamed to shorten most function calls
                kp_0 = points
                
                # Perform optical flow calculations
                TIMER.set()
                lk_args = [grey_prev, grey_curr, kp_0, None]
                kp_1, sts, flg = cv2.calcOpticalFlowPyrLK(*lk_args, **LK_PARAMS)
                TIMER.delta("LK-Flow")
                
                new_points = kp_1
                old_points = kp_0
                
                # Filtering steps
                # Knock off all keypoints within the edge margin. This stops index
                # errors for descriptors outside or on frame margin. Also, this
                # means for moving frame, points moving ’off’ the frame are dropped
                # so new points may be introduced
                
                TIMER.set()
                
                x_good = (new_points[:, 0] > EDGE_MARGIN) & \
                (new_points[:, 0] < width - EDGE_MARGIN)
                y_good = (new_points[:, 1] > EDGE_MARGIN) & \
                (new_points[:, 1] < height - EDGE_MARGIN)
                
                # Performing filtering by running LK-OptFlow ::backwards:: meaning,
                # use the query image as the train image and train image as the
                # query image. Only take points that match in both directions.
                
                lk_args = [grey_curr, grey_prev, kp_1, None]
                kp_0r, sts, flg = cv2.calcOpticalFlowPyrLK(*lk_args, **LK_PARAMS)
                pxl_delta = abs(kp_0-kp_0r).max(1)
                
                r_good = pxl_delta < LK_BACKCALC_PIXEL_DEVIATION
                TIMER.delta("LK-Back-Filter")
                TIMER.set()
                
                # Create a mask which is True for points that are not in margins,
                # and meet filtering requirements
                good = r_good & x_good & y_good
                
                # Perform indexing to select high quality keypoints
                indices = [i for (i, flag) in enumerate(good) if flag]
                
                filtered_new_points = new_points[indices, :]
                filtered_old_points = old_points[indices, :]
                
                # drop tracks whose points have not persisted and update detected
                if tracking:
                    good_tracks = zip(good, self.tracks)
                    new_tracks = [track for (flag, track) in good_tracks if flag]
                    for point, new_track in zip(filtered_new_points, new_tracks):
                        new_track.append(point)
                self.tracks = new_tracks
                
                returned_new_points = filtered_new_points
                returned_old_points = filtered_old_points
                
                TIMER.delta("Track-Updates")
                
                # Code optimization
                # Run detector and descriptor if filtering has caused keypoints to
                # fall below threshold. This way, detection and description are
                # performed preemptively so new points may immediately be
                # introduced in the next iteration. Preferable than calling
                # detection-description twice on next iteration as this causes
                # calculation speeds to fall outside of 30 hz region
                
                if len(filtered_new_points) < MATCH_QTY and not points_detected:
                    self.frame_curr = self.__detector_descriptor(self.frame_curr)
                    
            self.points = returned_new_points
            # New points and old points should always match in length as they
            # should correspond. Simple test that may show errors with changes
            # to code.
            assert len(returned_new_points) == len(returned_old_points)
            
            return returned_new_points, returned_old_points
        
        @staticmethod
        def __lowe_filtering(raw_matches):
            
            # Performs the Lowe ratio test on a matched set of keypoints
            TIMER.set()
            
            # Match tolerance ranges
            low_tol = 0
            hgh_tol = 10
            accepted = 6 # and lower
            
            # buckets matches based on ratio
            match_quality = [[] for _ in xrange(low_tol, hgh_tol+1)]
            try:
                for (m, n) in raw_matches:
                try:
                    bucket = int(10*m.distance/n.distance)
                except ZeroDivisionError as err:
                    # two matches on top of one another. Rare but acceptable
                    bucket = 0
                
                if bucket < hgh_tol:
                    match_quality[bucket].append(m)
            except ValueError:
                match_quality[hgh_tol].extend([match[0] for match in raw_matches])

            matches = []
            num_matches = 0
            
            # Select best matches based on their ratio distance
            for i in xrange(accepted):
                bckt_qty = len(match_quality[i])
                if num_matches > MATCH_QTY:
                    break
                elif bckt_qty < MATCH_QTY - num_matches:
                    matches.extend(match_quality[i])
                    num_matches += bckt_qty
                else:
                    matches.extend(match_quality[i][:MATCH_QTY - num_matches])
                    num_matches += MATCH_QTY - num_matches + 1
            
            TIMER.delta("Lowe-Filtering")
            return matches
        
        def __track_time(self, frame_time=None):
            """
            Private method for tracking dt, current and previous time steps
            relative to an initial starting point
            """
            # Store initial time
            if self.t0 is None:
                self.t0 = frame_time
                t_prev = self.t_curr
                t_curr = frame_time
            
            # frames are assumed to be sequential in time. Error if dt is negative
            if t_prev is not None:
                assert t_curr - t_prev >= 0
                self.dt = t_curr - t_prev
            
            self.t_curr = t_curr
            self.t_prev = t_prev
            
        def create_flow_plot(self, kp_0=None, kp_1=None):
        """
        Action: Creates visualization of keypoint motion between frames. To
        include color coding of keypoints based on optical flow rate,
        kp_0, and kp_1 must be provided: the location of the previous
        and current keypoints.
        Input: Optional - kp_0 - previous keypoint positions
        kp_1 - current keypoint positions
        Output: Image object built from current frame, keypoints and keypoint
        tracks.
        Class Requirements: Current color image object
        Current images Keypoints
        (Uses) Keypoint track history when available.
        """
        
        colr_curr = self.frame_curr.get_color()
        frame_time = self.frame_curr.get_time()
        TIMER.set()
        
        # If user has passed previous and current points in use rate of motion
        # to "color code" tracks based on rate of motion
        
        if kp_0 not in [None, []] and kp_1 not in [None, []]:
            dxy = kp_1 - kp_0
            dist = np.linalg.norm(dxy, axis=1)
            rgb_vals = scalarMap.to_rgba(dist, bytes=True)[:, 0:3].astype(int)   
            rgb_vals = map(tuple, rgb_vals)
        # Otherwise just set all the track colors equally. This is faster.
        else:
            def color_iter():
                while True:
                    yield (0, 255, 0)
            rgb_vals = color_iter()
            
        # plot the current points using a circle of the appropriate color.
        for pt, color in zip(self.points, rgb_vals):
            x, y = pt
            cv2.circle(colr_curr, (x, y), 2, color, 1)
        
        # if tracking is turned on, plot the tracks with appropriate color.
        if self.tracks is not []:
            for track, color in zip(self.tracks, rgb_vals):
                cv2.polylines(colr_curr, [np.int32(track)], False, color)
        
        # Plot the number of points being tracked at the top of the image.
        draw_str(colr_curr, (20, 20), ’track count: %d’ % len(self.points))
        TIMER.delta("Plotting-Tracks")
        
        # Return the new image for display.
        colr_curr = Image(colr_curr, frame_time=frame_time)
        return colr_curr
    
    def create_grid_plot(self, cell_mags, (num_rows, num_cols), max_norm=10.0):
        """
        Action: Creates a bar plot in each cell of an m x n grid. Bar plot will
        be x and y directions.
        Input: cell_mags - list of x-y magnitude [(x1, y1),(x2,y2),...,(xn,yn)]
        num_rows - number of rows bar plots will be applied to.
        num_cols - number of cols bar plots will be applied to.
        Class Requirements: Current color image object.
        """
        
        colr_curr = self.frame_curr.get_color()
        frame_time = self.frame_curr.get_time()
        if cell_mags == [] or cell_mags == None:
            return self.frame_curr
        
        TIMER.set()
        
        width, height = self.frame_curr.get_size()
        
        cols = np.linspace(0, width, num_cols+1)
        rows = np.linspace(0, height, num_rows+1)
        
        col_width = (cols[1]-cols[0])
        row_width = (rows[1]-rows[0])
        
        col_centers = cols[1:] - col_width/2.0
        row_centers = rows[1:] - row_width/2.0
        
        rect_half_width = 5
        rect_max_length = width/num_cols
        rect_max_height = height/num_rows
        
        for i, mags in enumerate(cell_mags):
            col_indx = i % num_cols
            row_indx = i / num_cols
            cent_xy = (col_centers[col_indx], row_centers[row_indx])
            
            # x-directional rectangle
            if np.isnan(mags[0]):
                pass
            else:
                x1 = int(cent_xy[0])
                y1 = int(cent_xy[1] - rect_half_width)
                x2 = int(cent_xy[0] + mags[0]/max_norm*rect_max_length)
                y2 = int(cent_xy[1] + rect_half_width)
                cv2.rectangle(colr_curr, (x1, y1), (x2, y2), (0, 255, 0), -1)
            
            # y-directional rectangle
            if np.isnan(mags[1]):
                pass
            else:
                x1 = int(cent_xy[0] - rect_half_width)
                y1 = int(cent_xy[1])
                x2 = int(cent_xy[0] + rect_half_width)
                y2 = int(cent_xy[1] + mags[1]/max_norm*rect_max_height)
                cv2.rectangle(colr_curr, (x1, y1), (x2, y2), (0, 0, 255), -1)
            
        TIMER.delta("Plotting-Grid")
        return Image(colr_curr, frame_time=frame_time)
    
    