In [4]:
import findspark
findspark.init()

In [7]:
# Original Author: Amit Kumar Mondal
# Modified: Habib Sabiu
# Date: September 4, 2017
#
# Description: A Spark application to cluster still camera images. The script read
#              images data from HDFS and writes it's output to HDFS. The output is a
#              text file containing image name and the cluster to which it belongs.
#
# Usage: spark-submit --master [spark master] [file name] [input path] [output_path] [job name]
#        [spark master] = Can be Spark's Standalone, Mesos, or YARN
#        To run on:-
#                 Standalone: spark://discus-p2irc-master:7077
#                 Mesos: mesos://discus-p2irc-master:5050
#                 YARN: yarn
#        [file name]   = Full path to the python script (../imageClustering.py)
#        [input_path]  = Full HDFS path to input images
#        [output_path] = Full HDFS path to save results. Please note that all contents of this
#                        folder will be deleted if it already exist
#        [job_name]    = A nice name for the job. This will be displayed on the web UI
#
# Example usage: spark-submit --master spark://discus-p2irc-master:7077 imageClustering.py \
#                hdfs://discus-p2irc-master:54310/user/hduser/habib/still_camera_images/ \
#                hdfs://discus-p2irc-master:54310/user/hduser/habib/flower_counter_output/ imageClustering

import os
import sys
import cv2
import pyspark
import subprocess
import numpy as np
import skimage.io as io

from skimage import *
from time import time
from PIL import ImageFile
from pyspark import SparkConf
from pyspark import SparkContext
from io import StringIO, BytesIO
from scipy.spatial import distance
from pyspark.mllib.clustering import KMeans, KMeansModel

    
def images_to_descriptors(rawdata):
    ImageFile.LOAD_TRUNCATED_IMAGES = True
    try:
        fname = rawdata[0]
        #imgbytes = np.array(io.imread(StringIO(rawdata[1]))) 
        imgbytes = np.array(io.imread(BytesIO(rawdata[1]))) 
        extractor = cv2.xfeatures2d.SIFT_create()
        kp, descriptors = extractor.detectAndCompute(img_as_ubyte(imgbytes), None)
        return [fname, descriptors]

    except (ValueError, IOError, SyntaxError) as e:
        pass


def assign_pooling(data):

    image_name, feature_matrix = data[0]
    clusterCenters = data[1]

    feature_matrix = np.array(feature_matrix)

    model = KMeansModel(clusterCenters)
    bow = np.zeros(len(clusterCenters))

    for x in feature_matrix:
        k = model.predict(x)
        dist = distance.euclidean(clusterCenters[k], x)
        bow[k] = max(bow[k], dist)

    clusters = bow.tolist()
    group = clusters.index(min(clusters)) + 1
    return [image_name, group]


if __name__ == "__main__":

    application_start_time =  time()

    
    input_path = "file:///Users/habib/Desktop/still_images_small/"
    output_path = "file:///Users/habib/Desktop/outputs/"
    job_name = "imageClustering"
    
    import shutil
    if os.path.exists(output_path[7:]):
        shutil.rmtree(output_path[7:])
        
       
    #input_path = sys.argv[1]
    #output_path = sys.argv[2]
    #job_name = sys.argv[3]
    
    #subprocess.call(["hadoop", "fs", "-rm", "-r", output_path])

    sc = SparkContext(appName = job_name)

    build_start_time = time()

    images_rdd = sc.binaryFiles(input_path) \
        .map(images_to_descriptors) \
        .filter(lambda x: x[1].all() != None) \
        .map(lambda x: (x[0], x[1])) 
     
    features = images_rdd.flatMap(lambda x: x[1])      

    model = KMeans.train(features, 3, maxIterations=5, initializationMode="random")
    clusterCenters = model.clusterCenters

    build_end_time = time() - build_start_time
    
    processing_start_time = time()

    data_to_cluster = images_rdd.map(lambda x: [x, clusterCenters])

    features_bow = data_to_cluster.map(assign_pooling)
    features_bow.coalesce(1, shuffle=True).saveAsTextFile(output_path)

        
    processing_end_time = time() - processing_start_time
    application_end_time = time() - application_start_time
   
    sc.stop()

    print("---------------------------------------------")
    print("SUCCESS: Model built in {} seconds".format(round(build_end_time, 3)))
    print("SUCCESS: Images processed in {} seconds".format(round(processing_end_time, 3)))
    print("SUCCESS: Total time spent = {} seconds".format(round(application_end_time, 3)))
    print("---------------------------------------------")



---------------------------------------------
SUCCESS: Model built in 19.998 seconds
SUCCESS: Images processed in 25.593 seconds
SUCCESS: Total time spent = 45.748 seconds
---------------------------------------------


In [79]:
# Original Author: Keenan
# Author: Habib Sabiu
# Date: August 24, 2017
#
# Description: A Spark application to register drone images. Images should be in
#              in a group of 5 chennels. For example, IMG_OOO1 group should have
#              5 images representing various chennels e.g IMG_OOO1_1.png to IMG_OOO1_5.png.
#              The output is a set of 5 registered images for each input group, and RGB of the
#              location, croped version of the RGB, and an NDVI.
#
# Usage: spark-submit --master [spark master] [file name] [input path] [output_path] [job name]
#        [spark master] = Can be Spark's Standalone, Mesos, or YARN
#        To run on:-
#                 Standalone: spark://discus-p2irc-master:7077
#                 Mesos: mesos://discus-p2irc-master:5050
#                 YARN: yarn
#        [file name]   = Full path to the python script (../imageRegistration.py)
#        [input_path]  = Full HDFS path to input images
#        [output_path] = A network directory such as NFS3 that is accessible on all the worker nodes
#        [job_name]    = A nice name for the job. This will be displayed on the web UI
#
# Example usage: spark-submit --master spark://discus-p2irc-master:7077 imageRegistration.py \
#                hdfs://discus-p2irc-master:54310/user/hduser/habib/drone_images_png/ \
#                /data/mounted_hdfs_path/user/hduser/habib/registered_images_output/ imageRegistration


import os
import cv2
import sys
import math
import string
import random
import pyspark
import os.path
import warnings
import argparse
import numpy as np
import skimage.io as io

from time import time
from operator import add
from io import StringIO, BytesIO
from skimage import img_as_ubyte
from pyspark import SparkContext
from PIL import Image, ImageFile
from matplotlib import pyplot as plt


# Set numpy array to print all it values instead of 3 dots in the middle
#np.set_printoptions(threshold=np.nan)

# Ignore all user warnings
warnings.filterwarnings("ignore")

# Ignore divide by zero warning
np.seterr(divide='ignore', invalid='ignore')


def find_keypoints_and_features(image):

    # Check that image is not invalid
    if image is None:
        raise TypeError("Invalid image in find_keypoints_and_features")

    descriptor = cv2.xfeatures2d.SIFT_create(nfeatures=100000)

    #if fails means can't find similarities between two images
    (key_points, features) = descriptor.detectAndCompute(image, None)

    # IF YOU HAVE CV2 VERSION 2 USE THIS STUFF, INSTEAD OF THE ABOVE TWO LINES
    # turn the image into greyscale to work with

    #grey = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    #detector = cv2.FeatureDetector_create("SURF")
    #key_points = detector.detect(grey)
    #extractor = cv2.DescriptorExtractor_create("SURF")
    #(key_points, features) = extractor.compute(grey, key_points)

    # Convert key_points from KeyPoint objects to numpy arrays
    key_points = np.float32([key_point.pt for key_point in key_points])
    return (key_points, features)

def match_key_points(right_key_points, left_key_points, right_features, left_features, ratio, reproj_thresh):

    # A cv2 class that matches keypoint descriptors
    # FLANN is a much faster method for large datasets, so it may be a good
    # idea to switch to that. However it is a very different code set up
    # that uses a couple dictionaries, so there's a bit that'll have to
    # change
    matcher = cv2.DescriptorMatcher_create("BruteForce")
    # knnMatch makes a whole bunch of matches (as a DMatch class)
    # The k stands for how large the tuple will be (because that's
    # basically what DMatches are)
    # i picked two because straight lines
    raw_matches = matcher.knnMatch(right_features, left_features, 2)

    # Turns the raw_matches into tuples we can work with, while also
    # filtering out matches that occurred on the outside edges of the
    # pictures where matches really shouldn't have occurred
    # Is equivalent to the following for loop
    #        matches = []
    #        for m in raw_matches:
    #            if len(m) == 2 and m[0].distance < m[1].distance * ratio:
    #                matches.append((m[0].trainIdx, m[0].queryIdx))
    matches = [(m[0].trainIdx, m[0].queryIdx) for m in raw_matches if len(m) == 2 and m[0].distance < m[1].distance * ratio]

    # Converts the tuples into a numpy array (for working with the
    # homograph), while also splitting up the right and left points
    # We are making a homograph of the matches to apply a ratio test, and
    # determine which of the matches are of a high quality. Typical ratio
    # values are between 0.7 and 0.8
    # Computing a homography requires at least 4 matches
    if len(matches) > 4:
        # Split right and left into numphy arrays
        src_pts = np.float32([right_key_points[i] for (_, i) in matches])
        dst_pts = np.float32([left_key_points[i] for (i, _) in matches])

        # Use the cv2 to actually connect the dots between the two pictures
        (H, status) = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, reproj_thresh)

        src_t = np.transpose(src_pts)
        dst_t = np.transpose(dst_pts)
        back_proj_error = 0
        inlier_count = 0

        for i in range(0, src_t.shape[1]):
            x_i = src_t[0][i]
            y_i = src_t[1][i]
            x_p = dst_t[0][i]
            y_p = dst_t[1][i]
            num1 = (H[0][0] * x_i + H[0][1] * y_i + H[0][2])
            num2 = (H[1][0] * x_i + H[1][1] * y_i + H[1][2])
            dnm = (H[2][0] * x_i + H[2][1] * y_i + H[2][2])

            tmp = (x_p - (num1 / dnm))**2 + (y_p - (num2 / dnm))**2
            if status[i] == 1:
                back_proj_error += tmp
                inlier_count += 1

        return (matches, H, status, back_proj_error, inlier_count)
    else:
        return None

