In [None]:
##library required
from wildlife_datasets.datasets import MacaqueFaces
from wildlife_tools.data import WildlifeDataset
import torchvision.transforms as T
import timm
from wildlife_tools.features import DeepFeatures
from wildlife_tools.inference import KnnMatcher
from wildlife_tools.data import FeatureDatabase
from PIL import Image
import os
import torch
os.environ['CUDA_VISIBLE_DEVICES'] = '1'

In [None]:
from ultralytics import YOLO

# 加载权重文件路径
model_path = "/data/Jar/projects/ultralytics/ultralytics/runs/detect/yolov8_large/weights/best.pt"
model = YOLO(model_path)  # 加载自定义模型

# 预测
original_image = '/data/Elio/data/Donkeys/trainset/origin_Data/images/Daphne_00012.jpg'
results = model.predict(original_image, conf=0.25)  # 不使用 source 参数

# 打印结果
print(results)

# 提取并打印每个边界框的信息
for result in results:
    boxes = result.boxes  # 获取所有边界框
    for box in boxes:
        # 获取边界框的坐标
        bbox = box.xyxy[0].tolist()  # 转换为 [x_min, y_min, x_max, y_max] 格式
        confidence = box.conf[0].item()  # 置信度
        class_id = int(box.cls[0].item())  # 类别ID
        print(f"BBox: {bbox}, Confidence: {confidence}, Class ID: {class_id}")

In [None]:
import cv2
import os
from torchvision import transforms

class DonkeyDataset:
    def __init__(self, root, label, transform=None, register=False):
        self.root = root
        self.transform = transform
        # 获取所有图片文件的路径
        self.image_files = sorted([f for f in os.listdir(root) if f.endswith(('.jpg', '.jpeg', '.png'))])
        self.register = register
        self.label_path = label
    
    def __len__(self):
        return len(self.image_files)
    
    def get_image(self, path):
        img = cv2.imread(path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = Image.fromarray(img)
        return img
    
    # YOLO 格式的 bbox: [x_center, y_center, width, height] （归一化坐标）
    # width 和 height 是图像的宽度和高度（像素）
    def yolo_to_pixel(self, bbox, img_width, img_height):
        x_center, y_center, w, h = bbox

        # 反归一化为像素坐标
        x_center *= img_width
        y_center *= img_height
        w *= img_width
        h *= img_height

        # 计算像素坐标边界框 (x_min, y_min, x_max, y_max)
        x_min = x_center - w / 2
        y_min = y_center - h / 2
        x_max = x_center + w / 2
        y_max = y_center + h / 2

        # 确保坐标在图像范围内
        x_min = max(0, min(img_width, round(x_min)))
        y_min = max(0, min(img_height, round(y_min)))
        x_max = max(0, min(img_width, round(x_max)))
        y_max = max(0, min(img_height, round(y_max)))

        return x_min, y_min, x_max, y_max

    def __getitem__(self, idx):
        image_name = self.image_files[idx]
        label_name = os.path.splitext(image_name)[0] + '.txt'
        img_path = os.path.join(self.root, image_name)

        # 提取标签 (根据文件名分割方式，你可以调整 label 提取逻辑)
        label = image_name.split(',')[0]  # 或者根据你的命名规则来提取
        
        if not self.register:
            print('1')
            # 加载图片
            img = self.get_image(img_path)

            # 应用 transform
            if self.transform:
                img = self.transform(img)
        else:
            #print('2')
            original_image = Image.open(img_path).convert("RGB")  # 转换为 PIL 图像
            width, height = original_image.size
            bbox = None

            with open(os.path.join(self.label_path, label_name), 'r') as file:
                for line in file:
                    # 解析每一行的数据
                    values = line.strip().split()
                    class_id = int(values[0])  # 第一个数是类别
                    bbox = list(map(float, values[1:5]))  # 后四个数是边框坐标
                    #print(f"Class ID: {class_id}, Bounding Box: {bbox}")

            x_min, y_min, x_max, y_max = self.yolo_to_pixel(bbox, width, height)
            #print(x_min, y_min, x_max, y_max)
            # 裁剪原图中的边界框区域
            cropped_image = original_image.crop((x_min, y_min, x_max, y_max))
            #cropped_image.save(image_name)
            #cropped_image.show()
            # 应用图像变换
            img = self.transform(cropped_image)

        return img, label

In [None]:
# 创建测试数据
root = '/data/Elio/data/Donkeys/trainset/origin_Data/images'  # 假设这是文件名列表，包含路径和标签
label = '/data/Elio/data/Donkeys/donkey_labels_rec'
dataset_root_path = root  # 图片文件夹的路径

transform = T.Compose([T.Resize([224, 224]), 
                       T.ToTensor(), 
                       T.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))])
