### dengjunli 的测试代码

#### PCA分解

In [None]:
import torch

def pca(X, n_components):
    """
    对给定的数据集执行PCA并返回前n个主成分。
    
    :param X: 数据集，形状为(num_samples, num_features)
    :param n_components: 保留的主成分数目
    :return: 主成分矩阵，形状为(num_features, n_components)
    """
    # 中心化数据
    X_centered = X - X.mean(dim=0)
    
    # 计算协方差矩阵
    covariance_matrix = torch.mm(X_centered.t(), X_centered) / (X_centered.size(0) - 1)
    
    # 求解特征值和特征向量
    eigenvalues, eigenvectors = torch.linalg.eigh(covariance_matrix, UPLO='U')
    
    # 选择主成分
    idx = eigenvalues.argsort(descending=True)[:n_components]
    principal_components = eigenvectors[:, idx]
    
    return principal_components

def project_to_basis(X, basis):
    """
    将数据集X投影到基向量basis上，并计算投影权重。
    
    :param X: 要投影的数据集，形状为(num_samples, num_features)
    :param basis: 基向量，形状为(num_features, n_components)
    :return: 投影权重，形状为(num_samples, n_components)
    """
    X_centered = X - X.mean(dim=0)
    weights = torch.mm(X_centered, basis)
    return weights

def reconstruct_from_weights(weights, basis):
    """
    使用给定的权重和基向量重构数据。
    
    :param weights: 投影权重，形状为(num_samples, n_components)
    :param basis: 基向量，形状为(num_features, n_components)
    :return: 重构后的数据，形状为(num_samples, num_features)
    """
    return torch.mm(weights, basis.t())



In [None]:
# 假设A_flame和B_flame是两个FLAME参数矩阵，形状分别为(240, 120)和(num_samples, 120)
# 选择要保留的主成分数目
A_flame = torch.randn(240, 120)
B_flame = torch.randn(1, 120)

n_components = 28

# 步骤1: 对A的FLAME参数执行PCA
A_principal_components = pca(A_flame, n_components)

# 步骤2: 计算B的FLAME参数在A的基向量上的投影权重
B_weights = project_to_basis(B_flame, A_principal_components)

# 步骤3: 使用权重重构B的FLAME参数
B_reconstructed = reconstruct_from_weights(B_weights, A_principal_components)


print("B_reconstructed.shape:", B_reconstructed.shape)  # 输出: (100, 120) - 重构后的B的FLAME参数矩阵



In [None]:
def project_to_basis_single(X, basis, mean):
    """
    将单帧数据X投影到基向量basis上，并计算投影权重。
    
    :param X: 要投影的单帧数据，形状为(1, num_features)或(num_features,)
    :param basis: 基向量，形状为(num_features, n_components)
    :param mean: 数据集的均值，用于中心化，形状为(num_features,)
    :return: 投影权重，形状为(1, n_components)
    """
    # 确保X是二维的
    if X.dim() == 1:
        X = X.unsqueeze(0)
    X_centered = X - mean
    weights = torch.mm(X_centered, basis)
    return weights

def reconstruct_from_weights_single(weights, basis, mean):
    """
    使用给定的权重和基向量重构单帧数据。
    
    :param weights: 投影权重，形状为(1, n_components)
    :param basis: 基向量，形状为(num_features, n_components)
    :param mean: 数据集的均值，用于重构后的数据中心化，形状为(num_features,)
    :return: 重构后的单帧数据，形状为(1, num_features)
    """
    reconstruction = torch.mm(weights, basis.t()) + mean
    return reconstruction


### 特定文件夹中的所有PNG图像按时间戳顺序拼接成MP4视频

In [None]:
'''
可视化点云图像，mesh
拼接 output 文件夹中的图像，生成视频
'''

import cv2
import os
from natsort import natsorted
from tqdm import tqdm

def convert_images_to_video(base_folder):
    """
    将每个子目录中的图像转换成视频文件。

    参数:
    - base_folder: 包含子目录的基础目录路径。
    """
    # 遍历 base_folder 下的每个条目
    for entry in os.listdir(base_folder):
        subdir = os.path.join(base_folder, entry)
        # 确保条目是一个目录
        if os.path.isdir(subdir):
            video_name = os.path.join(subdir, f'FLAME_driven_animation_{entry}.mp4')

            images = [img for img in os.listdir(subdir) if img.endswith(".png")]
            images = natsorted(images)  # 根据名称自然排序

            if not images:
                continue  # 如果没有图像，则跳过当前目录

            frame = cv2.imread(os.path.join(subdir, images[0]))
            height, width, layers = frame.shape

            fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # 定义编解码器
            video = cv2.VideoWriter(video_name, fourcc, 25, (width, height))

            for image in tqdm(images, desc=entry):
                video.write(cv2.imread(os.path.join(subdir, image)))

            video.release()  # 释放 VideoWriter 对象