def register_channels(C, idx=0, ratio=.75, reproj_thresh=4):

    # Check that the images in C are good images and not empty
    if C is None:
        raise TypeError("Invalid image set in register_channels")
    for i in C:
        if len(i.shape) > 2:
            raise TypeError("Images have greater depth than 1!")

    # Compute SIFT features for each channel.
    # Channel images are converted to unsigned byte.  All proper scaling
    # is done by image_as_ubyte regardless of dtype of the input images.
    keypoints_and_features = [find_keypoints_and_features(img_as_ubyte(chan)) for chan in C]

    # Generate list of indices excluding the target channel index.
    channels_to_register = list(range(len(C)))
    del channels_to_register[idx]

    # Generate keypoint matches between each channel to be registered
    # and the target image.
    matched_key_points = [match_key_points(keypoints_and_features[i][0], keypoints_and_features[idx][0], keypoints_and_features[i][1],
                                           keypoints_and_features[idx][1], ratio=ratio, reproj_thresh=reproj_thresh) for i in channels_to_register]

    # extract the homography matrices from 'matched_key_points'.
    H = [x[1] for x in matched_key_points]
    BPError = [x[3] for x in matched_key_points]
    Inliers = [x[4] for x in matched_key_points]
    # Add the identity matrix for the target channel.
    H.insert(idx, np.identity(3))
    return H, BPError, Inliers

def warp_image(I, H):
    return cv2.warpPerspective(I, H, (I.shape[1], I.shape[0]))

def transform_channels(C, H):
    return [warp_image(C[i], H[i]) for i in range(len(C))]

def decompose_homography(H):

    if H is None:
        raise TypeError("Invalid homogrpahy input in decompose_homogrphy")
    if H.shape != (3, 3):
        raise TypeError("Invalid homogrpahy shape in decompose_homogrphy")

    a = H[0, 0]
    b = H[0, 1]
    c = H[0, 2]
    d = H[1, 0]
    e = H[1, 1]
    f = H[1, 2]

    p = math.sqrt(a * a + b * b)
    r = (a * e - b * d) / (p)
    q = (a * d + b * e) / (a * e - b * d)

    translation = (c, f)
    scale = (p, r)
    shear = q
    theta = math.atan2(b, a)

    return (translation, theta, scale, shear)

def register_group(images_group):

    images_key = images_group[0]
    images_values = images_group[1]
    images_values = sorted(zip(images_values[0::2], images_values[1::2]))

    keys = [x[0] for x in images_values]
    values = [x[1] for x in images_values]

    # Get the images and store them in an array, then calculate their homographies and transform the images.
    # H, Back-proj-error and the inlier points are all calculated
    C = np.array(values, dtype=float)  / 65535

    H, BPError, Inliers = register_channels(C)
    # Add a 0 to the start of the list of back projection errors, since the
    # first image always has a BPError of 0 (This is for later where we need to print the BPErrors)

    BPError.insert(0, 0)
    T = transform_channels(C, H)

    # Decompose the homogrpahy and calculate the bounding box of the good data, where all 5 channels are present
    max_x = []
    max_y = []
    max_theta = []

    for j in H:
        max_x.append(abs(decompose_homography(j)[0][0]))
        max_y.append(abs(decompose_homography(j)[0][1]))
        max_theta.append(abs(decompose_homography(j)[1]))

    rot = math.ceil(math.sin(max(max_theta)) * C[0].shape[1])
    crop_x = math.ceil(max(max_x))
    crop_y = math.ceil(max(max_y))

    border_x = (crop_x + rot, C[0].shape[1] - crop_x - rot)
    border_y = (crop_y + rot, C[0].shape[0] - crop_y - rot)

    # Loop through each subset of images and re-save them now that they are registered
    for j in range(len(T)):

        output_image_path = os.path.abspath(os.path.join(OUTPUT_FILE_PATH, "IMG_" + images_key + "_" + str(j + 1) + OUTPUT_FILE_TYPE))

        # Different ways to save the numpy array as image
        #io.imsave(output_image_path, T[j])

        # Here the array is first converted into a cv2 image and then saved
        cv_image = np.array(T[j]*255)
        cv2.imwrite(output_image_path, cv_image)

        # Here the array is first converted into a PIL image and then saved
        #im = Image.fromarray(T[j])
        #im.save(output_image_path)

    # Create and save the RGB image
    rgb = np.dstack([T[2], T[1], T[0]])
    output_rgb_path = os.path.abspath(os.path.join(OUTPUT_PROCESSED_PATH, "IMG_" + images_key + "_RGB" + OUTPUT_FILE_TYPE))

    #io.imsave(output_rgb_path, rgb)

    cv_image = np.array(rgb*255)
    cv2.imwrite(output_rgb_path, cv_image)

    #im = Image.fromarray(rgb)
    #im.save(output_rgb_path)

    # Crop images
    crop_img = np.dstack([T[2], T[1], T[0]])
    crop_img = crop_img[int(border_y[0]):int(border_y[1]), int(border_x[0]):int(border_x[1])]
    output_crop_path = os.path.abspath(os.path.join(OUTPUT_PROCESSED_PATH, "IMG_" + images_key  + "_RGB_CROPPED" + OUTPUT_FILE_TYPE))

    #io.imsave(output_crop_path, crop_img)

    cv_image = np.array(crop_img*255)
    cv2.imwrite(output_crop_path, cv_image)

    #im = Image.fromarray(crop_img)
    #im.save(output_crop_path)

    # Create and save the NDVI image
    num = np.subtract(T[3], T[2])
    dnm = np.add(T[3], T[2])

    ndvi_img = np.divide(num, dnm)

    original_ndvi = ndvi_img

    output_ndvi_path = os.path.abspath(os.path.join(OUTPUT_PROCESSED_PATH, "IMG_" + images_key  + "_NDVI" + OUTPUT_FILE_TYPE))

    #io.imsave(output_ndvi_path, original_ndvi)

    cv_image = np.array(original_ndvi*255)
    cv2.imwrite(output_ndvi_path, cv_image)

    #im = Image.fromarray(original_ndvi)
    #im.save(output_ndvi_path)

def read_images(image_rawdata):
    #return image_rawdata[0], np.array(io.imread((StringIO(image_rawdata[1])), as_grey=True) / 65535)
    return image_rawdata[0], np.array(io.imread(BytesIO(image_rawdata[1]), as_grey=True))


if __name__ == "__main__":

    application_start_time =  time()
    
    
    input_path = "file:///Users/habib/Desktop/drone_images_small/"
    output_root_path = "/Users/habib/Desktop/outputs/"
    job_name = "imageRegistration"


    #input_path = sys.argv[1]
    #output_root_path = sys.argv[2]
    #job_name = sys.argv[3]
    
    OUTPUT_FILE_TYPE = ".png"
    # Directory to store registered images
    OUTPUT_FILE_PATH = output_root_path
    # Directory to store processed registered images
    OUTPUT_PROCESSED_PATH = output_root_path + "/processed/"
    
    
    # Set spark configurations
    sc = SparkContext(appName = job_name)

    reading_start_time = time()

    # When reading from local file system
    #images_rdd = sc.binaryFiles('file:///sparkdata/registration_images')
    
    # When reading from HDFS
    images_rdd = sc.binaryFiles(input_path)
    
    index = images_rdd.first()[0].find("IMG_")+4

    images_group_rdd = images_rdd.map(read_images) \
        .map(lambda rawdata: (rawdata[0][index:rawdata[0].rfind('_')], (rawdata[0][index:], rawdata[1]))) \
        .reduceByKey(lambda first_image, second_image: (first_image + second_image))

    reading_end_time = time() - reading_start_time

    processing_start_time = time()

    images_group_rdd.foreach(register_group)

    processing_end_time = time() - processing_start_time

    application_end_time = time() - application_start_time
    
    sc.stop()
    
    print("------------------------------------------------")
    print("SUCCESS: Images read from HDFS in {} seconds".format(round(reading_end_time, 3)))
    print("SUCCESS: Images processed in {} seconds".format(round(processing_end_time, 3)))
    print("SUCCESS: Total time spent = {} seconds".format(round(application_end_time, 3)))
    print("------------------------------------------------")
   

