<a href="https://colab.research.google.com/github/PashaLysyi321/CSC-Hackathon-2021/blob/main/ImageSortNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import subprocess

CUDA_version = [s for s in subprocess.check_output(["nvcc", "--version"]).decode("UTF-8").split(", ") if s.startswith("release")][0].split(" ")[-1]
print("CUDA version:", CUDA_version)

if CUDA_version == "10.0":
    torch_version_suffix = "+cu100"
elif CUDA_version == "10.1":
    torch_version_suffix = "+cu101"
elif CUDA_version == "10.2":
    torch_version_suffix = ""
else:
    torch_version_suffix = "+cu110"

# ! pip install torch==1.7.1{torch_version_suffix} torchvision==0.8.2{torch_version_suffix} -f https://download.pytorch.org/whl/torch_stable.html ftfy regex

import numpy as np
import torch

print("Torch version:", torch.__version__)

CUDA version: 11.0
Torch version: 1.9.0+cu102


# Загружаем CLIP

Скачиваем CLIP, предобученный на 400М пар изображение-текст.  Его можно использовать в режиме обучения без обучения (например ViT-B/32 CLIP). После запуска блока нас ждет установка скачивание model.pt модели CLIP: Visual Transformer "ViT-B/32" + Text Transformer

In [None]:
MODELS = {
    "ViT-B/32":  "https://openaipublic.azureedge.net/clip/models/40d365715913c9da98579312b702a82c18be219cc2a73407c4526f58eba950af/ViT-B-32.pt",
}

! wget {MODELS["ViT-B/32"]} -O model.pt

model = torch.jit.load("model.pt").cuda().eval()
input_resolution = model.input_resolution.item()
context_length = model.context_length.item()
vocab_size = model.vocab_size.item()

print("Model parameters:", f"{np.sum([int(np.prod(p.shape)) for p in model.parameters()]):,}")
print("Input resolution:", input_resolution)
print("Context length:", context_length)
print("Vocab size:", vocab_size)

from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize
from PIL import Image

preprocess = Compose([
    Resize(input_resolution, interpolation=Image.BICUBIC),
    CenterCrop(input_resolution),
    ToTensor()
])

image_mean = torch.tensor([0.48145466, 0.4578275, 0.40821073]).cuda()
image_std = torch.tensor([0.26862954, 0.26130258, 0.27577711]).cuda()


! pip install ftfy regex
! wget https://openaipublic.azureedge.net/clip/bpe_simple_vocab_16e6.txt.gz -O bpe_simple_vocab_16e6.txt.gz

--2021-07-05 13:50:42--  https://openaipublic.azureedge.net/clip/models/40d365715913c9da98579312b702a82c18be219cc2a73407c4526f58eba950af/ViT-B-32.pt
Resolving openaipublic.azureedge.net (openaipublic.azureedge.net)... 13.107.246.71, 13.107.213.71, 2620:1ec:bdf::71, ...
Connecting to openaipublic.azureedge.net (openaipublic.azureedge.net)|13.107.246.71|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 353976522 (338M) [application/octet-stream]
Saving to: ‘model.pt’


2021-07-05 13:50:48 (56.1 MB/s) - ‘model.pt’ saved [353976522/353976522]

Model parameters: 151,277,313
Input resolution: 224
Context length: 77
Vocab size: 49408


  "Argument interpolation should be of type InterpolationMode instead of int. "


--2021-07-05 13:50:51--  https://openaipublic.azureedge.net/clip/bpe_simple_vocab_16e6.txt.gz
Resolving openaipublic.azureedge.net (openaipublic.azureedge.net)... 13.107.246.71, 13.107.213.71, 2620:1ec:bdf::71, ...
Connecting to openaipublic.azureedge.net (openaipublic.azureedge.net)|13.107.246.71|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1356917 (1.3M) [application/octet-stream]
Saving to: ‘bpe_simple_vocab_16e6.txt.gz’


2021-07-05 13:50:51 (23.6 MB/s) - ‘bpe_simple_vocab_16e6.txt.gz’ saved [1356917/1356917]



