In [None]:
from tqdm import tqdm
import torch
import clip
import torchvision.transforms as transforms
import numpy as np
from utils import *
import random
import torch
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from args import Args
import torch
import torch.nn.functional as F
from utils import cls_acc, pre_load_features
from loralib.utils import mark_only_lora_as_trainable, apply_lora, get_lora_parameters, lora_state_dict, save_lora, \
    load_lora



def pre_load_features(clip_model, loader):
    features, labels = [], []
    with torch.no_grad():
        for i, (images, target) in enumerate(tqdm(loader)):
            images, target = images.cuda(), target.cuda()
            image_features = clip_model.encode_image(images)
            image_features /= image_features.norm(dim=-1, keepdim=True)
            features.append(image_features.cpu())
            labels.append(target.cpu())
        features, labels = torch.cat(features), torch.cat(labels)

    return features, labels

def set_random_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

def main():
    args = Args
    set_random_seed(args.seed)
    clip_model, preprocess = clip.load(args.backbone)
    clip_model.eval()
    logit_scale = 100
    return clip_model, preprocess


def clip_classifier(clip_model):
    classnames = ["生", "旦", "净", "末", "丑"]
    with torch.no_grad():
        clip_weights = []
        for classname in classnames:
            # Tokenize the prompts
            template = ["这张图片的皮影头茬行当类别是{}.", "这张图片的皮影头茬类别属于{}."]
            texts = random.choice(template).format(classname)
            texts = clip.tokenize(texts).cuda()
            class_embeddings = clip_model.encode_text(texts)
            class_embeddings /= class_embeddings.norm(dim=-1, keepdim=True)
            class_embedding = class_embeddings.mean(dim=0)
            class_embedding /= class_embedding.norm()
            clip_weights.append(class_embedding)
        clip_weights = torch.stack(clip_weights, dim=1).cuda()

    return clip_weights


def evaluate_lora(args, clip_model, loader):
    clip_model.eval()
    with torch.no_grad():
        classnames = ["生", "旦", "净", "末", "丑"]
        texts = [random.choice(["这张图片的皮影头茬行当类别是{}.", "这张图片的皮影头茬类别属于{}."]).format(i) for i in classnames]
        with torch.amp.autocast(device_type="cuda", dtype=torch.float16):
            texts = clip.tokenize(texts).cuda()
            class_embeddings = clip_model.encode_text(texts)
        text_features = class_embeddings / class_embeddings.norm(dim=-1, keepdim=True)

    acc = 0.
    tot_samples = 0
    with torch.no_grad():
        for i, (images, target) in enumerate(loader):
            images, target = images.cuda(), target.cuda()
            with torch.amp.autocast(device_type="cuda", dtype=torch.float16):
                image_features = clip_model.encode_image(images)
            image_features = image_features / image_features.norm(dim=-1, keepdim=True)
            cosine_similarity = image_features @ text_features.t()
            acc += cls_acc(cosine_similarity, target) * len(cosine_similarity)
            tot_samples += len(cosine_similarity)
    acc /= tot_samples

    return acc


def run_lora(args, clip_model, train_loader, test_loader):
    logit_scale = 100

    # Textual features
    print("\nGetting textual features as CLIP's classifier.")
    textual_features = clip_classifier(clip_model)

    # Pre-load test features
    print("\nLoading visual features and labels from test set.")
    test_features, test_labels = pre_load_features(clip_model, test_loader)

    test_features = test_features.cuda()
    test_labels = test_labels.cuda()

    # Zero-shot CLIP
    clip_logits = logit_scale * test_features @ textual_features
    zs_acc = cls_acc(clip_logits, test_labels)
    print("\n**** Zero-shot CLIP's test accuracy: {:.2f}. ****\n".format(zs_acc))

    test_features = test_features.cpu()
    test_labels = test_labels.cpu()

    list_lora_layers = apply_lora(args, clip_model)
    clip_model = clip_model.cuda()

    if args.eval_only:
        load_lora(args, list_lora_layers)
        acc_test = evaluate_lora(args, clip_model, test_loader)
        print("**** Test accuracy: {:.2f}. ****\n".format(acc_test))
        return

    mark_only_lora_as_trainable(clip_model)
    total_iters = args.n_iters * args.shots

    optimizer = torch.optim.AdamW(get_lora_parameters(clip_model), weight_decay=1e-2, betas=(0.9, 0.999), lr=args.lr)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, total_iters, eta_min=1e-6)

    best_acc_val, best_acc_test = 0., 0.
    best_epoch_val = 0

    # training LoRA
    scaler = torch.cuda.amp.GradScaler()
    count_iters = 0
    finish = False
    while count_iters < total_iters:
        print("total_iters: ", total_iters)
        clip_model.train()
        acc_train = 0
        tot_samples = 0
        loss_epoch = 0.
        if args.encoder == 'vision':
            text_features = textual_features.t().half()
        for i, (images, target) in enumerate(tqdm(train_loader)):

            classnames = ["生", "旦", "净", "末", "丑"]
            texts = [random.choice(["这张图片的皮影头茬行当类别是{}.", "这张图片的皮影头茬类别属于{}."]).format(i) for i in classnames]
            images, target = images.cuda(), target.cuda()
            if args.encoder == 'text' or args.encoder == 'both':
                with torch.amp.autocast(device_type="cuda", dtype=torch.float16):
                    texts = clip.tokenize(texts).cuda()
                    class_embeddings = clip_model.encode_text(texts)
                text_features = class_embeddings / class_embeddings.norm(dim=-1, keepdim=True)

            if args.encoder == 'vision' or args.encoder == 'both':
                with torch.amp.autocast(device_type="cuda", dtype=torch.float16):
                    image_features = clip_model.encode_image(images)
            else:
                with torch.no_grad():
                    with torch.amp.autocast(device_type="cuda", dtype=torch.float16):
                        image_features = clip_model.encode_image(images)
            image_features = image_features / image_features.norm(dim=-1, keepdim=True)

            cosine_similarity = logit_scale * image_features @ text_features.t()
            print("cosine_similarity: ", cosine_similarity)
            loss = F.cross_entropy(cosine_similarity, target)
            acc_train += cls_acc(cosine_similarity, target) * target.shape[0]
            loss_epoch += loss.item() * target.shape[0]
            tot_samples += target.shape[0]
            optimizer.zero_grad()
            scaler.scale(loss).backward()
            scaler.step(optimizer)

            scaler.update()
            scheduler.step()

            count_iters += 1

            if count_iters == total_iters:
                break

        if count_iters < total_iters:
            acc_train /= tot_samples
            loss_epoch /= tot_samples
            current_lr = scheduler.get_last_lr()[0]
            print('LR: {:.6f}, Acc: {:.4f}, Loss: {:.4f}'.format(current_lr, acc_train, loss_epoch))


    acc_test = evaluate_lora(args, clip_model, test_loader)
    print("**** Final test accuracy: {:.2f}. ****\n".format(acc_test))

    if args.save_path != None:
        save_lora(args, list_lora_layers)
    return