# 使用示例
base_folder = './output'
convert_images_to_video(base_folder)


#### wav2vec 预测音素

In [None]:

from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC 
# from datasets import load_dataset
import torch
import soundfile as sf

# load model and processor
processor = Wav2Vec2Processor.from_pretrained("./wav2vec2-xls-r-300m-timit-phoneme")
model = Wav2Vec2ForCTC.from_pretrained("./wav2vec2-xls-r-300m-timit-phoneme")

# Read and process the input
audio_input, sample_rate = sf.read("./test_data/英文tts.wav")
inputs = processor(audio_input, sampling_rate=16_000, return_tensors="pt", padding=True)

with torch.no_grad():
    logits = model(inputs.input_values, attention_mask=inputs.attention_mask).logits

# Decode id into string
predicted_ids = torch.argmax(logits, axis=-1)      
predicted_sentences = processor.batch_decode(predicted_ids)
print(predicted_sentences)




#### 输入视频，选取与特定音素相匹配的帧



In [None]:
from moviepy.editor import VideoFileClip
import librosa
import soundfile as sf
import os
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
import torch
import cv2
from tqdm.auto import tqdm

# 视频文件路径
video_path = './test_data/初一-11.mp4'
video_name = os.path.splitext(os.path.basename(video_path))[0]
audio_output_path = f'./test_data/extracted_audio_{video_name}.wav'
resampled_audio_output_path = f'./test_data/resampled_audio_{video_name}_16000Hz.wav'
phoneme_of_interest = 'ʃ'

# 加载视频并提取音频
video_clip = VideoFileClip(video_path)
audio_clip = video_clip.audio
audio_clip.write_audiofile(audio_output_path, codec='pcm_s16le')

# 使用librosa重新采样音频文件至16000Hz
audio, sr = librosa.load(audio_output_path, sr=16000)
sf.write(resampled_audio_output_path, audio, 16000)

print("音频长度:", librosa.get_duration(y=audio, sr=16000), "秒")

# 使用Wav2Vec2进行音频处理和模型预测
processor = Wav2Vec2Processor.from_pretrained("./wav2vec2-xls-r-300m-timit-phoneme")
model = Wav2Vec2ForCTC.from_pretrained("./wav2vec2-xls-r-300m-timit-phoneme")
audio_input, sample_rate = sf.read(resampled_audio_output_path)
inputs = processor(audio_input, sampling_rate=sample_rate, return_tensors="pt", padding=True)

with torch.no_grad():
    logits = model(inputs.input_values, attention_mask=inputs.attention_mask).logits

predicted_ids = torch.argmax(logits, axis=-1)
predicted_phonemes = processor.batch_decode(predicted_ids)
print("Predicted phonemes:", predicted_phonemes)

# 提取特定音素对应的帧并保存
timestamps_for_phoneme = []
cap = cv2.VideoCapture(video_path)
frame_rate = cap.get(cv2.CAP_PROP_FPS)
output_folder = f'./test_data/{video_name}_phoneme_{phoneme_of_interest}_frames'
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

for i, phoneme in enumerate(predicted_phonemes[0]):
    if phoneme_of_interest in phoneme:
        timestamps_for_phoneme.append(i * 20)  # Assuming each timestep corresponds to 20ms


print(f"Extracting frames for phoneme '{phoneme_of_interest}' at timestamps: {timestamps_for_phoneme}")

for timestamp in tqdm(timestamps_for_phoneme, desc='Extracting frames'):
    frame_id = int((timestamp / 1000) * frame_rate)
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
    ret, frame = cap.read()
    if ret:
        cv2.imwrite(os.path.join(output_folder, f'frame_{frame_id}.jpg'), frame)

cap.release()


In [None]:
predicted_phonemes[0].split()

#### 检查可视化 FLAME 参数

In [None]:
import os
import torch

