In [3]:
from PIL import Image
import torch
import numpy as np
import matplotlib.pyplot as plt
import os
import scipy.io
import numpy as np
import pandas as pd
from tqdm import tqdm
from PIL import Image
import matplotlib.pyplot as plt

import dgl
import torch
import pickle
import networkx as nx
from dgl.data import DGLDataset
import torchvision.transforms as T
from sklearn.preprocessing import OneHotEncoder

from transformers import OwlViTProcessor, OwlViTForObjectDetection

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32")
model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32")
# model = model.to('cuda')

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def predict(img_path, texts, threshold):
    image = Image.open(img_path)

    inputs = processor(text=texts, images=image, return_tensors="pt")
    outputs = model(**inputs)

    # Target image sizes (height, width) to rescale box predictions [batch_size, 2]
    target_sizes = torch.Tensor([image.size[::-1]])
    # Convert outputs (bounding boxes and class logits) to COCO API
    results = processor.post_process_object_detection(outputs=outputs, threshold=threshold, target_sizes=target_sizes)

    i = 0  # Retrieve predictions for the first image for the corresponding text queries
    text = texts[i]
    boxes, scores, labels = results[i]["boxes"].cpu().detach().numpy(), results[i]["scores"], results[i]["labels"]
   
    # Print detected objects and rescaled box coordinates
    for box, score, label in zip(boxes, scores, labels):
        box = [round(i, 2) for i in box.tolist()]
        print(f"Detected {text[label]} with confidence {round(score.item(), 3)} at location {box}")

    fig, ax = plt.subplots(1, 1, figsize=(8, 8))
    ax.imshow(image)
    ax.set_axis_off()

    for score, box, label in zip(scores, boxes, labels):
        # if score < score_threshold:
        #     continue
        x, y, w, h = box
        rect = plt.Rectangle((x, y), w, h, fill=False, 
                             edgecolor='red', 
                             linewidth=2)
        ax.add_patch(rect)
        ax.text(x, y - 2, 
                f"{text[label]}: {score:.2f}", 
                color='red', 
                fontsize=12, 
                ha='left', 
                va='top')
    plt.show()
    # return boxes, scores, labels

In [4]:
from transformers.image_utils import ImageFeatureExtractionMixin
mixin = ImageFeatureExtractionMixin()

def predictions(image, text_queries):
    inputs = processor(text=text_queries, images=image, return_tensors="pt")
    outputs = model(**inputs)
    # Load example image
    image_size = model.config.vision_config.image_size
    image = mixin.resize(image, image_size)
    input_image = np.asarray(image).astype(np.float32) / 255.0

    # Threshold to eliminate low probability predictions
    score_threshold = 0.1

    # Get prediction logits
    logits = torch.max(outputs["logits"][0], dim=-1)
    scores = torch.sigmoid(logits.values).cpu().detach().numpy()

    # Get prediction labels and boundary boxes
    labels = logits.indices.cpu().detach().numpy()
    boxes = outputs["pred_boxes"][0].cpu().detach().numpy()

    # # 绘制图
    # fig, ax = plt.subplots(1, 1, figsize=(8, 8))
    # ax.imshow(input_image, extent=(0, 1, 1, 0))
    # ax.set_axis_off()

    boxes_node = []
    labels_node = []

    for score, box, label in zip(scores, boxes, labels):
      if score < score_threshold:
        continue
      boxes_node.append(box)
      labels_node.append(label)
      # cx, cy, w, h = box
      
      # ax.plot([cx-w/2, cx+w/2, cx+w/2, cx-w/2, cx-w/2],
      #         [cy-h/2, cy-h/2, cy+h/2, cy+h/2, cy-h/2], "r")
      # ax.text(
      #     cx - w / 2,
      #     cy + h / 2 + 0.015,
      #     f"{text_queries[label]}: {score:1.2f}",
      #     ha="left",
      #     va="top",
      #     color="red",
      #     bbox={
      #         "facecolor": "white",
      #         "edgecolor": "red",
      #         "boxstyle": "square,pad=.3"
      #     })
    return boxes_node,labels_node