In [None]:
clip_model, preprocess = main()
train_preprocess = transforms.Compose([
    transforms.RandomResizedCrop(size=224, scale=(0.08, 1), interpolation=transforms.InterpolationMode.BICUBIC),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.48145466, 0.4578275, 0.40821073), std=(0.26862954, 0.26130258, 0.27577711))
])

trainX_img = datasets.ImageFolder(r"C:\Users\fanta\code\shadow_puppet\CLIP-LoRA-main\piying\train",
                                  transform=train_preprocess)
testX_img = datasets.ImageFolder(r"C:\Users\fanta\code\shadow_puppet\CLIP-LoRA-main\piying\test",
                                 transform=train_preprocess)

train_loader = torch.utils.data.DataLoader(trainX_img, batch_size=256, num_workers=8, shuffle=False,
                                           pin_memory=True)
test_loader = torch.utils.data.DataLoader(testX_img, batch_size=256, num_workers=8, shuffle=False, pin_memory=True)
run_lora(Args, clip_model, train_loader, test_loader)

In [None]:
# KNN
# train features；
features, labels = [], []
clip_model.eval()
with torch.no_grad():
    for i, (images, target) in enumerate(tqdm(train_loader)):
        images, target = images.cuda(), target.cuda()
        with torch.amp.autocast(device_type="cuda", dtype=torch.float16):
            image_features = clip_model.encode_image(images)
        # image_features = clip_model.encode_image(images)
        image_features /= image_features.norm(dim=-1, keepdim=True)
        features.append(image_features.cpu())
        labels.append(target.cpu())
    features, labels = torch.cat(features), torch.cat(labels)

# test features；
features_test, labels_test = [], []
clip_model.eval()
with torch.no_grad():
    for i, (images, target) in enumerate(tqdm(test_loader)):
        images, target = images.cuda(), target.cuda()
        with torch.amp.autocast(device_type="cuda", dtype=torch.float16):
            image_features = clip_model.encode_image(images)
        # image_features = clip_model.encode_image(images)
        image_features /= image_features.norm(dim=-1, keepdim=True)
        features_test.append(image_features.cpu())
        labels_test.append(target.cpu())
    features_test, labels_test = torch.cat(features_test), torch.cat(labels_test)

In [None]:
#import ollama
import joblib
import numpy as np
from sklearn.model_selection import train_test_split
import os
import glob

np.random.seed(42)
#import faiss


def read_pickle(path):
    with open(path, 'rb')as f:
        feats = joblib.load(f)

    return feats

def save_pickle(path, feats):
    with open(path, 'wb')as f:
        joblib.dump(feats, f)


train_root = r"path/to/train"
# 搜寻train文件夹下的所有文件夹；
class_names_p = glob.glob(r"path/to/train/*")
class_names = [i.split("\\")[-1] for i in class_names_p]

#
train_paths ={}
train_paths_lis, train_paths_labels = [], []
print(f"训练集：")
for name in class_names:
    p = r"path/to/train/%s/*.jpg"%name
    img_p = glob.glob(p)
    train_paths[name] = img_p
    train_paths_lis.extend(img_p)
    train_paths_labels.extend([name]*len(img_p))
    print(f"{name}有图片{len(img_p)}张...")


test_paths = {}
test_paths_lis, test_paths_labels = [], []
print(f"测试集：")
for name in class_names:
    p = r"path/to/test/%s/*.jpg"%name
    img_p = glob.glob(p)
    test_paths[name] = img_p
    test_paths_lis.extend(img_p)
    test_paths_labels.extend([name]*len(img_p))
    print(f"{name}有图片{len(img_p)}张...")

print(f"测试集有图片{len(test_paths_lis)}张...")

In [None]:
# KNN;
from sklearn.neighbors import NearestNeighbors

id_class = {v: k for k, v in trainX_img.class_to_idx.items()}

labels_trans = train_paths_labels
labels_test_trans = test_paths_labels

neigh = NearestNeighbors(n_neighbors=9, metric="euclidean")
neigh.fit(list(features))

near_exa = neigh.kneighbors(features_test, return_distance=True)

In [None]:
neigh = NearestNeighbors(n_neighbors=len(labels), metric='euclidean')
neigh.fit(list(features))

near_exa = neigh.kneighbors(features_test, return_distance=False)

unTopK = 9
un_near_exa = [i[-unTopK:] for i in near_exa]