In [1]:
'''
基于PointNet++的牙预备体评分模型。
模型的输入为：若干预备体的点云和评分。
输出为：训练好的神经网络，可用于对新预备体点云评分。
目前在使用train on batch方法训练，在运算速度方面尚有优化余地。

如果要在输入中加入预备前完整牙齿的图像，用算量更大的体素模型更为自然，但需要彻底另写程序（我知道得太晚了）。
点云模型是从预备体外表面上的采样点提取特征，而体素模型是把预备体扩充为一个三维矩阵，用这个矩阵提取特征；显然点云方法数据量更小。
如果同时输入完整牙齿和预备体的图像，体素方法可以在同一个矩阵格子里（同一个空间坐标）用两个值分别表示完整牙齿和预备体是否出现；
点云方法则不能在同一个坐标处同时给出完整牙齿和预备体的特征，需要单独为完整牙齿准备一套采样点。
考虑到完整牙齿和预备体的对比确实也可能包含一部分评分依据的信息，我们暂时保留同时输入这两种图像的思路。

实际应用时，向训练集录入数据需要经过专家的监督，随后数据上传至总机。
模型的训练可以只会发生在官方的总机上。各地分机只需要读取稳定版模型参数，不必独自训练模型。
这也就是说，训练模型所需的计算只需要总机承担，分机的性能不需要很高。
如果没有即时更新数据库的需求，实际上总机的性能也不需要很高。
'''

'\n基于PointNet++的牙预备体评分模型。\n模型的输入为：若干预备体的点云和评分。\n输出为：训练好的神经网络，可用于对新预备体点云评分。\n目前在使用train on batch方法训练，在运算速度方面尚有优化余地。\n\n如果要在输入中加入预备前完整牙齿的图像，用算量更大的体素模型更为自然，但需要彻底另写程序（我知道得太晚了）。\n点云模型是从预备体外表面上的采样点提取特征，而体素模型是把预备体扩充为一个三维矩阵，用这个矩阵提取特征；显然点云方法数据量更小。\n如果同时输入完整牙齿和预备体的图像，体素方法可以在同一个矩阵格子里（同一个空间坐标）用两个值分别表示完整牙齿和预备体是否出现；\n点云方法则不能在同一个坐标处同时给出完整牙齿和预备体的特征，需要单独为完整牙齿准备一套采样点。\n考虑到完整牙齿和预备体的对比确实也可能包含一部分评分依据的信息，我们暂时保留同时输入这两种图像的思路。\n\n实际应用时，向训练集录入数据需要经过专家的监督，随后数据上传至总机。\n模型的训练可以只会发生在官方的总机上。各地分机只需要读取稳定版模型参数，不必独自训练模型。\n这也就是说，训练模型所需的计算只需要总机承担，分机的性能不需要很高。\n如果没有即时更新数据库的需求，实际上总机的性能也不需要很高。\n'

In [2]:
'''
# 安装环境，和上次发的代码一样的操作方式
!pip install --upgrade tensorflow
'''

'\n# 安装环境，和上次发的代码一样的操作方式\n!pip install --upgrade tensorflow\n'

In [3]:
'''
# 更新环境，以解决numpy和tensorflow的兼容性问题
pip install --upgrade numpy tensorflow
'''

'\n# 更新环境，以解决numpy和tensorflow的兼容性问题\npip install --upgrade numpy tensorflow\n'

In [4]:
import json
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split

2024-07-07 02:27:14.076762: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-07-07 02:27:14.080949: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-07-07 02:27:14.093870: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:479] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-07 02:27:14.118850: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:10575] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-07 02:27:14.118883: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1442] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-07-07 02:27:14.135178: I tensorflow/core/platform/cpu_feature_guard.cc:

In [5]:
# 加载输入信息

def read_scores(score_file_path):
    # 读全部牙的序号和评分
    indices = []
    scores = []
    with open(score_file_path, 'r') as f:
        for line in f:
            index, score = line.strip().split()
            indices.append(int(index))
            scores.append(float(score))
    return indices, scores

def read_obj_file(obj_file_path):
    # 读单颗牙的3D模型
    points = []
    with open(obj_file_path, 'r') as f:
        for line in f:
            if line.startswith('v '):
                _, x, y, z = line.strip().split()
                points.append([float(x), float(y), float(z)])
    return np.array(points)

