In [None]:
!pip install dioptra

In [None]:
!pip install datasets torchvision transformers evaluate

In [None]:
####
#
# Fetch an open source dataset from the Huggingface Hub
#
####

from datasets import load_dataset
import random

cats_vs_gods_dataset = load_dataset('cats_vs_dogs')['train']

groundtruth = list(cats_vs_gods_dataset['labels'])

all_indexes = list(range(0, len(cats_vs_gods_dataset)))
random.Random(1234).shuffle(all_indexes)

training_indexes = all_indexes[0: int(0.8 * len(all_indexes))]
testing_indexes = all_indexes[int(0.8 * len(all_indexes)): -1]
training_indexes.sort()
testing_indexes.sort()

reverse_ontology = {
  0: "cat",
  1: "dog"
}

ontology = {
  "cat": 0,
  "dog": 1
}

In [None]:
####
#
# Set some env variables
#
####

import os
img_bucket = 'my_bucket'
img_dir = f's3://{img_bucket}/end_to_end_test/imgs'

os.environ['DIOPTRA_API_KEY'] = 'my_api_key'
os.environ['DIOPTRA_UPLOAD_BUCKET'] = img_bucket
os.environ['DIOPTRA_UPLOAD_PREFIX'] = 'end_to_end_test/logs/'

In [None]:
####
#
# Define some labeling provider
#
####

class LabelProvider():
    def __init__(self, groundtruth, reverse_ontology):
        self.groundtruth = groundtruth
        self.reverse_ontology = reverse_ontology

    def label_data(self, dataframe):
        labels = []
        for index, row in dataframe.iterrows():
            dataset_index = row['tags.datapoint_id']
            label = self.reverse_ontology[self.groundtruth[dataset_index]]
            labels.append(label)
        dataframe['groundtruth.class_name'] = labels
        return dataframe
        

In [None]:
####
#
# Upload the imgs to S3
#
####

import os
os.mkdir('imgs')
for index in range(len(all_indexes)):
    cats_vs_gods_dataset[index]['image'].save(f'./imgs/{index}.jpg', format='jpeg')

In [None]:
!aws s3 cp imgs {img_dir} --recursive --quiet

In [None]:
####
#
# Upload the metadata to dioptra
#
####

from dioptra.lake.utils import upload_to_lake

initial_metadata = []
for index in range(len(all_indexes)):
    initial_metadata.append({
        'image_metadata': {
            'uri': f'{img_dir}/{index}.jpg'
        },
        'tags': {
            'datapoint_id': index,
            'data_split': 'train' if index in training_indexes else 'test'
        }})

for batch in [initial_metadata[i:i + 1000] for i in range(0, len(initial_metadata), 1000)]:
    upload_to_lake(batch)

In [None]:
####
#
# Create a test dataset
#
####

from dioptra.lake.utils import download_from_lake
from dioptra.lake.datasets import Dataset as DioptraDataset

test_df = download_from_lake(filters=[{
    'left': 'tags.data_split',
    'op': '=',
    'right': 'test'
}], fields=['uuid', 'request_id', 'tags.datapoint_id'])


test_dataset = DioptraDataset()
test_dataset.create('test_cast_vs_dogs')
test_dataset.add(list(test_df['uuid']))
test_dataset.commit('initial commit')

In [None]:
####
#
# Get the data labled by some provider and update the lake with the new groundtruth
#
####

my_label_provider = LabelProvider(groundtruth, reverse_ontology)
test_df = my_label_provider.label_data(test_df)

update_dataset = []

for index, row in test_df.iterrows():
    update_dataset.append({'request_id': row['request_id'], 'groundtruth': {'class_name': row['groundtruth.class_name']}})
    
for batch in [update_dataset[i:i + 1000] for i in range(0, len(update_dataset), 1000)]:
    upload_to_lake(batch)

In [None]:
####
#
# Download training unlabeled data as a dataset
#
####

from dioptra.lake.utils import download_from_lake
from dioptra.lake.torch.object_store_datasets import ImageDataset

unlabeled_df = download_from_lake(filters=[{
    'left': 'tags.data_split',
    'op': '=',
    'right': 'train'
}], fields=['image_metadata.uri', 'tags.datapoint_id', 'request_id'])

unlabeled_dataset = ImageDataset(unlabeled_df)

first_run_metadata = []
for row in unlabeled_dataset:
    first_run_metadata.append({'request_id': row['request_id'], 'tags': {'run_id': 'initial'}})

In [None]:
####
#
# Let's define the transform pipe and pre fetch the images (optional)
#
####

import io
import smart_open
import torch
from torchvision import transforms

from torch.utils.data import DataLoader

from dioptra.inference.torch.classifier_runner import ClassifierRunner

transform_pipe = transforms.Compose([
    transforms.Lambda(lambda x: x.convert('RGB')),
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.repeat(3, 1, 1) if x.shape[0] == 1 else x),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def transform(row):
    return transform_pipe(row['image'])
    
unlabeled_dataset.transform = transform
unlabeled_dataset.load_images = True
unlabeled_dataset.prefetch_images(20)

In [None]:
####
#
# Use a pre trained model to generate first embeddings
# Let's use a plain torch model for this one
#
####

data_loader = DataLoader(
    unlabeled_dataset, batch_size=10, num_workers=4, shuffle=False)

torch_model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)
torch_model.to('cuda')