------------------------------------------------
SUCCESS: Images read from HDFS in 1.637 seconds
SUCCESS: Images processed in 234.653 seconds
SUCCESS: Total time spent = 236.35 seconds
------------------------------------------------


In [3]:
# Original Author: Javier Garcia
# Modified: Habib Sabiu
# Date: August 24, 2017
#
# Description: An application to detect and count canola flowers from still camera images.
#              The script reads input images from a local directory. 
#
# Example usage: python imageFlowerCounterNewSquencial.py
#
# Note: Make sure to update the "imagesBasePath" with the path to the directory where the
#       input images can be found

import os
import cv2
import sys
import glob
#import pyspark
import operator
import subprocess
import numpy as np

import skimage.io as io
from enum import Enum
from time import time
from io import StringIO, BytesIO
from PIL import Image, ImageFile
from skimage.feature import blob_doh
#from pyspark import SparkConf, SparkContext
from sklearn.cluster import KMeans as skKMeans



class CanolaObject:
    def __str__(self):
        strObject = ''
        for k, v in self.__dict__.items():
            strObject += '{} : {}\n'.format(k, v)
        return strObject


class CanolaPlotRegionObject(CanolaObject):
    def __init__(self):
        self.__id = None
        self.__plot = None
        self.__year = None
        self.__corners = None
        self.__allocatedPlotMask = None
        self.__allocatedRegionMask = None

    def setPlot(self, plot):
        assert self.__plot is None
        self.__plot = plot

    def setYear(self, year):
        assert self.__year is None
        self.__year = year

    def setCorners(self, corners):
        assert self.__corners is None
        self.__corners = corners

    def setId(self, id):
        assert self.__id is None
        self.__id = id

    def setPlotMask(self, plotMask):
        assert self.__allocatedPlotMask is None
        self.__allocatedPlotMask = plotMask

    def setRegionMask(self, regionMask):
        assert self.__allocatedRegionMask is None
        self.__allocatedRegionMask = regionMask


    def getPlot(self):
        return self.__plot

    def getYear(self):
        return self.__year

    def getCorners(self):
        return self.__corners

    def getId(self):
        return self.__id

    def getPlotMask(self):
        return self.__allocatedPlotMask

    def getRegionMask(self):
        return self.__allocatedRegionMask


class CanolaImageClassifierObject(CanolaObject):
    def __init__(self):
        self.__id = None
        self.__imageId = None
        self.__cluster = None
        self.__numberOfYellowPixels = None
        self.__corrupted = False
        self.__regionObject = None

    def setCluster(self, cluster):
        assert self.__cluster is None
        self.__cluster = cluster

    def setRegionObject(self, regionObject):
        assert self.__regionObject is None
        self.__regionObject = regionObject

    def setNumberOfYellowPixels(self, numberOfYellowPixels):
        assert self.__numberOfYellowPixels is None
        self.__numberOfYellowPixels = numberOfYellowPixels

    def setId(self, id):
        assert self.__id is None
        self.__id = id

    def setImageId(self, imageId):
        assert self.__imageId is None
        self.__imageId = imageId

    def flagCorrupted(self):
        self.__corrupted = True


    def getCluster(self):
        return self.__cluster

    def getRegionObject(self):
        return self.__regionObject

    def getNumberOfYellowPixels(self):
        return self.__numberOfYellowPixels

    def getId(self):
        return self.__id

    def getImageId(self):
        return self.__imageId

    def isCorrupted(self):
        return self.__corrupted


class HistogramObject(CanolaObject):
    def __init__(self):
        self.__id = None
        self.__imageId = None
        self.__histogramB = None
        self.__histogramG = None
        self.__histogramBShift = None
        self.__regionObject = None

    def setId(self, id):
        assert self.__id is None
        self.__id = id

    def setImageId(self, imageId):
        assert self.__imageId is None
        self.__imageId = imageId

    def setHistogramB(self, histogramB):
        assert self.__histogramB is None
        self.__histogramB = histogramB

    def setHistogramG(self, histogramG):
        assert self.__histogramG is None
        self.__histogramG = histogramG

    def setHistogramBShift(self, histogramBShift):
        assert self.__histogramBShift is None
        self.__histogramBShift = histogramBShift

    def setRegionObject(self, regionObject):
        assert self.__regionObject is None
        self.__regionObject = regionObject


    def getId(self):
        return self.__id

    def getImageId(self):
        return self.__imageId

    def getHistogramB(self):
        return self.__histogramB

    def getHistogramG(self):
        return self.__histogramG

    def getHistogramBShift(self):
        return self.__histogramBShift

    def getRegionObject(self):
        return self.__regionObject


class PipelineObject(CanolaObject):
    def __init__(self, pipeline=None):
        self.__id = None
        self.__pipeline = pipeline

    def setPipeline(self, pipeline):
        assert self.__pipeline is None
        self.__pipeline = pipeline

    def setId(self, id):
        assert self.__id is None
        self.__id = id


    def getPipeline(self):
        return self.__pipeline

    def getId(self):
        return self.__id


class FlowerCountObject(CanolaObject):
    def __init__(self):
        self.__id = None
        self.__imageId = None
        self.__blobs = None
        self.__regionObject = None
        self.__pipelineObject = PipelineObject(CanolaObject)

    def setBlobs(self, blobs):
        assert self.__blobs is None
        self.__blobs = blobs

    def setRegionObject(self, regionObject):
        assert self.__regionObject is None
        self.__regionObject = regionObject

    def setId(self, id):
        assert self.__id is None
        self.__id = id

    def setImageId(self, imageId):
        assert self.__imageId is None
        self.__imageId = imageId


    def getPipelineObject(self):
        return self.__pipelineObject

    def getBlobs(self):
        return self.__blobs

    def getRegionObject(self):
        return self.__regionObject

    def getId(self):
        return self.__id

    def getImageId(self):
        return self.__imageId


class CanolaTimelapseImage(CanolaObject):
    def __init__(self):
        self.__id = None
        self.__imageId = None
        self.__path = None
        self.__timestamp = None
        self.__imageSize = None
        self.__plot = None
        self.__allocatedImageArray = None
        self.__corrupted = False
        self.__classifierObject = CanolaImageClassifierObject()
        self.__histogramObject = HistogramObject()
        self.__flowerCountObject = FlowerCountObject()

    def readImage(self):
        if self.__allocatedImageArray is None:
            ImageFile.LOAD_TRUNCATED_IMAGES = True
            pil_image = Image.open(self.__path).convert('RGB')
            open_cv_image = np.array(pil_image)
            self.__allocatedImageArray = open_cv_image[:, :, ::-1].copy()
        return self.__allocatedImageArray

    def showDetectedBlobs(self):
        blobs = self.__flowerCountObject.getBlobs()
        assert blobs is not None
        img = np.copy( self.readImage() )
        for x, y, r in blobs:
            cv2.circle(img,center=(int(y), int(x)),radius=int(r),color=(0, 255, 255),thickness=2)
        cv2.imshow("{} {}".format( self.__plot, self.__timestamp ),img )
        cv2.waitKey( 0 )
        cv2.destroyAllWindows()

    def getPlotMask(self):
        regionObject = self.getRegionObject()
        plotMask = regionObject.getPlotMask()

        if plotMask is None:
            regionObject.setPlotMask(Mask.generate(self.__imageSize,regionObject.getCorners()))
        return regionObject.getPlotMask()

    def getRegionMask(self):
        regionObject = self.getRegionObject()
        regionMask = regionObject.getRegionMask()
        if regionMask is None:
            regionManager = RegionManager()
            regionObject.setRegionMask(regionManager.getRegionMask(self.__imageSize,regionObject.getCorners()))
        return regionObject.getRegionMask()

    # SETTERS

    def setPath(self, path):
        assert self.__path is None
        self.__path = path

    def setTimestamp(self, timestamp):
        assert self.__timestamp is None
        if isinstance( timestamp, str ):
            timestamp = stringToDatetime( timestamp )
        self.__timestamp = timestamp

    def setImageId(self, imageId):
        assert self.__imageId is None
        self.__imageId = imageId
        self.__classifierObject.setImageId( imageId )
        self.__histogramObject.setImageId( imageId )
        self.__flowerCountObject.setImageId( imageId )

    def setImageSize(self, imageSize):
        assert self.__imageSize is None
        if isinstance( imageSize, str ):
            imageSize = tuple([int(k) for k in imageSize.split('x')[::-1]])
        self.__imageSize = imageSize

    def setPlot(self, plot):
        assert self.__plot is None
        regionObject = self.getRegionObject()
        if regionObject is not None:
            regionPlot = regionObject.getPlot()
            if regionPlot is not None:
                assert plot == regionPlot
        self.__plot = plot

    def setId(self, id):
        assert self.__id is None
        self.__id = id

    def setRegionObject(self, regionObject):
        self.__classifierObject.setRegionObject( regionObject )
        self.__histogramObject.setRegionObject( regionObject )
        self.__flowerCountObject.setRegionObject( regionObject )


    # GETTERS

    def getPath(self):
        return self.__path

    def getTimestamp(self):
        return self.__timestamp

    def getImageId(self):
        return self.__imageId

    def getImageSize(self):
        return self.__imageSize

    def getPlot(self):
        return self.__plot

    def getRegionObject(self):
        return self.__histogramObject.getRegionObject()

    def getPipelineObject(self):
        return self.__flowerCountObject.getPipelineObject()

    def getClassifierObject(self):
        return self.__classifierObject

    def getHistogramObject(self):
        return self.__histogramObject

    def getFlowerCountObject(self):
        return self.__flowerCountObject

    def getId(self):
        return self.__id

    def isCorrupted(self):
        return self.__classifierObject.isCorrupted()