def sample_points(points, num_samples):
    # 采样
    # 由于每颗牙的3D模型顶点数不同（约2100+），不方便直接用作神经网络的输入，需要统一输入的格式
    # 如果某颗牙的顶点数大于采样点数，则采样不可重复，否则采样可重复
    # 为了和vertices做变量名上的区分，这个函数中使用了points这个变量名，但实际上就是vertices的数据
    # 此处使用的是点云模型，如果用体素模型则有其他的配套手段
    if len(points) >= num_samples:
        # 如果点云中的点数大于等于采样点数，先进行非重复抽样
        sampled_indices = np.random.choice(len(points), num_samples, replace=False)
        sampled_points = points[sampled_indices]
    else:
        # 如果点云中的点数少于采样点数，先取所有的点，再进行可重复抽样补足数量
        sampled_points = points.copy()
        additional_samples = points[np.random.choice(len(points), num_samples - len(sampled_points), replace=True)]
        sampled_points = np.concatenate((sampled_points, additional_samples), axis=0)
    
    return sampled_points

In [6]:
# Set Abstraction模块，用于提取点云的局部特征
# 相较05July版本，用class重写了每个层
# 已经能用测试数据无报错运行全部代码

# 定义Sampling层
class SamplingLayer(tf.keras.layers.Layer):
    '''
    最远点采样 Farthest Point Sample，通过“让采样点之间的最小距离最大化”来用尽可能少的点代表尽可能多的特征。
    
    算法是：
        （1）先随机选取一个点，计算其余点到这个点的距离，选取距离最大的点做第二个点；
        （2）已采样n个点时，计算所有剩余点到每个已采样点的距离，对单个剩余点总有一个已采样点在采样点中距离此剩余点最近，取前述最近距离最大的剩余点做为第n+1个采样点；
        （3）重复此过程直至达到采样数目要求。
    
    输入是：
        点云（每个点的前三个维度是其空间坐标，其余维度是特征）、采样数。
    输出是：
        采样出的点云（空间坐标维度+特征维度）。
    '''
    def __init__(self, N_prime, **kwargs):
        super(SamplingLayer, self).__init__(**kwargs)
        self.N_prime = N_prime

    def call(self, points):
        # 确认点数和采样数
        B = tf.shape(points)[0]  # 获取批次大小
        N = tf.shape(points)[1]  # 获取点的数量
        # 初始化返回的采样点列表
        sampled_points_batch = []
        
        # 遍历每个batch中的点云
        for b in range(B):
            # 初始化距离数组，设置为一个较大的数值
            distances = tf.ones((N,), dtype=tf.float32) * 1e10
            # 初始化当前batch的采样点列表
            sampled_points = []

            # 随机选择第一个点
            first_point_index = tf.random.uniform(shape=[], minval=0, maxval=N, dtype=tf.int32)
            first_point = tf.expand_dims(tf.gather(points[b], first_point_index, axis=0), axis=0)
            sampled_points.append(first_point)

            # 更新所有点到第一个采样点的距离，只考虑前三个特征维度作为空间坐标
            distances = tf.norm(points[b, :, :3] - tf.expand_dims(points[b, first_point_index, :3], axis=0), axis=-1)

            for i in range(1, self.N_prime):
                # 选择距离当前所有采样点最远的点
                farthest_point_index = tf.argmax(distances)
                farthest_point = tf.expand_dims(tf.gather(points[b], farthest_point_index, axis=0), axis=0)
                sampled_points.append(farthest_point)
                
                # 更新所有点到新加入的采样点的距离，只考虑前三个维度作为空间坐标
                distances = tf.minimum(distances, tf.norm(points[b, :, :3] - tf.expand_dims(points[b, farthest_point_index, :3], axis=0), axis=-1))

            # 将当前batch的采样点列表转换为张量，并添加到batch维度的列表中
            sampled_points_batch.append(tf.concat(sampled_points, axis=0))

        return tf.stack(sampled_points_batch, axis=0)  # 将采样点列表输出为张量
    

