In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
# os.chdir('./drive/MyDrive/566/tise-toolbox')
os.chdir('./drive/MyDrive/566')

#Fine tune Clip

In [None]:
!pip install ftfy regex tqdm
!pip install git+https://github.com/openai/CLIP.git

In [4]:
from PIL import Image
import torch
from torch import nn, optim
import glob
import os
import pandas as pd
import json
import numpy as np
import clip
from torch.utils.data import Dataset, DataLoader, BatchSampler
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
import random
from matplotlib.pyplot import imshow
import torchtext
import nltk, re, string, collections
from nltk.util import ngrams
import collections
from pathlib import Path

In [5]:
class MyDataset(Dataset):
    def __init__(self, data, labels, captions, preprocess):
        self.preprocess = preprocess

        self.images = []
        self.labels = labels
        self.captions = captions
        for img in data:
            self.images.append(self.preprocess(img))
        
    def __len__(self):
        return len(self.captions)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        caption = self.captions[idx]
        return image, caption, label

def getAllImage(base):
    files = []
    images = []
    for root, ds, fs in os.walk(base):
        for f in fs:
            files.append(f)
    files.sort(key = lambda x : int(x[:-4]))
    for f in files:
        image = Image.open(base + f)
        images.append(image)
    return images

def getAllCaption(base):
    files = []
    captions = []
    for root, ds, fs in os.walk(base):
        for f in fs:
            files.append(f)
    files.sort(key = lambda x : int(x[:-4]))

    for fi in files:
        f = open(base + fi, 'r')
        lines = f.readlines()
        for line in lines:
            captions.append(lines[0][0:-1])
    return captions

In [6]:
imageset = getAllImage('./sample/')
captionset = getAllCaption('./captions/')

f = open('./label.csv', 'r')
lines = f.readlines()

labelset = []
for line in lines:
    labelset.append(int(line[0]))

idxList = [x for x in range(1000)]

In [7]:
train_idxs, test_idxs = train_test_split(idxList, test_size=0.2, random_state=42)

images_train = [imageset[k] for k in train_idxs]
images_test = [imageset[k] for k in test_idxs]

labels_train = [labelset[k] for k in train_idxs]
labels_test = [labelset[k] for k in test_idxs]

captions_train = [captionset[k] for k in train_idxs]
captions_test = [captionset[k] for k in test_idxs]

len(captions_train), len(captions_test)

(800, 200)

In [8]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device, jit=False)

100%|████████████████████████████████████████| 338M/338M [00:01<00:00, 282MiB/s]


In [9]:
train_dataset = MyDataset(images_train, labels_train, captions_train, preprocess)
test_dataset = MyDataset(images_test, labels_test, captions_test, preprocess)
len(train_dataset), len(test_dataset), train_dataset[0]

