# Luigi Classification Pipeline

We will build a small Luigi pipeline in order to get started. The task is to classify images into either *lemons* or *bananas*.

Write 3 task:

1. Check for daily data
1. Preprocess images (convert to grayscale, resize to (100, 100)
1. Classify image and write the results into a JSON-File

## Hints and Tricks for openCV

Read an image from disk:
```python
img = cv2.imread("path", cv2.IMREAD_COLOR)
```

Resize an image:
```python
img = cv2.resize(img, (X_SIZE,Y_SIZE))
```

Convert image to grayscale:
```python
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
```

Write an image to disk:
```python
cv2.imwrite("path", img)
```

Find circles to identify lemons:
```python
circles = cv2.HoughCircles(img, 
                           cv2.HOUGH_GRADIENT,
                           dp=2, 
                           minDist=15, 
                           param1=100, 
                           param2=70)
```

## Imports

In [1]:
import json
from datetime import date
import luigi
from luigi.parameter import DateParameter
from luigi import LocalTarget, Task, WrapperTask
from luigi.tools.range import RangeDailyBase
import cv2

## Task 1: Check for daily data

In [2]:
class CheckDailyData(Task):
    date = DateParameter(default=date.today())

    def output(self):
        prefix = self.date.strftime("%m-%d-%Y")
        return LocalTarget("exercise-dataset/daily/%s/image.jpg" % prefix)

## Task 2: Preprocess input image

In [3]:
class Preprocess(Task):
    date = DateParameter(default=date.today())

    def requires(self):
        return CheckDailyData(self.date)

    def output(self):
        prefix = self.date.strftime("%m-%d-%Y")
        return LocalTarget("exercise-dataset/daily/%s/preprocessed.jpg" % prefix)

    def run(self):
        print(self.input().path)
        img = cv2.imread(self.input().path, cv2.IMREAD_COLOR)
        larger = cv2.resize(img, (100,100))
        gray = cv2.cvtColor(larger, cv2.COLOR_BGR2GRAY)
        cv2.imwrite(self.output().path, gray)


## Classify image

In [4]:
class Classify(Task):
    date = DateParameter(default=date.today())

    def requires(self):
        return Preprocess(self.date)

    def output(self):
        prefix = self.date.strftime("%m-%d-%Y")
        return LocalTarget("exercise-dataset/daily/%s/result.json" % prefix)

    def run(self):
        img = cv2.imread(self.input().path, cv2.IMREAD_GRAYSCALE)
        circles = cv2.HoughCircles(img, 
                                   cv2.HOUGH_GRADIENT,
                                   dp=2, 
                                   minDist=15, 
                                   param1=100, 
                                   param2=70)
        label = "lemon" if circles is not None else "banana"
        with self.output().open("w") as out:
            json.dump({"class": label}, out)


## Run the pipeline

In [5]:
luigi.build([Classify(date(2018,2,19))], local_scheduler=True, no_lock=True)

DEBUG: Checking if Classify(date=2018-02-19) is complete
INFO: Informed scheduler that task   Classify_2018_02_19_999079b9db   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=927606712, workers=1, host=05fc4da11f1d, username=root, pid=62) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 Classify(date=2018-02-19)

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



True

## Daily jobs and backfillings 

Now we can classify a single image that is identified by it's savedate. But Luigi comes even more handy when handling "backfillings". Using the *RangeDailyBase* Wrappertask we can process all 3 images with the pipeline we already built.

```python
RangeDailyBase(of=TASK, start=START_DATE, stop=END_DATE, days_back=ALLOWED_DAYS_INTO_PAST)
```

In [18]:
luigi.build([RangeDailyBase(of=Classify, start=date(2018,2,19), stop=date(2018,2,23), days_back=365)], local_scheduler=True, no_lock=True)

DEBUG: Checking if RangeDailyBase(of=Classify, of_params={}, reverse=False, task_limit=50, now=None, param_name=None, start=2018-02-19, stop=2018-02-23, days_back=365, days_forward=0) is complete
INFO: Informed scheduler that task   RangeDailyBase_365_0_None_0325ca2a07   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=195309940, workers=1, host=05fc4da11f1d, username=root, pid=62) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 RangeDailyBase(...)

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



True