In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import numpy as np
from scipy.signal import savgol_filter
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from time import time
import matplotlib.pyplot as plt
from keras import layers
from keras import Sequential
from keras.models import Model
from keras.callbacks import ModelCheckpoint
import os
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.animation import FuncAnimation
from IPython.display import HTML

In [4]:
subjects_PD=['S01','S02','S03','S04','S05','S06','S07','S09',
'S10','S11','S12','S13','S14','S16','S17','S18','S19',
'S21','S22','S23','S24','S28','S29','S30','S31',
'S32','S33','S34','S35']

subjects_All=['S01','S02','S03','S04','S05','S06','S07','S08','S09',
'S10','S11','S12','S13','S14','S16','S17','S18','S19','S20',
'S21','S22','S23','S24','S25','S26','S27','S28','S29','S30','S31',
'S32','S33','S34','S35']

subjects_All_date=['20210223','20191114','20191120','20191112','20191119','20200220','20191121',
'20191126','20191128','20191203','20191204','20200108','20200109','20200121','20200122','20200123',
'20200124','20200127','20200130','20200205','20200206_9339','20200207','20200213','20200214','20200218',
'26','20200221','20210706','20210804','20200206_9629','202108','20191210','20191212','20191218','20200227']

healthy_controls=['S08','S20','S27','S25','S26']

Y_true=np.asarray([1,1,0,1,1,0,0,0,1,0,1,0,1,1,1,1,0,0,0,1,1,0,0,1,1,0,1,0,1])

print(np.array(subjects_All).shape)
print(np.array(subjects_PD).shape)
print(Y_true.shape)

(34,)
(29,)
(29,)


In [5]:
scaler = StandardScaler()

X = []
y = []

window_len = 30

for idx,(subj,subj_date) in enumerate(zip(subjects_All,subjects_All_date)):

    data=np.load('/content/drive/MyDrive/outputs_finetuned/Predictions_'+subj+'.npy')

    data_normal = data

    for ii in range(15):
        for jj in range(3):
            data_normal[:,ii,jj]=savgol_filter(data_normal[:,ii,jj],11,3)

    x_vec=data_normal[:,1]-data_normal[:,4]
    y_vec=data_normal[:,7]-data_normal[:,0]

    x_vec/=np.linalg.norm(x_vec,keepdims=True,axis=-1)
    y_vec/=np.linalg.norm(y_vec,keepdims=True,axis=-1)

    z_vec=np.cross(x_vec,y_vec)

    rotation_matrix=np.ones((len(x_vec),3,3))
    rotation_matrix[:,:,0]=x_vec
    rotation_matrix[:,:,1]=y_vec
    rotation_matrix[:,:,2]=z_vec

    data_normal=np.matmul(data_normal,rotation_matrix)

    label = None

    if subj in healthy_controls:
      label = 0
    elif subj in subjects_PD:
      index = subjects_PD.index(subj)
      label = Y_true[index]

    data_normal = data_normal.reshape([data_normal.shape[0], data_normal.shape[1]*data_normal.shape[2]])

    for ii in range(window_len,len(data)):
      X.append(data_normal[ii-window_len:ii, :])
      y.append(label)

X = np.asarray(X)
y = np.asarray(y)

# Split the data into two separate arrays: one for label 0 and one for label 1.
y_0 = y[y == 0]
y_1 = y[y == 1]
X_0 = X[y == 0]
X_1 = X[y == 1]

# Split each label's data into training and test sets.
test_size = 0.10  # 10% for the test data

X_0_train, X_0_test, y_0_train, y_0_test = train_test_split(
    X_0, y_0, test_size=test_size, random_state=42)

X_1_train, X_1_test, y_1_train, y_1_test = train_test_split(
    X_1, y_1, test_size=test_size, random_state=42)

# Combine the training and test sets for both y.
X_train = np.concatenate((X_0_train, X_1_train))
y_train = np.concatenate((y_0_train, y_1_train))
X_test = np.concatenate((X_0_test, X_1_test))
y_test = np.concatenate((y_0_test, y_1_test))

