# Prodigy Recipe


In [None]:
# default_exp custom_recipe
# default_cls_lvl 2 

In [None]:
#hide
from nbdev.showdoc import *

## Imports

In [None]:
#export
import numpy as np
import copy
import io
import torch
import os
import fastai
from PIL import Image
import PIL
from time import time
import json
from pathlib import Path
from dotenv import load_dotenv

from prodigy.components.loaders import get_stream, JSONL
from prodigy.components.preprocess import fetch_images
from prodigy.core import recipe, recipe_args
from prodigy.util import log, b64_uri_to_bytes, split_string, read_image_b64, write_jsonl, read_jsonl
from prodigy.components.loaders import Images
from prodigy.components.sorters import prefer_uncertain, prefer_high_scores, prefer_low_scores
from prodigy.components.loaders import JSONL
import prodigy


from fastai.vision import *
from pathlib import Path
from fastscript import *

from datetime import datetime
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

In [None]:
INFLUX_TOKEN=os.getenv('INFLUX_TOKEN')
INFLUX_ORG=os.getenv('INFLUX_ORG')
INFLUX_BUCKET=os.getenv('INFLUX_BUCKET')
INFLUX_URL=os.getenv('INFLUX_URL')

## fastaimodel recipe

In [None]:
#export
@recipe(
    "fastaimodel",
    dataset=("The dataset to use", "positional", None, str),
    source=("Path to a directory of images", "option", "source", str),
    model_path=("Path to the fastai model", "option", "model", str),
    target_folder=("Path to the target folder where the pictures are " +
                   "in the labled folders", 
                   "option", 
                   "target_folder", 
                   str),
    sort_by_score_type=("choose which order you want to receive the predictions. " +
                        "The availiable orders are prefer_uncertain, prefer_high_scores, prefer_low_scores.", 
                        "option", 
                        "sort", 
                        str),
    label=("One or more comma-separated labels", "option", "label", str)    
)
def fastai_recipe(dataset, source, model_path, target_folder, sort_by_score_type, label='horse_poo'):
    """recipe to load data in a certain order and save them to a folder"""

    
    def update(examples):
        # This function is triggered when Prodigy receives annotations
        print(f"type of examples = {type(examples)}")
        for example in examples:
            if example['answer'] == 'accept':
                save_base64_image(str(target_folder_pos), example['text'] + '.jpg', example['image'])
            if example['answer'] == 'reject':
                save_base64_image(str(target_folder_neg), example['text'] + '.jpg', example['image'])
                    
            
        #print(f"Received {len(examples)} annotations!")
      
    
    #create folders
    create_folders(target_folder, label)
    target_folder = Path(target_folder)
    target_folder_pos = target_folder / label
    target_folder_neg = target_folder / ('no_' + label)
            
    learn = load_fastai_model(model_path)
    stream = score_stream(Images(source), model_path)
    
  
    if sort_by_score_type == 'prefer_high_scores':
        stream = prefer_high_scores(stream)
    elif sort_by_score_type == 'prefer_low_scores':
        stream = prefer_low_scores(stream)
    elif sort_by_score_type == 'prefer_uncertain':
        stream = prefer_uncertain(stream)
    
    stream.first_n = 20000
    

    return {
        "dataset": dataset,
        "view_id": "image_manual",
        "stream": stream,
        "update": update,
        "config": {  # Additional config settings, mostly for app UI
            "label": "horse_poo"
        }
        
    }


In [None]:
#export 
def create_folders(path:str, label:str) -> None:
    """create the target folder"""
    path = Path(path)
    path.mkdir(parents=True, exist_ok=True)
    path_pos = path / label
    path_pos.mkdir(parents=True, exist_ok=True)
    path_neg = path / ('no_' + label)
    path_neg.mkdir(parents=True, exist_ok=True)

In [None]:
# check if folder do not exist
if os.path.exists('horse_poo'): Path('horse_poo').rmdir()
if os.path.exists('no_horse_poo'): Path('no_horse_poo').rmdir()
create_folders('.', 'horse_poo')
assert Path('horse_poo').exists() is True
assert Path('no_horse_poo').exists() is True

In [None]:
#export
def load_fastai_model(path, test_folder:[Path, str]=None):
    """load a fastai model from a given path"""
    path = Path(path)
    folder = path.parent
    file = path.name
    if test_folder is not None:
        il = ImageList.from_folder(test_folder)
        return load_learner(path=folder, file=file, test=il)
    return load_learner(str(folder), str(file))
    

In [None]:
if os.path.exists('data/export.pkl'):
    model = load_fastai_model('data/export.pkl')
    if os.path.exists('test_data/sample/20181216093008.jpg'):
        prediction = model.predict(fastai.vision.open_image('test_data/sample/20181216093008.jpg'))        
        assert type(model) == fastai.basic_train.Learner
     
    model = load_fastai_model('data/export.pkl', 'test_data/sample')
    preds = model.get_preds(ds_type=DatasetType.Test)
    assert type(preds) == list
    assert len(preds) == 2
    assert preds[0][0,:].shape == torch.Size([2])
        
     

In [None]:
#export 
def save_base64_image(path, filename, uri):
    """save base64 encoded image """
    tgt_path = Path(path) / filename
    pil_image = PIL.Image.open(io.BytesIO(b64_uri_to_bytes(uri)))
    pil_image.save(str(tgt_path))