class ImageProcessingModule:

    def processImage( self, imageArray, *argv ):
        assert self._isValidImage(imageArray), self.__class__.__name__
        return self._process( imageArray, *argv )

    def getParamsFromCanolaTimelapseImage(self, canolaTimelapseImage):
        return None

    def _process( self, imageArray, *argv ):
        pass

    def _isValidImage(self, imageArray):
        return isinstance( imageArray, np.ndarray ) and imageArray.dtype == np.uint8


class ImageProcessingPipeline:
    def __init__(self):
        self.__modules = []

    def addModule(self, moduleInstance):
        assert isinstance( moduleInstance, ImageProcessingModule )
        self.__modules.append( moduleInstance )

    def run(self, canolaTimelapseImage):
        res = canolaTimelapseImage.readImage()
        for module in self.__modules:
            res = module.processImage( res,module.getParamsFromCanolaTimelapseImage(canolaTimelapseImage ) )
        return res

    def __str__(self):
        return ','.join([k.__class__.__name__ for k in self.__modules])


class FlowerCountImageProcessor:
    def run( self, canolaTimelapseImages ):

        result = (self.runSingleImage(image) for image in canolaTimelapseImages)
        result = dict( [k for k in result if k is not None] )
        for img in canolaTimelapseImages:
            path = img.getPath()
            if path in result.keys():
                img.getFlowerCountObject().setBlobs(result[img.getPath()])

    def runSingleImage(self, canolaTimelapseImage):
        blobs = canolaTimelapseImage.getFlowerCountObject().getBlobs()
        if blobs is not None:
            return (None, None)

        flowerCountPipeline = ImageProcessingPipeline()
        flowerCountPipeline.addModule(CIELabColorspaceModule())
        flowerCountPipeline.addModule(SigmoidMapping())
        flowerCountPipeline.addModule(BlobDetection())
        
        blobs = flowerCountPipeline.run( canolaTimelapseImage )

        return (canolaTimelapseImage.getPath(),blobs)


class Transformation3D(ImageProcessingModule):
    def _isValidImage(self, imageArray):
        return ImageProcessingModule._isValidImage(self, imageArray) and np.ndim( imageArray ) == 3


class Transformation2D(ImageProcessingModule):
    def _isValidImage(self, imageArray):
        return ImageProcessingModule._isValidImage(self, imageArray) and np.ndim( imageArray ) == 2


class CIELabColorspaceModule(Transformation3D):
    def _process( self, imageArray, *argv ):
        """
        Shift image from BGR to CIELab colorspace and return channel B
        """
        colorspace = cv2.cvtColor( imageArray, cv2.COLOR_BGR2Lab )
        return colorspace[:,:,2]


class OtsuThreshold(Transformation2D):
    def _process( self, imageArray, *argv ):
        """
        Apply Otsu threshold. Output image has zero on pixels with value
        lower than threshold, and their original value otherwise
        """
        blur = cv2.GaussianBlur( imageArray, (5,5), 0 )
        return cv2.threshold( blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU )


class SigmoidMapping(Transformation2D):
    def getParamsFromCanolaTimelapseImage(self, canolaTimelapseImage):
        return canolaTimelapseImage.getHistogramObject().getHistogramBShift()

    def _process( self, imageArray, *argv ):
        """
        Map every pixel's intensity to an output value given by a sigmoid function.
        f(x) = K / ( 1 + e^(t-d*x) )
        """
        yellowThreshMapValue = 0.99
        yellowThreshFromZero = 6
        yellowThreshold = 155

        hist_b_shift = argv[0]
        imageArrayFloat = np.array( imageArray, dtype=np.float32 )
        t_exp = (yellowThreshold + hist_b_shift - yellowThreshFromZero) % 256
        k_exp = np.log(1 / yellowThreshMapValue - 1) / yellowThreshFromZero
        imgAsFloat = 1 / (1 + np.exp(k_exp * (imageArrayFloat - t_exp)))
        return ( imgAsFloat * 255 ).astype( np.uint8 )


class IntensityMapping(Transformation2D):
    def getParamsFromCanolaTimelapseImage(self, canolaTimelapseImage):
        return canolaTimelapseImage.getHistogramObject().getHistogramBShift()

    def _process( self, imageArray, *argv ):
        """
        Perform mapping on grayscale channel to highlight flowers
        """
        hist_b_shift = argv[0]
        imageArrayFloat = np.array( imageArray, dtype=np.float )
        threshold = ( self._params["flowerIntensityThreshold"] + hist_b_shift ) % 256
        imageArrayFloat = np.clip( imageArrayFloat, 0, threshold + 10 )
        imageArrayFloat -= threshold - 8
        imageArrayFloat /= threshold + 10
        imageArrayFloat = np.clip( imageArrayFloat, 0, 1 )
        return np.array( imageArrayFloat, dtype=np.uint8 )


class BlobDetection(Transformation2D):
    def getParamsFromCanolaTimelapseImage(self, canolaTimelapseImage):
        return [canolaTimelapseImage.getPlotMask(),canolaTimelapseImage.getClassifierObject().getNumberOfYellowPixels()]

    def _process( self, imageArray, *argv ):
        """
        Count blobs
        :param *argv: Mask
        :return: List of tuples with (x, y, size) information of every blob detected
        """
        mask = argv[0][0]
        yellowPixels = argv[0][1]
        if yellowPixels == 0:
            return []
        mask = mask > 0
        maskedImage = np.copy( imageArray )
        maskedImage[ mask == 0 ] = 0
        blobs = blob_doh(maskedImage,max_sigma=8,min_sigma=3 )
        return [[int(x), int(y), float(z)] for x, y, z in blobs]


class ImageClassifier:
    def classify( self, canolaTimelapseImages ):
        kmeans = KMeans()
        histogramManager = HistogramManager()
        corruptedImagesDetector = CorruptedImagesDetector()
        histogramManager.runImages( canolaTimelapseImages )        
        corruptedImagesDetector.flagCorruptedImages(canolaTimelapseImages)
        kmeans.clusterImages( canolaTimelapseImages )


class KMeans:

    def clusterImages( self, canolaTimelapseImages ):
        nonCorruptedImages = [img for img in canolaTimelapseImages if not img.isCorrupted()]
        if self.__isClusteringAlreadyDone( canolaTimelapseImages ):
            return
        fileToFlowerPixelsDict = self.__calculateNumberOfFlowerPixels(nonCorruptedImages)
        numClusters = 4
        kmeans_fwperc = self.__kmeans( fileToFlowerPixelsDict, nClusters=numClusters)
        kmeans_copy = dict( (k, v) for k, v in fileToFlowerPixelsDict.items( ) if kmeans_fwperc[k] == 0 )
        kmeans_fwperc_2 = self.__kmeans( kmeans_copy, nClusters=2 )
        
        # For every image originally on cluster 0, reassign to cluster 1 or keep on cluster 0 depending on the second
        # K-means result
        for img in canolaTimelapseImages:
            classifierObject = img.getClassifierObject()
            if kmeans_fwperc.__contains__( img ):
                cluster = kmeans_fwperc[ img ]
                if cluster == 0:
                    classifierObject.setCluster(kmeans_fwperc_2[img])
                else:
                    classifierObject.setCluster(cluster)
            else:
                classifierObject.setCluster(4)

    def __isClusteringAlreadyDone( self, canolaTimelapseImages ):
        clusters = [k.getClassifierObject().getCluster() for k in canolaTimelapseImages]
        return not np.any( np.equal( clusters, None ) )

    def __kmeans( self, km_points_orig, nClusters ):
        assert isinstance(km_points_orig, dict)
        assert isinstance(nClusters, int) and nClusters > 1

        km = skKMeans(n_clusters=nClusters)

        # Get the ordered set of points (i.e. flower pixel percentages of each image)
        km_points = np.array([k[1] for k in sorted(km_points_orig.items(), key=operator.itemgetter(1))]).reshape((-1, 1))
                
        # Compute KMeans
        km.fit(km_points)

        # Get the centroids ordered
        km_centroids = list(km.cluster_centers_)        
        km_centroids.sort()
        
        # Assign each image to a cluster
        final_img_clusters = {}
        for k, v in km_points_orig.items():
            # Compute distance to each of the centroids
            dist = np.array([abs(v - q) for q in km_centroids])

            # Get the closest centroid
            final_img_clusters[k] = int(dist.argmin())

        return final_img_clusters

    def __calculateNumberOfFlowerPixels(self, canolaTimelapseImages):
        fileToFlowerPixelsDict = {}
        for img in canolaTimelapseImages:
            histObject = img.getHistogramObject()
            hist_b = histObject.getHistogramB()
            hist_b_shift = histObject.getHistogramBShift()
            threshold = ( 155 + hist_b_shift ) % 256
            n_flower_pixels = np.sum(hist_b[threshold:])
            img.getClassifierObject().setNumberOfYellowPixels( n_flower_pixels )
            fileToFlowerPixelsDict[ img ] = n_flower_pixels
        return fileToFlowerPixelsDict


