# Imports

In [None]:
import copy
import cv2
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import os
from PIL import ImageColor
from skimage import exposure
from skimage.morphology import area_opening, disk, square
import sys

# Helper Functions

In [1]:
# from PIL import ImageColor
import seaborn as sb

def rgb2hex(rgb):
    return "#{:02x}{:02x}{:02x}".format(rgb[0], rgb[1], rgb[2])

def plot_images(image_list, title_list=[], is_barchart=False, barchart_colors=[''], grid='off', size_per_image=10, display_images=True, save_images=False, file_name='test.png'):
    directory = '..\\report_images\\'
    
    if len(image_list) > 20: # this assumes fewer than 20 images will ever be displayed
        image_list = [image_list]

    # ----------------------------------------
    
    if is_barchart:        
        fig, axes = plt.subplots(1, ncols=len(image_list), figsize=(size_per_image*len(image_list), size_per_image*1), sharey=True)
        if len(image_list) == 1:
            axes = [axes]
        
        # convert colors to RGB hex
        colors = [rgb2hex(c[::-1]) for c in barchart_colors]
        palette = sb.set_palette(sb.color_palette(colors))
        
        for i, ax in enumerate(axes):
#             ax.bar([str(i) for i in np.arange(len(image_list[i]))], image_list[i], align='center') # TODO set bar chart color to match score from heatmap
            sb.barplot(x=np.arange(len(image_list[i])), y=image_list[i], ax=ax, palette=palette)
            ax.set_xlabel('dirt/dead grass <-- Score --> healthy grass')
            ax.set_ylabel('%')
            if len(title_list) > 0:
                ax.set_title(title_list[i])
            ax.axis(grid)
    
    # ----------------------------------------
    
    else:
        fig, axes = plt.subplots(nrows=1, ncols=len(image_list), figsize=(size_per_image*len(image_list), size_per_image*1))
        if len(image_list) == 1:
            axes = [axes]

        for i, ax in enumerate(axes):
            ax.imshow(cv2.cvtColor(image_list[i], cv2.COLOR_BGR2RGB))
            if len(title_list) > 0:
                ax.set_title(title_list[i])
            ax.axis(grid)
        
        if save_images:
            print('Save:', directory+file_name)
            fig.savefig(directory+file_name)
            
        if not display_images:
            plt.close(fig)

In [None]:
# COLOR GRADIENT

# given 2/3 colors in hex BGR format and a mixing coefficient
def color_fader(cs, mix=0):
    if len(cs) == 2:
        c1 = np.array(mpl.colors.to_rgb(cs[0]))
        c2 = np.array(mpl.colors.to_rgb(cs[1]))
        return mpl.colors.to_hex((1-mix)*c1 + mix*c2)
    elif len(cs) == 3:
        c1 = np.array(mpl.colors.to_rgb(cs[0]))
        c2 = np.array(mpl.colors.to_rgb(cs[1]))
        c3 = np.array(mpl.colors.to_rgb(cs[2]))
        if mix <= .5:
            mix *= 2 # create 0-1 range
            return mpl.colors.to_hex((1-mix)*c1 + mix*c2)
        else:
            mix -= .5 # create 0-1 range
            mix *= 2
            return mpl.colors.to_hex((1-mix)*c2 + mix*c3)

# ----------------------------------------
        
def create_color_buckets(colors=['#FF0000', '#00FF00'], num_buckets=10, display_buckets=False): #RGB
    color_buckets = []

    if display_buckets:
        fig, ax = plt.subplots(figsize=(4, 2))
    
    for x in range(0, num_buckets):
        X = color_fader(colors, x/(num_buckets-1))
        
        if display_buckets:
            ax.axvline(x, color=X, linewidth=4)
        
        color_buckets.append(ImageColor.getcolor(X, "RGB")[::-1]) # convert RGB to BGR

    if display_buckets:
        plt.axis('off')
        plt.show()

    return color_buckets

# Shadow Removal

In [None]:
def create_shadow_masks(mission_numbers, mission_images_hsv, display_images=False):
        
    shadow_masks = []
    smoothed_shadow_masks = []

    for i in range(0, len(mission_numbers)):
        lower_shadow = np.array([69, 20, 1])
        upper_shadow = np.array([129, 102, 34])
        mask = cv2.inRange(mission_images_hsv[i].copy(), lower_shadow, upper_shadow)
        shadow_masks.append(mask)

        mask = copy.deepcopy(mask)
        mask = area_opening(mask, area_threshold=50)
        mask = cv2.dilate(mask, disk(radius=5), iterations=1)
        smoothed_shadow_masks.append(mask)

    # ----------------------------------------
    
    complete_shadow_mask = np.zeros((smoothed_shadow_masks[0].shape)).astype(int)
    for i in range(0, len(shadow_masks)):
        complete_shadow_mask = cv2.bitwise_or(complete_shadow_mask, smoothed_shadow_masks[i].astype(int))

    complete_shadow_mask = complete_shadow_mask.astype(np.uint8)
    
    # ----------------------------------------
    
    if display_images:
        plot_images(shadow_masks, ['shadow_masks ' + str(mission_number) for mission_number in mission_numbers])
        plot_images(smoothed_shadow_masks, ['smoothed_shadow_masks ' + str(mission_number) for mission_number in mission_numbers])
        plot_images([complete_shadow_mask], ['complete_shadow_mask'])
        
    return shadow_masks, smoothed_shadow_masks, complete_shadow_mask