# 定义Grouping层
class GroupingLayer(tf.keras.layers.Layer):
    '''
    Ball query，以FPS采样结果的每个点为球心，找到采样前总点云中距离每个球心最近的K个点（含球心），以便在后续步骤中提取局部特征。
    
    算法是：
        （1）预设搜索区域的半径R与子区域的点数K；
        （2）FPS采样函数已经提取出来了N'个点，以这N'个点为球心，画半径为R的球体（叫做query ball，也就是搜索区域）；
        （3）以每个样本点为球心，按照给定搜索半径R得到一个球形搜索区域，然后从该区域提取前K个最邻近点。如果搜索区域内点数不够，则重复采样；
        （4）得到N'个采样点各自对应的包含K个点的子区域，这是一个N'*K*（空间维度+特征维度）的点云。
    
    输入是：
        FPS采样前的点云、FPS采样后的点云、搜索半径R、子区域点数K。
    输出是：
        N'个采样点各自对应的包含K个点的子区域点云。
    '''
    def __init__(self, R, K, **kwargs):
        super(GroupingLayer, self).__init__(**kwargs)
        self.R = R
        self.K = K

    def call(self, sampled_points, points):
        # 读输入点云形状
        B, N_prime, D = sampled_points.shape
        B, N, D = points.shape

        # 初始化返回的全部子区域点云列表
        local_regions = []

        for b in range(B):
            # 初始化当前batch的子区域点云列表
            local_region_b = []

            for i in range(N_prime):
                # 获取当前球心坐标
                center_point = sampled_points[b, i]
                # 计算所有点到球心的距离
                distances = tf.norm(points[b, :, :3] - tf.expand_dims(center_point[:3], axis=0), axis=-1)
                # 找到半径R内的点的索引
                within_radius_indices = tf.where(distances <= self.R)[:, 0]
                # 找到半径R内点的数量
                num_within_radius = tf.size(within_radius_indices)
                
                # 如果半径R内点的数量少于K个，则随机采样补充至K个
                nearest_indices = tf.cond(
                    num_within_radius < self.K,
                    # if分支，先按距离升序给所有点排序并保留全部点，随后用有放回抽样补足K个点数
                    lambda: tf.concat(
                        [
                            tf.gather(within_radius_indices, tf.argsort(tf.gather(distances, within_radius_indices))), 
                            tf.gather(within_radius_indices, tf.random.uniform(
                                shape=[self.K - num_within_radius], 
                                minval=0, 
                                maxval=num_within_radius, 
                                dtype=tf.int32)
                            )
                        ], 
                        axis=0
                    )[:self.K],
                    # else分支，先按距离升序给所有点排序，然后取前K个
                    lambda: tf.gather(within_radius_indices, tf.argsort(tf.gather(distances, within_radius_indices))[:self.K])
                )

                # 确保nearest_indices的形状为(K,)
                nearest_indices = tf.reshape(nearest_indices, (self.K,))  # 调试时这里的K总是变成NoneType占位符，所以这里要强制固定其格式

                # 将该球心的子区域点云列表转为张量形式，并保存
                local_region = tf.gather(points[b], nearest_indices, axis=0)
                local_region_b.append(local_region)

            # 保存返回的全部子区域点云列表
            local_regions.append(tf.stack(local_region_b, axis=0))

        return tf.stack(local_regions, axis=0)  # 将全部子区域点云列表输出为张量