class HistogramManager:
    def runImages(self, canolaTimelapseImages):
        if not self.__areHistogramsAlreadyComputed( canolaTimelapseImages ):
            self.computeHistograms( canolaTimelapseImages )
            self.computeHistogramShifts([ img.getHistogramObject() for img in canolaTimelapseImages ] )

    def computeHistograms(self, canolaTimelapseImages):
        """
        Compute the average A and B histograms over all images
        """
        assert len(canolaTimelapseImages) > 0
        histResult = dict(self.computeHistogramsOnSingleImage(image) for image in canolaTimelapseImages)

        for img in canolaTimelapseImages:
            histObject = img.getHistogramObject()
            path = img.getPath()
            histObject.setHistogramB( histResult[path]['histogramB'] )
            histObject.setHistogramG( histResult[path]['histogramG'] )

    def computeHistogramsOnSingleImage( self, canolaTimelapseImage ):
        plotMask = canolaTimelapseImage.getPlotMask()
        im_bgr = canolaTimelapseImage.readImage()
        im_gray = cv2.cvtColor(im_bgr, cv2.COLOR_BGR2GRAY)
        im_lab_plot = cv2.cvtColor(im_bgr, cv2.COLOR_BGR2Lab)
        im_gray = im_gray[plotMask > 0]
        im_lab_plot = im_lab_plot[plotMask > 0]
        hist_G, _ = np.histogram(im_gray, 256, [0, 256])
        hist_b, _ = np.histogram(im_lab_plot[:, 2], 256, [0, 256])
        
        return (canolaTimelapseImage.getPath(),{'histogramB' : hist_b,'histogramG' : hist_G})

    def computeHistogramShifts(self, histogramObjects):
        refHistObject = histogramObjects[0]
        refHist = refHistObject.getHistogramB()
        allHistsDict = {refHistObject : 0}
            
        for histObject in histogramObjects[1:]:
            correlation_b = np.correlate(histObject.getHistogramB(), refHist, "full")
            x_shift_b = correlation_b.argmax().astype( np.int8 )
            allHistsDict[histObject] = x_shift_b

        allHistograms = [k.getHistogramB() for k in histogramObjects]
        correlation_reference = np.correlate(refHist,np.average( allHistograms, axis=0),"full")        
        additionalShift = correlation_reference.argmax().astype(np.uint8)
        
        for histObject in histogramObjects:
            histObject.setHistogramBShift(allHistsDict[histObject] + additionalShift )

    def __areHistogramsAlreadyComputed( self, canolaTimelapseImages ):
        histShifts = [k.getHistogramObject().getHistogramBShift() for k in canolaTimelapseImages]
        return not np.any( np.equal( histShifts, None ) )


class CorruptedImagesDetector:

    def flagCorruptedImages(self, canolaTimelapseImages):
        assert len( canolaTimelapseImages ) > 0
        for img in canolaTimelapseImages:
            hist_g = img.getHistogramObject().getHistogramG()
            if max( hist_g ) / np.sum( hist_g ) >= 0.2:
                img.getClassifierObject().flagCorrupted()


class Mask:
    @staticmethod
    def generate( imageShape, boundCorners ):
        if len( imageShape ) == 3:
            modShape = imageShape[:-1]
        else:
            modShape = imageShape

        def __crossProduct( p1, p2, p3 ):
            v1 = [p2[0] - p1[0], p2[1] - p1[1]]
            v2 = [p3[0] - p2[0], p3[1] - p2[1]]
            return v1[0] * v2[1] - v1[1] * v2[0]

        mask = np.zeros( modShape )
        minX = max( [min( [x[0] for x in boundCorners] ), 0] )
        minY = max( [min( [y[1] for y in boundCorners] ), 0] )
        maxX = min( max( [x[0] for x in boundCorners] ), modShape[1] )
        maxY = min( max( [y[1] for y in boundCorners] ), modShape[0] )

        # Iterate through the containing-square and eliminate points
        # that are out of the ROI
        for x in range( minX, maxX ):
            for y in range( minY, maxY ):
                h1 = __crossProduct( boundCorners[2], boundCorners[0], (x, y) )
                h2 = __crossProduct( boundCorners[3], boundCorners[1], (x, y) )
                v1 = __crossProduct( boundCorners[0], boundCorners[1], (x, y) )
                v2 = __crossProduct( boundCorners[2], boundCorners[3], (x, y) )
                if h1 > 0 > h2 and v1 > 0 > v2:
                    mask[y, x] = 255
        return mask

    
def findImagesLocally(imagesBasePath):

    imgs = []

    regionObject = CanolaPlotRegionObject()
    regionObject.setPlot('1237')
    regionObject.setCorners([(10, 10), (1270, 10), (10, 710), (1270, 710)])

    imagesFullPath = glob.glob(imagesBasePath+'*.jpg')
    
    for imgPath in imagesFullPath:
        img = CanolaTimelapseImage()
        img.setPath(imgPath)
        img.setRegionObject(regionObject)
        img.setImageSize((720,1280))

        imgs.append(img)
    
    return imgs


if __name__ == "__main__":

    application_start_time = time()
        
    imagesBasePath = '/Users/habib/Desktop/still_images_small/'

    canolaTimelapseImages = findImagesLocally(imagesBasePath)
    
    imageClassifier = ImageClassifier()
    flowerCountImageProcessor = FlowerCountImageProcessor()

    imageClassifier.classify(canolaTimelapseImages)
    imagesOnClusterZero = [img for img in canolaTimelapseImages if img.getClassifierObject().getCluster() == 0]
    sortedByYellowPixels = sorted(imagesOnClusterZero,key=lambda x: x.getClassifierObject().getNumberOfYellowPixels())
    flowerCountImageProcessor.run(sortedByYellowPixels)

    application_end_time = time() - application_start_time

    print("---------------------------------------------------")
    print("SUCCESS: Images procesed in {} seconds".format(round(application_end_time, 3)))
    print("---------------------------------------------------")
    

---------------------------------------------------
SUCCESS: Images procesed in 31.924 seconds
---------------------------------------------------


In [None]:
#sortedByYellowPixels[0].showDetectedBlobs()
#len(sortedByYellowPixels[0].getFlowerCountObject().getBlobs())

In [None]:
# Original Author: Javier Garcia
# Modified: Habib Sabiu
# Date: August 24, 2017
#
# Description: A Spark application to detect and count canola flowers from still camera images.
#              The script reads input images from a HDFS directory. The output is a text file
#              containing the image file name and estimate of flower count. This file is saved to
#              a specified HDFS directory
#
# Usage: spark-submit --master [spark master] --py-files [classes file] [file name] [input path] [output_path] [job name]
#        [spark master] = Can be Spark's Standalone, Mesos, or YARN
#        To run on:-
#                 Standalone: spark://discus-p2irc-master:7077
#                 Mesos: mesos://discus-p2irc-master:5050
#                 YARN: yarn
#        [classes file] = Full path to classes definitions (../canola_timelapse_image.py)
#        [file name]    = Full path to the python script (../imageFlowerCounterNew.py)
#        [input_path]   = Full HDFS path to input images
#        [output_path]  = Full HDFS path to save results. Please note that all contents of this
#                        folder will be deleted if it already exist
#        [job_name]     = A nice name for the job. This will be displayed on the web UI
#
# Example usage: spark-submit --master spark://discus-p2irc-master:7077 --py-files canola_timelapse_image.py newImageFlowerCounter.py \
#                hdfs://discus-p2irc-master:54310/user/hduser/habib/still_camera_images/ \
#                hdfs://discus-p2irc-master:54310/user/hduser/habib/flower_counter_output/ imageFlowerCounter 


import os
import cv2
import sys
import glob
import pyspark
import operator
import subprocess
import numpy as np
import skimage.io as io