# ----------------------------------------

def apply_shadow_masks(mission_numbers, histogram_mission_images_hsv, complete_shadow_mask, display_images=False):
    modified_mission_images_hsv = []

    for i in range(0, len(mission_numbers)):
        modified_hsv = copy.deepcopy(histogram_mission_images_hsv[i])

        try:
            modified_hsv[complete_shadow_mask.astype(bool), :] = 0 # set to black
        except ValueError:
            print('not modified')

        modified_mission_images_hsv.append(modified_hsv)
        
    if display_images:
        plot_images(modified_mission_images_hsv, ['modified_mission_images_hsv ' + str(mission_number) for mission_number in mission_numbers])
        
    return modified_mission_images_hsv

In [None]:
def run_shadows(mission_numbers, mission_images_hsv, histogram_mission_images_hsv, display_images=False):
    shadow_masks, smoothed_shadow_masks, complete_shadow_mask = create_shadow_masks(mission_numbers, mission_images_hsv, display_images)
    modified_mission_images_hsv = apply_shadow_masks(mission_numbers, histogram_mission_images_hsv, complete_shadow_mask, display_images)
    return modified_mission_images_hsv

# Anomaly Detection

In [None]:
def differencing(img1, img2):   
    img1 = copy.deepcopy(img1[:, :, 1])
    img2 = copy.deepcopy(img2[:, :, 1])

    img1 = img1.astype(np.int16) # expand memory
    img2 = img2.astype(np.int16)
    
    result = img1 - img2
    result += 255
    result = (result-np.min(result))*(255/(np.max(result) - np.min(result)))
    
    result[result < 75] = 0 # darken
    result[result > 175] = 255 # lighten
        
    result = result.astype(np.uint8) # convert back to uint8 to be displayed
    
    return result

In [None]:
# using modified_mission_images_hsv[0]

def compute_anomaly_masks(mission_numbers, modified_mission_images_hsv, display_images=False):

    # ----------------------------------------
    
    adjacent_regions_change_hsv = []

    for i in range(0, len(mission_numbers)-1):
        adjacent_regions_change_hsv.append(differencing(modified_mission_images_hsv[i], modified_mission_images_hsv[i+1]))

    # ----------------------------------------
    
    masks_initial = []
    masks_2 = []
    masks_final = []

    for i in range(0, len(adjacent_regions_change_hsv)):
        mask_l = adjacent_regions_change_hsv[i] == 0
        mask_u = adjacent_regions_change_hsv[i] == 255
        mask = (np.bitwise_or(mask_l, mask_u)*255).astype(np.uint8)
        masks_initial.append(mask)
        
        mask = cv2.erode(masks_initial[i], disk(radius=1), iterations=2).astype(np.uint8)
        masks_2.append(mask)
        
        mask = cv2.dilate(masks_2[i], disk(radius=5), iterations=1).astype(np.uint8)
        masks_final.append(mask)

    # ----------------------------------------
    
    if display_images:
        plot_images(adjacent_regions_change_hsv, ['adjacent_regions_change_hsv ' + str(mission_numbers[i]) + '-' + str(mission_numbers[i+1]) for i in range(0, len(mission_numbers)-1)])
        plot_images(masks_initial, ['masks_initial ' + str(mission_number) for mission_number in mission_numbers])
        plot_images(masks_2, ['masks_2 ' + str(mission_number) for mission_number in mission_numbers])        
        plot_images(masks_final, ['masks_final ' + str(mission_numbers[i]) + '-' + str(mission_numbers[i+1]) for i in range(0, len(mission_numbers)-1)])
        
    return masks_final

In [None]:
def anomaly_detection_1(mission_numbers, mission_images_hsv, histogram_mission_images_hsv, display_images=False, save_images=False):
    print('Anomaly Detection 1')
    print('Step 1 - detecting shadows')
    modified_mission_images_hsv = run_shadows(mission_numbers, mission_images_hsv, histogram_mission_images_hsv, display_images)
    print('Step 2 - detecting anomalies')
    anomaly_masks = compute_anomaly_masks(mission_numbers, modified_mission_images_hsv, display_images)
    print('Done')
    return anomaly_masks