In [None]:
# Import necessary modules
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
import requests
from io import StringIO
warnings.filterwarnings('ignore')
import gc
import joblib

!pip install dscribe

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from torch.optim import Adam
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error


from dscribe.descriptors import ACSF
from ase import Atoms
from ase.io import read

In [None]:
# --- CONFIGURATION CELL ---
# DO NOT CHANGE THE VARIABLE NAMES BELOW

# Path to your saved model (relative to this notebook)
MODEL_PATH = "model.pt"

# Name of the test CSV file
# Note: For classification/regression tasks, this file has the same columns
# as train.csv, minus the target column.
INPUT_CSV_NAME = "test.csv"

# Folder containing external files (Only applicable for Domain A: Inorganic)
# For A1/A2, this folder contains the test.csv and all corresponding .cif files.
INPUT_FOLDER_NAME = "test_dataset"


In [None]:
# Read the master CSV file with file name and energy
master_df = pd.read_csv(INPUT_CSV_NAME)
master_df.columns = ['cif_files','formula','pld','lcd','density','energy']
len(master_df)

In [None]:
master_df["ID_num"] = master_df["cif_files"].str.extract(r"MOF_(\d+)").astype(int)
master_df = master_df.sort_values(by="ID_num").reset_index(drop=True)

# Defining features (X) and target (y) based on your plots
features = ['energy', 'pld', 'lcd', 'density']

X_global = master_df[features]
master_df.head()

In [None]:
# Create a global atom list
def get_global_species(cif_files):
    species = set()

    for file in cif_files:
        try:
            atoms = read(f'{INPUT_FOLDER_NAME}/{file}.cif')
            species.update(atoms.get_chemical_symbols())
        except Exception:
            print(f"Failed to read CIF file: {file}.cif")

    return sorted(species)

# Call the function to get the total list
atom_set = get_global_species(master_df['cif_files'])

## **Build and use the symmetry function**

In [None]:
# Function to generate the acsf parameters
def generate_acsf(cif_file, acsf_params):

    try:
        atoms = read(f'{cif_file}')  # Read structure directly from CIF file using ASE
    except Exception as e:
        print(f"Failed to read CIF file: {cif_file}")
        return None

    # Initialize ACSF descriptor
    acsf = ACSF(**acsf_params)  # Create an ACSF descriptor object with the specified parameters.

    # Generate ACSF descriptors (n_atoms x n_features)
    acsf_descriptors = acsf.create(atoms)  # Generate ACSF descriptors for the molecule.
    return acsf_descriptors  # Return the ACSF descriptors.

# Function to generate the acsf parameters
def create_streamed(df, acsf_params, n_features, out_path="/content/acsf_descriptors.dat", size_aware=True):

    n_samples = len(df)

    output = np.memmap(
        out_path,
        dtype="float32",
        mode="w+",
        shape=(n_samples, n_features)
    )

    # Create ACSF
    acsf = ACSF(**acsf_params)

    for i, cif_file in enumerate(df["cif_files"]):
        try:
            atoms = read(f"{INPUT_FOLDER_NAME}/{cif_file}.cif")
            desc = acsf.create(atoms)
        except Exception:
            output[i] = np.zeros(n_features, dtype=np.float32)
            continue

        if desc.shape[0] == 0:
            output[i] = np.zeros(n_features, dtype=np.float32)
        else:
            if size_aware:
                output[i] = desc.sum(axis=0) / desc.shape[0]
            else:
                output[i] = desc.mean(axis=0)

        # Clean up
        del atoms, desc

    return output

In [None]:
# Define ACSF parameters
acsf_params = {  # Define ACSF parameters for descriptor generation.
        "species": atom_set,  # List of atomic species to consider.
        "r_cut": 5.0,  # Cutoff radius for ACSF calculations.
        "g2_params": [[1, 1], [1, 3]],  # Parameters for G2 symmetry functions.
        "g4_params": [[1, 1, 1]],  # Parameters for G4 symmetry functions.
    }

# Get n_features to set the feature vector size
sample_desc = generate_acsf(f"{INPUT_FOLDER_NAME}/{master_df['cif_files'].iloc[0]}.cif", acsf_params)
n_features = sample_desc.shape[1]
del sample_desc

# Generate X_values for the model
X = create_streamed(master_df, acsf_params, n_features)
# Get y values
# y = master_df['band_gap'].to_numpy()
# y = y.reshape(-1,1)

# Y-scaled
# scaler_y = StandardScaler()
# y_scaled = scaler_y.fit_transform(y)
# joblib.dump(scaler_y, "y_scaler.pkl") # Save for future use

