In [1]:
# randomly sampling and label spreading
import open3d as o3d
import torch
import networkx as nx
import numpy as np
import pdb
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data
from sklearn.neighbors import NearestNeighbors
import matplotlib.pyplot as plt

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
synlidar2kitti = np.array(['car', 'bicycle', 'motorcycle',  'truck', 'other-vehicle', 'person',
                           'bicyclist', 'motorcyclist',
                           'road', 'parking', 'sidewalk', 'other-ground',
                           'building', 'fence', 'vegetation', 'trunk',
                           'terrain', 'pole', 'traffic-sign'])
synlidar2kitti_color = np.array([(255, 255, 255),  # unlabelled
                                    (25, 25, 255),  # car
                                    (187, 0, 255),  # bicycle
                                    (187, 50, 255),  # motorcycle
                                    (0, 247, 255),  # truck
                                    (50, 162, 168),  # other-vehicle
                                    (250, 178, 50),  # person
                                    (255, 196, 0),  # bicyclist
                                    (255, 196, 0),  # motorcyclist
                                    (0, 0, 0),  # road
                                    (148, 148, 148),  # parking
                                    (255, 20, 60),  # sidewalk
                                    (164, 173, 104),  # other-ground
                                    (233, 166, 250),  # building
                                    (255, 214, 251),  # fence
                                    (157, 234, 50),  # vegetation
                                    (107, 98, 56),  # trunk
                                    (78, 72, 44),  # terrain
                                    (83, 93, 130),  # pole
                                    (173, 23, 121)])/255.   # traffic-sign

# 根据类别索引返回颜色
def get_color_from_label(label_index):
    """
    根据类别索引返回相应的颜色
    :param label_index: 类别索引
    :return: 对应的 RGB 颜色（值在 0 到 1 之间）
    """
    return synlidar2kitti_color[0] if label_index == -1 else synlidar2kitti_color[label_index+1]

# Tools

In [3]:
def vis(disp=False):
    def _decorate(func):
        def calculate_vis_scope(pcd):
            import numpy as np
            import open3d as o3d
            # 获取点云数据的中心点
            if isinstance(pcd, tuple):
                pcd = pcd[0]
            pcd_legacy = pcd.to_legacy()
            center = pcd_legacy.get_center()
            # 计算点云数据的范围
            points = np.asarray(pcd_legacy.points)
            min_bound = np.min(points, axis=0)
            max_bound = np.max(points, axis=0)
            extent = max(max_bound - min_bound) / 2
            # 设置视角为正前方
            lookat = center
            up = [0, 1, 1]
            front = [0, 0, 1]
            zoom = 0.3
            return zoom, front, lookat, up

        def visualization(pcd):
            vis = o3d.visualization.Visualizer()
            if isinstance(pcd,tuple):
                pcd = pcd[0]
            vis.create_window()
            # 将点云添加到可视化窗口
            vis.add_geometry(pcd.to_legacy())
            ctr = vis.get_view_control()
            ctr.set_lookat(pcd.to_legacy().get_center())
            ctr.set_up([0, 1, 1])
            ctr.set_front([0, 0, 1])
            ctr.set_zoom(0.3)
            vis.run()
            vis.destroy_window()

        def _wrap(*args, **kwargs):
            pcd = func(*args, **kwargs)
            if disp:
                print(
                    f"[INFO] Visualization Information: The point cloud from the function: {func.__name__}"
                )
                visualization(pcd)
            return pcd

        return _wrap

    return _decorate


