In [172]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np 
import glob
import os

In [173]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [174]:
config = {
    # training
    "epochs": 500,
    "batch_size": 1,          # 1 utterance = 1 ODE
    "learning_rate": 1e-3,
    "lambda_phys": 1e-3,

    # optimizer
    "optimizer": "adam",
    "weight_decay": 0.0,

    # scheduler
    "use_scheduler": True,
    "scheduler": "reduce_on_plateau",
    "lr_patience": 20,
    "lr_factor": 0.5,
    "min_lr": 1e-6,

    # early stopping
    "early_stopping": True,
    "early_patience": 40,
    "min_delta": 1e-6,

    # model
    "embed_dim": 48,

    # physics
    "I": 1.0,
    "r": 1.0
}

In [None]:
'''
observed jaw kinematics : Iθ¨(t) + bθ˙(t) + kθ(t)  - torque = 0
I = inertia, b= damping, k = stiffness
learned articulatory embedding : z=fCNN​(θ,x,y)
input = (x: [theta, w, a], t ), embeddings
output = a(t) : force, (b,k,f_max)
constant : I = jaw intertia
            Moment arm = r
torque model = τ = r.a(t).f_max (simplified)
loss function: l_contrastive + l_kinetics
'''

In [176]:
#dataset ready
#architecture
#gradients
#kinematics residual
#contrastive residual 
#training loop
# model evaluation 
#plotting residuals

In [177]:
data = np.load('./../src/feature_extraction/z_word_embeddings.npz')

word_ids = data["word_ids"]       # (N_words,)
embeddings = data["embeddings"]   # (N_words, D)

print(len(word_ids))
# Word embeddings as a single tensor
z_word_tensor = torch.tensor(
    embeddings,
    dtype=torch.float32
)  # shape: (N_words, D)

z_word_tensor.shape


25


torch.Size([25, 56])

In [178]:
#making data for the pinn model 
src_pattern = r'/workspace/Silent_Speech/dataset_sony/Normalized_dataset/recordings/*/*.csv'
features = ["theta"]
C = len(features)
print("1. Loading & Normalizing PER SUBJECT...")

files = glob.glob(src_pattern)

X, words, subjects = [], [], []
T = 150

for fp in files:
    try:
        df = pd.read_csv(fp)
        data = df[features].values
        #normalization
        scaler = StandardScaler()
        data = scaler.fit_transform(data)
        
        n_chunks = len(data) // T
        if n_chunks == 0:
            continue

        data = data[:n_chunks * T].reshape(n_chunks, T, len(features))
        X.append(data)

        word = os.path.basename(os.path.dirname(fp))
        subj = os.path.basename(fp).replace(".csv", "")

        words.extend([word] * n_chunks)
        subjects.extend([subj] * n_chunks)

    except Exception as e:
        print("Skipping:", fp, e)

X = np.vstack(X)  # (N, T, 5)
theta_all = X[..., 0] 
theta_all = torch.tensor(
    theta_all,
    dtype=torch.float32
)

1. Loading & Normalizing PER SUBJECT...


In [179]:
# unique words (strings)
unique_words = sorted(list(set(words)))
num_words = len(unique_words)

# map word string → embedding row
word_to_index = {w: i for i, w in enumerate(unique_words)}

In [180]:
# z_word_tensor: (num_words, D)
# must be in the SAME ORDER as unique_words

z_word_tensor = torch.tensor(
    embeddings,  # assuming embeddings are already ordered correctly
    dtype=torch.float32
)
theta_all = theta_all.to(device)        # (N, T)
z_word_tensor = z_word_tensor.to(device)  # (num_words, D)

assert z_word_tensor.shape[0] == num_words

In [181]:
class JawPINNDataset(Dataset):
    def __init__(self, theta_all, words, z_word_tensor, word_to_index):
        """
        theta_all     : (N, T)
        words         : list[str], length N
        z_word_tensor : (num_words, D)
        word_to_index : dict[str -> int]
        """
        self.theta_all = theta_all
        self.words = words
        self.z_word_tensor = z_word_tensor
        self.word_to_index = word_to_index

    def __len__(self):
        return self.theta_all.shape[0]

    def __getitem__(self, idx):
        theta_obs = self.theta_all[idx]   # (T,)
        word = self.words[idx]            # string
        z_idx = self.word_to_index[word]  # int

        return {
            "theta_obs": theta_obs.unsqueeze(1),        # (T,1)
            "z_word": self.z_word_tensor[z_idx]         # (D,)
        }

In [182]:
loader = DataLoader(
    JawPINNDataset(theta_all, words, z_word_tensor, word_to_index),
    batch_size=64,
    shuffle=True,
    drop_last=True
)

In [183]:
def make_time_tensor(batch_size, T, t0=0.0, t1=1.0, device="cuda"):
    t = torch.linspace(t0, t1, T, device=device)
    t = t.view(1, T, 1).repeat(batch_size, 1, 1)
    t.requires_grad_(True)
    return t

In [184]:
batch = next(iter(loader))
for batch in loader:
    theta_obs = batch["theta_obs"].to(device)  # (B, T, 1)
    z_word = batch["z_word"].to(device)        # (B, D)

    B, T, _ = theta_obs.shape

In [185]:
t = make_time_tensor(B, T)

z_exp = z_word.unsqueeze(1).expand(B, T, -1)   # (B,T,D)
pinn_input = torch.cat([t, z_exp], dim=-1)     # (B,T,1+D)

In [188]:
assert pinn_input.shape == (B, T, 1 + z_word.shape[1])
assert theta_obs.is_cuda
assert z_word.is_cuda
assert t.is_cuda
assert pinn_input.is_cuda
assert t.requires_grad

In [191]:
D=56
#shapes
assert theta_obs.shape == (B, T, 1), theta_obs.shape
assert z_word.shape == (B, D), z_word.shape
assert t.shape == (B, T, 1), t.shape
assert z_exp.shape == (B, T, D), z_exp.shape
assert pinn_input.shape == (B, T, 1 + D), pinn_input.shape

#device
assert theta_obs.is_cuda
assert z_word.is_cuda
assert t.is_cuda
assert pinn_input.is_cuda

#grad flags
assert t.requires_grad is True
assert theta_obs.requires_grad is False
assert z_word.requires_grad is False

print("ALL SHAPE + DEVICE CHECKS PASSED")

 ALL SHAPE + DEVICE CHECKS PASSED
