In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
import os

_INPUT_DIR = Path("kaggle/input")
_WORKING_DIR = Path('kaggle/working')
_WORKING_DIR.mkdir(exist_ok=True)
_NO_VAL = True

import zipfile
with zipfile.ZipFile(_INPUT_DIR /'plates.zip', 'r') as zip_obj:
   zip_obj.extractall(_WORKING_DIR)

_DATA_ROOT = _WORKING_DIR / 'plates'

In [None]:
_LLAVA_MODEL = "liuhaotian/llava-v1.5-7b"

def download_model(model: str):
    import requests
    from bs4 import BeautifulSoup
    from pathlib import Path

    _BASE_URL = "https://huggingface.co"
    _HUGGING_FACE_URL = f"{_BASE_URL}/{model}"
    _UNEEDED_FILES = {f"/{model}/resolve/main/{file}?download=true" for file in ("README.md", ".gitattributes")}

    response = requests.get(f"{_HUGGING_FACE_URL}/tree/main")
    response.encoding = 'utf-8'
    assert response.status_code == 200
    soup = BeautifulSoup(response.text, 'html.parser')
    links = [a["href"] for a in soup.find_all("a", download=True) if a["href"] not in _UNEEDED_FILES]

    path = Path(_LLAVA_MODEL)
    path.mkdir(parents=True, exist_ok=True)
    for l in links:
        print(url)
        local_filename, _ = l.split('/')[-1].split('?')
        print(f"uploading file: {local_filename}")
        with requests.get(_BASE_URL + l, stream=True) as resp:
            resp.raise_for_status()
            with open(path / local_filename, "wb") as file:
                for chunk in resp.iter_content(chunk_size=8192):
                    file.write(chunk)


In [None]:
download_model(_LLAVA_MODEL)

In [None]:
!git clone git@github.com:haotian-liu/LLaVA.git

In [None]:
!pip install -e LLaVA

In [None]:
!pip install protobuf

In [None]:
from llava.model.builder import load_pretrained_model
from transformers import TextIteratorStreamer
from llava.mm_utils import process_images, load_image_from_base64, tokenizer_image_token
from llava.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN, DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN
from threading import Thread

model_name=_LLAVA_MODEL.split("/")[-1]
tokenizer, model, image_processor, context_len = load_pretrained_model(
    model_path=_LLAVA_MODEL,
    model_base=None,
    model_name=model_name,
    load_4bit=True)

In [None]:
from llava.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN, DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN
from llava.conversation import conv_templates, SeparatorStyle
from llava.model.builder import load_pretrained_model
from llava.utils import disable_torch_init
from llava.mm_utils import process_images, tokenizer_image_token, get_model_name_from_path

from PIL import Image

import requests
from PIL import Image
from io import BytesIO
from transformers import TextStreamer
import torch

def load_image(image_file):
    if image_file.startswith('http://') or image_file.startswith('https://'):
        response = requests.get(image_file)
        image = Image.open(BytesIO(response.content)).convert('RGB')
    else:
        image = Image.open(image_file).convert('RGB')
    return image


def process(
    image_file: str,
    prompt: str,
    do_sample=True,
    temperature=0.5,
    max_new_tokens=4048,
):
    model_name = get_model_name_from_path(_LLAVA_MODEL)

    if "llama-2" in model_name.lower():
        conv_mode = "llava_llama_2"
    elif "mistral" in model_name.lower():
        conv_mode = "mistral_instruct"
    elif "v1.6-34b" in model_name.lower():
        conv_mode = "chatml_direct"
    elif "v1" in model_name.lower():
        conv_mode = "llava_v1"
    elif "mpt" in model_name.lower():
        conv_mode = "mpt"
    else:
        conv_mode = "llava_v0"

    conv = conv_templates[conv_mode].copy()
    if "mpt" in model_name.lower():
        roles = ('user', 'assistant')
    else:
        roles = conv.roles

    image = load_image(image_file)
    image_size = image.size
    image_tensor = process_images([image], image_processor, model.config)
    if type(image_tensor) is list:
        image_tensor = [image.to(model.device, dtype=torch.float16) for image in image_tensor]
    else:
        image_tensor = image_tensor.to(model.device, dtype=torch.float16)

    if image is not None:
        if model.config.mm_use_im_start_end:
            prompt = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN + '\n' + prompt
        else:
            prompt = DEFAULT_IMAGE_TOKEN + '\n' + prompt

    conv.append_message(conv.roles[0], prompt)
    conv.append_message(conv.roles[1], None)
    prompt = conv.get_prompt()

    input_ids = tokenizer_image_token(prompt, tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt').unsqueeze(0).to(model.device)
    stop_str = conv.sep if conv.sep_style != SeparatorStyle.TWO else conv.sep2
    keywords = [stop_str]
    streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)

    with torch.inference_mode():
        output_ids = model.generate(
            input_ids,
            images=image_tensor,
            image_sizes=[image_size],
            do_sample=do_sample,
            temperature=temperature,
            max_new_tokens=max_new_tokens,
            streamer=streamer,
            use_cache=True)

    outputs = tokenizer.decode(output_ids[0]).strip()

    return outputs

In [None]:
_SAMPLES = 40
_PROMPT = \
"Your task is to separate dirty and cleaned plates using their images. "\
"If the plate in the image is dirty then reply with just one word 'dirty'. "\
"If the plate in the image is cleaned then reply with just one word 'cleaned'."

def is_cleaned_plate(image_file: str, samples: int = _SAMPLES) -> int:
    result = 0
    for _ in range(samples):
        resp = process(image_file, _PROMPT).lower()
        result += "cleaned" in resp
    return result

In [None]:
train_data = {
    "id": [],
    "label": [],
    "prediction": [],
}

for file in (_DATA_ROOT / "train" / "dirty").glob("*.jpg"):
    train_data["id"].append(file.stem)
    train_data["label"].append("dirty")
    train_data["prediction"].append(is_cleaned_plate(str(file)))

for file in (_DATA_ROOT / "train" / "cleaned").glob("*.jpg"):
    train_data["id"].append(file.stem)
    train_data["label"].append("clean")
    train_data["prediction"].append(is_cleaned_plate(str(file)))

train_df = pd.DataFrame(data=train_data)

In [None]:
train_df.head()

In [None]:
train_df[["label", "prediction"]].groupby("label").agg(["mean", "std"])

In [None]:
import matplotlib.pyplot as plt

x = []
y = []

for i in range(_SAMPLES):
    x.append(i)
    y.append(
        ((train_df["label"] == "clean") & (train_df["prediction"] > i)).sum() \
        + ((train_df["label"] == "dirty") & (train_df["prediction"] <= i)).sum()
    )
    y[-1] /= len(train_df)
    y[-1] *= 100.
plt.plot(x, y, lw=2)
plt.grid("on")
plt.xlabel("Порог, значение")
plt.ylabel("Точность, %")
_ = plt.title("Точность в зависимости от порога")

In [None]:
test_data = {
    "id": [],
    "prediction": [],
}

for file in (_DATA_ROOT / "test").glob("*.jpg"):
    test_data["id"].append(file.stem)
    test_data["prediction"].append(is_cleaned_plate(str(file)))

test_df = pd.DataFrame(data=test_data)

In [None]:
submission_df = pd.DataFrame()
submission_df["id"] = test_df["id"]
submission_df["label"] = test_df["prediction"].apply(lambda x: "cleaned" if x > 24 else "dirty")

In [None]:
submission_df.head()

In [None]:
submission_df.to_csv("submission.csv", index=False)