In [4]:
# 修改过的 visualize_graph 函数
def visualize_graph(graph_data,labels):
    # 创建 NetworkX 图
    G = nx.Graph()  # 初始化图
    edge_index = graph_data.edge_index.numpy()  # PyTorch Geometric 的 edge_index 转换为 numpy 数组
    coords = graph_data.x.numpy()  # 节点坐标
    # labels = graph_data.y.numpy()  # 节点标签
    labels = labels.numpy()

    # 添加节点和边
    for edge in edge_index.T:
        G.add_edge(edge[0], edge[1])

    # 设置节点位置和颜色
    pos = {i: (coords[i][0], coords[i][1]) for i in range(len(coords))}  # 使用节点的 2D 坐标
    node_colors = [get_color_from_label(labels[i]) for i in range(len(labels))]  # 节点颜色

    # 绘制图
    plt.figure(figsize=(10, 8))
    nx.draw(G, pos, with_labels=False, node_color=node_colors, node_size=50, edge_color='gray')  # 使用节点颜色
    plt.title("Graph Visualization")
    plt.show()

In [5]:
# 定义Open 3D可视化函数
def visualize_point_cloud_with_edges(graph_data):
    coords = graph_data.x.numpy()  # 节点坐标
    edge_index = graph_data.edge_index.numpy() if graph_data.edge_index is not None else None # 边
    labels = graph_data.y.numpy()  # 节点标签

    # 创建 Open3D 点云
    point_cloud = o3d.geometry.PointCloud()  
    point_cloud.points = o3d.utility.Vector3dVector(coords)  # 设置点云坐标

    # 为点云设置颜色
    colors = [get_color_from_label(label) for label in labels]
    point_cloud.colors = o3d.utility.Vector3dVector(colors)  # 设置点云颜色

    if edge_index is not None:
        # 创建 Open3D 线集合
        lines = []
        for edge in edge_index.T:
            lines.append([edge[0], edge[1]])  # 添加边

        # 创建线集合的边和颜色
        line_set = o3d.geometry.LineSet(
            points=o3d.utility.Vector3dVector(coords),
            lines=o3d.utility.Vector2iVector(lines),
        )
        line_set.paint_uniform_color([0.5, 0.5, 0.5])  # 设置边颜色为灰色


        # 创建可视化
        # o3d.visualization.draw_geometries([point_cloud, line_set], window_name="Point Cloud with Edges")
        vis = o3d.visualization.Visualizer()
        vis.create_window()
        # 将点云添加到可视化窗口
        vis.add_geometry(point_cloud)
        vis.add_geometry(line_set)
        ctr = vis.get_view_control()
        ctr.set_lookat(point_cloud.get_center())
        ctr.set_up([0, 1, 1])
        ctr.set_front([0, 0, 1])
        ctr.set_zoom(0.3)
        vis.run()
        vis.destroy_window()
    else:
        o3d.visualization.draw_geometries([point_cloud], window_name="Point Cloud with Edges")


In [6]:
# load point cloud

def load_pcd(adv_point_path):
    return o3d.t.io.read_point_cloud(adv_point_path)

In [7]:
# load point cloud as array
@vis(True)
def load_pcd_array(pcd_path):
    pcd = o3d.t.io.read_point_cloud(pcd_path)
    pcd_legacy = pcd.to_legacy()
    pcd_coords = np.asarray(pcd_legacy.points) 
    pcd_colors = np.asarray(pcd_legacy.colors) if pcd_legacy.has_normals else None
    pcd_normals = np.asarray(pcd_legacy.normals) if pcd_legacy.has_normals() else None

    return pcd,pcd_coords,pcd_colors,pcd_normals

In [8]:
# load point cloud labels
def load_labels_cloud(adv_label_path):
    return torch.load(adv_label_path).int()

In [9]:
# Uniform down sampling (对于点云均匀随机采样)
@vis(disp=True)
def pcd_unisampling(pcd=None,every_k_point=5):
    return pcd.uniform_down_sample(every_k_points=every_k_point)