def load_flame_file(file_path):
    """
    加载 FLAME 文件并打印内容

    参数:
    - file_path: FLAME 文件的路径
    """
    # 确保文件存在
    if not os.path.exists(file_path):
        print(f"文件 {file_path} 不存在。")
        return

    # 加载文件
    payload = torch.load(file_path)

    # # 打印文件内容的摘要
    # print(f"已加载文件：{file_path}")
    # print("文件内容包括：")
    # for key in payload.keys():
    #     print(f" - {key}: 类型 {type(payload[key])}")
    #     # 这里可以根据需要进一步打印内容或内容的摘要

    # 如果需要查看特定内容，可以直接访问
    # 例如，如果文件中包含了 'flame' 键:
    if 'flame' in payload:
        print("\nFLAME 参数详情：")
        flame_params = payload['flame']
        for param, value in flame_params.items():
            print(f"  - {param}: {value}")  # 可以根据实际情况调整以适当方式打印或处理这些值

# 示例用法
file_path = './metrical-tracker/output/justin/checkpoint/00341.frame'  # 替换为实际文件路径
load_flame_file(file_path)


In [None]:
import os
import torch

def load_and_print_opencv_params(file_path):
    """
    加载 FLAME 文件并打印其中的 opencv 参数

    参数:
    - file_path: FLAME 文件的路径
    """
    # 确保文件存在
    if not os.path.exists(file_path):
        print(f"文件 {file_path} 不存在。")
        return

    # 加载文件
    payload = torch.load(file_path)

    # 检查并打印 opencv 参数
    if 'opencv' in payload:
        print("\nOpenCV 参数详情：")
        opencv_params = payload['opencv']
        if 'K' in opencv_params:
            K = opencv_params['K'][0]  # 假设 K 存在并获取第一个（如果有多个）
            print(f"相机矩阵 K: \n{K}")
        else:
            print("相机矩阵 K 不存在于 opencv 参数中。")
    else:
        print("opencv 参数不存在于文件中。")

    # 检查并打印图像尺寸 img_size
    if 'img_size' in payload:
        img_size = payload['img_size']
        print(f"\n图像尺寸: {img_size}")
    else:
        print("图像尺寸 img_size 不存在于文件中。")

# 示例用法
file_path = './metrical-tracker/output/justin/checkpoint/00350.frame'  # 替换为实际文件路径
load_and_print_opencv_params(file_path)


#### 训练过程合成为视频

In [None]:
from moviepy.editor import ImageSequenceClip
import os

# 图片所在的文件夹
image_folder = '/root/autodl-tmp/WassersteinGS/dataset/head_move/INITIAL_ROT_ALONG_FACE_2D_UNet2D_noNorm_noSigmoid_SinCos_5_chronological/train'
# 视频的名称
video_name = 'output_chronological.mp4'

images = [img for img in os.listdir(image_folder) if img.endswith(".jpg")]
# 如果图片是按照数字顺序命名的，可以使用以下代码来对图片进行排序
images.sort(key=lambda x: int(x.split('.')[0]))

# 创建一个ImageSequenceClip对象，fps参数表示每秒的帧数
clip = ImageSequenceClip([os.path.join(image_folder, img) for img in images], fps=25)

# 将视频保存为mp4格式
clip.write_videofile(video_name)

#### 删除多余文件

In [None]:
import os
import glob
import re
from tqdm import tqdm

# 获取所有文件
files = glob.glob('/root/autodl-tmp/WassersteinGS/dataset/**', recursive=True)

# 定义文件名模式
patterns = [r'chkpnt(\d+)\.pth$', r'point_cloud_3dgs(\d+)\.ply$']

for file in tqdm(files):
    for pattern in patterns:
        match = re.search(pattern, file)
        print("match:", match)
        if match and int(match.group(1)) < 145000:
            os.remove(file)
            print("删除",file)

#### attention

In [21]:
import torch
from torch import nn
import torch.nn.functional as F

import torch
import torch.nn as nn
import torch.nn.functional as F

