In [None]:
%matplotlib inline

import numpy as np
from argparse import Namespace
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
import plotly.colors as pc

from sklearn.preprocessing import MinMaxScaler
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler

from sklearn.decomposition import PCA
import phate

# Prepared embedded data

In [None]:
"""load saved trajectories data for npz file
"""
SEQ = "PT4"
# SEQ = "PT4_hairpin"

# laod pre-training data
fnpz_data = "./data/pretraining/pretraining_{}.npz".format(SEQ)
data_npz = np.load(fnpz_data)

# asssign data to variables
for var in data_npz.files:
     locals()[var] = data_npz[var]
     print(var, locals()[var].shape)

In [None]:
X_j = np.array(np.load(f'./data/graph/{SEQ}/shortestpath_knn=100.npz',allow_pickle=True)["X_j"], dtype=int)
D_ij = np.array(np.load(f'./data/graph/{SEQ}/shortestpath_knn=100.npz',allow_pickle=True)["D_ij"], dtype=float)
print("X_j", X_j.shape)
print("D_ij", D_ij.shape)

In [None]:
# calculate the probability of being visited during a simulated trajectory 
# from the initial state
split_id = trj_id + 1 # index for split to each trajectory
P_tot = np.zeros(len(SIMS_dict_uniq))

for i in range(len(split_id)):
    if i == 0:
        trj = set(SIMS_dict[0:split_id[i],4].astype(int))
    else:
        trj = set(SIMS_dict[split_id[i-1]:split_id[i],4].astype(int))

    P_tot[list(trj)] += 1

P_tot = P_tot / 100

print("P_tot", P_tot.shape, P_tot.max(), P_tot.min())

In [None]:
%%script false --no-raise-error

# load embedding WITHOUT vida plot data
# SEQ = "PT0"
fnpz_noViDa = f"./data/vida_data/noViDa-noEnergy/{SEQ}_noViDa.npz"
data_noViDa = np.load(fnpz_noViDa,allow_pickle=True)
for var in data_noViDa.files:
    globals()[var] = data_noViDa[var]
    print(var, globals()[var].shape)

In [None]:
# load ViDa embedding plot data

fnpz_data_embed = f"./data/vida_data/{SEQ}.npz"
# fnpz_data_embed = f"./data/vida_data/{SEQ}_usePT4_03040216.npz"
data_npz_embed = np.load(fnpz_data_embed,allow_pickle=True)

# asssign data to variables
for var in data_npz_embed.files:
    globals()[var] = data_npz_embed[var]
    print(var, globals()[var].shape)
print("data_embed:", data_embed.max(), data_embed.min(), data_embed.mean(), data_embed.std())

In [None]:
%%script false --no-raise-error

# load GSAE embedding plot data

fnpz_data_embed = f"./data/gsae_data/helix_assoc_{SEQ}.npz"

data_npz_embed = np.load(fnpz_data_embed,allow_pickle=True)
# asssign data to variables
for var in data_npz_embed.files:
    globals()[var] = data_npz_embed[var]
    print(var, globals()[var].shape)
print("data_embed", data_embed.max(), data_embed.min(), data_embed.mean(), data_embed.std())

## Evaluate embedding

### Smoothness

In [None]:
def get_trajec_embed(trj_id, embedding):
    trajec_embed = []
    TRJ_ID = trj_id+1
    
    for i in range(len(TRJ_ID)):
        if i == 0:
            s = 0
            s_prime = TRJ_ID[i]
        elif i == len(trj_id):
            s = TRJ_ID[i-1]
            s_prime = len(embedding)
        else:
            s = TRJ_ID[i-1]
            s_prime = TRJ_ID[i]
        
        trajec_embed.append(embedding[s:s_prime][:,:2])
    
    return trajec_embed

In [None]:
def calculate_1trj_distance(arr):
    total_distance = 0
    
    for i in range(1, len(arr)):
        total_distance += np.sqrt(np.sum((arr[i] - arr[i-1])**2))
        
    return total_distance

In [None]:
# calculate the expected total distance over all trajectories
def calculate_avg_total_distance(embedding, trj_id):
    scaler = MinMaxScaler()
    std_embedding = scaler.fit_transform(embedding) 
    
    trajec_embed = get_trajec_embed(trj_id, std_embedding)
    total_distance = 0
    
    for i in range(len(trajec_embed)):
        total_distance += calculate_1trj_distance(trajec_embed[i])        
        
    return total_distance / (trj_id[-1]+1)

In [None]:
print(f"ViDa PCA: {calculate_avg_total_distance(pca_all_coords, trj_id):.3f}")
print(f"ViDa PHATE: {calculate_avg_total_distance(phate_all_coords, trj_id):.3f}")

In [None]:
print(f"GSAE PCA: {calculate_avg_total_distance(pca_all_coords, trj_id):.3f}")
print(f"GSAE PHATE: {calculate_avg_total_distance(phate_all_coords, trj_id):.3f}")

In [None]:
print(f"Direct PCA: {calculate_avg_total_distance(pca_all_coords_direct, trj_id):.3f}")
print(f"Direct PHATE: {calculate_avg_total_distance(phate_all_coords_direct, trj_id):.3f}")
# print(f"Direct MDS: {calculate_avg_total_distance(mds_all_coords, trj_id):.3f}")