In [10]:
# 创建输入输出Data
def point_cloud_to_graph(point_coords, point_labels, k_neighbors=5):
    """
    将点云数据转换为图结构。
    
    :param point_coords: 点云坐标的 numpy 数组，形状为 (N, 3)
    :param point_labels: 点云标签的 numpy 数组，形状为 (N,)
    :param k_neighbors: 每个点的 K 近邻数
    :return: PyTorch Geometric 图数据对象
    """
    # 创建 K-近邻结构
    nbrs = NearestNeighbors(n_neighbors=k_neighbors, algorithm='ball_tree').fit(point_coords)
    distances, indices = nbrs.kneighbors(point_coords)

    # 创建 NetworkX 图
    G = nx.Graph()  # 初始化图
    for i, neighbors in enumerate(indices):
        for j in neighbors:
            if i != j:  # 避免自环
                G.add_edge(i, j)  # 添加边

    # 将 NetworkX 图转换为 PyTorch Geometric 图数据
    edge_index = torch.tensor(list(G.edges)).t().contiguous()  # 转置并连续化
    x = torch.tensor(point_coords, dtype=torch.float32)  # 节点特征
    y = torch.tensor(point_labels, dtype=torch.long)  # 标签

    # 创建 PyTorch Geometric 的 Data 对象
    graph_data = Data(x=x, edge_index=edge_index, y=y)

    return graph_data  # 返回图数据

# label diffusion using Gnn

In [11]:
#1. get access to point cloud data (记载点云数据)
attacked_source_pcd_path = 'attackedLidar_Kitti100/adv_preds/04/4989_18.ply'
attacked_source_labels_path = 'attackedLidar_Kitti100/adv_labels_pt/04/4989_71.pt'

source_pcd_path = 'attackedLidar_Kitti100/ori_preds/04/4989_71.ply'
source_labels_path = 'attackedLidar_Kitti100/ori_labels_pt/04/4989_71.pt'

test_pcd_path = 'attackedLidar_Kitti100/ori_preds/06/35166_76.ply'
test_labels_path = 'attackedLidar_Kitti100/ori_labels_pt/06/35166_76.pt'

# Data preparation

In [12]:
# source_pcd dataset
_,source_pcd_coords, source_pcd_colors, source_pcd_normals = load_pcd_array(pcd_path=source_pcd_path)
source_pcd_labels = load_labels_cloud(adv_label_path=source_labels_path)
source_pcd_labels = source_pcd_labels.numpy()

# target_pcd dataset
_,test_pcd_coords, test_pcd_colors, test_pcd_normals = load_pcd_array(pcd_path=test_pcd_path)
test_pcd_labels = load_labels_cloud(adv_label_path=test_labels_path)
test_pcd_labels = test_pcd_labels.numpy()

# attacked_source_pcd dataset
_, attacked_source_pcd_coords, attacked_source_pcd_colors, attacked_source_pcd_normals = load_pcd_array(pcd_path=attacked_source_pcd_path)
attacked_source_pcd_labels = load_labels_cloud(adv_label_path=attacked_source_labels_path)
attacked_source_pcd_labels = attacked_source_pcd_labels.numpy()



# 创建训练测试数据
Source_Data = point_cloud_to_graph(source_pcd_coords,source_pcd_labels,k_neighbors=5)
Attacked_Source_Data = point_cloud_to_graph(attacked_source_pcd_coords,attacked_source_pcd_labels,k_neighbors=5)
Test_Data = point_cloud_to_graph(test_pcd_coords,test_pcd_labels,k_neighbors=5)

print(f'Source Data points: {Source_Data.x}')
print(f'Test Data points: {Test_Data.x}')

visualize_point_cloud_with_edges(Source_Data)
visualize_point_cloud_with_edges(Test_Data)
visualize_point_cloud_with_edges(Attacked_Source_Data)

[INFO] Visualization Information: The point cloud from the function: load_pcd_array
[INFO] Visualization Information: The point cloud from the function: load_pcd_array
[INFO] Visualization Information: The point cloud from the function: load_pcd_array
Source Data points: tensor([[ 170.,  -57.,  -14.],
        [-958., -714.,  -87.],
        [  25.,  194.,  -35.],
        ...,
        [-465., -988.,  -79.],
        [ 159.,  -82.,  -16.],
        [ 273., -172.,   32.]])