# Препроцессинг текста и изображений


In [None]:
#@title

import gzip
import html
import os
from functools import lru_cache

import ftfy
import regex as re


@lru_cache()
def bytes_to_unicode():
    """
    Returns list of utf-8 byte and a corresponding list of unicode strings.
    The reversible bpe codes work on unicode strings.
    This means you need a large # of unicode characters in your vocab if you want to avoid UNKs.
    When you're at something like a 10B token dataset you end up needing around 5K for decent coverage.
    This is a signficant percentage of your normal, say, 32K bpe vocab.
    To avoid that, we want lookup tables between utf-8 bytes and unicode strings.
    And avoids mapping to whitespace/control characters the bpe code barfs on.
    """
    bs = list(range(ord("!"), ord("~")+1))+list(range(ord("¡"), ord("¬")+1))+list(range(ord("®"), ord("ÿ")+1))
    cs = bs[:]
    n = 0
    for b in range(2**8):
        if b not in bs:
            bs.append(b)
            cs.append(2**8+n)
            n += 1
    cs = [chr(n) for n in cs]
    return dict(zip(bs, cs))


def get_pairs(word):
    """Return set of symbol pairs in a word.
    Word is represented as tuple of symbols (symbols being variable-length strings).
    """
    pairs = set()
    prev_char = word[0]
    for char in word[1:]:
        pairs.add((prev_char, char))
        prev_char = char
    return pairs


def basic_clean(text):
    text = ftfy.fix_text(text)
    text = html.unescape(html.unescape(text))
    return text.strip()


def whitespace_clean(text):
    text = re.sub(r'\s+', ' ', text)
    text = text.strip()
    return text


class SimpleTokenizer(object):
    def __init__(self, bpe_path: str = "bpe_simple_vocab_16e6.txt.gz"):
        self.byte_encoder = bytes_to_unicode()
        self.byte_decoder = {v: k for k, v in self.byte_encoder.items()}
        merges = gzip.open(bpe_path).read().decode("utf-8").split('\n')
        merges = merges[1:49152-256-2+1]
        merges = [tuple(merge.split()) for merge in merges]
        vocab = list(bytes_to_unicode().values())
        vocab = vocab + [v+'</w>' for v in vocab]
        for merge in merges:
            vocab.append(''.join(merge))
        vocab.extend(['<|startoftext|>', '<|endoftext|>'])
        self.encoder = dict(zip(vocab, range(len(vocab))))
        self.decoder = {v: k for k, v in self.encoder.items()}
        self.bpe_ranks = dict(zip(merges, range(len(merges))))
        self.cache = {'<|startoftext|>': '<|startoftext|>', '<|endoftext|>': '<|endoftext|>'}
        self.pat = re.compile(r"""<\|startoftext\|>|<\|endoftext\|>|'s|'t|'re|'ve|'m|'ll|'d|[\p{L}]+|[\p{N}]|[^\s\p{L}\p{N}]+""", re.IGNORECASE)

    def bpe(self, token):
        if token in self.cache:
            return self.cache[token]
        word = tuple(token[:-1]) + ( token[-1] + '</w>',)
        pairs = get_pairs(word)

        if not pairs:
            return token+'</w>'

        while True:
            bigram = min(pairs, key = lambda pair: self.bpe_ranks.get(pair, float('inf')))
            if bigram not in self.bpe_ranks:
                break
            first, second = bigram
            new_word = []
            i = 0
            while i < len(word):
                try:
                    j = word.index(first, i)
                    new_word.extend(word[i:j])
                    i = j
                except:
                    new_word.extend(word[i:])
                    break

                if word[i] == first and i < len(word)-1 and word[i+1] == second:
                    new_word.append(first+second)
                    i += 2
                else:
                    new_word.append(word[i])
                    i += 1
            new_word = tuple(new_word)
            word = new_word
            if len(word) == 1:
                break
            else:
                pairs = get_pairs(word)
        word = ' '.join(word)
        self.cache[token] = word
        return word

    def encode(self, text):
        bpe_tokens = []
        text = whitespace_clean(basic_clean(text)).lower()
        for token in re.findall(self.pat, text):
            token = ''.join(self.byte_encoder[b] for b in token.encode('utf-8'))
            bpe_tokens.extend(self.encoder[bpe_token] for bpe_token in self.bpe(token).split(' '))
        return bpe_tokens

    def decode(self, tokens):
        text = ''.join([self.decoder[token] for token in tokens])
        text = bytearray([self.byte_decoder[c] for c in text]).decode('utf-8', errors="replace").replace('</w>', ' ')
        return text


