In [1]:
from transformers import AutoProcessor, Wav2Vec2Model
import torch
from sklearn.model_selection import train_test_split
import librosa
import json
import os

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cpu


In [3]:
def process_folder(folder_path, model_name="facebook/wav2vec2-base-960h"):
    # Load processor and model
    processor = AutoProcessor.from_pretrained(model_name)
    model = Wav2Vec2Model.from_pretrained(model_name)

    # Get a list of all .wav files in the folder
    wav_files = [f for f in sorted(os.listdir(folder_path)) if f.endswith(".wav")]

    # Initialize an empty list to store last_hidden_states
    all_hidden_states = []

    jawopen_values_list = []

    json_files = [f for f in sorted(os.listdir(folder_path)) if f.endswith(".json")]

    # Process each .wav file in the folder
    for i in range(len(wav_files)):
        # Load audio file
        audio_input, rate = librosa.load(os.path.join(folder_path, wav_files[i]), sr=16000)

        # Process audio input with the processor
        inputs = processor(audio_input, sampling_rate=16000, return_tensors="pt")

        # Forward pass through the model
        with torch.no_grad():
            outputs = model(**inputs)

        # Get the last hidden states
        last_hidden_states = outputs.last_hidden_state

        with open(os.path.join(folder_path, json_files[i]), 'r') as file:
            # Load the JSON data
            data = json.load(file)

            # Check if "blendshapes" key is present in the JSON
            if "blendshapes" in data:
                # Check if "jawOpen" key is present in the blendshapes
                if "jawOpen" in data["blendshapes"]:
                    # Append the "jawOpen" values to the list

                    jawopen_values_list.extend(data["blendshapes"]["jawOpen"])
                    n = len(data["blendshapes"]["jawOpen"])
        # 使用切片进行降采样
        downsampled_last_hidden_states = last_hidden_states[:, ::2, :]


        # 删除最后len(last_hidden_states)-n个元素
        downsampled_last_hidden_states = downsampled_last_hidden_states[:, :-(len(last_hidden_states)//2-n), :]

        # Append to the list
        all_hidden_states.append(downsampled_last_hidden_states)

    return all_hidden_states,jawopen_values_list
def normalize_hidden_states(hidden_states_list):
    normalized_states_list = []

    for last_hidden_states in hidden_states_list:
        # Find min and max values along the second dimension (dim=1)
        min_values, _ = torch.min(last_hidden_states, dim=1, keepdim=True)
        max_values, _ = torch.max(last_hidden_states, dim=1, keepdim=True)

        # Normalize the hidden states
        normalized_states = (last_hidden_states - min_values) / (max_values - min_values)

        # Append to the list
        normalized_states_list.append(normalized_states)

    return normalized_states_list

In [5]:
path = "final"

hidden_states_list,jawopen_values = process_folder(path)
normalized_hidden_states_list = torch.cat(normalize_hidden_states(hidden_states_list),dim=1)

print(jawopen_values)
print(normalized_hidden_states_list)

Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


[0.0009436695254407823, 0.0011430016020312905, 0.0011635582195594907, 0.0012455241521820426, 0.0012643779627978802, 0.0013309929054230452, 0.0014180643483996391, 0.001282856916077435, 0.0011750399135053158, 0.0017058108933269978, 0.0018952676327899098, 0.0020567947067320347, 0.002019671956077218, 0.00200592540204525, 0.0020072946790605783, 0.001820303383283317, 0.001478254096582532, 0.0017344166990369558, 0.001344162505120039, 0.0016099934000521898, 0.0012315207859501243, 0.002006629016250372, 0.0013606301508843899, 0.0007916360627859831, 0.0019211273174732924, 0.003436148865148425, 0.0022839384619146585, 0.0018202333012595773, 0.0015261132502928376, 0.003827120643109083, 0.00639472296461463, 0.009122451767325401, 0.0056002577766776085, 0.0015445853350684047, 0.0010020016925409436, 0.00128310383297503, 0.0011276990408077836, 0.010537778027355671, 0.016478057950735092, 0.0211445614695549, 0.0175151526927948, 0.007202859502285719, 0.001301087555475533, 0.0015131084946915507, 0.0007178063

In [6]:
print(len(jawopen_values))

107189


In [7]:
jawopen_tensor = torch.tensor(jawopen_values, dtype=torch.float32)
jawopen_tensor.shape

torch.Size([107189])

In [8]:
# 重塑 x 和 y 的形状以适应线性回归
x_reshaped = normalized_hidden_states_list.reshape(-1, 768)
y_reshaped = jawopen_tensor.reshape(-1, 1)

print(x_reshaped.shape)

print(y_reshaped.shape)

torch.Size([107189, 768])
torch.Size([107189, 1])


In [9]:

# 划分数据集为训练集和测试集和验证集
# 划分数据为训练集（80%）和临时集（20%）
x_train, x_temp, y_train, y_temp = train_test_split(x_reshaped, y_reshaped, test_size=0.2, random_state=42)

# 再次划分临时集为验证集（50%）和测试集（50%）
x_val, x_test, y_val, y_test = train_test_split(x_temp, y_temp, test_size=0.5, random_state=42)

# 现在 x_train, x_val, x_test 是训练集、验证集和测试集，y_train, y_val, y_test 是相应的标签

# 训练集
print(x_train.shape, y_train.shape)
# 验证集
print(x_val.shape, y_val.shape)
# 测试集
print(x_test.shape, y_test.shape)


torch.Size([85751, 768]) torch.Size([85751, 1])
torch.Size([10719, 768]) torch.Size([10719, 1])
torch.Size([10719, 768]) torch.Size([10719, 1])


In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import mean_squared_error
from torch.utils.data import TensorDataset, DataLoader



X_train_tensor = torch.tensor(x_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
X_val_tensor = torch.tensor(x_val, dtype=torch.float32).to(device)
y_val_tensor = torch.tensor(y_val, dtype=torch.float32).to(device)
X_test_tensor = torch.tensor(x_test, dtype=torch.float32).to(device)

# 创建 DataLoader 对象
batch_size = 64
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

# 定义深层神经网络模型
class DeepRegressionModel(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size):
        super(DeepRegressionModel, self).__init__()
        layers = []
        for i in range(len(hidden_sizes)):
            if i == 0:
                layers.append(nn.Linear(input_size, hidden_sizes[i]))
            else:
                layers.append(nn.Linear(hidden_sizes[i-1], hidden_sizes[i]))
            layers.append(nn.ReLU())
        layers.append(nn.Linear(hidden_sizes[-1], output_size))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

# 实例化神经网络模型
input_size = X_train_tensor.shape[1]
hidden_sizes = [256, 128, 64]
output_size = 1
model = DeepRegressionModel(input_size, hidden_sizes, output_size).to(device)

# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005)

# 训练模型
num_epochs = 2000
for epoch in range(num_epochs):
    for batch_x, batch_y in train_loader:
        # 前向传播
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        loss = loss.to(device)
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if (epoch+1) % 100 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# 模型预测
with torch.no_grad():
    y_pred_tensor = model(X_test_tensor)

# 转换为NumPy数组
y_pred = y_pred_tensor.cpu().numpy()

# 评估性能
mse_nn = mean_squared_error(y_test, y_pred)
print(f"Mean Squared Error (Neural Network): {mse_nn}")


  X_train_tensor = torch.tensor(x_train, dtype=torch.float32).to(device)
  y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
  X_val_tensor = torch.tensor(x_val, dtype=torch.float32).to(device)
  y_val_tensor = torch.tensor(y_val, dtype=torch.float32).to(device)
  X_test_tensor = torch.tensor(x_test, dtype=torch.float32).to(device)


Epoch [100/2000], Loss: 0.0005
Epoch [200/2000], Loss: 0.0007
Epoch [300/2000], Loss: 0.0010
Epoch [400/2000], Loss: 0.0012
Epoch [500/2000], Loss: 0.0006
Epoch [600/2000], Loss: 0.0006
Epoch [700/2000], Loss: 0.0004
Epoch [800/2000], Loss: 0.0005
Epoch [900/2000], Loss: 0.0004
Epoch [1000/2000], Loss: 0.0012
Epoch [1100/2000], Loss: 0.0003
Epoch [1200/2000], Loss: 0.0006
Epoch [1300/2000], Loss: 0.0002
Epoch [1400/2000], Loss: 0.0009
Epoch [1500/2000], Loss: 0.0003
Epoch [1600/2000], Loss: 0.0009
Epoch [1700/2000], Loss: 0.0007
Epoch [1800/2000], Loss: 0.0004
Epoch [1900/2000], Loss: 0.0005
Epoch [2000/2000], Loss: 0.0004
Mean Squared Error (Neural Network): 0.001003466546535492


In [11]:
y_pred[y_pred<0] = 0

In [12]:

print(y_pred)
print(y_test)


[[0.0569189 ]
 [0.02442421]
 [0.0124113 ]
 ...
 [0.03028581]
 [0.03290699]
 [0.01407313]]
tensor([[0.1389],
        [0.0447],
        [0.0014],
        ...,
        [0.0036],
        [0.0182],
        [0.0134]])


In [13]:

from sklearn.metrics import r2_score

# 计算 R²
r_squared = r2_score(y_test, y_pred)

r_squared


0.605017971948717