donkey_dataset = DonkeyDataset(root=root, label=label, transform=transform, register=True)  # 使用ToTensor将图像转换为张量

# 测试函数
def test_dataset(dataset):
    print("Testing dataset...")
    
    try:
        # 检查长度
        assert len(dataset) == len(os.listdir(root)), "Dataset length mismatch."

        # 测试前3个样本
        for i in range(3):
            img, label = dataset[i]
            # 检查图像和标签是否存在
            assert img is not None, f"Image at index {i} is None."
            assert label is not None, f"Label at index {i} is None."
            #print(f"Sample {i}: Image size - {img.size() if hasattr(img, 'size') else img.shape}, Label - {label}")
        
        print("Dataset test passed successfully!")
    except AssertionError as e:
        print("Dataset test failed:", e)

# 运行测试
test_dataset(donkey_dataset)

In [None]:
class SimpleDeepFeatures:
    def __init__(
        self,
        model,
        batch_size: int = 128,
        num_workers: int = 1,
        device: str = "cpu",
    ):
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.device = device
        self.model = model

    def __call__(self, dataset):
        self.model = self.model.to(self.device)
        self.model = self.model.eval()

        loader = torch.utils.data.DataLoader(
            dataset,
            num_workers=self.num_workers,
            batch_size=self.batch_size,
            shuffle=False,
        )
        outputs = []
        label_list = []
        for image, label in tqdm(loader, mininterval=1, ncols=100):
            label_list.append(label)
            with torch.no_grad():
                output = self.model(image.to(self.device))
                outputs.append(output.cpu())
                
        return torch.cat(outputs).numpy(), label_list

    def run_and_save(self, dataset, save_path):
        features = self(dataset)

        os.makedirs(save_path, exist_ok=True)
        name = self.__class__.__name__
        data = {
        "name": name,
        "features": features,
        "metadata": getattr(dataset, "metadata", None),
        }

        file_name = os.path.join(save_path, name + ".pkl")
        with open(file_name, "wb") as file:
            pickle.dump(data, file, protocol=pickle.HIGHEST_PROTOCOL)
            return file_name

In [None]:
import torch
from tqdm import tqdm
import numpy as np
from PIL import Image
import PIL

PIL.Image.MAX_IMAGE_PIXELS = None
name = 'hf-hub:BVRA/MegaDescriptor-T-224'
model = timm.create_model(name, num_classes=0, pretrained=True)
extractor = SimpleDeepFeatures(model)
features, labels = extractor(donkey_dataset)
print(features.shape)
merged_list = [item for sublist in labels for item in sublist]

# 保存到 .npy 文件
#np.save('features.npy', features)
np.savez('features_and_labels.npz', features=features, labels=merged_list)

In [None]:
data = np.load('features_and_labels.npz')
features = data['features']
merged_list = data['labels']
print(features.shape)


In [None]:
import torch.nn.functional as F
import numpy as np
from collections import defaultdict
import pandas as pd

class SimpleKnnClassifier:
    def __init__(self, database_labels, k: int = 1):
        self.k = k
        self.database_labels = np.array(database_labels)

    def __call__(self, similarity):

        similarity = torch.tensor(similarity, dtype=float)
        scores, idx = similarity.topk(k=self.k, dim=0)
        pred = self.aggregate(idx)[:, self.k - 1]
        if self.database_labels is not None:
            pred = self.database_labels[pred]
        return pred

    def aggregate(self, predictions):
        """
        Aggregates array of nearest neighbours to single prediction for each k.
        If there is tie at given k, prediction from k-1 is used.

        Args:
            array of with shape [n_query, k] of nearest neighbours.
        Returns:
            array with predictions [n_query, k]. Column dimensions are predictions for [k=1,...,k=k]
        """

        results = []
        # for k in range(1, predictions.shape[1] + 1):
        for row in predictions:
            vals, counts = np.unique(row, return_counts=True)
            best = vals[np.argmax(counts)]

            counts_sorted = sorted(counts)
            if (len(counts_sorted)) > 1 and (counts_sorted[0] == counts_sorted[1]):
                best = None
            results.append(best)

        results = pd.DataFrame(results).T.fillna(method="ffill").T
        return results.values


