<a href="https://colab.research.google.com/github/Barbarioli/Pipeline/blob/master/Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Pipeline

##Packages

In [0]:
from skimage.io import imread
from skimage.transform import resize
import numpy as np
import os
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import seaborn as sns
import cv2
from multiprocessing import Process, JoinableQueue, Pool, Manager, cpu_count
from time import sleep
import time

In [6]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
!ls "/content/drive/My Drive/Photos"

##Loading

In [0]:
def load(path):
  """
  path: path to the photos to be loaded
  """
  image_files = sorted([os.path.join(path, file) for file in os.listdir(path) if file.endswith('.jpg')])
  return image_files

##Operators

In [0]:
class image_loader(object):
  """Load images from a list of paths to a queue in order to be processed.
  
  Args:
    data: list of paths to all images.
    queue: queue to where the loaded images will be added.
  
  """
  
  def __init__(self, data, queue):
    self.queue = queue
    self.data = iter(data)
    
  def __iter__(self):  
    return self
  
  def __next__(self):
    if len(data) > 0:
      self.queue.put(imread(data[-1]))
      data.pop()
      return
    
    else:
      raise StopIteration
      
      
class preprocessing(object):
  """Preprocess images resizing, cropping and converting to tensor
  
  Args:
    input_queue: input queue with the images to be preprocessed.
    output_queue: queue with the output images.
    output_size (tuple): desired image size in a tuple of (height, width).
    crop (boolean): randomly crop the image. If cropped it will not be resized.
    totensor (boolean): if the images should be converted to tensor.
    
  """
  
  def __init__(self, input_queue, output_queue, output_size, crop = False, totensor = False):
    self.input_queue = input_queue
    self.output_queue = output_queue
    self.size = output_size
    self.crop = crop
    self.tensor = totensor
    
  def __iter__(self):
    return self
      
  def __next__(self):
    if self.input_queue.qsize() != 0:
      
      image = self.input_queue.get()
      
      if self.crop:
        h, w = image.shape[:2]
        new_h, new_w = self.size
        top = np.random.randint(0, h - new_h)
        left = np.random.randint(0, w - new_w)
        image = image[top: top + new_h,
                     left: left + new_w]
        
      else:
        image = resize(image, self.size, anti_aliasing = True)
      
      if self.tensor:
        image = image.transpose((2,0,1))
        image = torch.from_numpy(image)
        
      self.output_queue.put(image)
      
      
      
class inference(object):
  """Perform inference on a dataset
  
  Args:
    input_queue: input queue with the images to be preprocessed.
    output_queue: queue with the output images.
    compression_rate(float): float number between 0-100 which represents the compression rate
          in percentage terms. For simulation it represents the sleep time in seconds
    simulation(boolean): if simulation or real run
    model(string): the deep learning model that will be used on the images
    
  """
  
  def __init__(self, input_queue, output_queue, compression_rate, simulation = True, model = None):
      self.input_queue = input_queue
      self.output_queue = output_queue
      self.simulation = simulation
      
      if simulation:
        self.sleep = compression_rate
        
      else:
        self.compression_rate = compression_rate
        self.model = model
      
  def __iter__(self):
      return self
  
  def __next__(self):
    if self.simulation:
      self.input_queue.get()
      sleep(self.sleep)
      self.output_queue.put('a')
      return
    #else:
       

##Test

In [0]:
data = load("/content/drive/My Drive/Photos")

In [0]:
#Initializing queues

queue_1 = JoinableQueue()
queue_2 = JoinableQueue()
queue_3 = JoinableQueue()

#Loading

process_1 = Process(target = image_loader, args = (data, queue_1), daemon=True)
process_1.start()

#Preprocessing

process_2 = Process(target = preprocessing, args = (queue_1, queue_2, (300,300)), daemon=True)
process_2.start()

#Inference

process_3 = Process(target = inference, args = (queue_2, queue_3, 2), daemon=True)
process_3.start()

In [12]:
#Inserting another image

next(image_loader(data, queue_1))
next(preprocessing(queue_1, queue_2, (300,300)))
next(inference(queue_2, queue_3, 2))

  warn("The default mode, 'constant', will be changed to 'reflect' in "


In [13]:
#Number of items in each queue

print("queue 1 size: ", queue_1.qsize())
print("queue 2 size: ", queue_2.qsize())
print("queue 3 size: ", queue_3.qsize())

queue 1 size:  0
queue 2 size:  0
queue 3 size:  1
