In [6]:
!pip install soundfile
!pip install pandas
!pip install scipy
!pip install librosa
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

from pathlib import Path
import pandas as pd
import numpy as np
import soundfile as sf
import scipy
from scipy import fft
import functools
from torch import nn
import librosa


Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [149]:
def stft(filepath, fft_size=348, hop_length = 348, win_length = 348, resample_to=22050, pad_len_in_secs=10):
    wdw = np.hanning(win_length)
    audio, rate = sf.read(filepath)
    target_length = rate * pad_len_in_secs
    if (audio.shape[0] < target_length):
        audio = np.hstack([audio, ([0] * (target_length - audio.shape[0]))])

    audio = librosa.resample(audio, rate, resample_to)
    output = np.zeros((int(fft_size / 2 + 1),audio.shape[0] // hop_length))
    i = 0
    while i < output.shape[0]:
        j = i*hop_length
        k = j+win_length
        if k <= audio.shape[0]:
            windowed = wdw * audio[j:k]
        elif j >= audio.shape[0]:
            break
        else:
            padded = np.zeros(win_length)
            padded[0:audio.shape[0] - j] = audio[j:]
            windowed = wdw * padded
        output[:,i] = fft.rfft(windowed,n=fft_size)
        i += 1
    return output, audio.shape[0]

# 
# The easiest way to align visemes to input features is to choose an FFT/hop length that 
# matches the framerate of the visemes.
#
def preprocess_viseme(csv, fft_windows, blendshapes=None):
    csv = pd.read_csv(csv)
    
    if(csv.shape[0] < fft_windows):
        pad = pd.DataFrame(0, index=[i for i in range(fft_windows - csv.shape[0])], columns=csv.columns)
        pad.pad(inplace=True)
        csv = pd.concat([csv, pad])
    else:
        raise "Visemes exceeded max length, truncate?"
    
    #split = csv["Timecode"].str.split(':')
    #minute = split.str[1].astype(int)
    #second = split.str[2].astype(int)
    #frame = split.str[3].astype(float)
    #minute -= minute[0]
    #ms
    #step = minute * 60 + second
    #csv["step"] = step
    #return csv.drop_duplicates(["step"])[["step", "MouthClose","MouthFunnel","MouthPucker","JawOpen"]]
    if blendshapes is None:
        return csv
    return csv[blendshapes]

# data_dir should be structured as follows:
# - speaker_id_1/
# - speaker_id_1/sample_id1.wav
# - speaker_id_1/sample_id1.csv
# - speaker_id_1/sample_id2.wav
# - speaker_id_1/sample_id2.wav
# - speaker_id_2/sample_id1.wav
# ..
class VisemeDataset(Dataset):
    def __init__(self, data_dir, audio_transform, viseme_transform):
        self.viseme_transform= viseme_transform
        self.audio_transform = audio_transform
        self.audio_files = []
        self.visemes = []
        for file in list(Path(data_dir).rglob("*.wav")):
            self.audio_files.append(file)
            self.visemes.append(str(file).replace("wav", "csv"))

    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, idx):
        fft, num_samples = self.audio_transform(self.audio_files[idx])
        visemes = torch.tensor(self.viseme_transform(self.visemes[idx],fft.shape[1]).values.astype(np.float32))
        # return STFT, viseme tensor
        return torch.tensor(fft, dtype=torch.float), visemes
training_data = VisemeDataset("./data/training", functools.partial(stft), functools.partial(preprocess_viseme, blendshapes=blendshapes))
test_data = VisemeDataset("./data/test", functools.partial(stft), functools.partial(preprocess_viseme, blendshapes=blendshapes))

train_dataloader = DataLoader(training_data, batch_size=4, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=4, shuffle=True)



In [150]:
fft_size=348
hop_length = 348
win_length = 348
sample_rate = 22050
#data, num_samples = stft("./training_data/speaker_1/1.wav", fft_size=fft_size, hop_length=hop_length, win_length=win_length, resample_to=sample_rate)

blendshapes = ["MouthClose", "MouthFunnel", "MouthPucker", "JawOpen"]
#labels = preprocess_viseme("training_data/speaker_1/MySlate_7_Nic.csv", blendshapes)

framerate=59.97 # source framerate for video

# duration (ms) for each audio window
# this also corresponds to length of time for each output viseme
window_size = (win_length / sample_rate) * 1000


In [234]:
class SeparableConv1d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, bias=False):
        super(SeparableConv1d, self).__init__()
        self.depthwise = nn.Conv1d(in_channels, in_channels, kernel_size=kernel_size, 
                               groups=in_channels, bias=bias, padding=1)
        self.pointwise = nn.Conv1d(in_channels, out_channels, 
                               kernel_size=1, bias=bias)
    def forward(self, x):
        out = self.depthwise(x)
        out = self.pointwise(out)
        return out
    
    