from enum import Enum
from time import time
from io import StringIO, BytesIO
from PIL import Image, ImageFile
from skimage.feature import blob_doh
from pyspark import SparkConf, SparkContext
from sklearn.cluster import KMeans as skKMeans

from canola_timelapse_image import * 



class CanolaObject:
    def __str__(self):
        strObject = ''
        for k, v in self.__dict__.items():
            strObject += '{} : {}\n'.format(k, v)
        return strObject


class CanolaPlotRegionObject(CanolaObject):
    def __init__(self):
        self.__id = None
        self.__plot = None
        self.__year = None
        self.__corners = None
        self.__allocatedPlotMask = None
        self.__allocatedRegionMask = None

    def setPlot(self, plot):
        assert self.__plot is None
        self.__plot = plot

    def setYear(self, year):
        assert self.__year is None
        self.__year = year

    def setCorners(self, corners):
        assert self.__corners is None
        self.__corners = corners

    def setId(self, id):
        assert self.__id is None
        self.__id = id

    def setPlotMask(self, plotMask):
        assert self.__allocatedPlotMask is None
        self.__allocatedPlotMask = plotMask

    def setRegionMask(self, regionMask):
        assert self.__allocatedRegionMask is None
        self.__allocatedRegionMask = regionMask


    def getPlot(self):
        return self.__plot

    def getYear(self):
        return self.__year

    def getCorners(self):
        return self.__corners

    def getId(self):
        return self.__id

    def getPlotMask(self):
        return self.__allocatedPlotMask

    def getRegionMask(self):
        return self.__allocatedRegionMask


class CanolaImageClassifierObject(CanolaObject):
    def __init__(self):
        self.__id = None
        self.__imageId = None
        self.__cluster = None
        self.__numberOfYellowPixels = None
        self.__corrupted = False
        self.__regionObject = None

    def setCluster(self, cluster):
        self.__cluster = cluster

    def setRegionObject(self, regionObject):
        self.__regionObject = regionObject

    def setNumberOfYellowPixels(self, numberOfYellowPixels):
        self.__numberOfYellowPixels = numberOfYellowPixels

    def setId(self, id):
        self.__id = id

    def setImageId(self, imageId):
        self.__imageId = imageId

    def flagCorrupted(self):
        self.__corrupted = True


    def getCluster(self):
        return self.__cluster

    def getRegionObject(self):
        return self.__regionObject

    def getNumberOfYellowPixels(self):
        return self.__numberOfYellowPixels

    def getId(self):
        return self.__id

    def getImageId(self):
        return self.__imageId

    def isCorrupted(self):
        return self.__corrupted


class HistogramObject(CanolaObject):
    def __init__(self):
        self.__id = None
        self.__imageId = None
        self.__histogramB = None
        self.__histogramG = None
        self.__histogramBShift = None
        self.__regionObject = None

    def setId(self, id):
        assert self.__id is None
        self.__id = id

    def setImageId(self, imageId):
        assert self.__imageId is None
        self.__imageId = imageId

    def setHistogramB(self, histogramB):
        assert self.__histogramB is None
        self.__histogramB = histogramB

    def setHistogramG(self, histogramG):
        assert self.__histogramG is None
        self.__histogramG = histogramG

    def setHistogramBShift(self, histogramBShift):
        assert self.__histogramBShift is None
        self.__histogramBShift = histogramBShift

    def setRegionObject(self, regionObject):
        assert self.__regionObject is None
        self.__regionObject = regionObject


    def getId(self):
        return self.__id

    def getImageId(self):
        return self.__imageId

    def getHistogramB(self):
        return self.__histogramB

    def getHistogramG(self):
        return self.__histogramG

    def getHistogramBShift(self):
        return self.__histogramBShift

    def getRegionObject(self):
        return self.__regionObject


class PipelineObject(CanolaObject):
    def __init__(self, pipeline=None):
        self.__id = None
        self.__pipeline = pipeline

    def setPipeline(self, pipeline):
        assert self.__pipeline is None
        self.__pipeline = pipeline

    def setId(self, id):
        assert self.__id is None
        self.__id = id


    def getPipeline(self):
        return self.__pipeline

    def getId(self):
        return self.__id


class FlowerCountObject(CanolaObject):
    def __init__(self):
        self.__id = None
        self.__imageId = None
        self.__blobs = None
        self.__regionObject = None
        self.__pipelineObject = PipelineObject(CanolaObject)

    def setBlobs(self, blobs):
        assert self.__blobs is None
        self.__blobs = blobs

    def setRegionObject(self, regionObject):
        assert self.__regionObject is None
        self.__regionObject = regionObject

    def setId(self, id):
        assert self.__id is None
        self.__id = id

    def setImageId(self, imageId):
        assert self.__imageId is None
        self.__imageId = imageId


    def getPipelineObject(self):
        return self.__pipelineObject

    def getBlobs(self):
        return self.__blobs

    def getRegionObject(self):
        return self.__regionObject

    def getId(self):
        return self.__id

    def getImageId(self):
        return self.__imageId


class CanolaTimelapseImage(CanolaObject):
    def __init__(self):
        self.__id = None
        self.__imageId = None
        self.__path = None
        self.__timestamp = None
        self.__imageSize = None
        self.__plot = None
        self.__allocatedImageArray = None
        self.__corrupted = False
        self.__classifierObject = CanolaImageClassifierObject()
        self.__histogramObject = HistogramObject()
        self.__flowerCountObject = FlowerCountObject()

    def readImage(self):
        """
        if self.__allocatedImageArray is None:
            ImageFile.LOAD_TRUNCATED_IMAGES = True
            pil_image = Image.open(self.__path).convert('RGB')
            open_cv_image = np.array(pil_image)
            self.__allocatedImageArray = open_cv_image[:, :, ::-1].copy()
        """
        return self.__allocatedImageArray

    def showDetectedBlobs(self):
        blobs = self.__flowerCountObject.getBlobs()
        assert blobs is not None
        img = np.copy( self.readImage() )
        for x, y, r in blobs:
            cv2.circle(img,center=(int(y), int(x)),radius=int(r),color=(0, 255, 255),thickness=2)
        cv2.imshow("{} {}".format( self.__plot, self.__timestamp ),img )
        cv2.waitKey( 0 )
        cv2.destroyAllWindows()

    def getPlotMask(self):
        regionObject = self.getRegionObject()
        plotMask = regionObject.getPlotMask()

        if plotMask is None:
            regionObject.setPlotMask(Mask.generate(self.__imageSize,regionObject.getCorners()))
        return regionObject.getPlotMask()

    def getRegionMask(self):
        regionObject = self.getRegionObject()
        regionMask = regionObject.getRegionMask()
        if regionMask is None:
            regionManager = RegionManager()
            regionObject.setRegionMask(regionManager.getRegionMask(self.__imageSize,regionObject.getCorners()))
        return regionObject.getRegionMask()

    # SETTERS

    # I added this method for setting image data using Spark
    def setImageArray(self, arrayData):
        assert self.__allocatedImageArray is None
        self.__allocatedImageArray = arrayData       

    def setPath(self, path):
        assert self.__path is None
        self.__path = path

    def setTimestamp(self, timestamp):
        assert self.__timestamp is None
        if isinstance( timestamp, str ):
            timestamp = stringToDatetime( timestamp )
        self.__timestamp = timestamp

    def setImageId(self, imageId):
        assert self.__imageId is None
        self.__imageId = imageId
        self.__classifierObject.setImageId( imageId )
        self.__histogramObject.setImageId( imageId )
        self.__flowerCountObject.setImageId( imageId )

    def setImageSize(self, imageSize):
        assert self.__imageSize is None
        if isinstance( imageSize, str ):
            imageSize = tuple([int(k) for k in imageSize.split('x')[::-1]])
        self.__imageSize = imageSize

    def setPlot(self, plot):
        assert self.__plot is None
        regionObject = self.getRegionObject()
        if regionObject is not None:
            regionPlot = regionObject.getPlot()
            if regionPlot is not None:
                assert plot == regionPlot
        self.__plot = plot

    def setId(self, id):
        assert self.__id is None
        self.__id = id

    def setRegionObject(self, regionObject):
        self.__classifierObject.setRegionObject( regionObject )
        self.__histogramObject.setRegionObject( regionObject )
        self.__flowerCountObject.setRegionObject( regionObject )


    # GETTERS

    def getPath(self):
        return self.__path

    def getTimestamp(self):
        return self.__timestamp

    def getImageId(self):
        return self.__imageId

    def getImageSize(self):
        return self.__imageSize

    def getPlot(self):
        return self.__plot

    def getRegionObject(self):
        return self.__histogramObject.getRegionObject()

    def getPipelineObject(self):
        return self.__flowerCountObject.getPipelineObject()

    def getClassifierObject(self):
        return self.__classifierObject

    def getHistogramObject(self):
        return self.__histogramObject

    def getFlowerCountObject(self):
        return self.__flowerCountObject

    def getId(self):
        return self.__id

    def isCorrupted(self):
        return self.__classifierObject.isCorrupted()