In [None]:
from google.colab import files
from IPython.display import Image
from PIL import Image
import requests
import os
import skimage
import IPython.display
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
from collections import OrderedDict
import torch
import os
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
import shutil

def main_processing(classes):
  images = []
  one_per_class = []

  per_class_lists = []

  list_sorted_of_photo = sorted(os.listdir(path="/content/input_images/"))
  for i in list_sorted_of_photo:
    im = Image.open("/content/input_images/"+i).convert('RGB')
    image = preprocess(im)
    if image.shape[0] != 3:
        raise TypeError("Only 3-channel RGB image are allowed")
    images.append(image)
    one_per_class.append(image)

  descriptions = {}

  for my_class in classes:
    descriptions[my_class] = "This is a image of " + str(my_class)

  texts = [descriptions[key] for key in descriptions]
  image_input = torch.tensor(np.stack(one_per_class)).cuda()
  image_input -= image_mean[:, None, None]
  image_input /= image_std[:, None, None]

  tokenizer = SimpleTokenizer()
  text_tokens = [tokenizer.encode(desc) for desc in texts]

  text_input = torch.zeros(len(text_tokens), model.context_length, dtype=torch.long)
  sot_token = tokenizer.encoder['<|startoftext|>']
  eot_token = tokenizer.encoder['<|endoftext|>']

  for i, tokens in enumerate(text_tokens):
      tokens = [sot_token] + tokens + [eot_token]
      text_input[i, :len(tokens)] = torch.tensor(tokens)

  text_input = text_input.cuda()

  with torch.no_grad():
      image_features = model.encode_image(image_input).float()
      text_features = model.encode_text(text_input).float()

  text_descriptions = list(descriptions.values())
  image_input = torch.tensor(np.stack(images)).cuda()
  image_input -= image_mean[:, None, None]
  image_input /= image_std[:, None, None]

  text_tokens = [[sot_token] + tokenizer.encode(desc) + [eot_token] for desc in text_descriptions]
  text_input = torch.zeros(len(text_tokens), model.context_length, dtype=torch.long)

  for i, tokens in enumerate(text_tokens):
      text_input[i, :len(tokens)] = torch.tensor(tokens)

  text_input = text_input.cuda()
  text_input.shape

  with torch.no_grad():
      image_features = model.encode_image(image_input).float()
      image_features /= image_features.norm(dim=-1, keepdim=True) # 512 -> 256 -> 1 (1/0) (N -> 512)
      text_features = model.encode_text(text_input).float()
      text_features /= text_features.norm(dim=-1, keepdim=True)
      text_probs = (100.0 * image_features @ text_features.T).softmax(dim=-1)
      top_probs, top_labels = text_probs.cpu().topk(3, dim=-1)

  plt.figure(figsize=(25, 60))
  param_to_flask = []
  for i, image in enumerate(images):
      plt.subplot(17, 8, 2 * i + 1)
      plt.imshow(image.permute(1, 2, 0))
      plt.axis("off")

      plt.subplot(17, 8, 2 * i + 2)
      y = np.arange(top_probs.shape[-1])
      plt.grid()
      plt.barh(y, top_probs[i])
      plt.gca().invert_yaxis()
      plt.gca().set_axisbelow(True)
      plt.yticks(y, [text_descriptions[index].split(' ')[-1] for index in top_labels[i].numpy()])
      param_to_flask.append([text_descriptions[index].split(' ')[-1] for index in top_labels[i].numpy()][0])

  os.makedirs('/content/output_classes/',exist_ok=True)
  for i in classes:
    os.makedirs('/content/output_classes/'+i,exist_ok=True)
  for i in range(0,len(list_sorted_of_photo)):
    os.replace("/content/input_images/"+list_sorted_of_photo[i], '/content/output_classes/'+str(param_to_flask[i])+'/'+list_sorted_of_photo[i])
  
  !zip -r /content/Result.zip /content/output_classes
  dir = "/content/output_classes/"
  dir = "/content/output_classes/"
  shutil.rmtree(dir)
  return 1