### PDF

In [None]:
def calculate_step_length(trj_id, embedding):
    trajec_embed = get_trajec_embed(trj_id, embedding)
    step_length = []
    
    for  i in range(len(trajec_embed)):
        arr = trajec_embed[i]
        for j in range(1, len(arr)):
            step_length.append(np.sqrt(np.sum((arr[j] - arr[j-1])**2)))
    return np.array(step_length)
            

In [None]:
# def make_pdf(steps, num_bins):
#     # Sort the data
#     data = np.sort(steps)
    
#     # Group the data into the bins   
#     counts, bin_edges = np.histogram(data, bins=num_bins)
    
#     # Calculate the density of data points in each bin
#     density = counts / sum(counts)

#     # Plot the PDF
#     bin_widths = np.diff(bin_edges)
#     bin_centers = bin_edges[:-1] + bin_widths / 2
#     bin_centers = bin_centers / bin_centers.max() * 100

#     plt.plot(bin_centers, density)
#     plt.xticks(np.arange(0, 101, 10))
#     plt.xlabel('Step length percentage (%)')
#     plt.ylabel('Density')
#     plt.title('PDF of step length for embedding')
#     return plt.show()


In [None]:
def make_pdf(steps, num_bins):
    # Normalize the data
    scaler = MinMaxScaler()
    norm_steps = scaler.fit_transform(steps.reshape(-1,1))
    
    # Sort the data
    data = np.sort(norm_steps)
    
    # Group the data into the bins   
    make_pdf.counts, make_pdf.bin_edges = np.histogram(data, bins=num_bins)
    
    # Calculate the density of data points in each bin
    make_pdf.density = make_pdf.counts / sum(make_pdf.counts)

    # Plot the PDF
    bin_widths = np.diff(make_pdf.bin_edges)
    make_pdf.bin_centers = make_pdf.bin_edges[:-1] + bin_widths / 2
    make_pdf.bin_centers = make_pdf.bin_centers / make_pdf.bin_centers.max() * 100

    plt.plot(make_pdf.bin_centers, make_pdf.density)
    plt.xticks(np.arange(0, 101, 10))
    plt.xlabel('Step length percentage (%)')
    plt.ylabel('Density')
    plt.title('PDF of step length for embedding')
    return plt.show()


In [None]:
steps = calculate_step_length(trj_id, phate_all_coords)
# steps = calculate_step_length(trj_id, phate_all_coords_direct)

make_pdf(steps, num_bins=10)

In [None]:
make_pdf.bin_edges, make_pdf.counts

In [None]:
gsae_phate = make_pdf.counts
vida_pca, vida_phate, direct_pca, direct_phate, gsae_pca, gsae_phate

In [None]:
vida_pca, vida_phate, direct_pca, direct_phate, gsae_pca, gsae_phate

In [None]:
bin_widths = np.diff(make_pdf.bin_edges)
bin_centers = make_pdf.bin_edges[:-1] + bin_widths / 2
bin_centers = make_pdf.bin_centers / make_pdf.bin_centers.max() * 100
bin_centers

In [None]:
# save for plotting
fnpz_data = f"data/vida_data/pdf_plot_{SEQ}.npz"
with open(fnpz_data, 'wb') as f:
    np.savez(f,
            bin_centers=bin_centers,
            vida_pca=vida_pca,
            vida_phate=vida_phate,
            gsae_pca=gsae_pca,
            gsae_phate=gsae_phate,
            direct_pca=direct_pca,
            direct_phate=direct_phate,
            # direct_mds=direct_mds,
            )

In [None]:
## Make a whole plot

# Create a figure and axis object
fig, ax = plt.subplots(figsize =(15, 8))

# Plot the lines
ax.plot(bin_centers, vida_pca/sum(vida_pca), label='ViDa PCA')
ax.plot(bin_centers, vida_phate/sum(vida_phate), label='ViDa PHATE')
ax.plot(bin_centers, gsae_pca/sum(gsae_pca), label='GSAE PCA')
ax.plot(bin_centers, gsae_phate/sum(gsae_phate), label='GSAE PHATE')
ax.plot(bin_centers, direct_pca/sum(direct_pca), label='Direct PCA')
ax.plot(bin_centers, direct_phate/sum(direct_phate), label='Direct PHATE')
# ax.plot(bin_centers, direct_mds/sum(direct_mds), label='Direct MDS')

# Set the legend
ax.legend()

plt.xticks(np.arange(0, 101, 10))
plt.xlabel('Step length percentage (%)')
plt.ylabel('Density')
plt.title(f'PDF of step length for {SEQ}')

plt.show()

In [None]:
# steps = calculate_step_length(trj_id, pca_all_coords_direct)
steps = calculate_step_length(trj_id, pca_all_coords)

make_pdf(steps, num_bins=1000)

### RMSD