# 定义PointNEt层
class PointNetLayer(tf.keras.layers.Layer):
    '''
    PointNet卷积层，用卷积对采样点进行局部特征提取。
    
    算法是：
        （1）使用局部坐标来抗平移、旋转；
        （2）多层感知机（MLP）* 1维卷积。
    
    输入是：
        Ball query得到的N'个采样点各自对应的包含K个点的子区域点云。
    输出是：
        包含新特征的点云（形状是B*N'*(3+新特征数量)）
    '''
    def __init__(self, conv_layers, **kwargs):
        super(PointNetLayer, self).__init__(**kwargs)
        # 传入
        self.conv_layers = conv_layers

    def call(self, local_regions):
        B, N_prime, K, D = tf.unstack(tf.shape(local_regions))
        
        # 计算局部坐标，即令每个局部区域的所有点的坐标减掉该区域的球心坐标
        center_point = local_regions[:, :, 0, :3]  # Shape: (B, N', 3)
        local_coords = local_regions[:, :, :, :3] - tf.expand_dims(center_point, axis=2)
        
        # 把局部坐标（前3个特征维度）和特征（后续特征维度）拼接到一起
        reshape_points = tf.concat([local_coords, local_regions[:, :, :, 3:]], axis=-1)
        
        # 将局部区域点云变形为 (B, N', K * D) 的形状，并转换为 float32 类型，便于进行卷积运算
        reshape_points = tf.reshape(reshape_points, (B, N_prime, K * D))
        reshape_points = tf.cast(reshape_points, dtype=tf.float32)

        # 用1维卷积做特征提取。
        # 注意，不同于传统2维卷积的功能，1维卷积只能将同一点的不同特征做混合，不能提取几何形状（即不同点相对位置）中的特征。
        # 提取几何形状特征是靠前面两层即对点云的采样和分组实现的。
        new_points = reshape_points
        for layer in self.conv_layers:
            # MLP和卷积运算，参数来自传入
            new_points = layer(new_points)
        
        # 将原始点的坐标与提取的特征拼接
        new_points = tf.concat([center_point, new_points], axis=-1)
        
        return new_points


# 定义完整SA模块
class SAModule(tf.keras.layers.Layer):
    '''
    完整的Set Abstraction模块，由Sampling层、Grouping层、PointNet层串联组成。
    '''
    def __init__(self, sampling_layer, grouping_layer, pointnet_layer, **kwargs):
        super(SAModule, self).__init__(**kwargs)
        self.sampling_layer = sampling_layer
        self.grouping_layer = grouping_layer
        self.pointnet_layer = pointnet_layer

    def call(self, points):
        # 所有print都是为了显示中间结果，便于调试，正式版本会删除它们
        sampled_points = self.sampling_layer(points)
        # print(f"SamplingLayer output shape: {sampled_points.shape}")
        grouped_points = self.grouping_layer(sampled_points, points)
        # print(f"GroupingLayer output shape: {grouped_points.shape}")
        new_points = self.pointnet_layer(grouped_points)
        # print(f"PointNetLayer output shape: {new_points.shape}")
        return new_points

In [7]:
# 定义完整神经网络

class PointNetPlusPlus(tf.keras.Model):
    def __init__(self, input_shape, **kwargs):
        super(PointNetPlusPlus, self).__init__(**kwargs)
        # 三个采样和特征提取层，其中第三层不做采样。需调用SA模块
        self.block1 = SAModule(
            SamplingLayer(N_prime=256),
            GroupingLayer(R=5, K=10),
            PointNetLayer(conv_layers=[layers.Conv1D(64, 1, activation='relu'),
                                       layers.Conv1D(64, 1, activation='relu'),
                                       layers.Conv1D(128, 1, activation='relu')])
        )
        self.block2 = SAModule(
            SamplingLayer(N_prime=128),
            GroupingLayer(R=5, K=8),
            PointNetLayer(conv_layers=[layers.Conv1D(128, 1, activation='relu'),
                                       layers.Conv1D(128, 1, activation='relu'),
                                       layers.Conv1D(256, 1, activation='relu')])
        )
        self.block3 = SAModule(
            SamplingLayer(N_prime=64),
            GroupingLayer(R=5, K=8),
            PointNetLayer(conv_layers=[layers.Conv1D(256, 1, activation='relu'),
                                       layers.Conv1D(256, 1, activation='relu'),
                                       layers.Conv1D(512, 1, activation='relu')])
        )
        # 全局最大池化层，用于压缩数据量，也实现了数据扁平化
        self.global_feature_layer = layers.GlobalAveragePooling1D()
        # 全连接层和Dropout层，前者用于进行特征的整合和最终的输出，后者用于减少神经网络的过拟合
        self.dense1 = layers.Dense(256, activation='relu')
        self.dropout1 = layers.Dropout(0.5)
        self.dense2 = layers.Dense(128, activation='relu')
        self.dropout2 = layers.Dropout(0.5)
        # 输出层
        self.output_layer = layers.Dense(1, activation='sigmoid')

    def call(self, inputs):
        #输入层
        points = inputs
        # 三个采样和特征提取层，其中第三层不做采样。需调用SA模块
        points = self.block1(points)
        points = self.block2(points)
        points = self.block3(points)
        # 全局最大池化层，用于压缩数据量，也实现了数据扁平化
        global_features = self.global_feature_layer(points)
        # 全连接层和Dropout层，前者用于进行特征的整合和最终的输出，后者用于减少神经网络的过拟合
        x = self.dense1(global_features)
        x = self.dropout1(x)
        x = self.dense2(x)
        x = self.dropout2(x)
        # 输出层
        output = self.output_layer(x)
        return output
    
    def get_config(self):
        config = super(PointNetPlusPlus, self).get_config()
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(input_shape=config['input_shape'])

