In [1]:
import picamera
from threading import Thread
from multiprocessing import Process
from queue import Queue, Empty
from time import time, sleep
import numpy as np
import cv2
import imutils
import math

In [2]:
class MyOutput():
    '''
    Class used by PiCamera object to record the frames coming from the camera. 
    
    The PiCamera class assumes there is a write method (and optionally a flush method).
    Since we've got to pass the data to the image processor by the means of a queue and 
    since we are actually recording data (not taking photos), setting markers for an individual
    frame is rather troublesome - the solution is to define a custom output class.
    
    Check https://picamera.readthedocs.io/en/release-1.13/recipes2.html#custom-outputs
    
    '''
    def __init__(self, queue, resolution, state):
        '''
        Initialize MyOutput object.
        
        Init params:
        queue -- The queue onto which raw frames are pushed to.
        resolution -- The resolution of the pushed frames. 2-element tuple of (width, height).
        state -- A dictionary containing important markers about the current frame: time, red/blue/green GB; TL;DR metadata.
        '''
        self._queue = queue
        self._resolution = resolution[::-1] + (3,)
        self._state = state
        self.decouple_queue = False
    
    def write(self, s):
        '''
        Write method required by PiCamera class.
        
        Keyword params:
        s -- A bytes object containing the actual image that got captured.
        '''
        self._buffer = np.frombuffer(s, np.uint8).reshape(*self._resolution)
        self.flush()
    
    def flush(self):
        '''
        Flush method that needs to be called to push the data to the queue. 
        Apart from being called by write method every time, it also gets called at the end of the recording by the PiCamera object.
        '''
        
        if self.decouple_queue is False and self._buffer is not None:
            self._queue.put([
                self._buffer,
                self._state
            ], block = False)
            self._buffer = None
        
    @property
    def state(self):
        '''
        Returns the dictionary containing important markers about the current frame(s).
        '''
        return self._state
    
    @state.setter
    def state(self, state):
        '''
        Set important markers about the current frame(s).
        
        Contains information about the current environment that the camera "sees".
        The state variable is necessary for validating the data later on.
        '''
        self._state = state
        
    @property
    def decouple_queue(self):
        '''
        Returns if the queue is getting filled with frames.
        '''
        return self._disengage
    
    @decouple_queue.setter
    def decouple_queue(self, val):
        '''
        Set whether to stop pushing to the queue or not. Can be set dynamically. 
        
        Keyword params:
        val -- True to put a halt on pushing frames and False for the opposite.
        '''
        self._disengage = val
        

In [3]:
class CameraSource(Thread):
    '''
    Class used to gather BGR frames from the PiCamera on a separate thread while pushing them
    in a queue that gets consumed on the fly by a process which analyses the data.
    
    Only one instance of this class can exist at a moment.
    
    The format of each pushed object into the queue is tailored for the GiggleBotQAValidation class. That is, each object (list)
    contains a (captured) frame and metadata about it stored as a dictionary. This class can be adapted to accept any format for
    these pushed objects.
    '''
    def __init__(self, queue, camera_settings, state = {}, output_resize = (480, 272)):
        '''
        Initialise the CameraSource object.
        
        Init params:
        queue -- The queue into which frames are pushed in one by one. The queue needs to be infinite in size.
        camera_settings -- Settings of the PiCamera that can be accessed as an attribute after having initialized the object.
        state -- A dictionary containing important markers necessary for validating the data.
        output_resize -- The actual resolution of the frames that get pushed into the queue. Must be >= than what's specified in camera_settings param. 2-element tuple of (width, height).
        
        The constructor waits 2 seconds for the camera to initialize 
        after the PiCamera object is created. This is a suggestion found in PiCamera's documentation.
        
        The output_resize param has to be a 2-element tuple containing the width and height of the recorded frames.
        '''
        super(CameraSource, self).__init__(group = None, target = None, name = 'CameraSource')
        
        self._queue = queue
        self._camera_settings = camera_settings
        self._output_resize = output_resize
        self._output = MyOutput(queue, output_resize, state)
        
        self._stop_thread = False
        self._terminated = False
        
        self.camera = picamera.PiCamera()
        self.camera.start_preview()
        for setting in list(camera_settings.keys()):
           setattr(self.camera, setting, camera_settings[setting])
        sleep(2.0)
        
    def run(self):
        '''
        This method must not be called from the user space. This gets called by the start method,
        which in turn, is used to trigger this method.
        
        This method continuously pushes frames into the queue. To stop it, call the stop method.
        '''
        try:
            self.camera.start_recording(self._output,
                                        format = 'bgr',
                                        resize = self._output_resize)
            while self._stop_thread is False:
                sleep(0.001)
            self.camera.close()
        except:
            pass
        finally:
            self._terminated = True
            
    def stop(self, blocking = True):
        '''
        Stops the run method that got called by start method.
        
        Keyword params:
        blocking -- Boolean to specify if it awaits for the run method to stop.
        '''
        self._stop_thread = True
        if blocking is True:
            while self._terminated is False:
                sleep(0.001)
                
    @property
    def state(self):
        return self._output.state
    
    @state.setter
    def state(self, state):
        self._output.state = state
        
    @property
    def pause(self):
        '''
        Checks if recording is paused or not.
        '''
        return self._output.decouple_queue
    
    @pause.setter
    def pause(self, val):
        '''
        Pauses the recording process or resumes it.
        
        Keyword params:
        val -- True to pause it and false to resume it.
        '''
        self._output.decouple_queue = val
    