In [None]:
def rmsd(X_j, D_ij, P_tot, z):
    """
    Metric to calculate the distance 
    """
    z_re = z.reshape(-1,1,z.shape[-1])
    zj = z[X_j]
    global l2_zizj
    l2_zizj = np.sqrt(np.sum((z_re-zj)**2, axis=-1))
    
    # # normalize the distance
    # scaler = MinMaxScaler(feature_range=(0,1)) 
    # l2_zizj = scaler.fit_transform(l2_zizj)
    # D_ij = scaler.fit_transform(D_ij)
    
    dist_diff = (l2_zizj - D_ij)**2
    root = 
    wij = (P_tot.reshape(-1,1) * P_tot[X_j])
    # dist_loss = np.sum(wij * dist_diff)
    dist_loss = ((wij * dist_diff) * P_tot.reshape(-1,1) / P_tot.sum()).sum()
    
    
    return dist_loss

### Metric distance

In [None]:
def metric_dist(X_j, D_ij, P_tot, z):
    """
    Metric to calculate the distance 
    """
    z_re = z.reshape(-1,1,z.shape[-1])
    zj = z[X_j]
    global l2_zizj
    l2_zizj = np.sqrt(np.sum((z_re-zj)**2, axis=-1))
    
    # # normalize the distance
    # scaler = MinMaxScaler(feature_range=(0,1)) 
    # l2_zizj = scaler.fit_transform(l2_zizj)
    # D_ij = scaler.fit_transform(D_ij)
    
    dist_diff = (l2_zizj - D_ij)**2
    wij = (P_tot.reshape(-1,1) * P_tot[X_j])
    # dist_loss = np.sum(wij * dist_diff)
    dist_loss = ((wij * dist_diff) * P_tot.reshape(-1,1) / P_tot.sum()).sum()
    
    
    return dist_loss

In [None]:
log_Dij = np.log(D_ij)+np.abs(np.log(D_ij).min())
print("log_Dij", log_Dij.shape, log_Dij.max(), log_Dij.min())

pca_dist = metric_dist(X_j, log_Dij, P_tot, pca_coords[:,:2])
phate_dist = metric_dist(X_j, log_Dij, P_tot, phate_coords)
print (f'ViDa PCA distance loss: {pca_dist:.4f}')
print (f'ViDa PHATE distance loss: {phate_dist:.4f}')

In [None]:
log_Dij = np.log(D_ij)+np.abs(np.log(D_ij).min())

pca_dist = metric_dist(X_j, log_Dij, P_tot, pca_coords_direct[:,:2])
phate_dist = metric_dist(X_j, log_Dij, P_tot, phate_coords_direct)
print (f'Direct PCA distance loss: {pca_dist:.4f}')
print (f'Direct PHATE distance loss: {phate_dist:.4f}')

In [None]:
log_Dij = np.log(D_ij)+np.abs(np.log(D_ij).min())

pca_dist = metric_dist(X_j, log_Dij, P_tot, pca_coords[:,:2])
phate_dist = metric_dist(X_j, log_Dij, P_tot, pca_coords)
print (f'GSAE PCA distance loss: {pca_dist:.4f}')
print (f'GSAE Direct PHATE distance loss: {phate_dist:.4f}')

### Neighboring preservation rate

In [None]:
from sklearn.neighbors import NearestNeighbors

def neighboring_preservation_rate(X, X_j, P_tot, k):
    """
    Metric to calculate the neighboring preservation rate 
    """
    # Compute the k-nearest neighbors for both X and Y.
    nn_X = NearestNeighbors(n_neighbors=k+1).fit(X) # k+1 because we don't want to include the point itself
    indices_X = nn_X.kneighbors(X,return_distance=False)[:,1:] # exclude the point itself
    
    # compute the rate of each point
    rate_list = []
    for i in range(len(indices_X)):
        count = len(np.intersect1d(indices_X[i], X_j[i,:k]))
        rate_i = count/k
        rate_list.append(rate_i)
    
    # Compute the overall neighbsoring preservation rate
    
    # rate = np.mean(rate_list) # average
    rate = (rate_list * P_tot / P_tot.sum()).sum() # weighted average
    
    return rate

In [None]:
# ViDa embedding
knn = 10000
print("ViDa PCA rate: {:.4f}".format(neighboring_preservation_rate(pca_coords[:,:2], X_j_all, P_tot, k=knn)))
print("ViDa PHATE rate: {:.4f}".format(neighboring_preservation_rate(phate_coords, X_j_all, P_tot, k=knn)))

In [None]:
# Pretrained ViDa embedding
knn = 10000
print("Pretrained ViDa PCA rate: {:.4f}".format(neighboring_preservation_rate(pca_coords[:,:2], X_j_all, P_tot, k=knn)))
print("Pretrained ViDa PHATE rate: {:.4f}".format(neighboring_preservation_rate(phate_coords, X_j_all, P_tot, k=knn)))

In [None]:
# GSAE embedding
knn = 10000
print("GSAE PCA rate: {:.4f}".format(neighboring_preservation_rate(pca_coords[:,:2], X_j_all, P_tot, k=knn)))
print("GSAE PHATE rate: {:.4f}".format(neighboring_preservation_rate(phate_coords, X_j_all, P_tot, k=knn)))