(800,
 200,
 (tensor([[[ 1.4048,  1.4048,  1.4632,  ...,  1.7844,  1.7406,  1.7552],
           [ 1.4194,  1.4340,  1.4340,  ...,  1.7260,  1.7260,  1.7114],
           [ 1.4048,  1.4194,  1.3902,  ...,  1.6968,  1.7114,  1.6822],
           ...,
           [-0.3470, -0.3616, -0.3032,  ..., -0.0259, -0.0988, -0.2156],
           [ 0.5289,  0.4121,  0.2077,  ...,  0.0909,  0.1055,  0.0909],
           [ 0.3683,  0.3391,  0.1931,  ...,  0.0325,  0.0471, -0.0405]],
  
          [[ 1.5346,  1.5196,  1.5796,  ...,  1.9098,  1.8798,  1.8798],
           [ 1.5196,  1.5346,  1.5346,  ...,  1.8498,  1.8648,  1.8348],
           [ 1.5046,  1.5346,  1.4896,  ...,  1.8198,  1.8198,  1.8047],
           ...,
           [-0.2963, -0.2963, -0.2363,  ...,  0.0789,  0.0038, -0.1163],
           [ 0.5891,  0.4841,  0.2740,  ...,  0.1839,  0.2139,  0.1839],
           [ 0.4390,  0.4240,  0.2589,  ...,  0.1389,  0.1389,  0.0638]],
  
          [[ 1.5344,  1.5487,  1.6055,  ...,  1.9753,  1.9326,  1.9468],

In [10]:
class BalancedBatchSampler(BatchSampler):
    """
    BatchSampler - from a MNIST-like dataset, samples n_classes and within these classes samples n_samples.
    Returns batches of size n_classes * n_samples
    """

    def __init__(self, labels, n_classes, n_samples):
        self.labels = labels
        self.labels_set = list(set(self.labels.numpy()))
        self.label_to_indices = {label: np.where(self.labels.numpy() == label)[0]
                                 for label in self.labels_set}
        for l in self.labels_set:
            np.random.shuffle(self.label_to_indices[l])
        self.used_label_indices_count = {label: 0 for label in self.labels_set}
        self.count = 0
        self.n_classes = n_classes
        self.n_samples = n_samples
        self.n_dataset = len(self.labels)
        self.batch_size = self.n_samples * self.n_classes

    def __iter__(self):
        self.count = 0
        while self.count + self.batch_size < self.n_dataset:
            classes = np.random.choice(self.labels_set, self.n_classes, replace=False)
            indices = []
            for class_ in classes:
                indices.extend(self.label_to_indices[class_][
                               self.used_label_indices_count[class_]:self.used_label_indices_count[
                                                                         class_] + self.n_samples])
                self.used_label_indices_count[class_] += self.n_samples
                if self.used_label_indices_count[class_] + self.n_samples > len(self.label_to_indices[class_]):
                    np.random.shuffle(self.label_to_indices[class_])
                    self.used_label_indices_count[class_] = 0
            yield indices
            self.count += self.n_classes * self.n_samples

    def __len__(self):
        return self.n_dataset // self.batch_size

train_dataloader = DataLoader(train_dataset, batch_size = 10, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size = 10, shuffle=False)


In [11]:
EPOCH = 10

def convert_models_to_fp32(model): 
    for p in model.parameters(): 
        p.data = p.data.float() 
        p.grad.data = p.grad.data.float() 

if device == "cpu":
    model.float()

loss_img = nn.CrossEntropyLoss()
loss_txt = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=5e-5,betas=(0.9,0.98),eps=1e-6,weight_decay=0.2)
# optimizer = optim.Adam(model.parameters(), lr=1e-5)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, len(train_dataloader)*EPOCH)

In [12]:
best_te_loss = 1e5
best_ep = -1
for epoch in range(EPOCH):
    print(f"running epoch {epoch}, best test loss {best_te_loss} after epoch {best_ep}")
    step = 0
    tr_loss = 0
    model.train()
    pbar = tqdm(train_dataloader, leave=False)
    for batch in pbar:
        step += 1
        optimizer.zero_grad()

        images, texts, _ = batch
        images = images.to(device)
        texts = clip.tokenize(texts).to(device)

        logits_per_image, logits_per_text = model(images, texts)
        ground_truth = torch.arange(10).to(device)

        total_loss = (loss_img(logits_per_image,ground_truth) + loss_txt(logits_per_text,ground_truth))/2
        total_loss.backward()
        tr_loss += total_loss.item()
        if device == "cpu":
            optimizer.step()
            scheduler.step()
        else:
            convert_models_to_fp32(model)
            optimizer.step()
            scheduler.step()
            clip.model.convert_weights(model)
        pbar.set_description(f"train batchCE: {total_loss.item()}", refresh=True)
    tr_loss /= step
    
    step = 0
    te_loss = 0
    with torch.no_grad():
        model.eval()
        test_pbar = tqdm(test_dataloader, leave=False)
        for batch in test_pbar:
            step += 1
            images, texts, _ = batch
            images = images.to(device)
            texts = clip.tokenize(texts).to(device)
            logits_per_image, logits_per_text = model(images, texts)
            ground_truth = torch.arange(10).to(device)

            total_loss = (loss_img(logits_per_image,ground_truth) + loss_txt(logits_per_text,ground_truth))/2
            te_loss += total_loss.item()
            test_pbar.set_description(f"test batchCE: {total_loss.item()}", refresh=True)
        te_loss /= step
        
    if te_loss < best_te_loss:
        best_te_loss = te_loss
        best_ep = epoch
        torch.save(model.state_dict(), "best_model.pt")
    print(f"epoch {epoch}, tr_loss {tr_loss}, te_loss {te_loss}")
torch.save(model.state_dict(), "last_model.pt")

running epoch 0, best test loss 100000.0 after epoch -1


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

epoch 0, tr_loss 2.305029296875, te_loss 2.27802734375
running epoch 1, best test loss 2.27802734375 after epoch 0


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

epoch 1, tr_loss 2.26259765625, te_loss 2.22568359375
running epoch 2, best test loss 2.22568359375 after epoch 1


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

epoch 2, tr_loss 2.27442626953125, te_loss 2.30361328125
running epoch 3, best test loss 2.22568359375 after epoch 1


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

epoch 3, tr_loss 2.3025146484375, te_loss 2.30283203125
running epoch 4, best test loss 2.22568359375 after epoch 1


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

epoch 4, tr_loss 2.303466796875, te_loss 2.3025390625
running epoch 5, best test loss 2.22568359375 after epoch 1


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

epoch 5, tr_loss 2.303271484375, te_loss 2.30322265625
running epoch 6, best test loss 2.22568359375 after epoch 1


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

epoch 6, tr_loss 2.303369140625, te_loss 2.30244140625
running epoch 7, best test loss 2.22568359375 after epoch 1


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

epoch 7, tr_loss 2.302001953125, te_loss 2.30126953125
running epoch 8, best test loss 2.22568359375 after epoch 1


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

epoch 8, tr_loss 2.30009765625, te_loss 2.300390625
running epoch 9, best test loss 2.22568359375 after epoch 1


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

epoch 9, tr_loss 2.2955322265625, te_loss 2.298828125


In [None]:
img = preprocess(Image.open("./sample/0.png")).unsqueeze(0).to(device)
with torch.no_grad():
    image_features = model.encode_image(img)
    print(image_features.to('cpu').numpy()[0])

In [None]:
X = []
for image in imageset:
    print(image)
    img = preprocess(image).unsqueeze(0).to(device)
    with torch.no_grad():
        image_features = model.encode_image(img)
        X.append(image_features.to('cpu').numpy()[0])

In [None]:
train_idxs, test_idxs = train_test_split(idxList, test_size=0.2, random_state=40)

trainset = [(X[k], labelset[k]) for k in train_idxs]
testset = [(X[k], labelset[k]) for k in test_idxs]

train_loader = torch.utils.data.DataLoader(trainset, batch_size=20, shuffle=True)
test_loader = torch.utils.data.DataLoader(testset, batch_size=20, shuffle=False)

In [None]:
class Net(nn.Module):
    def __init__(self, inputSize, outputSize):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(inputSize, 256)
        self.fc2 = nn.Linear(256, 64)
        self.dropout = nn.Dropout(p=0.2)
        self.output = nn.Linear(64, outputSize)

    def forward(self, x):
        x = nn.functional.relu(self.fc1(x))
        x = self.dropout(x)
        x = nn.functional.relu(self.fc2(x))
        x = self.dropout(x)
        x = nn.functional.sigmoid(self.output(x))
        return x

    def predict(self, x):
        pred = nn.functional.sigmoid(self.forward(x))
        ans = []
        for t in pred:
            if t[0]>t[1]:
                ans.append(0)
            else:
                ans.append(1)
        return torch.tensor(ans)

net = Net(len(X[0]), 1)

In [None]:
import torch.optim as optim

criterion = nn.BCELoss()
optimizer = optim.Adam(net.parameters(), lr=0.01)

In [None]:
from torch.autograd import Variable
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("The model will be running on", device, "device")

net.to(device)

for epoch in range(5):  # loop over the dataset multiple times

    running_loss = 0.0
    running_acc = 0.0
    for i, (inputs, labels) in enumerate(train_loader, 0):
        # wrap them in Variable
        inputs = Variable(torch.tensor(inputs).to(device)).float()
        labels = Variable(torch.tensor(labels).to(device)).float()
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        # print(outputs)
        loss = criterion(outputs, labels[:,None])
        loss.backward()
        optimizer.step()
        
        # print statistics
        running_acc += (outputs.reshape(-1).detach().cpu().numpy().round() == labels.cpu().numpy()).mean()

        print(outputs.reshape(-1).detach().cpu().numpy().round())
        
        running_loss += loss.data
        if i % 5 == 4:    # print every 5 mini-batches
            print('[%d, %5d] loss: %.3f accuracy: %.3f' %
                  (epoch + 1, i + 1, running_loss / 5, running_acc / 5))
            running_loss = 0.0
            running_acc = 0.0

    #accuracy = testAccuracy()
    #print('For epoch', epoch+1,'the test accuracy over the whole test set is %d %%' % (accuracy))

print('Finished Training')

In [14]:
import numpy as np
from sklearn.model_selection import train_test_split

X = np.array(X)
y = np.array(labelset)

X_train, X_test, y_train, y_test = train_test_split(X, y)

In [15]:
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier, VotingClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn import metrics
import xgboost


clf1 = GradientBoostingClassifier(n_estimators=200)
clf2 = RandomForestClassifier(random_state=0, n_estimators=500)
clf3 = LogisticRegression(random_state=1)
clf4 = GaussianNB()
clf5 = xgboost.XGBClassifier()
clf = VotingClassifier(estimators=[
    ('gbdt',clf1),
    ('rf',clf2),
    ('lr',clf3),
    ('nb',clf4),
    ('xgboost',clf5),
    ],
    voting='soft')

clf.fit(X_train,y_train)
predictions = clf.predict(X_test)
print("GBDT")
print(metrics.classification_report(y_test,predictions))
print("AC", metrics.accuracy_score(y_test,predictions))

GBDT
              precision    recall  f1-score   support

           0       0.38      0.18      0.25        92
           1       0.63      0.82      0.72       158

    accuracy                           0.59       250
   macro avg       0.51      0.50      0.48       250
weighted avg       0.54      0.59      0.54       250

AC 0.588


In [21]:
ypred = clf.predict(X)
print(metrics.classification_report(y, ypred))
print("AC", metrics.accuracy_score(y, ypred))

              precision    recall  f1-score   support

           0       0.82      0.51      0.63       346
           1       0.78      0.94      0.86       654

    accuracy                           0.79      1000
   macro avg       0.80      0.73      0.74      1000
weighted avg       0.80      0.79      0.78      1000

AC 0.792


In [26]:
yprob = clf.predict_proba(X)

In [28]:
yprob[388]

array([0.84779518, 0.15220482])

In [25]:
testx = []
img = preprocess(Image.open("2394.png")).unsqueeze(0).to(device)
with torch.no_grad():
    image_features = model.encode_image(img)
    testx.append(image_features.to('cpu').numpy()[0])

testx = np.array(testx)

print(clf.predict_proba(testx))

[[0.25717761 0.74282239]]


#Generate Data

In [None]:
!pip install diffusers transformers accelerate scipy safetensors

In [None]:
import torch
from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler

model_id = "stabilityai/stable-diffusion-2-1"

In [None]:
# Use the DPMSolverMultistepScheduler (DPM-Solver++) scheduler here instead
pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16)
pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
pipe = pipe.to("cuda")

In [None]:
f = open('./captions.txt', 'r')
lines = f.readlines()

captions = []
for line in lines:
    captions.append(line)

In [None]:
for i in range(500, 750):
    prompt = captions[i].replace("\n", '')
    image = pipe(prompt).images[0]
    image.save("./sample/" + str(i) + ".png")

In [None]:
prompt = captions[0].replace("\n", '')
image = pipe(prompt).images[0]
image.save("astronaut_rides_horse.png")

#test

In [None]:
!pip install transformers

In [None]:
from PIL import Image
import requests

from transformers import CLIPProcessor, CLIPModel

model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

In [None]:
def getAllImage(base):
    files = []
    images = []
    for root, ds, fs in os.walk(base):
        for f in fs:
            files.append(f)
    files.sort(key = lambda x : int(x[:-4]))
    for f in files:
        image = Image.open(base + f)
        images.append(image)
    return images

In [None]:
images = getAllImage('./sample/')

In [None]:
images[0]

In [None]:
X = []
for image in images:
    output = []
    for rule in rules:
        
        inputs = processor(rule, images=image, return_tensors="pt", padding=True)

        outputs = model(**inputs)
        logits_per_image = outputs.logits_per_image  # this is the image-text similarity score
        probs = logits_per_image.softmax(dim=1)  # we can take the softmax to get the label probabilities

        output += probs.tolist()[0]
    X.append(output)

In [None]:
f = open('./label.csv', 'r')
lines = f.readlines()

labels = []
for line in lines:
    labels.append(int(line[0]))

In [None]:
len(X)

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split

X = np.array(X)
y = np.array(labels)

X_train, X_test, y_train, y_test = train_test_split(X, y)

In [None]:
X_train.shape

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn import metrics

lr = LogisticRegression()                                        # 实例化一个LR模型
lr.fit(X_train, y_train)                                          # 训练模型
y_prob = lr.predict_proba(X_test)[:,1]                           # 预测1类的概率
y_pred = lr.predict(X_test)                                      # 模型对测试集的预测结果
fpr_lr, tpr_lr, threshold_lr = metrics.roc_curve(y_test,y_prob)    # 获取真阳率、伪阳率、阈值
auc_lr = metrics.auc(fpr_lr, tpr_lr)                              # AUC得分
score_lr = metrics.accuracy_score(y_test, y_pred)                 # 模型准确率
print([score_lr, auc_lr])

In [None]:
yPB = lr.predict_proba(X)[:,0] 

In [None]:
yPB[69]

In [None]:
rules = [
    ['human with normal head', 'human without head'], #, 'human with twisted head'
    ["human with 2 legs", "human without legs"],
    ["one human with 2 hands", "one human without hands"],
    ["one human with 2 feet", "one human without feet"]
]

In [None]:
from tabulate import tabulate
# text = ["human have 10 fingers", "human without 10 fingers"]

# text = ["one human with 2 hands", "one human without 2 hands"]
# text = ['human with normal head', 'human with twisted head', 'human without head']
text = ["one human with 2 legs", "one human without legs"]
inputs = processor(text, images=images[0], return_tensors="pt", padding=True)

outputs = model(**inputs)
logits_per_image = outputs.logits_per_image  # this is the image-text similarity score
probs = logits_per_image.softmax(dim=1)  # we can take the softmax to get the label probabilities

print(outputs)

pred = probs.argmax(dim=1)
print(tabulate({"prompt": text, "prob": probs.tolist()[0]}, headers="keys"))
print("\nPredict: " + text[pred])

In [None]:
outputs

#DataSet

In [None]:
!pip install -r requirements.txt
!pip install git+https://github.com/openai/CLIP.git
!pip install 'git+https://github.com/facebookresearch/detectron2.git'

In [None]:
!python download_scripts/download_pretrained_models.py

In [None]:
!python download_scripts/download_cub_data.py

In [None]:
!python download_scripts/download_ms_coco_metadata.py
!sh download_scripts/download_ms_coco_images.sh

In [None]:
os.chdir('./semantic_object_accuracy')

In [None]:
os.chdir('./text_to_images_models/AttnGAN++')

In [None]:
!CUDA_VISIBLE_DEVICES=0 

!python coco_gen_soa_input_images.py \
--label_file_dir "../../semantic_object_accuracy/captions" \
--saved_dir "../../semantic_object_accuracy/images/attngan++" \
--batch_size 16

In [None]:
os.chdir('../../semantic_object_accuracy')

In [None]:
!METHOD=attngan++
!GENERATED_IMAGE_DIR=images/"$METHOD"
!DETECTED_RESULTS_DIR=detected_results/"$METHOD"
!SAVED_RESULT_PATH=results/"$METHOD".txt
!GPU_ID=0

!CUDA_VISIBLE_DEVICES="$GPU_ID"
!python SOA.py \
--images="./images/attngan++" \
--detected_results="./detected_results/attngan++" \
--saved_file="./results/attngan++"