# Autoencoder 

**Creating an AE that can learn an information rich spectral profiles embeddings to reconstruct the image/spetrum**

In [2]:
import numpy as np
X = np.load('../LPS/LPS_ST_1-1658_x.npy')
# intensities of each spectrum

## Autoencoder Architecture for Mass Spectrometry Data

This is the implementation of a baseline **Autoencoder**. The autoencoder learns to encode spectral profiles into a lower-dimensional latent space representation, then reconstructs the original spectra from these compressed embeddings.

### Key Components:

#### 1. **Architecture Design**
- **Encoder**: Compresses spectral data from ~1400 peaks → 512 → 256 → 64 latent dimensions
- **Decoder**: Reconstructs from 64 latent dimensions → 256 → 512 → 1500 peaks
- **Activation Functions**: ReLU for hidden layers, linear output for intensity reconstruction





In [5]:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras import layers


from sklearn.model_selection import train_test_split



class SpectrumAutoencoder(Model):
    def __init__(self, latent_dim, n_peaks):
        super(SpectrumAutoencoder, self).__init__()
        self.latent_dim = latent_dim
        self.n_peaks = n_peaks
        
        self.encoder = tf.keras.Sequential([
            layers.Dense(512, activation='relu'),
            layers.Dense(256, activation='relu'),
            layers.Dense(latent_dim, activation='relu'),
        ])

        self.decoder = tf.keras.Sequential([
            layers.Dense(256, activation='relu'),
            layers.Dense(512, activation='relu'),
            layers.Dense(n_peaks, activation='linear'),  
        ])

    def call(self, intensities):
        encoded = self.encoder(intensities)
        decoded_intensities = self.decoder(encoded)
        return decoded_intensities


#### 3. **Training Strategy**
- **Loss Function**: Mean Squared Error (MSE) for reconstruction fidelity
- **Optimization**: Adam optimizer
- **Regularization**: Early stopping to prevent overfitting
- **Validation**: 80/20 train-test split for performance evaluation


In [6]:
# Split data into train and test sets
X_train, X_test = train_test_split(X, test_size=0.2, random_state=42)
print(f"Training set shape: {X_train.shape}")
print(f"Test set shape: {X_test.shape}")

latent_dim = 64  
input_dim = X_train.shape[1]  
print(f"Input dimension: {input_dim}, Latent dimension: {latent_dim}")

autoencoder = SpectrumAutoencoder(latent_dim=latent_dim, n_peaks=input_dim)

# Compile the model
autoencoder.compile(
    optimizer='adam',
    loss='mse',  # Mean Squared Error for reconstruction
    metrics=['mae','rmse','mse']  # Mean Absolute Error as additional metric
)

print(f"Autoencoder created with latent_dim={latent_dim}, input_dim={input_dim}")

Training set shape: (1308, 92500)
Test set shape: (328, 92500)
Input dimension: 92500, Latent dimension: 64
Autoencoder created with latent_dim=64, input_dim=92500


In [7]:
# Training the autoencoder
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True
)

# Check data before training
print(f"Training data stats:")
print(f"  Shape: {X_train.shape}")
print(f"  Min: {X_train.min():.6f}, Max: {X_train.max():.6f}")
print(f"  Mean: {X_train.mean():.6f}, Std: {X_train.std():.6f}")


print("Starting training...")
history = autoencoder.fit(
    X_train, X_train,  
    epochs=1,  
    batch_size=32,
    validation_data=(X_test, X_test),
    callbacks=[early_stopping],
    verbose=1

)

print("Training completed!")

Training data stats:
  Shape: (1308, 92500)
  Min: 0.000000, Max: 0.081360
  Mean: 0.000011, Std: 0.000244
Starting training...


MemoryError: Unable to allocate 462. MiB for an array with shape (1308, 92500) and data type float32

#### 5. **Evaluation Metrics**
- **Reconstruction Error**: MSE and MAE between original and reconstructed spectra
- **Visual Assessment**: Direct comparison plots of original vs. reconstructed profiles

In [None]:
# Evaluate the model
test_loss, test_mae = autoencoder.evaluate(X_test, X_test, verbose=1)
print(f"Test Loss (MSE): {test_loss:.6f}")
print(f"Test MAE: {test_mae:.6f}")

# Plot training history
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Training MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Model MAE')
plt.xlabel('Epoch')
plt.ylabel('MAE')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# Plot original (after preprocessing) vs reconstructed spectra
import random
def plot_reconstruction_comparison(autoencoder, X_test, common_mzs):
    # Plot the 3 random samples vs thier reconstruction
    
    spectrum1, spectrum2, spectrum3 = random.sample(range(0,len(X_test)),3)
    spectra_indexes = [spectrum1,spectrum2,spectrum3]
    reconstructed = autoencoder(np.array([X_test[spectrum1],X_test[spectrum2],X_test[spectrum3]]))
    
    fig, axes = plt.subplots(3, 1, figsize=(12, 4*3))
    
    for i in range(3):
        axes[i].plot(common_mzs, X_test[spectra_indexes[i]], label='Original', alpha=0.7)
        axes[i].plot(common_mzs, reconstructed[i], label='Reconstructed', alpha=0.7)
        axes[i].set_xlabel('m/z')
        axes[i].set_ylabel('Intensity')
        axes[i].set_title(f'Spectrum {i+1} - Reconstruction Comparison')
        axes[i].legend()
        axes[i].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()


plot_reconstruction_comparison(autoencoder, X_test, common_mzs)

reconstructed_test = autoencoder(X_test)
reconstruction_errors = np.mean(np.square(X_test - reconstructed_test), axis=1)

plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.hist(reconstruction_errors, bins=30, alpha=0.7)
plt.xlabel('Reconstruction Error (MSE)')
plt.ylabel('Frequency')
plt.title('Distribution of Reconstruction Errors')

plt.subplot(1, 2, 2)
plt.plot(reconstruction_errors)
plt.xlabel('Sample Index')
plt.ylabel('Reconstruction Error')
plt.title('Reconstruction Error per Sample')
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"Mean reconstruction error: {np.mean(reconstruction_errors):.6f}")

In [None]:
# plot various recreations of spectra vs each other
# plt.plot(common_mzs,X_processed[2])
# plt.show()

# print(np.array(X_processed[0]))
r = autoencoder(np.array([X_processed[0]]))
plt.figure(figsize=(10,4))
plt.plot(common_mzs,r[0])
plt.plot(common_mzs,X_processed[0],alpha=0.5)

plt.show()

# It seems to be true that there are some spectra that are not reconstrcted well by the AE , e.g index 0,1,2
# My hypothesis is that such points have a low amount of the total number of mz actually recorded - high interpolation
len(my_spectra[20][0])


In [None]:
# I want export an array of the interopolated OG data and the reconstructed data.
# print(X_processed.shape)

reconstructions = np.array(autoencoder(X_processed))

with open("HIV_Common_mzs.npy",'wb') as file:
    np.save(file,common_mzs)

with open("HIV_OG_Normalized.npy",'wb') as file:
    np.save(file,X_processed)

with open("HIV_Recon_Normalized.npy",'wb') as file:
    np.save(file,reconstructions)
    

In [None]:
print(np.array(reconstructions))


In [None]:
count =0
indexes = []
for index in range(len(my_spectra)):
    if len(my_spectra[index][0]) < 250:
        count+=1
        indexes.append(index)
print(count)
print(len(my_spectra[indexes[3968]][0])) 
print(indexes[3968])       

In [None]:
common_mzs