In [None]:
# Direct embedding
knn = 100
print("Direct PCA rate: {:.4f}".format(neighboring_preservation_rate(pca_coords_direct[:,:2], X_j, P_tot, k=knn)))
print("Direct PHATE rate: {:.4f}".format(neighboring_preservation_rate(phate_coords_direct, X_j, P_tot, k=knn)))

In [None]:
# D_ij_all = []
# X_j_all = []

# for i in range(len(SIMS_HT_uniq)):
#     dij = np.array(list(np.load(f'./data/graph/{SEQ}/allpath_{SEQ}/path_{i}.npy',allow_pickle=True)[0].values()), dtype=float)
#     xj = np.array(list(np.load(f'./data/graph/{SEQ}/allpath_{SEQ}/path_{i}.npy',allow_pickle=True)[0].keys()), dtype=int)
    
#     D_ij_all.append(dij)
#     X_j_all.append(xj)

# D_ij_all = np.stack(D_ij_all)
# X_j_all = np.stack(X_j_all)

# # save npz file for shortest path
# with open(f'./data/graph/{SEQ}_allpath.npz', 'wb') as f:
#     np.savez(f,
#              X_j_all = np.stack(X_j_all),
#              D_ij_all = np.stack(D_ij_all),
#          )

### PCA explained variance

In [None]:
cm = PCA(n_components=25)
cm.fit(data_embed)

PC_values = np.arange(cm.n_components_) + 1
plt.plot(PC_values, np.cumsum(cm.explained_variance_ratio_), 'ro-', linewidth=2)
plt.title('Scree Plot: PCA')
plt.xlabel('Number of principal components')
plt.ylabel('Cumulative explained variance');
# plt.xticks(np.arange(0, data_embed.shape[-1]+1, 1))
plt.show()

print(np.cumsum(cm.explained_variance_ratio_))

|### MDS for distance matri

In [None]:
# PT4_hairpin all path
X_j_all = np.array(np.load(f'./data/graph/{SEQ}_allpath.npz',allow_pickle=True)["X_j_all"])
D_ij_all = np.array(np.load(f'./data/graph/{SEQ}_allpath.npz',allow_pickle=True)["D_ij_all"])
print("X_j_all", X_j_all.shape)
print("D_ij_all", D_ij_all.shape)

In [None]:
# make precomputed distance matrix for MDS with 
MDS_dist = np.ones((D_ij_all.shape[0],D_ij_all.shape[0]))
for i in range(len(D_ij_all)):
    MDS_dist[i,X_j_all[i]] = D_ij_all[i]

In [None]:
MDS_dist.shape

In [None]:
# not necessary if the distance matrix includes all the neighbors
def makeSymmetric(mat):
    # Loop to traverse lower triangular
    # elements of the given matrix
    for i in range(0, len(mat)):
        for j in range(0, len(mat)):
            if (j < i):
                mat[i][j] = mat[j][i] = min(mat[i][j], mat[j][i])
    return mat

In [None]:
MDS_dist_symm = makeSymmetric(MDS_dist)
MDS_dist_symm.shape

In [None]:
# %%script false --no-raise-error

from sklearn.manifold import MDS
mds = MDS(n_components=2, dissimilarity='precomputed')
mds_coords = mds.fit_transform(MDS_dist)

In [None]:
# # save for python
# pca_all_coords_direct = pca_coords_direct[coord_id_S]  
# phate_all_coords_direct = phate_coords_direct[coord_id_S]
# mds_all_coords = mds_coords[coord_id_S]

# fnpz_data = f"data/vida_data/noViDa-noEnergy/{SEQ}_noViDa.npz"
# with open(fnpz_data, 'wb') as f:
#     np.savez(f,
#             pca_coords_direct=pca_coords_direct, pca_all_coords_direct=pca_all_coords_direct,
#             phate_coords_direct=phate_coords_direct, phate_all_coords_direct=phate_all_coords_direct,
#             mds_coords=mds_coords, mds_all_coords=mds_all_coords,
#             )

## Visualize

In [None]:
SEQ

### PCA Vis

In [None]:
%matplotlib inline
X = pca_coords[:,0]
Y = pca_coords[:,1]
Z = pca_coords[:,2]

# PCA: 2 components
fig,ax = plt.subplots(figsize=(8,6))
im = ax.scatter(X, Y, 
          c=SIMS_G_uniq, 
          cmap='plasma',
        )

plt.colorbar(im)

annotations=["I","F"]
x = [X[0],X[int(SIMS_dict[-1,-1])]]
y = [Y[0],Y[int(SIMS_dict[-1,-1])]]
plt.scatter(x,y,s=150, c="green", alpha=1)
for i, label in enumerate(annotations):
    plt.annotate(label, (x[i],y[i]),fontsize=20,c="black")

#### Try use PCA directly without AE

In [None]:
# %%script false --no-raise-error

pca_coords_direct = PCA(n_components=3).fit_transform(SIMS_scar_uniq)   # multiple trj

X = pca_coords_direct[:,0]
Y = pca_coords_direct[:,1]
Z = pca_coords_direct[:,2]