In [8]:
# 读序号和评分
'''此处为text文件地址，应当改为：’文件夹路径 + scores.txt'''
score_file_path = 'scores.txt'
indices, scores = read_scores(score_file_path)

# 读3D模型，并完成采样
'''设置采样点数'''
num_samples = 2000 
Vertices = []
for i in indices:
    obj_file_path = f'{i}.obj'  # 此处为obj文件地址，应当改为：f’ + 文件夹路径 + {i}.obj
    points = read_obj_file(obj_file_path)
    sampled_points = sample_points(points, num_samples)
    Vertices.append(sampled_points)
Vertices = np.array(Vertices)
Scores = np.array(scores)

# 划分训练集和测试集
'''此处用的4：1随机分配，以后可以改成k折'''
Vertices_train, Vertices_test, Scores_train, Scores_test = train_test_split(Vertices, Scores, test_size=0.2, random_state=42)

In [9]:
# 定义模型
model = PointNetPlusPlus(input_shape=(num_samples, 3))

# 定义优化器和损失函数
optimizer = tf.keras.optimizers.Adam()
loss_fn = tf.keras.losses.MeanSquaredError()

# 定义批量大小和训练迭代次数
batch_size = 10
epochs = 3

# 手动实现批量训练（Train on batch），如果用fit就会出现“静态计算图”、“B = None”等问题，暂且绕过
num_batches_train = len(Vertices_train) // batch_size
for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    epoch_loss = 0.0
    for batch in range(num_batches_train):
        start_idx = batch * batch_size
        end_idx = start_idx + batch_size
        batch_vertices = Vertices_train[start_idx:end_idx]
        batch_scores = Scores_train[start_idx:end_idx]

        with tf.GradientTape() as tape:
            predictions = model(batch_vertices, training=True)
            loss = loss_fn(batch_scores, predictions)

        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        epoch_loss += loss.numpy()

    print(f"Training Loss: {epoch_loss / num_batches_train}")

# 模型评估
predictions_test = model(Vertices_test, training=False)
test_loss = loss_fn(Scores_test, predictions_test).numpy()
print(f"Test Loss: {test_loss}")

Epoch 1/3
Training Loss: 4561.5655517578125
Epoch 2/3
Training Loss: 4557.3916422526045
Epoch 3/3
Training Loss: 4557.3916422526045
Test Loss: 5065.10009765625


In [10]:
# 保存模型权重
model.save_weights('model_weights.weights.h5')

# 保存模型配置
model_config = {
    'input_shape': (num_samples, 3)  # 根据你的模型定义调整
}
with open('model_config.json', 'w') as f:
    json.dump(model_config, f)

print("模型配置和权重保存成功。")


模型配置和权重保存成功。


In [11]:
# 加载模型配置
with open('model_config.json', 'r') as f:
    model_config = json.load(f)

# 定义模型
model = PointNetPlusPlus(**model_config)

# 加载模型权重
model.load_weights('model_weights.weights.h5')

print("模型加载成功。")

模型加载成功。


In [12]:
# 设置采样点数
num_samples = 2000

# 读取和采样点云数据
obj_file_path = '22.obj'
points = read_obj_file(obj_file_path)
sampled_points = sample_points(points, num_samples)
sampled_points = np.expand_dims(sampled_points, axis=0)  # 扩展维度以匹配模型输入

In [13]:
# 使用模型进行预测
prediction = model(sampled_points, training=False)
score = prediction.numpy()[0][0]

print(f"预测评分: {score}")

预测评分: 0.11882804334163666