my_runner = ClassifierRunner(
    model=torch_model, 
    embeddings_layers=['layer4'],
    device='cuda',
    metadata=first_run_metadata
)

my_runner.run(data_loader)

In [None]:
####
#
# Check that we got all the data ingested
#
####

download_from_lake(
    filters=[{'left': 'tags.run_id', 'op': '=', 'right': 'initial'}],
    fields=['uuid', 'request_id', 'tags.datapoint_id'])

In [None]:
####
#
# Kick off a coreset miner to pull the first 100 samples
#
####


import time
from dioptra.miners.coreset_miner import CoresetMiner

my_miner = CoresetMiner(
    select_filters=[{
        'left': 'tags.run_id',
        'op': '=',
        'right': 'initial'}],
    size=100,
    display_name='coreset miner',
    embeddings_field='embeddings',
    skip_caching=True
)

my_miner.run()

while my_miner.get_status() != 'SUCCESS':
    print('waiting for results')
    time.sleep(10)

In [None]:
####
#
# Let's create our training dataset and add the miner results to the dataset
#
####

from dioptra.lake.datasets import Dataset as DioptraDataset

my_dataset = DioptraDataset()
my_dataset.create('training_cast_vs_dogs')

coreset_results_df = download_from_lake(
    filters=[{'left': 'uuid', 'op': 'in', 'right': my_miner.get_results()}],
    fields=['uuid', 'request_id', 'tags.datapoint_id'])

my_dataset.add(list(coreset_results_df['uuid']))
my_dataset.commit('initial_version')

In [None]:
####
#
# Use our labeling provider to labels the data and update the lake
#
####

from dioptra.lake.utils import upload_to_lake

my_label_provider = LabelProvider(groundtruth, reverse_ontology)

labeled_df = my_label_provider.label_data(coreset_results_df)

update_dataset = []

for index, row in labeled_df.iterrows():
    update_dataset.append({'request_id': row['request_id'], 'groundtruth': {'class_name': row['groundtruth.class_name']}})
    
for batch in [update_dataset[i:i + 1000] for i in range(0, len(update_dataset), 1000)]:
    upload_to_lake(batch)

In [None]:
####
#
# Let's download and prep our training dataset
#
####

new_training_df = my_dataset.download(fields=['image_metadata.uri', 'groundtruth.class_name'])
new_training_df = new_training_df[new_training_df['groundtruth.class_name'] != '']

In [None]:
####
#
# our training routine ...
#
####

import torch
import torch.nn as nn

import evaluate
from torchvision import transforms

from transformers import (
    AutoConfig,
    AutoImageProcessor,
    AutoModelForImageClassification,
    Trainer,
    TrainingArguments,
    EarlyStoppingCallback
)

import numpy as np

from dioptra.lake.torch.object_store_datasets import ImageDataset

config = AutoConfig.from_pretrained(
    'microsoft/resnet-50',
    num_labels=len(ontology),
    label2id=reverse_ontology,
    id2label=ontology,
    finetuning_task="image-classification"
)
model = AutoModelForImageClassification.from_pretrained(
    'microsoft/resnet-50',
    ignore_mismatched_sizes=True,
    config=config
)
image_processor = AutoImageProcessor.from_pretrained(
    'microsoft/resnet-50'
)

training_args = TrainingArguments(
    output_dir='test_trainer',
    evaluation_strategy='epoch',
    logging_strategy='epoch',
    save_strategy='epoch',
    num_train_epochs=20,
    learning_rate=5e-4,
    load_best_model_at_end = True
)

metric = evaluate.load('accuracy')
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

def collate_fn(examples):
    pixel_values = torch.stack([example['pixel_values'] for example in examples])
    labels = torch.tensor([example['labels'] for example in examples])
    return {'pixel_values': pixel_values, 'labels': labels}

_train_transforms = transforms.Compose(
    [
        transforms.Lambda(lambda x: x.convert('RGB')),
        transforms.RandomResizedCrop((image_processor.size['shortest_edge'])),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=image_processor.image_mean, std=image_processor.image_std),
    ]
)
_eval_transforms = transforms.Compose(
    [
        transforms.Lambda(lambda x: x.convert('RGB')),
        transforms.Resize(image_processor.size['shortest_edge']),
        transforms.CenterCrop((image_processor.size['shortest_edge'])),
        transforms.ToTensor(),
        transforms.Normalize(mean=image_processor.image_mean, std=image_processor.image_std),
    ]
)


def train_transforms(example_batch):
    return {
        'pixel_values': _train_transforms(example_batch['image']),
        'labels': ontology[example_batch['groundtruth.class_name']]
    }

def eval_transforms(example_batch):
    return {
        'pixel_values': _eval_transforms(example_batch['image']),
        'labels': ontology[example_batch['groundtruth.class_name']]
    }