# PCA: 2 components
fig,ax = plt.subplots(figsize=(8,6))
im = ax.scatter(X, Y, 
          c=SIMS_G_uniq, 
          cmap='plasma',
        )

plt.colorbar(im)

annotations=["I","F"]
x = [X[0],X[-1]]
y = [Y[0],Y[-1]]
plt.scatter(x,y,s=150, c="green", alpha=1)
for i, label in enumerate(annotations):
    plt.annotate(label, (x[i]-0.3,y[i]-0.3),fontsize=20,c="black")

In [None]:
cm = PCA(n_components=25)
cm.fit(SIMS_scar_uniq)

PC_values = np.arange(cm.n_components_) + 1
plt.plot(PC_values, np.cumsum(cm.explained_variance_ratio_), 'ro-', linewidth=2)
plt.title('Scree Plot: PCA')
plt.xlabel('Number of principal components')
plt.ylabel('Cumulative explained variance');
# plt.xticks(np.arange(0, data_embed.shape[-1]+1, 1))
plt.show()

print(np.cumsum(cm.explained_variance_ratio_))

### PHATE Vis

In [None]:
X_phate = phate_coords[:,0]
Y_phate = phate_coords[:,1]

fig,ax = plt.subplots(figsize=(8,6))
im = ax.scatter(X_phate,Y_phate,
                c=SIMS_G_uniq,            
                cmap='plasma',
               )

plt.colorbar(im)

annotations=["I","F"]
x = [X_phate[0],X_phate[int(SIMS_dict[-1,-1])]]
y = [Y_phate[0],Y_phate[int(SIMS_dict[-1,-1])]]
plt.scatter(x,y,s=50, c="green", alpha=1)
for i, label in enumerate(annotations):
    plt.annotate(label, (x[i],y[i]),fontsize=20,c="black")

#### PHATE without AE

In [None]:
# %%script false --no-raise-error

phate_operator = phate.PHATE(n_jobs=-2)
phate_coords_direct = phate_operator.fit_transform(SIMS_scar_uniq)

fig,ax = plt.subplots(figsize=(8,6))
im = ax.scatter(phate_coords_direct[:,0],
          phate_coords_direct[:,1],
          c=SIMS_G_uniq, 
          cmap='plasma',
        )

plt.colorbar(im)

annotations=["I","F"]
x = [phate_coords_direct[:,0][0],phate_coords_direct[:,0][-1]]
y = [phate_coords_direct[:,1][0],phate_coords_direct[:,1][-1]]
plt.scatter(x,y,s=50, c="green", alpha=1)
for i, label in enumerate(annotations):
    plt.annotate(label, (x[i],y[i]),fontsize=20,c="black")

### MDS with distance matrix

In [None]:
X = mds_coords[:,0]
Y = mds_coords[:,1]
cmap = plt.cm.plasma
cmap_r = plt.cm.get_cmap('plasma_r')

fig,ax = plt.subplots(figsize=(8,6))
im = ax.scatter(X, Y, 
          c = SIMS_G_uniq,
          cmap=cmap,
          s=10
        )
 
plt.colorbar(im)

annotations=["I","F"]
x = [X[0],X[int(SIMS_dict[-1,-1])]]
y = [Y[0],Y[int(SIMS_dict[-1,-1])]]
plt.scatter(x,y,s=150, c="green", alpha=1)
for i, label in enumerate(annotations):
    plt.annotate(label, (x[i],y[i]),fontsize=20,c="black")

## Feature Engineering to Find Kinetic Traps

In [None]:
initial_idx = 0
# initial_idx = np.where(SIMS_dict_uniq[:,0] == '.........................+.........................')[0][0]
final_idx = np.where(SIMS_dict_uniq[:,0] == '(((((((((((((((((((((((((+)))))))))))))))))))))))))')[0][0]
print(initial_idx, final_idx)

In [None]:
## combine all features into one array
pos_data = data_embed
energy_data = SIMS_G_uniq
time_data = SIMS_HT_uniq
frq_data = P_tot

# # normalize different features to the same scale
scaler = MinMaxScaler(feature_range=(0,1)) 

norm_pos_data = scaler.fit_transform(pos_data)
norm_energy_data = scaler.fit_transform(energy_data.reshape(-1,1))
norm_time_data = scaler.fit_transform(time_data.reshape(-1,1))

combined_data = np.concatenate((norm_pos_data, norm_energy_data, norm_time_data, frq_data.reshape(-1,1)), axis=1)

In [None]:
## filter out data points with low frequency

## eps = 0.07, min_samples = 4 for PT0 with filter=0.13
## eps = 0.14, min_samples = 4 for PT3 with filter=0.15
## eps = 0.18, min_samples = 4 for PT4 with filter=0.2
## eps = 0.08, min_samples = 4 for PT3_hairpin with filter=0.2
## eps = 0.15, min_samples = 4 for PT4_hairpin with filter=0.5

if SEQ == "PT0": 
    filter_threshold = 0.13
elif SEQ == "PT3":
    filter_threshold = 0.15
elif SEQ == "PT4":
    filter_threshold = 0.2
elif SEQ == "PT3_hairpin":
    filter_threshold = 0.2
elif SEQ == "PT4_hairpin":
    filter_threshold = 0.5