# WEB сервис для сортировки изображений по выбраным пользователем классам

In [None]:
!pip install flask_ngrok
from flask import Flask, send_file
from flask_ngrok import run_with_ngrok 
from flask import Flask, request, redirect, url_for 
from flask import send_from_directory 
from werkzeug.utils import secure_filename 
import io
import zipfile
from flask import Flask, request, send_file, make_response

app = Flask(__name__) 
run_with_ngrok(app)    
 
os.makedirs('input_images',exist_ok=True) 
UPLOAD_FOLDER = 'input_images' 
ALLOWED_EXTENSIONS = set(['png', 'jpg', 'jpeg', 'bmp', 'NEF']) 
 
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER 
 
 
def allowed_file(filename): 
    return '.' in filename and filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS 
 
@app.route('/', methods=['GET', 'POST']) 
 
def upload_file(): 
    if request.method == 'POST': 
        for file in request.files.getlist("file"): 
            if file and allowed_file(file.filename): 
                filename = file.filename 
                file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) 
    return ''' 
    <!doctype html> 
    <html>
    <script> 
    var intTextBox = 3;  
    function addElement()  
    { 
        intTextBox = intTextBox + 1; 
        var contentID = document.getElementById('content'); 
        var howManyTextBoxes = intTextBox;   
        var newTBDiv = document.createElement('div');            
        newTBDiv.setAttribute('id', 'strText' + intTextBox); 
        newTBDiv.innerHTML += `<p><input type=text name = class${intTextBox}>`;                              
        contentID.appendChild(newTBDiv);    
        return False                      
    } 
    </script> 
    <head>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-EVSTQN3/azprG1Anm3QDgpJLIm9Nao0Yz1ztcQTwFspd3yD65VohhpuuCOmLASjC" crossorigin="anonymous">
    </head>
    <body>
    <nav class="navbar navbar-light bg-light">
      <div class="container-fluid">
        <span class="navbar-brand mb-0 h1 ">4nn Task 3</span>
      </div>
    </nav>
    <div class="container" align="center">
      <title>4nn Team</title> 
      <h1>Upload new Files</h1> 
      <form action="" method=post enctype=multipart/form-data> 
        <p><input type=file name=file multiple> 
          <input type=submit value=Upload> 
      </form>
    </div>
    <div class="container" align="center">
      <h3>Custom classes</h3> 
        <form id=content action="sort" method=post>
          <p><input type=text name = class1> 
          <p><input type=text name = class2>
          <p><input type=text name = class3 style = "margin-left: 10.5%;">
          <a href="javascript:addElement();"><input type="button"  value="Add class"></a> 
          <input type=submit value=Sort>
        </form>
    </div>
    </body>
    </html>
    ''' 
 
@app.route('/sort/', methods=['GET', 'POST']) 
def make_sorted_arhiv(): 
    if request.method == 'POST': 
        classes = [] 
        for key in request.form: 
            id_ = key.partition('.')[-1] 
            classes.append(request.form[key]) 
        print(classes)

        try:
          os.remove("/content/Result.zip")
        except:
          pass
          
        main_processing(classes)

        app.config['UPLOAD_FOLDER'] = "." 
        return send_from_directory(app.config['UPLOAD_FOLDER'], "Result.zip", as_attachment=True)
 
app.run()

[31mERROR: Operation cancelled by user[0m
 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off


 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)


 * Running on http://44e1a63ae11c.ngrok.io
 * Traffic stats available on http://127.0.0.1:4040