class NeuralNetwork(nn.Module):
    def __init__(self, batch_size, n_ffts=175, num_viseme=4, ks=3):
        super(NeuralNetwork, self).__init__()
        self.linear_relu_stack = nn.Sequential(
            SeparableConv1d(n_ffts,n_ffts,ks),
            #nn.Linear(174, 174),
            nn.ReLU(),
            SeparableConv1d(n_ffts,n_ffts,ks),
            nn.ReLU(),
            #nn.Linear(175, 175),
            #nn.ReLU(),
            #nn.Linear(1, fft_length),
            #nn.Linear(fft_length, num_viseme),
        )
        self.linear_out = nn.Linear(1, num_viseme)
        #self.attn = nn.MultiheadAttention(192, 4)

    def forward(self, x):
        #return self.linear_relu_stack(x)
        o1 = self.linear_relu_stack(x)
        o1 = torch.sum(o1, 1, keepdim=True)
        o1 = torch.transpose(o1, 1,2)
        o1 = self.linear_out(o1)
        return o1
        
        #return 

In [235]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using {} device'.format(device))

   
model = NeuralNetwork(4).to(device)

learning_rate = 0.00005
optimizer = torch.optim.RMSprop(model.parameters(), lr=learning_rate)
num_steps = 2000
print_loss_every = 5
eval_every = 20
for t in range(num_steps):
    train_features, train_labels = next(iter(train_dataloader))
    x = train_features.to(device)      
    #x = torch.unsqueeze(x, dim=3)
    
    y = train_labels.to(device)
    #print(y.shape)
    preds = model(x)
    
    #print(preds.shape)
    
    loss = torch.nn.functional.mse_loss(preds, y)
    if t % print_loss_every == 0:
        print(f"Step {t} Loss: {loss.item()}")
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if t > 0 and t % eval_every == 0:
        accum_loss = 0
        for test_features, test_labels in iter(test_dataloader):
            x = test_features.to(device)
            #x = torch.unsqueeze(x, dim=3)
            y = test_labels.to(device)
            preds = model(x)
            accum_loss += torch.nn.functional.mse_loss(preds, y).item()
        print(f"Test loss {accum_loss}")
    


#pred_probab = nn.Softmax(dim=1)(logits)
#y_pred = pred_probab.argmax(1)
#print(f"Predicted class: {y_pred}")

Using cuda device




torch.Size([4, 633, 4])
Step 0 Loss: 0.05108717456459999
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 5 Loss: 0.038752950727939606
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 10 Loss: 0.0408959724009037
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 15 Loss: 0.03782924264669418
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 20 Loss: 0.03397971764206886
Test loss 0.08066428080201149
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 25 Loss: 0.03202453628182411
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 30 Loss: 0.03923402

torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 255 Loss: 0.027185605838894844
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 260 Loss: 0.026612630113959312
Test loss 0.05450316146016121
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 265 Loss: 0.02586725726723671
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 270 Loss: 0.026665138080716133
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 275 Loss: 0.02649998478591442
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 280 Loss: 0.024935448542237282
Test loss 0.051960861310362816
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch

torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 505 Loss: 0.025014452636241913
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 510 Loss: 0.022163406014442444
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 515 Loss: 0.02668403647840023
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 520 Loss: 0.02391594834625721
Test loss 0.04819743148982525
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 525 Loss: 0.023756608366966248
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 530 Loss: 0.02350645326077938
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4

torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 760 Loss: 0.02062840387225151
Test loss 0.04219780117273331
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 765 Loss: 0.02193908579647541
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 770 Loss: 0.02057373709976673
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 775 Loss: 0.02068687230348587
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 780 Loss: 0.020675791427493095
Test loss 0.04171869903802872
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
torch.Size([4, 633, 4])
Step 785 Loss: 0.020520396530628204
torch.Si

KeyboardInterrupt: 