filter_idx = np.where(P_tot>=filter_threshold)[0]
filter_idx.shape

In [None]:
## do PCA for combined data

# comb_pca_coords = pca_coords[:,:2]
comb_pca_coords = PCA(n_components=2).fit_transform(combined_data)

# #######################
# import deeptime
# from deeptime.decomposition import TICA
# tica = TICA(lagtime=1,dim=2)
# comb_pca_coords = tica.fit_transform(combined_data)
# #######################

## filtered data
filter_comb_pca_coords = comb_pca_coords[filter_idx]
filter_comb_pca_coords.shape

### DBSCAN

In [None]:
## Elbow method to find eps for DBSCAN
from sklearn.neighbors import NearestNeighbors

n_neighbors = 4  # Number of neighbors to find
nbrs = NearestNeighbors(n_neighbors=n_neighbors).fit(filter_comb_pca_coords)
distances, indices = nbrs.kneighbors(filter_comb_pca_coords)
four_dist = np.sum(distances,axis=1)
sorted_four_dist = np.sort(four_dist)[::-1]

# Create a figure
fig = go.Figure()
# Add a line trace
fig.add_trace(go.Scatter(x=indices[:,0], y=sorted_four_dist, 
                         mode='lines', name='Line Plot'))
# Set labels and title
fig.update_layout(xaxis_title='points', yaxis_title='4-dist', title='Elbow')
# Show the plot
fig.show()


In [None]:
from sklearn.cluster import DBSCAN

## eps = 0.07, min_samples = 4 for PT0 with filter=0.13
## eps = 0.14, min_samples = 4 for PT3 with filter=0.15
## eps = 0.18, min_samples = 4 for PT4 with filter=0.2
## eps = 0.08, min_samples = 4 for PT3_hairpin with filter=0.2
## eps = 0.15, min_samples = 4 for PT4_hairpin with filter=0.5

if SEQ == 'PT0':
    eps = 0.07
elif SEQ == 'PT3':
    eps = 0.14
elif SEQ == 'PT4':
    eps = 0.18
elif SEQ == 'PT3_hairpin':
    eps = 0.08
elif SEQ == 'PT4_hairpin':
    eps = 0.15


X = filter_comb_pca_coords
clusters = DBSCAN(eps = eps, min_samples = 4).fit(X)
# get cluster labels
labels = clusters.labels_


# Number of clusters in labels, ignoring noise if present.
n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
n_noise_ = list(labels).count(-1)

print("Estimated number of clusters: %d" % n_clusters_)
print("Estimated number of noise points: %d" % n_noise_)

# # check unique clusters  
set(clusters.labels_)
# # -1 value represents noisy points could not assigned to any cluster

### Remove no trap clusters

In [None]:
real_labels = labels.copy()
for k_clust in np.unique(labels):
    min_index = np.argmin(SIMS_G_uniq[filter_idx][np.where(labels==k_clust)[0]])
    # print("For cluster {}:".format(k_clust))
    plausible_trap = SIMS_dict_uniq[filter_idx][np.where(labels==k_clust)[0]][min_index][0]
    if "("*10 in plausible_trap:
        real_labels = [-1 if x==k_clust else x for x in real_labels]
        print("Cluster {} is NOT a trap".format(k_clust))
    else:
        print("Cluster {} is a trap".format(k_clust))
        
real_labels = np.array(real_labels)

print("\nClusters with trap are: {}".format(np.unique(real_labels)))
    

### Visual to find traps

In [None]:
%%script false --no-raise-error

%matplotlib inline

X = filter_comb_pca_coords[:,0]
Y = filter_comb_pca_coords[:,1]

# color = SIMS_G_uniq[filter_idx]
color = labels
size = SIMS_HT_uniq[filter_idx]
text = SIMS_dict_uniq[:,0][filter_idx]
prob = P_tot[filter_idx]

color_palette = pc.qualitative.Plotly[:n_clusters_]

fig = go.Figure()
fig.add_trace(go.Scattergl(
    x = X,
    y = Y,
    mode = 'markers',
    marker=dict(
        color = color,
        # size = P_tot,
        size = size,
        # colorscale="Plasma",
        colorscale=color_palette,
        sizeref=3e-10,
        showscale=True,
    ),
    
    customdata = np.stack((SIMS_G_uniq[filter_idx],
                           size,
                           prob,
                           ),axis=-1),

    text = text,
    hovertemplate=
        "X: %{x}   " + "   Y: %{y} <br>"+
        "DP notation: <br> <b>%{text}</b><br>" +  
        "Energy:  %{customdata[0]:.3f} kcal/mol<br>"+
        "Average holding time:  %{customdata[1]:.5g} s<br>"+
        "Probability:  %{customdata[2]:.2g} <br>",
    name="states",
    
))

# label initial and final states
fig.add_trace(
    go.Scattergl(
        x = comb_pca_coords[:,0][[initial_idx, final_idx]],
        y = comb_pca_coords[:,1][[initial_idx, final_idx]],
        mode='markers+text',
        marker_color="lime", 
        marker_size=15,
        text=["I", "F"],
        textposition="middle center",
        textfont=dict(
        family="sans serif",
        size=10,
        color="black"
    ),
        hoverinfo='skip',
        showlegend=False,
        
                    )
)