class ImageProcessingModule:

    def processImage( self, imageArray, *argv ):
        assert self._isValidImage(imageArray), self.__class__.__name__
        return self._process( imageArray, *argv )

    def getParamsFromCanolaTimelapseImage(self, canolaTimelapseImage):
        return None

    def _process( self, imageArray, *argv ):
        pass

    def _isValidImage(self, imageArray):
        return isinstance( imageArray, np.ndarray ) and imageArray.dtype == np.uint8


class ImageProcessingPipeline:
    def __init__(self):
        self.__modules = []

    def addModule(self, moduleInstance):
        assert isinstance( moduleInstance, ImageProcessingModule )
        self.__modules.append( moduleInstance )

    def run(self, canolaTimelapseImage):
        res = canolaTimelapseImage.readImage()
        for module in self.__modules:
            res = module.processImage( res,module.getParamsFromCanolaTimelapseImage(canolaTimelapseImage ) )
        return res

    def __str__(self):
        return ','.join([k.__class__.__name__ for k in self.__modules])


class Transformation3D(ImageProcessingModule):
    def _isValidImage(self, imageArray):
        return ImageProcessingModule._isValidImage(self, imageArray) and np.ndim( imageArray ) == 3


class Transformation2D(ImageProcessingModule):
    def _isValidImage(self, imageArray):
        return ImageProcessingModule._isValidImage(self, imageArray) and np.ndim( imageArray ) == 2


class CIELabColorspaceModule(Transformation3D):
    def _process( self, imageArray, *argv ):
        """
        Shift image from BGR to CIELab colorspace and return channel B
        """
        colorspace = cv2.cvtColor( imageArray, cv2.COLOR_BGR2Lab )
        return colorspace[:,:,2]


class OtsuThreshold(Transformation2D):
    def _process( self, imageArray, *argv ):
        """
        Apply Otsu threshold. Output image has zero on pixels with value
        lower than threshold, and their original value otherwise
        """
        blur = cv2.GaussianBlur( imageArray, (5,5), 0 )
        return cv2.threshold( blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU )


class SigmoidMapping(Transformation2D):
    def getParamsFromCanolaTimelapseImage(self, canolaTimelapseImage):
        return canolaTimelapseImage.getHistogramObject().getHistogramBShift()

    def _process( self, imageArray, *argv ):
        """
        Map every pixel's intensity to an output value given by a sigmoid function.
        f(x) = K / ( 1 + e^(t-d*x) )
        """
        yellowThreshMapValue = 0.99
        yellowThreshFromZero = 6
        yellowThreshold = 155

        hist_b_shift = argv[0]
        imageArrayFloat = np.array( imageArray, dtype=np.float32 )
        t_exp = (yellowThreshold + hist_b_shift - yellowThreshFromZero) % 256
        k_exp = np.log(1 / yellowThreshMapValue - 1) / yellowThreshFromZero
        imgAsFloat = 1 / (1 + np.exp(k_exp * (imageArrayFloat - t_exp)))
        return ( imgAsFloat * 255 ).astype( np.uint8 )


class IntensityMapping(Transformation2D):
    def getParamsFromCanolaTimelapseImage(self, canolaTimelapseImage):
        return canolaTimelapseImage.getHistogramObject().getHistogramBShift()

    def _process( self, imageArray, *argv ):
        """
        Perform mapping on grayscale channel to highlight flowers
        """
        hist_b_shift = argv[0]
        imageArrayFloat = np.array( imageArray, dtype=np.float )
        threshold = ( self._params["flowerIntensityThreshold"] + hist_b_shift ) % 256
        imageArrayFloat = np.clip( imageArrayFloat, 0, threshold + 10 )
        imageArrayFloat -= threshold - 8
        imageArrayFloat /= threshold + 10
        imageArrayFloat = np.clip( imageArrayFloat, 0, 1 )
        return np.array( imageArrayFloat, dtype=np.uint8 )


class BlobDetection(Transformation2D):
    def getParamsFromCanolaTimelapseImage(self, canolaTimelapseImage):
        return [canolaTimelapseImage.getPlotMask(),canolaTimelapseImage.getClassifierObject().getNumberOfYellowPixels()]

    def _process( self, imageArray, *argv ):
        """
        Count blobs
        :param *argv: Mask
        :return: List of tuples with (x, y, size) information of every blob detected
        """
        mask = argv[0][0]
        yellowPixels = argv[0][1]
        if yellowPixels == 0:
            return []
        mask = mask > 0
        maskedImage = np.copy( imageArray )
        maskedImage[ mask == 0 ] = 0
        blobs = blob_doh(maskedImage,max_sigma=8,min_sigma=3 )
        return [[int(x), int(y), float(z)] for x, y, z in blobs]


class Mask:
    @staticmethod
    def generate( imageShape, boundCorners ):
        if len( imageShape ) == 3:
            modShape = imageShape[:-1]
        else:
            modShape = imageShape

        def __crossProduct( p1, p2, p3 ):
            v1 = [p2[0] - p1[0], p2[1] - p1[1]]
            v2 = [p3[0] - p2[0], p3[1] - p2[1]]
            return v1[0] * v2[1] - v1[1] * v2[0]

        mask = np.zeros( modShape )
        minX = max( [min( [x[0] for x in boundCorners] ), 0] )
        minY = max( [min( [y[1] for y in boundCorners] ), 0] )
        maxX = min( max( [x[0] for x in boundCorners] ), modShape[1] )
        maxY = min( max( [y[1] for y in boundCorners] ), modShape[0] )

        # Iterate through the containing-square and eliminate points
        # that are out of the ROI
        for x in range( minX, maxX ):
            for y in range( minY, maxY ):
                h1 = __crossProduct( boundCorners[2], boundCorners[0], (x, y) )
                h2 = __crossProduct( boundCorners[3], boundCorners[1], (x, y) )
                v1 = __crossProduct( boundCorners[0], boundCorners[1], (x, y) )
                v2 = __crossProduct( boundCorners[2], boundCorners[3], (x, y) )
                if h1 > 0 > h2 and v1 > 0 > v2:
                    mask[y, x] = 255
        return mask


    
def computeHistogramsOnSingleImage(canolaTimelapseImage):
    plotMask = canolaTimelapseImage.getPlotMask()
    im_bgr = canolaTimelapseImage.readImage()
    im_gray = cv2.cvtColor(im_bgr, cv2.COLOR_BGR2GRAY)
    im_lab_plot = cv2.cvtColor(im_bgr, cv2.COLOR_BGR2Lab)
    im_gray = im_gray[plotMask > 0]
    im_lab_plot = im_lab_plot[plotMask > 0]
    hist_G, _ = np.histogram(im_gray, 256, [0, 256])
    hist_b, _ = np.histogram(im_lab_plot[:, 2], 256, [0, 256])

    histObject = canolaTimelapseImage.getHistogramObject()
    histObject.setHistogramB(hist_b)
    histObject.setHistogramG(hist_G)

    return canolaTimelapseImage


def computeHistogramShifts(canolaTimelapseImages):

    refHistObject = canolaTimelapseImages[0].getHistogramObject()
    refHist = refHistObject.getHistogramB()

    first_canolaTimelapseImage_rdd = sc.parallelize([canolaTimelapseImages[0]]) \
        .map(lambda x: {x: 0})

    rest_canolaTimelapseImages_rdd = sc.parallelize(canolaTimelapseImages[1:]) \
        .map(lambda x: {x: (np.correlate(x.getHistogramObject().getHistogramB(), refHist, "full")).argmax().astype(np.int8)}) 
                                               
    all_canolaTimelapseImage_rdd = first_canolaTimelapseImage_rdd.union(rest_canolaTimelapseImages_rdd)

    average_histogram = sc.parallelize(canolaTimelapseImages) \
        .map(lambda x: x.getHistogramObject().getHistogramB()) \
        .reduce(lambda first_element, second_element: np.average([first_element + second_element], axis=0))

    correlation_reference = np.correlate(refHist,average_histogram,"full")
    additionalShift = correlation_reference.argmax().astype(np.uint8)
    
    histogramBShift_rdd = all_canolaTimelapseImage_rdd.map(lambda x: udpateHistogramBShift(x, additionalShift))

    return histogramBShift_rdd


def udpateHistogramBShift(canolaTimelapseImageDic, additionalShift):
    key = next(iter(canolaTimelapseImageDic))
    value = canolaTimelapseImageDic[key]
    key.getHistogramObject().setHistogramBShift(value + additionalShift)
    return key


def CorruptedImagesDetector(canolaTimelapseImage):
        
    hist_g = canolaTimelapseImage.getHistogramObject().getHistogramG()
    if max( hist_g ) / np.sum( hist_g ) >= 0.2:
        canolaTimelapseImage.getClassifierObject().flagCorrupted()

    return canolaTimelapseImage