In [23]:
texts = [[
        "a photo of people",
        "a photo of building",
        "a photo of sky",
        "a photo of car",
        "a photo of road",
        "a photo of tree"
          ]]
i = 0  # Retrieve predictions for the first image for the corresponding text queries
text = texts[i]

img_path = "E:/Dataset/xxxx/test_img/1.326936_103.891537_50f562ddfdc9f065f0005b17_Singapore.JPG"
image = Image.open(img_path)

# predictions(image, text)

In [None]:
import networkx as nx
import matplotlib.pyplot as plt

# 将19类别转化为one-hot编码
def label_to_onehot(label_id, num_classes):
    # 创建一个所有元素都是0的向量
    one_hot = np.zeros(num_classes, dtype=np.float32)
    one_hot[label_id] = 1.0
    return one_hot

def build_graph(boxes, labels, numclas):

    # 创建图对象
    graph = nx.Graph()

    # 添加框作为节点
    for i, (box, label) in enumerate(zip(boxes, labels)):
        one_hot_label = label_to_onehot(label, num_classes=numclas)
        graph.add_node(i, 
                       box=box,
                       label_class=one_hot_label)

    # 根据接触或包含关系添加边
    for i in range(len(boxes)):
        for j in range(i+1, len(boxes)):
            if is_touching_or_contained(boxes[i], boxes[j]):
                graph.add_edge(i, j)

    return graph

def is_touching_or_contained(box1, box2):
    # 判断box1与box2是否有接触或包含关系
    cx1, cy1, w1, h1 = box1
    cx2, cy2, w2, h2 = box2
    return abs(cx1 - cx2) <= (w1 + w2) / 2 and abs(cy1 - cy2) <= (h1 + h2) / 2

# 调用函数构建图
boxes_node, labels_node = predictions(image, text)
graph = build_graph(boxes_node, labels_node, numclas=len(text))

# 绘制图
plt.figure(figsize=(8, 8))
pos = nx.spring_layout(graph, seed=42)

nx.draw_networkx(graph, pos, node_color='red', node_size=200, font_size=8, with_labels=False)
nx.draw_networkx_labels(graph, pos, font_color='white', font_size=8)
plt.axis('off')
plt.show()

In [5]:
# 将类别转化为one-hot编码
def label_to_onehot(label_id, num_classes):
    # 创建一个所有元素都是0的向量
    one_hot = np.zeros(num_classes, dtype=np.float32)
    one_hot[label_id] = 1.0
    return one_hot

def create_graph(image_path, text):
    image = Image.open(image_path)
    boxes, labels = predictions(image, text)
    # 创建图对象
    G = nx.Graph()

    # 添加框作为节点
    for i, (box, label) in enumerate(zip(boxes, labels)):
        one_hot_label = label_to_onehot(label, num_classes=len(text))
        G.add_node(i, 
                       box=box,
                       label_class=one_hot_label)

    # 根据接触或包含关系添加边
    for i in range(len(boxes)):
        for j in range(i+1, len(boxes)):
            if is_touching_or_contained(boxes[i], boxes[j]):
                G.add_edge(i, j)

    return G

def is_touching_or_contained(box1, box2):
    # 判断box1与box2是否有接触或包含关系
    cx1, cy1, w1, h1 = box1
    cx2, cy2, w2, h2 = box2
    return abs(cx1 - cx2) <= (w1 + w2) / 2 and abs(cy1 - cy2) <= (h1 + h2) / 2

# 将图转化为dgl格式
def networkx_to_dgl(G):
    G = G.to_undirected()
    g = dgl.from_networkx(G)

    for node in G.nodes():
        g.nodes[node].data['features'] = torch.cat(
            [torch.tensor([G.nodes[node]['label_class']], dtype=torch.float)], dim=1)
    g = dgl.add_self_loop(g)  # 添加自循环，考虑到自身的特征
    return g