In [242]:
test_features, _ = next(iter(test_dataloader))
export_y = model(test_features.to(device)) 
print(export_y.shape)
header = "Timecode,BlendShapeCount,eyeBlinkRight,eyeLookDownRight,eyeLookInRight,eyeLookOutRight,eyeLookUpRight,eyeSquintRight,eyeWideRight,eyeBlinkLeft,eyeLookDownLeft,eyeLookInLeft,eyeLookOutLeft,eyeLookUpLeft,eyeSquintLeft,eyeWideLeft,jawForward,jawRight,jawLeft,jawOpen,mouthClose,mouthFunnel,mouthPucker,mouthRight,mouthLeft,mouthSmileRight,mouthSmileLeft,mouthFrownRight,mouthFrownLeft,mouthDimpleRight,mouthDimpleLeft,mouthStretchRight,mouthStretchLeft,mouthRollLower,mouthRollUpper,mouthShrugLower,mouthShrugUpper,mouthPressRight,mouthPressLeft,mouthLowerDownRight,mouthLowerDownLeft,mouthUpperUpRight,mouthUpperUpLeft,browDownRight,browDownLeft,browInnerUp,browOuterUpRight,browOuterUpLeft,cheekPuff,cheekSquintRight,cheekSquintLeft,noseSneerRight,noseSneerLeft,tongueOut,HeadYaw,HeadPitch,HeadRoll,LeftEyeYaw,LeftEyePitch,LeftEyeRoll,RightEyeYaw,RightEyePitch,RightEyeRoll".split(',')
selected_output_indices = [header.index(x[0].lower() + x[1:]) for x in blendshapes]
print(selected_output_indices)
num_visemes = len(blendshapes)
with open("output.csv", "w") as outfile:
    outfile.write(",".join(header) + "\n")
    timer_ms = 0
    for i in range(export_y.shape[1]):
        output = [str(0)] * len(header)
        for viseme in range(num_visemes): 
            second = str(int(timer_ms // 1000)).zfill(2)
            frame = (timer_ms % 1000) * framerate / 1000
            output[0] = f"00:00:{second}:{frame}"
            output[selected_output_indices[viseme]] = str(export_y[0,i,viseme].item())
        timer_ms += window_size
        outfile.write(",".join(output) + "\n")



torch.Size([4, 633, 4])
[20, 21, 22, 19]


In [106]:
new_header = "Timecode,BlendShapeCount,EyeBlinkLeft,EyeLookDownLeft,EyeLookInLeft,EyeLookOutLeft,EyeLookUpLeft,EyeSquintLeft,EyeWideLeft,EyeBlinkRight,EyeLookDownRight,EyeLookInRight,EyeLookOutRight,EyeLookUpRight,EyeSquintRight,EyeWideRight,JawForward,JawRight,JawLeft,JawOpen,MouthClose,MouthFunnel,MouthPucker,MouthRight,MouthLeft,MouthSmileLeft,MouthSmileRight,MouthFrownLeft,MouthFrownRight,MouthDimpleLeft,MouthDimpleRight,MouthStretchLeft,MouthStretchRight,MouthRollLower,MouthRollUpper,MouthShrugLower,MouthShrugUpper,MouthPressLeft,MouthPressRight,MouthLowerDownLeft,MouthLowerDownRight,MouthUpperUpLeft,MouthUpperUpRight,BrowDownLeft,BrowDownRight,BrowInnerUp,BrowOuterUpLeft,BrowOuterUpRight,CheekPuff,CheekSquintLeft,CheekSquintRight,NoseSneerLeft,NoseSneerRight,TongueOut,HeadYaw,HeadPitch,HeadRoll,LeftEyeYaw,LeftEyePitch,LeftEyeRoll,RightEyeYaw,RightEyePitch,RightEyeRoll"
remap = {h:(h[0].lower() + h[1:]) if h not in ["Timecode","BlendShapeCount","HeadYaw","HeadPitch","HeadRoll","LeftEyeYaw","LeftEyePitch","LeftEyeRoll","RightEyeYaw","RightEyePitch","RightEyeRoll"]  else h for h in new_header.split(",") }
for oh in remap.values():
    if oh not in header:
        print(oh)

#print(remap)
def new_to_old(csv_df):
    return csv_df.rename(columns=remap)
df = preprocess_viseme("training_data/speaker_1/MySlate_7_Nic.csv")
#print(df)
new_to_old(df)[header].to_csv("original.csv", index=False)