In [None]:
#export
def score_stream(stream, model_path):
    learn = load_fastai_model(model_path)
    for example in stream:
        if not example["image"].startswith("data"):
            msg = "Expected base64-encoded data URI, but got: '{}'."
            raise ValueError(msg.format(example["image"][:100]))

        pil_image = PIL.Image.open(io.BytesIO(b64_uri_to_bytes(example["image"])))
        a = np.asarray(pil_image)
        a = np.transpose(a, (1, 0, 2))
        a = np.transpose(a, (2, 1, 0))
        x = torch.from_numpy(a.astype(np.float32, copy=False) )
        x = x.div_(255)
        score = learn.predict(Image(x))[2][0].numpy().item() 
        print(f"socre={score}, id={example['text']}")
        yield (score, example)

## fastai_jsonl_recipe

In [None]:
#export
def predict_folder(image_folder:[str, Path], path_model:[str, Path]=Path('data/export.pkl')):
    """predicts a folder of images and saves images in tasks.jsonl"""
    image_folder = Path(image_folder)
    path_model = Path(path_model)
    learn = load_fastai_model(str(path_model), test_folder=str(image_folder))
    preds,y = learn.get_preds(ds_type=DatasetType.Test)
    scores = preds[:,learn.data.classes.index('horse_poo')].numpy()
    paths = learn.data.test_ds.items
    
    jsonl_list = []
    for score, path in sorted(zip(scores, paths), reverse=True):
            obj = {"image": str(path), "text": path.stem, "score": str(np.round(score, 3))}
            jsonl_list.append(obj)
        
    print(f"save results to {str(image_folder / 'tasks.jsonl')}")
    write_jsonl(str(image_folder / 'tasks.jsonl'), jsonl_list)
    
    
    return learn, preds, y, jsonl_list

In [None]:
if os.path.exists('data/export.pkl'):
    predict_folder(image_folder=Path('test_data/sample/'), path_model='data/export.pkl')

In [None]:
#export
@call_parse
def predict_all_subfolders(path:Param("path of parent folder", str)='data', 
                           skipXmostRecent:Param("skips the nth most recent folders", int)=1,
                           path_model:Param("path to the model to use", str)='data/export.pkl',
                           predict_single_folder:Param("path to single folder", str)=None):
    """predicts all images in subfolders of the given path an creates a tasks.jsonl file"""
    path = Path(path)
    
    if predict_single_folder is not None:
        predict_folder(Path(predict_single_folder), path_model)
        return 
    
    subfolders = sorted(next(os.walk(str(path)))[1], reverse=True)
    
    subfolders = [path / folder for folder in subfolders]
    
    for folder in subfolders[skipXmostRecent:]:
        print(f'predict {folder}')
        predict_folder(folder, path_model)
        
    

In [None]:
#export
@recipe(
    "fastai_jsonl_recipe",
    dataset=("The dataset to use", "positional", None, str),
    path_image_folder=("folder with tasks.jsonl file", "option", "path_image_folder", str),
    path_model=("folder where we can find the deployed model", "option", "path_model", str),
    predict=("wether to predict if there is already a tasks.jsonl or not", "option", "predict", int)
)
def fastai_jsonl_recipe(dataset, path_image_folder, path_model, predict=0):
    """recipe to predict and laod data in a certain order"""
    
    
    def on_load(controller):
        """crates tasks.jsonl file order by predictions"""        
        if predict == 1 or os.path.exists(path_image_folder) is False:
            print(f'make predictions for folder {path_image_folder} and model {path_model}')
            predict_folder(image_folder=Path(path_image_folder), path_model=Path(path_model))

    source = Path(path_image_folder)
    stream = JSONL(str(source / 'tasks.jsonl'))
    stream = fetch_images(stream, skip=True)
    
    return {
        "dataset": dataset,
        "view_id": "image_manual",
        "on_load": on_load,
        "stream": stream,
        "config": {  # Additional config settings, mostly for app UI
            "label": "horse_poo"
        }
        
    }
            

In [None]:
if os.path.exists('data/export.pkl'):
    res = fastai_jsonl_recipe('test', 'test_data/sample', 'data/export.pkl', 1)
    res['on_load']('controller')
    assert os.path.exists('test_data/sample/tasks.jsonl')
    os.unlink('test_data/sample/tasks.jsonl')
    

In [None]:
from nbdev.export import *
notebook2script('01_custom_receipe.ipynb')

In [None]:
!prodigy fastai_jsonl_recipe binary_horse_poo \
-path_image_folder /mnt/Data/to_label/20210107/ \
-path_model /home/wilhelm/PooDetector/data/tmp/export.pkl \
-predict 0co \
-F /home/wilhelm/PooDetector/PooDetector/custom_recipe.py

In [None]:
!python PooDetector/custom_recipe.py \
--path /mnt/Data/to_label/ \
--path_model /home/wilhelm/PooDetector/data/tmp/export.pkl 

In [None]:

def create_to_label_jsonl(path_target:[str,Path]=Path('tasks.jsonl'), flux_query:str=None):
    """creates a jsonl file for the predictions selcted by the flux query"""
    
    if flux_query is None:
        flux_query = '''
            from(bucket: "poo_detector")
            |> range(start: -48h)
            |> filter(fn: (r) => r["_measurement"] == "ai")
            |> filter(fn: (r) => r["_value"] >= 0.3)            
            '''

    client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN)
    df = client.query_api().query_data_frame(query, org=org)

    filenames = df.filename.to_list()
    paths = df.path.to_list()
    scores = df._value.to_list()

    jsonl_list = []
    for score, path, filename in sorted(zip(scores, paths, filenames), reverse=True):
        obj = {"image": path, "text": filename, "score": str(round(score, 3))}
        jsonl_list.append(obj)

    print(f"save results to {str(path_target)}")
    write_jsonl(str(path_target), jsonl_list)

In [None]:
create_to_label_jsonl()


In [None]:
!prodigy image.manual binary_horse_poo ./tasks.jsonl --loader jsonl --label horse_poo