class ImageGraphDataset(DGLDataset):
    def __init__(self, image_paths, labels, texts, raw_dir=None, force_reload=False, verbose=False):
        self.image_paths = image_paths
        self.labels = labels
        self.texts = texts
        super(ImageGraphDataset, self).__init__(name='ImageGraph',
                                               raw_dir=raw_dir,
                                               force_reload=force_reload,
                                               verbose=verbose)

    def process(self):
        if not os.path.exists(self.raw_dir):
            os.makedirs(self.raw_dir)

        graph_path = os.path.join(self.raw_dir, 'place_graphs.dat') # 定义保存的名称
        label_path = os.path.join(self.raw_dir, 'place_labels.dat')

        if os.path.exists(graph_path) and os.path.exists(label_path):
            with open(graph_path, 'rb') as f:
                self.graphs = pickle.load(f)

            with open(label_path, 'rb') as f:
                self.processed_labels = pickle.load(f)

        else:
            numbbb = len(self.image_paths)
            print(numbbb)

            self.graphs = []
            self.processed_labels = []

            with tqdm(range(numbbb)) as pbar:
                for i, image_path in enumerate(self.image_paths):
                    G = create_graph(image_path, self.texts)

                    if G.number_of_edges() > 0:
                        g = networkx_to_dgl(G)
                        self.graphs.append(g)

                        label = torch.tensor(self.labels[i])
                        self.processed_labels.append(label)

                    pbar.update(1)

            with open(graph_path, 'wb') as f:
                pickle.dump(self.graphs, f)
            
            with open(label_path, 'wb') as f:
                pickle.dump(self.processed_labels, f) 

    @property
    def num_tasks(self):
        return len(self.labels.unique())

    def __getitem__(self, i):
        return self.graphs[i], self.processed_labels[i]

    def __len__(self):
        return len(self.graphs)

In [9]:
# 处理数据，生成图分类数据集（dgl库）***

texts = [[
        # "a photo of people",
        "a photo of building",
        "a photo of sky",
        "a photo of car",
        "a photo of road",
        # "a photo of tree"
          ]]
i = 0  # Retrieve predictions for the first image for the corresponding text queries
text = texts[i]

# proccess后的文件路径
root_dir = 'E:/Dataset/xxxx/test'

# 加载文件，包含path和label*
img_name = pd.read_csv("E:/Dataset/xxxx/image_paths_placepulse_test.csv")

image_path = img_name['path']
labels = img_name['label']

dataset = ImageGraphDataset(raw_dir=root_dir, 
                            image_paths=image_path, 
                            labels=labels,
                            texts=text)

51


 47%|████▋     | 24/51 [00:33<00:40,  1.50s/it]

In [6]:
texts = [[
        "a photo of people",
        "a photo of building",
        "a photo of sky",
        "a photo of car",
        "a photo of road",
        "a photo of tree"
          ]]
i = 0  # Retrieve predictions for the first image for the corresponding text queries
text = texts[i]

img_path = "E:/Dataset/xxxx/test_img/1.326936_103.891537_50f562ddfdc9f065f0005b17_Singapore.JPG"

G = networkx_to_dgl(create_graph(img_path, text))

  [torch.tensor([G.nodes[node]['label_class']], dtype=torch.float)], dim=1)


In [13]:
G

Graph(num_nodes=16, num_edges=108,
      ndata_schemes={'features': Scheme(shape=(6,), dtype=torch.float32)}
      edata_schemes={})

In [None]:
# 查看生成的图数据集
print(f'Dataset: {dataset}:')
print('====================')
print(f'Number of graphs: {len(dataset)}')
print(f'Number of classes: {dataset.num_tasks}')

data_first = dataset[0][0]
print()
print(data_first)
print('=============================================================')
print(f'Number of nodes: {data_first.num_nodes()}')
print(f'Number of edges: {data_first.num_edges()}')
print(f'Average node degree: {data_first.num_edges() / data_first.num_nodes():.2f}')