# Scale the input values in batches
scaler = joblib.load("X_acsf_scaler.pkl")
batch_size = 50

for i in range(0, X.shape[0], batch_size):
    scaler.transform(X[i:i+batch_size])

X_scaled = np.memmap("/content/acsf_scaled.dat", dtype="float32", mode="w+", shape=X.shape)

for i in range(0, X.shape[0], batch_size):
    X_scaled[i:i+batch_size] = scaler.transform(X[i:i+batch_size])

In [None]:
X_glob_scaler = joblib.load("X_global_scaler.pkl")
X_glob_scaled = X_glob_scaler.transform(X_global)

class IOChemInferenceDataset(Dataset):
    def __init__(self, X_memmap, X_global, indices=None):
        self.X = X_memmap
        self.Xg = X_global
        self.indices = indices if indices is not None else np.arange(len(X_memmap))

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, i):
        idx = self.indices[i]
        x = torch.from_numpy(self.X[idx]).float()
        xg = torch.from_numpy(self.Xg[idx]).float()
        return x, xg

infer_dataset = IOChemInferenceDataset(
    X_scaled,
    X_glob_scaled
)

infer_loader = DataLoader(
    infer_dataset,
    batch_size=64,
    shuffle=False,
    num_workers=0
)

## **Define the Autoencoder**

In [None]:
# Model architecture
LATENT_DIM = 384

# Training parameters
BATCH_SIZE = 128
EPOCHS = 200
LEARNING_RATE = 0.001
ALPHA = 0.4  # Weight for reconstruction loss
BETA = 0.6   # Weight for property prediction loss
PATIENCE = 10  # Early stopping patience

# Device
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# Define Autoencoder Model
class MolecularAutoencoder(nn.Module):
    """
    Autoencoder for molecular property prediction

    Architecture:
    - Encoder: Compresses molecular fingerprints to latent representation
    - Decoder: Reconstructs fingerprints from latent space
    - Property Predictor: Predicts logP and logS from latent representation
    """

    def __init__(self, input_dim, latent_dim=384, num_properties=1):
        super(MolecularAutoencoder, self).__init__()

        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.num_properties = num_properties

        # Encoder Network
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.2),

            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(512, latent_dim),
            nn.BatchNorm1d(latent_dim),
            nn.LeakyReLU()
        )

        # Decoder Network
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(512, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.2),

            nn.Linear(1024, input_dim),
        )

        # Property Prediction Network
        self.property_predictor = nn.Sequential(
            nn.Linear(latent_dim+4, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),

            nn.Linear(64, num_properties)
        )

    def encode(self, x):
        """Encode input to latent representation"""
        return self.encoder(x)

    def decode(self, z):
        """Decode latent representation to reconstruction"""
        return self.decoder(z)

    def forward(self, x, xg):
        """
        Forward pass through autoencoder

        Returns:
            reconstructed: Reconstructed fingerprints
            predicted_properties: Predicted molecular properties
            latent: Latent representation
        """
        latent = self.encode(x)
        reconstructed = self.decode(latent)
        combined = torch.cat([latent, xg], dim=1)
        predicted_properties = self.property_predictor(combined)

        return reconstructed, predicted_properties, latent

print("Autoencoder model class defined")

In [None]:
# Initialize model
input_dim = X.shape[1]
num_properties = 1

model = MolecularAutoencoder(
    input_dim=input_dim,
    latent_dim=LATENT_DIM,
    num_properties=num_properties
).to(DEVICE)

model.load_state_dict(torch.load(MODEL_PATH))
print("Best model loaded for evaluation")
model.eval()
predictions = []
row_numbers = []

with torch.no_grad():
    row_count = 0
    for x, xg in infer_loader:
        x = x.to(DEVICE)
        xg = xg.to(DEVICE)

        _, preds, _ = model(x, xg)   # autoencoder output
        preds = preds.squeeze().cpu().numpy()

        batch_size = len(preds)
        predictions.extend(preds.tolist())
        row_numbers.extend(range(row_count, row_count + batch_size))
        row_count += batch_size

## **Saving the predictions**

In [None]:
y_scaler = joblib.load("y_scaler.pkl")
predictions = y_scaler.inverse_transform(
    np.array(predictions).reshape(-1, 1)
).flatten()

df_preds = pd.DataFrame({
    "Row Number": row_numbers,
    "Predicted Value": predictions
})

df_preds.to_csv("10_inference_predictions_A1.csv", index=False)