In [None]:
%%script false --no-raise-error

import plotly.graph_objects as go
import numpy as np

# Sample data
X = filter_comb_pca_coords[:,0]
Y = filter_comb_pca_coords[:,1]
clusters = labels  # Cluster labels

# Get unique cluster labels
unique_clusters = np.unique(clusters)

# Define colors for each cluster
noise  = 'grey'
colors = ['red', 'blue', 'green', 'orange', 'purple', 'yellow', 'cyan', 'magenta', 'lime', 'teal']  # Add more colors as needed

# Create a scatter trace for each cluster
traces = []
for cluster_label in unique_clusters:
    mask = clusters == cluster_label
    if cluster_label == -1:
        # Assign a color for cluster label -1
        color = noise
        name = 'Cluster -1'
        
    else:
        # Assign a color for other cluster labels
        color = colors[cluster_label]
        name = f'Cluster {cluster_label}'
    
    trace = go.Scattergl(
        x=X[mask],
        y=Y[mask],
        mode='markers',
        marker=dict(
            color=color,
            size = SIMS_HT_uniq[filter_idx][mask],
            sizeref=3e-10,
            # sizeref=1e-10,
            
            sizemode='diameter',
            ),
        name=name,
        showlegend=True,
        
        customdata = np.stack((SIMS_G_uniq[filter_idx][mask],
                           SIMS_HT_uniq[filter_idx][mask],
                           P_tot[filter_idx][mask],
                           ),axis=-1),
        text = SIMS_dict_uniq[:,0][filter_idx][mask],
        hovertemplate=
            "X: %{x}   " + "   Y: %{y} <br>"+
            "DP notation: <br> <b>%{text}</b><br>" +  
            "Energy:  %{customdata[0]:.3f} kcal/mol<br>"+
            "Average holding time:  %{customdata[1]:.5g} s<br>"+
            "Probability:  %{customdata[2]:.2g} <br>",
    )

    traces.append(trace)

# label initial and final states
trace = go.Scattergl(
        x = comb_pca_coords[:,0][[initial_idx, final_idx]],
        y = comb_pca_coords[:,1][[initial_idx, final_idx]],
        mode='markers+text',
        
        marker_color="lime", 
        marker_size=10,
        text=["I", "F"],
        textposition="middle center",
        textfont=dict(
        family="sans serif",
        size=10,
        color="black"
    ),
        hoverinfo='skip',
        showlegend=False,
                    )

traces.append(trace)

# label kinetic traps
for k_clust in np.unique(clusters):
    min_index = np.argmin(SIMS_G_uniq[filter_idx][np.where(clusters==k_clust)[0]])
    trace = go.Scattergl(
        x = np.array(X[np.where(clusters==k_clust)[0]][min_index]),
        y = np.array(Y[np.where(clusters==k_clust)[0]][min_index]),
        mode='markers',
        marker=dict(
            color="black",
            symbol='star',
            size=50,
        ),
        showlegend=False,
    )
    traces.append(trace)
    
# legend setting
layout = go.Layout(
    legend=dict(
        # x=0.5,  # Adjust the x position of the legend
        # y=0.5,  # Adjust the y position of the legend
        # font=dict(
        #     size=10  # Adjust the font size of the legend
        # ),
        itemsizing='constant',
    ),
    xaxis=dict(
        range = [min(X)*1.1,max(X)*1.1],
    ),
    yaxis=dict(
        range = [min(Y)*1.1,max(Y)*1.1],
    ),
    title=f"DBSCAN finding Kinetic Traps for sample {SEQ}",
)


# Create a figure
fig = go.Figure(data=traces, layout=layout)

# Show the plot
fig.show()


In [None]:
import plotly.graph_objects as go
import numpy as np

# Sample data
X = filter_comb_pca_coords[:,0]
Y = filter_comb_pca_coords[:,1]
clusters = real_labels  # Cluster real labels

# Get unique cluster labels
unique_clusters = np.unique(clusters)

# Define colors for each cluster
noise  = 'grey'
colors = ['red', 'blue', 'green', 'orange', 'purple', 'yellow', 'cyan', 'magenta', 'lime', 'teal']  # Add more colors as needed
name_list = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']
trap_shape = ['star', 'x', 'triangle-up', 'cross', 'pentagon', 'diamond', 'square', 'triangle-down', 'triangle-left', 'triangle-right']