In [9]:
class GiggleBotQAValidation(Thread):
    
    failed_qa = False
    counter = 0
    
    def __init__(self, process_queue, stop_when_empty = False):
        super(GiggleBotQAValidation, self).__init__(group = None, target = None, name = 'GiggleBotQAValidation')
        
        self._procq = process_queue
        self._stop_when_empty = stop_when_empty
        self._stop_thread = False
        self._terminated = False
        self._boundaries = [
            ('red', [0, 165, 128], [15, 255, 255]),
            ('red', [165, 165, 128], [179, 255, 255]),
            ('green', [35, 165, 128], [75, 255, 255]),
            ('blue', [90, 165, 128], [133, 255, 255])
        ]
        
    def run(self):
        while self._stop_thread is False:
            try:
                frame, metadata = self._procq.get_nowait()
                self._do_qa_on_frame(frame, metadata)
            except Empty:
                if self._stop_when_empty is True:
                    break
            finally:
                sleep(0.001)
        self._terminated = True
    
    def stop(self, blocking = True):
        self._stop_thread = True
        if blocking is True:
            while self._terminated is False:
                sleep(0.001)
                
    def _do_qa_on_frame(self, frame, metadata):
        color_dist, leds = self._do_frame_analysis(frame)
        print(color_dist, leds, self.counter)
    
    def _do_frame_analysis(self, frame):
        (height, width) = frame.shape[0:2]
        
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        # frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        blurred = cv2.GaussianBlur(gray, (5,5), 0)
        thresh = cv2.threshold(blurred, 200, 255, cv2.THRESH_BINARY)
        contours = cv2.findContours(thresh[1].copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        contours = contours[0] if imutils.is_cv2() else contours[1]
        
        contours_list = []
        radiuses = []
        centers = []
        for contour in contours:
            perimeter = cv2.arcLength(contour, True)
            approx = cv2.approxPolyDP(contour, 0.01 * perimeter, True, True)
            area = cv2.contourArea(contour)
            
            if (len(approx) > 8) and (len(approx) < 23) and area > 100:
                contours_list.append(contour)
                radiuses.append(perimeter / math.pi / 2)
                M = cv2.moments(contour)
                cX = int(M['m10'] / M['m00'])
                cY = int(M['m01'] / M['m00'])
                centers.append((cX, cY))
        
        scale = 1.7
        intern_radius = np.mean(radiuses)
        extern_radius = intern_radius * scale
        intern_radius = math.ceil(intern_radius)
        extern_radius = math.ceil(extern_radius)
        extern_diam = extern_radius * 2
        
        mask = np.zeros((height, width, 3), np.uint8)
        for center in centers:
            cv2.circle(mask, center, extern_radius, (255, 255, 255), -1)
            cv2.circle(mask, center, intern_radius, (0, 0, 0), -1)
            
        out = np.zeros((height, width, 3), np.uint8)
        cv2.bitwise_and(frame, mask, out)
        writearray(out[:,:,[2,1,0]], "processed/img_{}.png".format(self.counter), 'RGB')
        self.counter += 1
        cv2.cvtColor(out, cv2.COLOR_BGR2HSV, out)
                         
        transpose_list = list(zip(*self._boundaries))
        colors = {}
        for elem in transpose_list[0]:
            colors[elem] = 0
        for (color, lower, upper) in self._boundaries:
            lower = np.array(lower, dtype = np.uint8)
            upper = np.array(upper, dtype = np.uint8)
            
            mask = cv2.inRange(out, lower, upper)
            filtered_channel = cv2.bitwise_and(out, out, mask = mask)
            filtered_channel = filtered_channel.reshape((height * width, 3))
            filtered_channel = filtered_channel[~np.all(filtered_channel == 0, axis = 1)]
            colors[color] += filtered_channel.shape[0]
            
        return colors, len(contours_list)
    
def test(frame, metadata):
    pass

In [10]:
import io
from PIL import Image
import IPython

def imshow(a, fmt='jpeg'):
    '''
    Function to display an image within a Jupyter notebook.
    
    Use 'jpeg' instead of 'png' (~5 times faster)
    
    Keyword params:
    a -- numpy array to print; has width, height and no. of planes.
    fmt -- output format to print; 'jpeg' is the fastest.
    '''
    f = io.BytesIO()
    Image.fromarray(a).save(f, fmt)
    height = a.shape[0]
    width = a.shape[1]
    IPython.display.display(IPython.display.Image(data=f.getvalue(), width = width, height = height))
    
    
def writearray(array, filename, mode = 'RGB'):
    img = Image.fromarray(array, mode)
    if img.mode != 'RGB':
        img = img.convert('RGB')
    img.save(filename)
    
def readarray(filename):
    img = Image.open(filename)
    imarray = np.array(img)
    
    return imarray

In [8]:
settings = {
    'iso': 100,
    'shutter_speed': 48000,
    'preview_alpha': 255,
    'sensor_mode': 1,
    'rotation': 0,
    'framerate': 20,
    'brightness': 50,
    'awb_mode': 'off',
    'awb_gains': 1.5
}
state = {
    'leds': 'red'
}
image_queue = Queue(maxsize = -1)

consumer = GiggleBotQAValidation(image_queue)
producer = CameraSource(image_queue, settings, state)

consumer.start()
producer.start()
sleep(3)
producer.stop()
sleep(15)
consumer.stop()

/home/pi/.env/lib/python3.5/site-packages/picamera/encoders.py:521: PiCameraAlphaStripping: using alpha-stripping to convert to non-alpha format; you may find the equivalent alpha format faster
  "using alpha-stripping to convert to non-alpha "


{'blue': 0, 'green': 0, 'red': 4780} 7 1
{'blue': 0, 'green': 0, 'red': 4767} 7 2
{'blue': 0, 'green': 0, 'red': 4781} 7 3
{'blue': 0, 'green': 0, 'red': 4776} 7 4
{'blue': 0, 'green': 0, 'red': 4781} 7 5
{'blue': 0, 'green': 0, 'red': 4773} 7 6
{'blue': 0, 'green': 0, 'red': 4779} 7 7
{'blue': 0, 'green': 0, 'red': 4770} 7 8
{'blue': 0, 'green': 0, 'red': 4766} 7 9
{'blue': 0, 'green': 0, 'red': 4772} 7 10
{'blue': 0, 'green': 0, 'red': 4771} 7 11
{'blue': 0, 'green': 0, 'red': 4771} 7 12
{'blue': 0, 'green': 2342, 'red': 2193} 7 13
{'blue': 0, 'green': 5496, 'red': 0} 7 14
{'blue': 0, 'green': 5493, 'red': 0} 7 15
{'blue': 0, 'green': 5487, 'red': 0} 7 16
{'blue': 0, 'green': 5497, 'red': 0} 7 17
{'blue': 0, 'green': 5489, 'red': 0} 7 18
{'blue': 0, 'green': 5487, 'red': 0} 7 19
{'blue': 0, 'green': 5486, 'red': 0} 7 20
{'blue': 0, 'green': 5489, 'red': 0} 7 21
{'blue': 0, 'green': 5493, 'red': 0} 7 22
{'blue': 0, 'green': 5487, 'red': 0} 7 23
{'blue': 0, 'green': 5497, 'red': 0} 7 2

In [7]:
# writearray(frame, 'gb_red.png')