class SimpleKnnMatcher:
    """
    Find nearest match to query in existing database of features.
    Combines CosineSimilarity and KnnClassifier.
    """

    def __init__(self, features, labels, k=1):
        self.features = features
        self.labels = labels
        self.classifier = SimpleKnnClassifier(
            database_labels=self.labels, k=k
        )

    def __call__(self, query):
        # if isinstance(query, list):
        #     query = torch.cat(query)

        # if not isinstance(query, np.ndarray):
        #     raise ValueError("Query should be array or list of features.")

        sim_matrix = F.cosine_similarity(query, torch.tensor(self.features))

        return sim_matrix
        scores, idx = sim_matrix.topk(k=1, dim=0)
        idx = idx.cpu().numpy()[0]
        return self.labels[idx]
        #return self.classifier(sim_matrix)

In [None]:
def yolo_to_pixel(bbox, img_width, img_height):
    x_center, y_center, w, h = bbox

    # 反归一化为像素坐标
    x_center *= img_width
    y_center *= img_height
    w *= img_width
    h *= img_height

    # 计算像素坐标边界框 (x_min, y_min, x_max, y_max)
    x_min = x_center - w / 2
    y_min = y_center - h / 2
    x_max = x_center + w / 2
    y_max = y_center + h / 2

    # 确保坐标在图像范围内
    x_min = max(0, min(img_width, round(x_min)))
    y_min = max(0, min(img_height, round(y_min)))
    x_max = max(0, min(img_width, round(x_max)))
    y_max = max(0, min(img_height, round(y_max)))

    return x_min, y_min, x_max, y_max

In [None]:
from tqdm import tqdm
import warnings
import sys
from PIL import Image
import PIL
PIL.Image.MAX_IMAGE_PIXELS = None
warnings.filterwarnings('ignore')

image_path = "/data/Elio/data/Donkeys/trainset/origin_Data/images"
label_path = "/data/Elio/data/Donkeys/donkey_labels_rec"
answer_txt_path = "/data/Elio/data/Donkeys/donkey_labels_rec/classes.txt"
success = 0
total = 0

for i in tqdm(os.listdir(image_path)):
    total += 1
    image = Image.open(os.path.join(image_path, i))
    width, height = image.size

    label_name = i.split('.')[0] + '.txt'
    label_txt = os.path.join(os.path.join(label_path, label_name))

    with open(label_txt, 'r') as file:
        for line in file:
            # 解析每一行的数据
            values = line.strip().split()
            class_id = int(values[0])  # 第一个数是类别
            bbox = list(map(float, values[1:5]))  # 后四个数是边框坐标
            #print(f"Class ID: {class_id}, Bounding Box: {bbox}")
    
    # 获取答案
    answer_list = []
    with open(answer_txt_path, 'r') as file:
        for line in file:
            answer_list.append(line)

    answer = answer_list[class_id]
    print('answer is: ', answer)
    
    x_min, y_min, x_max, y_max = yolo_to_pixel(bbox, width, height)

    # 裁剪原图中的边界框区域
    cropped_image = image.crop((x_min, y_min, x_max, y_max))
    #cropped_image.save(i)

    # 应用图像变换
    query = model(transform(cropped_image).unsqueeze(0))
    sim_matrix = F.cosine_similarity(query, torch.tensor(features))
    #matcher = SimpleKnnMatcher(features, merged_list, k=1)
    #res = matcher(query)

    # 创建一个字典来存储每个类的得分
    class_scores = defaultdict(list)

    # 将得分归类
    for idx, score in enumerate(sim_matrix):
        class_name = merged_list[idx]  # 获取当前索引对应的类名
        class_name = class_name.split('_')[0]
        class_scores[class_name].append(score.item())

    # 计算每个类的平均分
    class_avg_scores = {cls: sum(scores) / len(scores) for cls, scores in class_scores.items()}
    print(class_avg_scores)

    # 找到平均分最高的类
    final_class = max(class_avg_scores, key=class_avg_scores.get)
    print(final_class)
    sys.exit()
    res = res.split('_')[0]

    if answer.strip() == res.strip():
        success += 1


print(success / total)