# Create a scatter trace for each cluster
traces = []
i = 0
for cluster_label in unique_clusters:
    mask = clusters == cluster_label
    if cluster_label == -1:
        # Assign a color for cluster label -1
        color = noise
        # name = 'Cluster -1'
        name = 'Noise'
        
    else:
        # Assign a color for other cluster labels
        color = colors[cluster_label]
        name = f'Cluster {name_list[i]}'
        i += 1
    
    trace = go.Scattergl(
        x=X[mask],
        y=Y[mask],
        mode='markers',
        marker=dict(
            color=color,
            size = SIMS_HT_uniq[filter_idx][mask],
            sizeref=3e-10,
            # sizeref=1e-10,
            sizemode='diameter',
            ),
        name=name,
        showlegend=True,
        
        customdata = np.stack((SIMS_G_uniq[filter_idx][mask],
                           SIMS_HT_uniq[filter_idx][mask],
                           P_tot[filter_idx][mask],
                           ),axis=-1),
        text = SIMS_dict_uniq[:,0][filter_idx][mask],
        hovertemplate=
            "X: %{x}   " + "   Y: %{y} <br>"+
            "DP notation: <br> <b>%{text}</b><br>" +  
            "Energy:  %{customdata[0]:.3f} kcal/mol<br>"+
            "Average holding time:  %{customdata[1]:.5g} s<br>"+
            "Probability:  %{customdata[2]:.2g} <br>",
    )

    traces.append(trace)

# label initial and final states
trace = go.Scattergl(
        x = comb_pca_coords[:,0][[initial_idx, final_idx]],
        y = comb_pca_coords[:,1][[initial_idx, final_idx]],
        mode='markers+text',
        
        marker_color="lime", 
        marker_size=10,
        text=["I", "F"],
        textposition="middle center",
        textfont=dict(
        family="sans serif",
        size=10,
        color="black"
    ),
        hoverinfo='skip',
        showlegend=False,
                    )

traces.append(trace)

# label kinetic traps
i = 0
for k_clust in np.unique(clusters):
    if k_clust == -1:
        continue
    
    min_index = np.argmin(SIMS_G_uniq[filter_idx][np.where(clusters==k_clust)[0]])
    trace = go.Scattergl(
        x = np.array(X[np.where(clusters==k_clust)[0]][min_index]),
        y = np.array(Y[np.where(clusters==k_clust)[0]][min_index]),
        mode='markers',
        marker=dict(
            color="black",
            symbol=trap_shape[i],
            size=10,
        ),
        name = f"Trap {name_list[i]}",
        showlegend=True,
    )
    i += 1
    traces.append(trace)

# legend setting
layout = go.Layout(
    legend=dict(
        # x=0.5,  # Adjust the x position of the legend
        # y=0.5,  # Adjust the y position of the legend
        # font=dict(
        #     size=10  # Adjust the font size of the legend
        # ),
        itemsizing='constant',
    ),
    xaxis=dict(
        range = [min(X)*1.1,max(X)*1.1],
    ),
    yaxis=dict(
        range = [min(Y)*1.1,max(Y)*1.1],
    ),
    title=f"DBSCAN finding Kinetic Traps for sample {SEQ}",
)

# Create a figure
fig = go.Figure(data=traces, layout=layout)

# Show the plot
fig.show()


In [None]:
# print out the kinetic trap in each cluster
for k_clust in np.unique(real_labels):
    if k_clust == -1:
        continue
    min_index = np.argmin(SIMS_G_uniq[filter_idx][np.where(labels==k_clust)[0]])
    print("Kinetic trap in cluster {} is:".format(k_clust))
    print(SIMS_dict_uniq[filter_idx][np.where(labels==k_clust)[0]][min_index])

In [None]:
# print out the plausible kinetic trap in each cluster
for k_clust in np.unique(labels):
    min_index = np.argmin(SIMS_G_uniq[filter_idx][np.where(labels==k_clust)[0]])
    print("Kinetic trap in cluster {} is:".format(k_clust))
    print(SIMS_dict_uniq[filter_idx][np.where(labels==k_clust)[0]][min_index])
    # print("Position in the cluster is:", X[np.where(labels==k_clust)[0]][min_index], Y[np.where(labels==k_clust)[0]][min_index], "\n")



### Trajectory analysis

In [None]:
## exact each trajectory
split_id = trj_id + 1 # index for split to each trajectory
traj_in_clust = np.zeros(len(np.unique(labels)), dtype=int)
avg_time_in_clust = np.zeros(len(np.unique(labels)), dtype=float)

for i in range(len(split_id)):
    if i == 0:
        trj_dp = SIMS_dict[0:split_id[i],0]
    else:
        trj_dp = SIMS_dict[split_id[i-1]:split_id[i],0]

    for j, k_clust in enumerate(np.unique(labels)):
        mask = labels == k_clust
        if np.size(np.intersect1d(trj_dp, SIMS_dict_uniq[:,0][filter_idx][mask])) != 0:
            traj_in_clust[j] += 1
            avg_time_in_clust[j] += SIMS_T[trj_id[i]]

print(f"{SEQ}:")
for i in range(len(traj_in_clust)):
    print("{} trajs in cluster {}. Average time: {:.3e}.".format(traj_in_clust[i], np.unique(labels)[i], avg_time_in_clust[i]/traj_in_clust[i]))


### TICA

In [None]:
import deeptime
from deeptime.decomposition import TICA

tica = TICA(lagtime=1,dim=2)
data = combined_data
tica_coor = tica.fit_transform(data)

In [None]:
data = combined_data
tica_coor = tica.fit_transform(data)
tica_coor

In [None]:
# plt plot for tica
plt.scatter(tica_coor[:,0],tica_coor[:,1],c=SIMS_G_uniq,cmap='viridis')