Test Data points: tensor([[   4., -175.,  -37.],
        [ -82., -160.,  -52.],
        [ 140.,   46.,  -28.],
        ...,
        [ 205., -171.,   -9.],
        [ 185.,    2.,  -25.],
        [ 397.,  257.,    3.]])


In [13]:
# 定义图卷积神经网络模型，用于标签扩散
class SimpleGCN(torch.nn.Module):
    def __init__(self, input_dim,hidden_dim,output_dim):
        super(SimpleGCN, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, output_dim)
    def forward(self,data):
        x, edge_index = data.x, data.edge_index
        x = self.conv1(x, edge_index)
        x = torch.relu(x)
        x = self.conv2(x, edge_index)
        return x       

# training model

In [14]:
# 定义模型
input_dim = 3  # 输入维度：点云的三维坐标
hidden_dim = 3*24
output_dim = 19  # 两个类别
model = SimpleGCN(input_dim, hidden_dim, output_dim)
# 定义损失函数和优化器
criterion  = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=0.01)



In [17]:
# 训练模型
flag_adv_pretrain = False
if flag_adv_pretrain:
    Source_Data = Attacked_Source_Data
else:
    Source_Data = Source_Data

num_epochs = 100
for epoch in range(num_epochs):
    optimizer.zero_grad()  # 重置优化器梯度
    out = model(Source_Data)  # 前向传递

    # 仅使用已知标签进行损失计算
    labeled_indices = (Source_Data.y >= 0).nonzero().view(-1)
    loss = criterion(out[labeled_indices], Source_Data.y[labeled_indices])  # 计算损失

    loss.backward()  # 反向传播
    optimizer.step()  # 优化器更新参数

    if epoch % 10 == 0:
        # 计算训练准确率
        _, predicted = torch.max(out, 1)
        correct = (predicted == Source_Data.y).sum().item()
        accuracy = correct / len(Source_Data.y)
        print(f"Epoch {epoch}, Loss: {loss.item()}, Training Accuracy: {accuracy}")


Epoch 0, Loss: 0.773408055305481, Training Accuracy: 0.7906653576013897
Epoch 10, Loss: 0.7843232750892639, Training Accuracy: 0.7811947738086247
Epoch 20, Loss: 0.740896999835968, Training Accuracy: 0.790499207008534
Epoch 30, Loss: 0.7508715391159058, Training Accuracy: 0.7892304206630919
Epoch 40, Loss: 0.8064696192741394, Training Accuracy: 0.7782795861339777
Epoch 50, Loss: 0.9630712866783142, Training Accuracy: 0.7689298391360169
Epoch 60, Loss: 0.7920063138008118, Training Accuracy: 0.7673740654029152
Epoch 70, Loss: 1.0644770860671997, Training Accuracy: 0.7489464541953025
Epoch 80, Loss: 0.9378944635391235, Training Accuracy: 0.7635979155652897
Epoch 90, Loss: 0.7220717668533325, Training Accuracy: 0.8080205422551167


In [18]:
# 在测试数据上进行评估
model.eval()  # 设置为评估模式
with torch.no_grad():  # 禁用梯度计算
    test_out = model(Test_Data)  # 测试数据的预测
    _, test_predicted = torch.max(test_out, 1)
    test_correct = (test_predicted == Test_Data.y).sum().item()
    test_accuracy = test_correct / len(Test_Data.y)
    print(f"Test Accuracy: {test_accuracy}")

# 可视化预测结果
visualize_point_cloud_with_edges(graph_data=Data(x=Test_Data.x,y=test_predicted,edge_index=None))
# 可视化测试样本的结果
visualize_point_cloud_with_edges(graph_data=Data(x=Test_Data.x,y=Test_Data.y,edge_index=None))

# 保存训练好的模型
torch.save(model.state_dict(), "simple_gcn_model.pth")  # 保存模型参数

Test Accuracy: 0.3463456310679612