class FlattenAndLinear(nn.Module):
    def __init__(self):
        super(FlattenAndLinear, self).__init__(in_channels=128)
        # 假设输入数据的维度为 [batch_size, 10, 128, 256]
        # 我们首先通过一个卷积层来提取特征，这里使用一个简单的一维卷积
        self.conv1 = nn.Conv1d(in_channels=128, out_channels=64, kernel_size=3, stride=1, padding=1)
        # 接着使用一个最大池化层来降低维度
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
        # 然后通过一个全连接层将卷积层的输出映射到目标维度
        self.fc = nn.Linear(64 * 128, 256)

    def forward(self, x):
        # 通过卷积层和激活函数
        x = self.pool(F.relu(self.conv1(x)))
        # 将卷积层输出的维度从 [batch_size, 64, 64] 调整为 [batch_size, 64 * 64]
        x = x.view(x.size(0), -1)  # Flatten the tensor
        # 通过全连接层和激活函数
        x = F.relu(self.fc(x))
        return x


class ImprovedCrossAttentionModel(nn.Module):
    def __init__(self, batch_size):
        super(ImprovedCrossAttentionModel, self).__init__()
        self.batch_size = batch_size
        self.num_frames = 5  # Assuming each "batch" in 5*batchsize contains 5 frames
        
        # Positional Encoding for video features
        self.positional_encoding_video = nn.Parameter(torch.randn((128, 8, 6), dtype=torch.float32))
        self.positional_encoding_audio = nn.Parameter(torch.randn((128, 2), dtype=torch.float32))
        
        # Attention layers
        self.query_proj = nn.Linear(2, 256)  # Project audio features to match the dimensionality
        self.key_proj = nn.Linear(8 * 6, 256)    # Project video features after convolution and reshaping
        self.value_proj = nn.Linear(8 * 6, 256)  # Same as key projection
        
        # Output layers to ensure output dimension is 5*batchsize, 256
        self.final_linear = nn.Linear(256, 256)
        # self.output_reshape = nn.Linear(self.num_frames * self.batch_size, 256)
        self.output_reshape = FlattenAndLinear(128)

    def forward(self, video_features, audio_features):
        # video_features: (5*batch_size, 128, 8, 6)
        # audio_features: (5*batch_size, 128, 2)
        
        # Applying convolutional layer to video features
        video_features = video_features.view(-1, 128, 8, 6)  # Reshape to handle each frame independently
        # video_features = self.conv_layer(batched_frames)

        print("video_features:",video_features.shape) # 10 128 8 6
        print("self.positional_encoding:",self.positional_encoding_video.shape) # 128 8 6
        video_features += self.positional_encoding_video  # Adding positional encoding

        
        # Reshape and maintain spatial structure for attention
        video_features = video_features.view(-1, 128, 8*6)  # Keep spatial dimensions together # 10 128 48
        # video_features = video_features.permute(0, 2, 1).contiguous().view(-1, 10*8*6)  # Reshape for key/value projection
        
        audio_features = audio_features.view(-1, 128, 2)  # Just ensure dimensions are correct # 10 128 2
        print("audio_features:",audio_features.shape)
        audio_features += self.positional_encoding_audio
        
        # Project features for attention
        query = self.query_proj(audio_features) # 10 128 256
        key = self.key_proj(video_features)
        value = self.value_proj(video_features)

        # Scaled dot-product attention
        print("query:",query.shape) # query: torch.Size([10, 128, 256])
        print("key.shape:",key.shape) # key.shape: torch.Size([10, 256])
        print("key.transpose(1, 2).shape:",key.transpose(1, 2).shape)

        attention_scores = torch.bmm(query, key.transpose(1, 2)) / (256 ** 0.5)
        attention = F.softmax(attention_scores, dim=-1)
        attended_features = torch.bmm(attention, value)
        
        # Post-attention processing to match required output dimensions
        attended_features = self.final_linear(attended_features)
        print("attended_features:",attended_features.shape)

        output = self.output_reshape(attended_features)
        
        return output

#### Example usage (commented out)
improved_model = ImprovedCrossAttentionModel(batch_size=2)
video_features = torch.randn(5*2, 128, 8, 6)
audio_features = torch.randn(5*2, 128, 2)
output = improved_model(video_features, audio_features)
print(output.shape)  # Expected shape: (5*batch_size, 256)


conv_video_features: torch.Size([10, 128, 8, 6])
self.positional_encoding: torch.Size([128, 8, 6])
audio_features: torch.Size([10, 128, 2])
query: torch.Size([10, 128, 256])
key.shape: torch.Size([10, 128, 256])
key.transpose(1, 2).shape: torch.Size([10, 256, 128])
attended_features: torch.Size([10, 128, 256])
attended_features: torch.Size([10, 128, 256])
torch.Size([10, 256])


In [9]:
conv_video_features

NameError: name 'conv_video_features' is not defined