def calculateNumberOfFlowerPixels(canolaTimelapseImage):
    
    fileToFlowerPixelsDict = {}

    histObject = canolaTimelapseImage.getHistogramObject()
    hist_b = histObject.getHistogramB()
    hist_b_shift = histObject.getHistogramBShift()
    threshold = (155 + hist_b_shift) % 256
    n_flower_pixels = np.sum(hist_b[threshold:])
    canolaTimelapseImage.getClassifierObject().setNumberOfYellowPixels(n_flower_pixels)
        
    fileToFlowerPixelsDict[canolaTimelapseImage] = n_flower_pixels
    
    return fileToFlowerPixelsDict


def fitKmeans(points, num_clusters):

    km = skKMeans(n_clusters=num_clusters)

    # Compute KMeans
    km.fit(points)

    # Get the centroids ordered
    km_centroids = list(km.cluster_centers_)
    km_centroids.sort()

    return km_centroids


def assignImagesToClusters(canolaTimelapseImage, km_centroids):

    final_img_clusters = {}

    key = next(iter(canolaTimelapseImage))
    value = canolaTimelapseImage[key]

    # Compute distance to each of the centroids
    dist = np.array([abs(value - q) for q in km_centroids])

    # Get the closest centroid
    centroid = int(dist.argmin())

    key.getClassifierObject().setCluster(centroid)

    final_img_clusters[key] = value

    return final_img_clusters


def assignBlobs(canolaTimelapseImage):

    blobs = flowerCountPipeline.run(canolaTimelapseImage)

    canolaTimelapseImage.getFlowerCountObject().setBlobs(blobs)

    return canolaTimelapseImage


def read_images(image_rawdata):
    
    ImageFile.LOAD_TRUNCATED_IMAGES = True

    regionObject = CanolaPlotRegionObject()
    regionObject.setPlot('1237')
    regionObject.setCorners([(10, 10), (1270, 10), (10, 710), (1270, 710)])
    
    imgPath = image_rawdata[0]
    imgData = np.array(io.imread(BytesIO(image_rawdata[1]))) 
    
    img = CanolaTimelapseImage()
    img.setPath(imgPath)
    img.setRegionObject(regionObject)
    img.setImageSize((720,1280))
    img.setImageArray(imgData[:, :, ::-1].copy())
    
    return img


if __name__ == "__main__":

    application_start_time = time()


    #input_path = "file:///Users/habib/Desktop/15072016_1108_images12/"
    input_path = "file:///Users/habib/Desktop/still_images_small"
    output_path = "file:///Users/habib/Desktop/outputs/"
    job_name = "imageFlowerCounter"

    import shutil
    if os.path.exists(output_path[7:]):
        shutil.rmtree(output_path[7:])

    
    #input_path = sys.argv[1]
    #output_path = sys.argv[2]
    #job_name = sys.argv[3]

    # Remove the output directory and all it contents if it already exist
    #subprocess.call(["hadoop", "fs", "-rm", "-r", output_path])

    flowerCountPipeline = ImageProcessingPipeline()
    flowerCountPipeline.addModule(CIELabColorspaceModule())
    flowerCountPipeline.addModule(SigmoidMapping())
    flowerCountPipeline.addModule(BlobDetection())

    sc = SparkContext(appName=job_name)
    
    raw_data_rdd = sc.binaryFiles(input_path)
    
    histResult_rdd = raw_data_rdd.map(read_images) \
        .map(lambda x: computeHistogramsOnSingleImage(x)) 

    refHist = histResult_rdd.first().getHistogramObject().getHistogramB()
    
    """
    # Averaging using reduce(): This is slightly slower and could result in out of memory error on large data
    average_histogram = histResult_rdd.map(lambda x: x.getHistogramObject().getHistogramB()) \
        .reduce(lambda x, y: np.average([x + y], axis=0))
    """

    # Averaging using aggregate()
    seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
    combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]))
    aggregate = histResult_rdd.map(lambda x: x.getHistogramObject().getHistogramB()) \
        .aggregate( (0, 0), seqOp, combOp)
    average_histogram = aggregate[0]/aggregate[1]

    correlation_reference = np.correlate(refHist,average_histogram,"full")
    additionalShift = correlation_reference.argmax().astype(np.uint8)
    
    histogramBShift_rdd = histResult_rdd.zipWithIndex() \
        .map(lambda x: {x[0]: 0} if x[1] == 0 else {x[0]: (np.correlate(x[0].getHistogramObject().getHistogramB(), refHist, "full")).argmax().astype(np.int8)}) \
        .map(lambda x: udpateHistogramBShift(x, additionalShift)) \
        .map(lambda x: CorruptedImagesDetector(x)) \
        .filter(lambda x: x.isCorrupted() == False) \
        .map(lambda x: calculateNumberOfFlowerPixels(x)) \
        .sortBy(lambda x: x[next(iter(x))]) 

    clustering_rdd_1 = histogramBShift_rdd.map(lambda x: [x[next(iter(x))]]) 
    cluster_centers_1 = fitKmeans(clustering_rdd_1, 4)

    canolaImagesWithClusters_rdd = histogramBShift_rdd.map(lambda x: assignImagesToClusters(x, cluster_centers_1)) \
        .filter(lambda x: next(iter(x)).getClassifierObject().getCluster() == 0) 

    clustering_rdd_2 = canolaImagesWithClusters_rdd.map(lambda x: [x[next(iter(x))]]) 
    cluster_centers_2 = fitKmeans(clustering_rdd_2, 2)

    processedCanolaImages_rdd = canolaImagesWithClusters_rdd.map(lambda x: assignImagesToClusters(x, cluster_centers_2)) \
        .map(lambda x: next(iter(x))) \
        .filter(lambda x: x.getClassifierObject().getCluster() == 0) \
        .sortBy(lambda x: x.getClassifierObject().getNumberOfYellowPixels()) \
        .map(lambda x: assignBlobs(x)) \
        .map(lambda x: (x.getPath(),len(x.getFlowerCountObject().getBlobs()))) \
        .coalesce(1, shuffle=True) \
        .saveAsTextFile(output_path)


    application_end_time = time() - application_start_time

    sc.stop()
    
    print("---------------------------------------------------")
    print("SUCCESS: Images procesed in {} seconds".format(round(application_end_time, 3)))
    print("---------------------------------------------------")
    


In [65]:
import numpy as np
from pyspark import SparkContext


sc = SparkContext()

zeros = np.zeros(256)
random_data = np.random.rand(10, 256)

#-----------------numpy array mean---------------------
np_mean = np.average(random_data, axis=0)
#------------------------------------------------------

listRDD = sc.parallelize(random_data, 10)

#----------------------Aggregate-----------------------
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]))
aggregate = listRDD.aggregate( (0, 0), seqOp, combOp)
agg_mean = aggregate[0]/aggregate[1]
#------------------------------------------------------

#----------------------Reduce--------------------------
reduce_mean = listRDD.reduce(lambda x, y: np.average([x] + [y], axis=0))
#------------------------------------------------------

stacked = np.vstack((np_mean,agg_mean,reduce_mean)).T

print(stacked)
#print(listRDD.count(), np.array(listRDD.first()).shape)

sc.stop()

[[ 0.47206665  0.47206665  0.59296169]
 [ 0.43265745  0.43265745  0.42754646]
 [ 0.59078162  0.59078162  0.59329013]
 [ 0.47054758  0.47054758  0.30010388]
 [ 0.42392359  0.42392359  0.44286934]
 [ 0.39879656  0.39879656  0.36610327]
 [ 0.38529554  0.38529554  0.64768795]
 [ 0.47160984  0.47160984  0.47687432]
 [ 0.55114161  0.55114161  0.57399958]
 [ 0.51893213  0.51893213  0.32941311]
 [ 0.47088674  0.47088674  0.71051141]
 [ 0.5708172   0.5708172   0.76748609]
 [ 0.48012217  0.48012217  0.45938178]
 [ 0.51869393  0.51869393  0.46373232]
 [ 0.38125527  0.38125527  0.41838349]
 [ 0.65380819  0.65380819  0.80326801]
 [ 0.24323815  0.24323815  0.06745904]
 [ 0.4198887   0.4198887   0.55632412]
 [ 0.52512965  0.52512965  0.50441899]
 [ 0.41516079  0.41516079  0.27862258]
 [ 0.54880461  0.54880461  0.61731709]
 [ 0.54194504  0.54194504  0.40750671]
 [ 0.51365962  0.51365962  0.42546481]
 [ 0.52847504  0.52847504  0.42806507]
 [ 0.57267408  0.57267408  0.7646705 ]
 [ 0.42316975  0.42316975

In [2]:
import pickle

xx = processedCanolaImages_rdd.collect()
myFile = open("processed_object.pkl", 'wb')
pickle.dump(xx, myFile)
myFile.close()
    
fr = open('processed_object.pkl', 'rb')
canolaImage = pickle.load(fr)
print(canolaImage[0].showDetectedBlobs())