rp = np.random.permutation(len(y_train))

X_train = X_train[rp, : ,:]
y_train = y_train[rp]

orig_shape_train = X_train.shape
orig_shape_test = X_test.shape

X_train_flatten = X_train.reshape(orig_shape_train[0], orig_shape_train[1]*orig_shape_train[2])
X_test_flatten = X_test.reshape(orig_shape_test[0], orig_shape_test[1]*orig_shape_test[2])

X_train_flatten_normal = scaler.fit_transform(X_train_flatten)
X_test_flatten_normal =scaler.transform(X_test_flatten)

X_train = X_train_flatten_normal.reshape(orig_shape_train[0], orig_shape_train[1], orig_shape_train[2])
X_test = X_test_flatten_normal.reshape(orig_shape_test[0], orig_shape_test[1], orig_shape_test[2])

print(X_train.shape)
print(y_train.shape)

print(X_test.shape)
print(y_test.shape)

(10925, 30, 45)
(10925,)
(1215, 30, 45)
(1215,)


In [10]:
class Diffusion():

  def __init__(self, input_size, steps = 1000, beta_start = 1e-4, beta_end = 0.02, num_classes = 2):
    self.steps = steps
    self.beta_start = beta_start
    self.beta_end = beta_end
    self.beta = np.linspace(beta_start, beta_end, steps)
    self.alpha = 1-self.beta
    self.alpha_hat = np.cumprod(self.alpha)
    self.input_size = input_size
    self.num_classes = num_classes
    self.model = self.make_net(self.input_size)
    self.discriminator_model = self.make_discriminator_net(self.input_size, self.num_classes)
    self.coeff1 =  tf.Variable(np.divide(1, np.sqrt(self.alpha)))
    self.coeff2 =  tf.Variable(np.divide(self.beta, np.sqrt(1-self.alpha_hat)))

  def forward(self, x_t0, step):
    noise = np.random.randn(*x_t0.shape)
    return noise, np.sqrt(self.alpha_hat[step, np.newaxis, np.newaxis]) * x_t0 + np.sqrt(1-self.alpha_hat[step, np.newaxis, np.newaxis]) * noise

  def load_net(self, checkpoint_name_model, checkpoint_name_discriminator_model, checkpoint_dir = "/content/drive/MyDrive/checkpoints/"):

    self.model.load_weights(checkpoint_dir+checkpoint_name_model)
    self.discriminator_model.load_weights(checkpoint_dir+checkpoint_name_discriminator_model)


  def embedder(self, x, size):
    x = layers.Dense(size)(x)
    x = x_res = layers.Activation('relu')(x)
    x = layers.Dense(size)(x)
    x = layers.Activation('relu')(x)
    x = layers.Dense(size)(x)
    x = layers.Add()([x, x_res])
    x = layers.Activation('relu')(x)
    x = layers.LayerNormalization(epsilon=1e-6)(x)
    return x

  def transformer(self, x, info, num_heads, size):
    att = layers.MultiHeadAttention(num_heads, size)(x, info)
    x = layers.Add()([att, x])
    x = x_res = layers.LayerNormalization(epsilon=1e-6)(x)
    x = self.embedder(x, x.shape[2])
    x = layers.Add()([x_res, x])
    x = layers.LayerNormalization(epsilon=1e-6)(x)
    return x


  def make_net(self, input_size):

    x_input = layers.Input(shape=(input_size[0], input_size[1]))

    x_ts_input = layers.Input(shape=(1,))

    x_cg_input = layers.Input(shape=(1,))

    size = 64

    num_heads = 40

    #x_ts = self.embedder(x_ts_input, size)
    #x_ts = layers.Reshape((size, 1))(x_ts)
    #x_ts = self.embedder(x_ts, input_size[0])
    #x_ts = layers.Permute((2, 1))(x_ts)

    x_ts = self.embedder(x_ts_input, size*input_size[0]//4)
    x_ts = layers.Reshape((input_size[0], size//4))(x_ts)
    x_ts = self.embedder(x_ts, size)


    x_cg = layers.Masking(mask_value=-1.0)(x_cg_input)
    x_cg = self.embedder(x_cg, size)
    x_cg = layers.Reshape((size, 1))(x_cg)
    x_cg = self.embedder(x_cg, input_size[0])
    x_cg = layers.Permute((2, 1))(x_cg)

    #x_cg = layers.Masking(mask_value=-1.0)(x_cg_input)
    #x_cg = self.embedder(x_ts_input, size*input_size[0]//4)
    #x_cg = layers.Reshape((input_size[0], size//4))(x_cg)
    #x_cg = self.embedder(x_cg, size)

    x = self.embedder(x_input, size)

    pos_enc = self.positional_encoding(input_size[0], size)

    x = x + pos_enc

    #x = self.transformer(x, x, num_heads, size)

    x = self.transformer(x, x, num_heads, size)

    #x = self.transformer(x, x_ts, num_heads, size)

    x = self.transformer(x, x_ts, num_heads, size)

    x = self.transformer(x, x_cg, num_heads, size)

    x = layers.Dense(input_size[1])(x)
    #x = layers.Activation('relu')(x)
    #x = layers.Dense(input_size[1])(x)
    #x = layers.Activation('tanh')(x)

    print(x.shape)

    model = Model([x_input, x_ts_input, x_cg_input], x)

    return model

  def make_discriminator_net(self, input_size, num_classes):

    x_input = layers.Input(shape=(input_size[0], input_size[1]))

    size = 64

    num_heads = 40

    x = self.embedder(x_input, size)

    pos_enc = self.positional_encoding(input_size[0], size)

    x = x + pos_enc

    x = self.transformer(x, x, num_heads, size)

    x = layers.GlobalAveragePooling1D()(x)

    x = layers.Dense(size//2)(x)
    x = layers.Activation('relu')(x)
    x = layers.Dense(size//4)(x)
    x = layers.Activation('relu')(x)
    x = layers.Dense(num_classes, activation='softmax')(x)

    print(x.shape)

    model = Model(x_input, x)

    return model


  def train_discriminator(self, X, y, num_epochs, batch_size=64, learning_rate=0.001):
      # Compile the model
      self.discriminator_model.compile(
          optimizer = tf.keras.optimizers.Adam(learning_rate),
          loss = tf.keras.losses.SparseCategoricalCrossentropy(),
          metrics = ['accuracy']
      )

      # Train the model
      self.discriminator_model.fit(
          X, y,
          epochs=num_epochs,
          batch_size=batch_size,
          verbose=1
      )

      return

  def compute_mu_from_epsilon(self, epsilon, x, t):
    # Use tf.gather to select elements from coeff1 and coeff2
    coeff1_t = tf.gather(self.coeff1, t)
    coeff2_t = tf.gather(self.coeff2, t)

    # Cast coeff1_t and coeff2_t to the same dtype as x and epsilon
    coeff1_t = tf.cast(coeff1_t, dtype=x.dtype)
    coeff2_t = tf.cast(coeff2_t, dtype=x.dtype)

    # Expand dimensions to match the image dimensions
    coeff1_t = tf.expand_dims(tf.expand_dims(coeff1_t, -1), -1)  # Shape becomes [batch_size, 1, 1]
    coeff2_t = tf.expand_dims(tf.expand_dims(coeff2_t, -1), -1)  # Shape becomes [batch_size, 1, 1]

    # Broadcast multiplication and subtraction across the image dimensions
    return coeff1_t * (x - coeff2_t * epsilon)

  def train(self, x, c, epochs, batch_size=64, checkpoint_dir='/content/drive/MyDrive/checkpoints'):

    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
    loss_fn = tf.keras.losses.MeanSquaredError()
    loss_crs_ent = tf.keras.losses.SparseCategoricalCrossentropy()

    # Ensure the checkpoint directory exists
    os.makedirs(checkpoint_dir, exist_ok=True)

    # Build the optimizer with all trainable variables
    all_trainable_variables = self.model.trainable_variables + self.discriminator_model.trainable_variables
    optimizer.build(all_trainable_variables)

    loss_ = []

    @tf.function
    def train_step(x, t, c, epsilon):
        with tf.GradientTape(persistent=True) as tape:
          epsilon_pred = self.model([x, t, c])
          loss_simple = loss_fn(epsilon_pred, epsilon)
          x = tf.cast(x, dtype=epsilon_pred.dtype)
          mu_pred = self.compute_mu_from_epsilon(epsilon_pred, x, t)
          c_pred = self.discriminator_model(mu_pred)
          loss_diss = loss_crs_ent(c, c_pred)
          loss_diss = tf.cast(loss_diss, dtype=loss_simple.dtype)
          loss = tf.add(loss_simple, 0.01*loss_diss)

        gradients_model = tape.gradient(loss, self.model.trainable_variables)
        gradients_discriminator_model = tape.gradient(loss, self.discriminator_model.trainable_variables)

        optimizer.apply_gradients(zip(gradients_model, self.model.trainable_variables))
        optimizer.apply_gradients(zip(gradients_discriminator_model, self.discriminator_model.trainable_variables))

        del tape  # Dispose of the persistent tape

        return loss

    for epoch in range(epochs):
      print("Epoch:", epoch + 1)
      for batch_start in range(0, x.shape[0], batch_size):
        batch_end = batch_start + batch_size

        x0_batch = x[batch_start:batch_end, :]

        t_batch = np.random.randint(low=1, high=self.steps, size=(x0_batch.shape[0],))

        c_batch = c[batch_start:batch_end]

        #if np.random.uniform(0, 1) < 0.2:
        #    c_batch = -np.ones(c_batch.shape)

        epsilon_batch, noisy_x_batch = self.forward(x0_batch, t_batch)

        loss = train_step(noisy_x_batch, t_batch, c_batch, epsilon_batch)

        if batch_start % 10 == 0:
            print("Batch:", batch_start, "Loss:", loss.numpy())

      if (epoch + 1) % 50 == 0:
        #checkpoint_callback.on_epoch_end(epoch, logs={'loss': float(loss)})
        self.model.save_weights(os.path.join(checkpoint_dir, 'model_checkpoint.h50'))
        self.discriminator_model.save_weights(os.path.join(checkpoint_dir, 'discriminator_model_checkpoint.h50'))


      loss_.append(loss.numpy())

    return np.asarray(loss_)

  def positional_encoding(self, seq_length, embedding_dim):
    position = np.arange(seq_length)[:, np.newaxis]
    div_term = np.exp(np.arange(0, embedding_dim, 2) * -(np.log(10000.0) / embedding_dim))
    pos_enc = np.zeros((seq_length, embedding_dim))
    pos_enc[:, 0::2] = np.sin(position * div_term)
    pos_enc[:, 1::2] = np.cos(position * div_term)
    pos_enc = pos_enc[np.newaxis, ...]
    return tf.constant(pos_enc, dtype=tf.float32)

  def sample(self, c = 2, w = 3, plot = 0):
    im_list = []
    xt = np.random.randn(self.input_size, self.input_size, 1)
    xt = xt.reshape(-1, self.input_size, self.input_size, 1)
    xt = tf.convert_to_tensor(xt)
    im_list.append(np.squeeze(xt))
    if plot == 1:
      plt.figure(figsize=(10,8))
    for t in range(self.steps-1, -1, -1):
      if t>0:
        z = np.random.randn(self.input_size, self.input_size, 1)
        z = z.reshape(-1, self.input_size, self.input_size, 1)
      else:
        z = 0
      t_m = tf.convert_to_tensor(np.reshape(t, (1,)))
      pred = self.model.predict([xt, t_m, tf.convert_to_tensor(np.reshape(c, (1,)))], verbose = 0)
      if w > 0:
        pred_ng = self.model.predict([xt, t_m, tf.convert_to_tensor(np.reshape(-1, (1,)))], verbose = 0)
        pred = (1+w)*pred - w*pred_ng
      xt = 1/np.sqrt(self.alpha[t]) * (xt-((1-self.alpha[t])/np.sqrt(1-self.alpha_hat[t]))*pred) + np.sqrt(self.beta[t]) * z
      if plot == 1:
        if np.mod(t,100) == 0:
          plt.subplot(1,10,t//100 + 1)
          plt.imshow(np.squeeze(xt))
          plt.axis("off")
      im_list.append(np.squeeze(xt))
    if plot == 1:
      plt.show()
    return xt, np.asarray(im_list)

  def sample_batch(self, w = 3, size=20):
    xt = np.random.randn(self.input_size, self.input_size, size)
    xt = xt.reshape(-1, self.input_size, self.input_size, 1)
    xt = tf.convert_to_tensor(xt)
    c = np.random.randint(0, 10, size)
    for t in range(self.steps-1, -1, -1):
      if t>0:
        z = np.random.randn(self.input_size, self.input_size, size)
        z = z.reshape(-1, self.input_size, self.input_size, 1)
      else:
        z = 0
      t_m = tf.convert_to_tensor(np.reshape(np.repeat(t, size), (size,)))
      pred = self.model.predict([xt, t_m, tf.convert_to_tensor(np.reshape(c, (size,)))], verbose = 0)
      if w > 0:
        pred_ng = self.model.predict([xt, t_m, tf.convert_to_tensor(np.reshape(np.repeat(-1, size), (size,)))], verbose = 0)
        pred = (1+w)*pred - w*pred_ng
      xt = 1/np.sqrt(self.alpha[t]) * (xt-((1-self.alpha[t])/np.sqrt(1-self.alpha_hat[t]))*pred) + np.sqrt(self.beta[t]) * z
    return xt, c

  def ddim_sample(self, steps, w = 5, size = 20, ddim_eta = 1, plot = 0):

    c = np.random.randint(0, self.num_classes, size)
    tao = np.asarray(list(range(0, self.steps, self.steps//steps))) + 1

    ddim_alpha = self.alpha_hat[tao]
    ddim_alpha_sqrt = np.sqrt(ddim_alpha)
    ddim_alpha_prev = np.concatenate([self.alpha_hat[0:1], self.alpha_hat[tao[:-1]]])
    ddim_sigma = (ddim_eta*((1-ddim_alpha_prev)/(1 -ddim_alpha)*(1-ddim_alpha/ddim_alpha_prev))**.5)
    ddim_sqrt_one_minus_alpha = (1. - ddim_alpha) ** .5

    xt = np.random.randn(self.input_size[0], self.input_size[1], size)
    xt = xt.reshape(-1, self.input_size[0], self.input_size[1])
    xt = tf.convert_to_tensor(xt)

    for i in range(steps-1, -1, -1):
      t_m = tf.convert_to_tensor(np.reshape(np.repeat(tao[i], size), (size,)))
      eps = self.model.predict([xt, t_m, tf.convert_to_tensor(np.reshape(c, (size,)))], verbose = 0)
      if w > 0:
        eps_ng = self.model.predict([xt, t_m, tf.convert_to_tensor(np.reshape(np.repeat(-1, size), (size,)))], verbose = 0)
        eps = (1+w)*eps - w*eps_ng
      noise = np.random.randn(self.input_size[0], self.input_size[1], size)
      noise = noise.reshape(-1, self.input_size[0], self.input_size[1])
      xt = (ddim_alpha_prev[i] ** 0.5) * (xt - ddim_sqrt_one_minus_alpha[i] * eps) / (ddim_alpha[i] ** 0.5) + ((1 - ddim_alpha_prev[i] - ddim_sigma[i] **2 ) ** 0.5) * eps + ddim_sigma[i] * noise

    return xt, c

In [11]:
diffusion = Diffusion(input_size = [30, 45])

(None, 30, 45)
(None, 2)


In [8]:
diffusion.model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 30, 45)]             0         []                            
                                                                                                  
 dense_12 (Dense)            (None, 30, 64)               2944      ['input_1[0][0]']             
                                                                                                  
 activation_12 (Activation)  (None, 30, 64)               0         ['dense_12[0][0]']            
                                                                                                  
 dense_13 (Dense)            (None, 30, 64)               4160      ['activation_12[0][0]']       
                                                                                              

In [None]:
losses = diffusion.train(X_train, y_train, 400)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Batch: 960 Loss: 0.0640061012469414
Batch: 1280 Loss: 0.04436071938021287
Batch: 1600 Loss: 0.0643644503673419
Batch: 1920 Loss: 0.040945458125744495
Batch: 2240 Loss: 0.05056986245296275
Batch: 2560 Loss: 0.03665716370860031
Batch: 2880 Loss: 0.06523302034973896
Batch: 3200 Loss: 0.03498392859295973
Batch: 3520 Loss: 0.05670480821673103
Batch: 3840 Loss: 0.04371689000015907
Batch: 4160 Loss: 0.052864115054337274
Batch: 4480 Loss: 0.04154680887317953
Batch: 4800 Loss: 0.0472204736660971
Batch: 5120 Loss: 0.043762879711617436
Batch: 5440 Loss: 0.05118525604211382
Batch: 5760 Loss: 0.03773885077364563
Batch: 6080 Loss: 0.0528619957806739
Batch: 6400 Loss: 0.03681486774253607
Batch: 6720 Loss: 0.036127410172334146
Batch: 7040 Loss: 0.0653271540847289
Batch: 7360 Loss: 0.05532983949611085
Batch: 7680 Loss: 0.045047529175282215
Batch: 8000 Loss: 0.06758853154822217
Batch: 8320 Loss: 0.06328429481573644
Batch: 8640 Loss: 0.0679

In [None]:
plt.plot(losses)

NameError: ignored

In [13]:
diffusion.load_net(checkpoint_name_model = "model_checkpoint.h50", checkpoint_name_discriminator_model = "discriminator_model_checkpoint.h50")

In [14]:
all_samples, all_labels = diffusion.ddim_sample(200, w=0.7, size=64)

for i in range(20):
  samples, labels = diffusion.ddim_sample(200, w=0.7, size=64)
  all_samples = np.concatenate((all_samples, samples), axis=0)
  all_labels = np.concatenate((all_labels, labels), axis=0)

all_samples = np.asarray(all_samples)

In [15]:
print(all_samples.shape)
print(all_labels.shape)
print(X_test.shape)
print(y_test.shape)

(1344, 30, 45)
(1344,)
(1215, 30, 45)
(1215,)


In [29]:
np.save('all_samples.npy', all_samples)
np.save('all_labels.npy', all_labels)

In [None]:
all_samples = np.load('all_samples.npy')
all_labels = np.load('all_labels.npy')
window_len = 30

In [None]:
connections = [
    (0, 1),   # Connect pelvis to right hip
    (1, 2),   # Connect right hip to right knee
    (2, 3),   # Connect right knee to right ankle
    (0, 4),   # Connect pelvis to left hip
    (4, 5),   # Connect left hip to left knee
    (5, 6),   # Connect left knee to left ankle
    (0, 7),  # Connect pelvis to neck
    (7, 8), # Connect neck to head
    (7, 9),   # Connect neck to right shoulder
    (9, 10),   # Connect right shoulder to right elbow
    (10, 11),  # Connect right elbow to right hand
    (7, 12),   # Connect neck to left shoulder
    (12, 13),  # Connect left shoulder to left elbow
    (13, 14)    # Connect left elbow to left hand
]


def init():
    ax.cla()
    ax.set_xlim(-2, 2)  # Adjust the limits according to your data
    ax.set_ylim(-2, 2)  # Adjust the limits according to your data
    ax.set_zlim(-2, 2)  # Adjust the limits according to your data
    return fig,

def update(frame):
    ax.cla()
    ax.set_xlim(-2, 2)  # Adjust the limits according to your data
    ax.set_ylim(-2, 2)  # Adjust the limits according to your data
    ax.set_zlim(-2, 2)  # Adjust the limits according to your data
    for connection in connections:
        x = two_sec_window[frame, [connection[0], connection[1], connection[1]]][:, 0]
        y = two_sec_window[frame, [connection[0], connection[1], connection[1]]][:, 1]
        z = two_sec_window[frame, [connection[0], connection[1], connection[1]]][:, 2]

        ax.plot(x, z, y, color='b')
    return fig,

i = np.random.randint(len(all_labels))

#two_sec_window = X_train[i,  :]
print(all_labels[i])
two_sec_window = all_samples[i, :]
two_sec_window = two_sec_window.reshape((1, window_len*15*3))
two_sec_window = scaler.inverse_transform(two_sec_window)
two_sec_window = two_sec_window.reshape((window_len ,15, 3))

fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

# Define the desired frames per second
fps = 15

# Calculate the interval in milliseconds based on the desired fps
interval = 1000 / fps

ani = FuncAnimation(fig, update, frames=two_sec_window.shape[0], init_func=init, blit=True, interval=interval)

# Save the animation as an .mp4 file
#ani.save('skeleton_animation_' + str(i) + '_' + str(all_labels[i]) +'.gif', writer='ffmpeg')

HTML(ani.to_jshtml())


0


In [None]:
ani.save('skeleton_animation_' + str(i) + '_' + str(all_labels[i]) +'.gif', writer='ffmpeg')

In [None]:
diffusion.train_discriminator(all_samples, all_labels, 50)
#diffusion.train_discriminator(X_test, y_test, 50)

In [27]:
# Compile the model
diffusion.discriminator_model.compile(
    optimizer='adam',  # Or any other optimizer
    loss='sparse_categorical_crossentropy',  # Or 'categorical_crossentropy', etc., depending on your label format
    metrics=['accuracy']  # Optional, but useful for evaluation
)

evaluation = diffusion.discriminator_model.evaluate(X_test, y_test, batch_size=64, verbose=1)
print("Test Loss:", evaluation[0])
print("Test Accuracy:", evaluation[1])

evaluation = diffusion.discriminator_model.evaluate(all_samples, all_labels, batch_size=64, verbose=1)
print("Test Loss:", evaluation[0])
print("Test Accuracy:", evaluation[1])

Test Loss: 3.498963678794098e-06
Test Accuracy: 1.0
Test Loss: 0.6869333386421204
Test Accuracy: 0.8891369104385376


In [18]:
all_samples_knn_input = all_samples.reshape(all_samples.shape[0], all_samples.shape[1]*all_samples.shape[2])
X_test_knn_input = X_test.reshape(X_test.shape[0], X_test.shape[1]*X_test.shape[2])

print(all_samples_knn_input.shape)
print(all_labels.shape)

print(X_test_knn_input.shape)
print(y_test.shape)

(1344, 1350)
(1344,)
(1215, 1350)
(1215,)


In [28]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

k = 5
knn_classifier = KNeighborsClassifier(n_neighbors=k)

knn_classifier.fit(all_samples_knn_input, all_labels)

y_pred = knn_classifier.predict(X_test_knn_input)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy * 100:.2f}%")

y_pred = knn_classifier.predict(all_samples_knn_input)
accuracy = accuracy_score(all_labels, y_pred)
print(f"Accuracy: {accuracy * 100:.2f}%")

Accuracy: 89.47%
Accuracy: 97.17%


In [None]:
!rm -r /content/drive/MyDrive/checkpoints