new_training_df = new_training_df.sample(frac=1).reset_index(drop=True)

training_data = ImageDataset(
    dataframe=new_training_df.iloc[0: int(len(new_training_df) * 0.6)],
    transform=train_transforms)
evaluation_data = ImageDataset(
    dataframe=new_training_df.iloc[int(len(new_training_df) * 0.6): -1],
    transform=eval_transforms)

training_data.load_images = True
evaluation_data.load_images = True

trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=training_data,
        eval_dataset=evaluation_data,
        compute_metrics=compute_metrics,
        tokenizer=image_processor,
        data_collator=collate_fn,
        callbacks=[EarlyStoppingCallback(early_stopping_patience = 10)]
    )

In [None]:
####
#
# Start the training
#
####

trainer.train()

In [None]:
####
#
# Get our test dataset
#
####

from dioptra.lake.torch.object_store_datasets import ImageDataset

test_dataset = DioptraDataset()
test_dataset.dataset_id = '7c91b00a-02b3-4d1d-90fa-63e017bb4597'
test_df = test_dataset.download()
test_df = test_df[test_df['groundtruth.class_name'].notna()]
test_data = ImageDataset(
    dataframe=test_df,
    transform=eval_transforms)
test_data.prefetch_images(20)

In [None]:
####
#
# Evaluate ...
#
####


trainer.evaluate(test_data)

In [None]:
####
#
# Get the next batch of data
#
####


from dioptra.lake.utils import download_from_lake
from dioptra.lake.torch.object_store_datasets import ImageDataset

second_run_df = download_from_lake(filters=[{
    'left': 'tags.data_split',
    'op': '=',
    'right': 'train'
}], fields=['image_metadata.uri', 'tags.datapoint_id', 'request_id']).drop_duplicates('request_id', keep='first')

second_run_dataset = ImageDataset(second_run_df)

second_run_metadata = []
for row in second_run_dataset:
    second_run_metadata.append({'request_id': row['request_id'], 'tags': {'run_id': 'second'}})


In [None]:
####
#
# Kick off a new run ...
#
####

def my_transforms(example_batch):
    return _eval_transforms(example_batch['image'])

second_run_dataset.load_images = True
second_run_dataset.transform = my_transforms

second_run_data_loader = DataLoader(
    second_run_dataset, batch_size=10, num_workers=4, shuffle=False)

my_runner_2 = ClassifierRunner(
    model=model, 
    embeddings_layers=['resnet.pooler'],
    logits_layer='classifier',
    device='cuda',
    metadata=second_run_metadata,
    class_names=list(ontology.keys())
)

my_runner_2.run(second_run_data_loader)

In [None]:
####
#
# Start a new set of miners
#
####

import time
from dioptra.miners.activation_miner import ActivationMiner
from dioptra.miners.coreset_miner import CoresetMiner
from dioptra.miners.entropy_miner import EntropyMiner

filters = [{
    'left': 'tags.run_id',
    'op': '=',
    'right': 'second'
}]

current_training_filters = [{
    'left': 'request_id',
    'op': 'in',
    'right': list(my_dataset.download()['request_id'])
}]

my_miners = []
my_miners.append(ActivationMiner(
    select_filters=filters,
    size=33,
    display_name='activation miner 2',
    embeddings_field='embeddings',
    skip_caching=True
))

my_miners.append(CoresetMiner(
    select_filters=filters,
    select_reference_filters=current_training_filters,
    size=33,
    display_name='coreset miner 2',
    embeddings_field='embeddings',
    skip_caching=True
))

my_miners.append(EntropyMiner(
    select_filters=filters,
    size=33,
    display_name='entropy miner 2'
))

for miner in my_miners:
    miner.run()
                 

done = False
while not done:
    print('waiting for results')
    time.sleep(10)
    for miner in my_miners:
        if miner.get_status() != 'SUCCESS':
            continue
        done = True

In [None]:
####
#
# Add the new match to our training dataset
#
####

for miner in my_miners:
    results_df = download_from_lake(
        filters=[{'left': 'uuid', 'op': 'in', 'right': miner.get_results()}],
        fields=['uuid', 'request_id', 'tags.datapoint_id'])

    my_dataset.add(list(results_df['uuid']))
my_dataset.commit('second version')

In [None]:
####
#
# Use our labeling provider to labels the data and update the lake
#
####

my_label_provider = LabelProvider(groundtruth, reverse_ontology)

for miner in my_miners:
    results_df = download_from_lake(
        filters=[{'left': 'uuid', 'op': 'in', 'right': miner.get_results()}],
        fields=['uuid', 'request_id', 'tags.datapoint_id'])

    labeled_df = my_label_provider.label_data(results_df)

    update_dataset = []

    for index, row in labeled_df.iterrows():
        update_dataset.append({'request_id': row['request_id'], 'groundtruth': {'class_name': row['groundtruth.class_name']}})

    for batch in [update_dataset[i:i + 1000] for i in range(0, len(update_dataset), 1000)]:
        upload_to_lake(batch)

In [None]:
####
#
# Continue ...
#
####