In [1]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

In [2]:
def parse_xyz(filename):
    with open(filename, 'r') as file:
        lines = file.readlines()
    
    data = []
    start_idx = 0
    
    while start_idx < len(lines):
        num_atoms = int(lines[start_idx].strip())
        properties_line = lines[start_idx + 1].strip()
        
        energy = float(properties_line.split('energy=')[1].split()[0])
        
        atom_data = []
        for i in range(start_idx + 2, start_idx + 2 + num_atoms):
            parts = lines[i].split()
            atom_type = parts[0]
            position = [float(parts[1]), float(parts[2]), float(parts[3])]
            force = [float(parts[4]), float(parts[5]), float(parts[6])]
            atom_data.append((atom_type, position, force))
        
        data.append((energy, atom_data))
        start_idx += 2 + num_atoms
    
    return data


def create_dataframe(data):
    rows = []
    for block_id, (energy, atom_data) in enumerate(data):
        positions_x = []
        positions_y = []
        positions_z = []
        atom_types = []
        forces_x = []
        forces_y = []
        forces_z = []
        for atom in atom_data:
            atom_type, position, force = atom
            atom_types.append(atom_type)
            positions_x.append(position[0])
            positions_y.append(position[1])
            positions_z.append(position[2])
            forces_x.append(force[0])
            forces_y.append(force[1])
            forces_z.append(force[2])
        
        row = {
            'block_id': block_id,  # 블록 ID 추가
            'positions_x': positions_x,
            'positions_y': positions_y,
            'positions_z': positions_z,
            'atom_types': atom_types,
            'forces_x': forces_x,
            'forces_y': forces_y,
            'forces_z': forces_z,
            'energy': energy,
        }
        rows.append(row)
    return pd.DataFrame(rows)

def encode_atom_types(atom_types):
    encoded = []
    for atom in atom_types:
        encoded.append(atom_type_to_index[atom])
    return encoded


# 특징 벡터 생성
def create_feature_vector(df):
    X_positions_x = np.array([np.array(pos) for pos in df['positions_x']])
    X_positions_y = np.array([np.array(pos) for pos in df['positions_y']])
    X_positions_z = np.array([np.array(pos) for pos in df['positions_z']])
    X_atom_types = np.array([np.array(atom_types) for atom_types in df['encoded_atom_types']])
    y_energy = df['energy'].values
    y_forces_x = np.array([np.array(forces) for forces in df['forces_x']])
    y_forces_y = np.array([np.array(forces) for forces in df['forces_y']])
    y_forces_z = np.array([np.array(forces) for forces in df['forces_z']])
    return X_positions_x, X_positions_y, X_positions_z, X_atom_types, y_energy, y_forces_x, y_forces_y, y_forces_z



class LatticeDataset(Dataset):
    def __init__(self, X_positions_x, X_positions_y, X_positions_z, X_atom_types, y_energy, y_forces_x, y_forces_y, y_forces_z):
        self.X_positions_x = torch.tensor(X_positions_x, dtype=torch.float32)
        self.X_positions_y = torch.tensor(X_positions_y, dtype=torch.float32)
        self.X_positions_z = torch.tensor(X_positions_z, dtype=torch.float32)
        self.X_atom_types = torch.tensor(X_atom_types, dtype=torch.float32)
        self.y_energy = torch.tensor(y_energy, dtype=torch.float32)
        self.y_forces_x = torch.tensor(y_forces_x, dtype=torch.float32)
        self.y_forces_y = torch.tensor(y_forces_y, dtype=torch.float32)
        self.y_forces_z = torch.tensor(y_forces_z, dtype=torch.float32)
    
    def __len__(self):
        return len(self.X_positions_x)
    
    def __getitem__(self, idx):
        return (self.X_positions_x[idx], self.X_positions_y[idx], self.X_positions_z[idx], self.X_atom_types[idx],
                self.y_energy[idx], self.y_forces_x[idx], self.y_forces_y[idx], self.y_forces_z[idx])


In [3]:
# 파일에서 데이터 추출
train_data = parse_xyz('data/Train.xyz')
test_data = parse_xyz('data/Test.xyz')

# 데이터프레임으로 변환 및 병합
train_df = create_dataframe(train_data)
test_df = create_dataframe(test_data)

# 원자 종류를 인코딩
unique_atom_types = sorted(set([atom for sublist in train_df['atom_types'] for atom in sublist]))
atom_type_to_index = {atom: idx for idx, atom in enumerate(unique_atom_types)}

train_df['encoded_atom_types'] = train_df['atom_types'].apply(encode_atom_types)
test_df['encoded_atom_types'] = test_df['atom_types'].apply(encode_atom_types)

X_train_positions_x, X_train_positions_y, X_train_positions_z, X_train_atom_types, y_train_energy, y_train_forces_x, y_train_forces_y, y_train_forces_z = create_feature_vector(train_df)
X_test_positions_x, X_test_positions_y, X_test_positions_z, X_test_atom_types, _, _, _, _ = create_feature_vector(test_df)

In [4]:
class LatticeEnergyForceModel(nn.Module):
    def __init__(self, input_dim_positions, input_dim_atom_types, num_atoms):
        super(LatticeEnergyForceModel, self).__init__()
        self.fc1_x = nn.Linear(input_dim_positions, 128)
        self.fc1_y = nn.Linear(input_dim_positions, 128)
        self.fc1_z = nn.Linear(input_dim_positions, 128)
        self.fc2_x = nn.Linear(128, 64)
        self.fc2_y = nn.Linear(128, 64)
        self.fc2_z = nn.Linear(128, 64)
        self.force_output_x = nn.Linear(64, num_atoms)
        self.force_output_y = nn.Linear(64, num_atoms)
        self.force_output_z = nn.Linear(64, num_atoms)
        self.fc3_energy_x = nn.Linear(64, 32)
        self.fc3_energy_y = nn.Linear(64, 32)
        self.fc3_energy_z = nn.Linear(64, 32)
        self.fc3_energy_atom = nn.Linear(input_dim_atom_types, 32)
        self.energy_output = nn.Linear(64, 1)  # 32 (mean of x, y, z) + 32 (atom types)
    
    def forward(self, positions_x, positions_y, positions_z, atom_types):
        x_x = torch.relu(self.fc1_x(positions_x))
        x_x = torch.relu(self.fc2_x(x_x))
        forces_x = self.force_output_x(x_x)
        
        x_y = torch.relu(self.fc1_y(positions_y))
        x_y = torch.relu(self.fc2_y(x_y))
        forces_y = self.force_output_y(x_y)
        
        x_z = torch.relu(self.fc1_z(positions_z))
        x_z = torch.relu(self.fc2_z(x_z))
        forces_z = self.force_output_z(x_z)
        
        energy_x = torch.relu(self.fc3_energy_x(x_x))
        energy_y = torch.relu(self.fc3_energy_y(x_y))
        energy_z = torch.relu(self.fc3_energy_z(x_z))
        
        energy_xyz_mean = (energy_x + energy_y + energy_z) / 3
        energy_atom = torch.relu(self.fc3_energy_atom(atom_types))
        
        energy_concat = torch.cat((energy_xyz_mean, energy_atom), dim=1)
        energy = self.energy_output(energy_concat)
        
        forces = torch.stack((forces_x, forces_y, forces_z), dim=-1)
        return energy, forces

In [5]:
# CUDA 사용 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 데이터 스케일링
scaler_positions_x = StandardScaler()
scaler_positions_y = StandardScaler()
scaler_positions_z = StandardScaler()
scaler_atom_types = StandardScaler()

X_train_positions_x_scaled = scaler_positions_x.fit_transform(X_train_positions_x)
X_train_positions_y_scaled = scaler_positions_y.fit_transform(X_train_positions_y)
X_train_positions_z_scaled = scaler_positions_z.fit_transform(X_train_positions_z)
X_train_atom_types_scaled = scaler_atom_types.fit_transform(X_train_atom_types)

X_test_positions_x_scaled = scaler_positions_x.transform(X_test_positions_x)
X_test_positions_y_scaled = scaler_positions_y.transform(X_test_positions_y)
X_test_positions_z_scaled = scaler_positions_z.transform(X_test_positions_z)
X_test_atom_types_scaled = scaler_atom_types.transform(X_test_atom_types)

train_dataset = LatticeDataset(X_train_positions_x_scaled, X_train_positions_y_scaled, X_train_positions_z_scaled, X_train_atom_types_scaled, y_train_energy, y_train_forces_x, y_train_forces_y, y_train_forces_z)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

test_dataset = LatticeDataset(X_test_positions_x_scaled, X_test_positions_y_scaled, X_test_positions_z_scaled, X_test_atom_types_scaled, np.zeros(len(X_test_atom_types_scaled)), np.zeros(len(X_test_atom_types_scaled)), np.zeros(len(X_test_atom_types_scaled)), np.zeros(len(X_test_atom_types_scaled)))
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

input_dim_positions = X_train_positions_x_scaled.shape[1]
input_dim_atom_types = X_train_atom_types_scaled.shape[1]
num_atoms = X_train_positions_x_scaled.shape[1]
model = LatticeEnergyForceModel(input_dim_positions, input_dim_atom_types, num_atoms).to(device)
criterion_energy = nn.MSELoss()
criterion_forces = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [6]:
num_epochs = 2000
model.train()
for epoch in range(num_epochs):
    train_loss = 0.0
    for X_batch_positions_x, X_batch_positions_y, X_batch_positions_z, X_batch_atom_types, y_energy_batch, y_forces_x_batch, y_forces_y_batch, y_forces_z_batch in train_loader:
        X_batch_positions_x, X_batch_positions_y, X_batch_positions_z, X_batch_atom_types = (X_batch_positions_x.to(device), X_batch_positions_y.to(device), X_batch_positions_z.to(device), X_batch_atom_types.to(device))
        y_energy_batch, y_forces_x_batch, y_forces_y_batch, y_forces_z_batch = (y_energy_batch.to(device), y_forces_x_batch.to(device), y_forces_y_batch.to(device), y_forces_z_batch.to(device))
        
        optimizer.zero_grad()
        y_energy_pred, y_forces_pred = model(X_batch_positions_x, X_batch_positions_y, X_batch_positions_z, X_batch_atom_types)
        
        energy_loss = criterion_energy(y_energy_pred.squeeze(), y_energy_batch)
        forces_loss_x = criterion_forces(y_forces_pred[:, :, 0], y_forces_x_batch)
        forces_loss_y = criterion_forces(y_forces_pred[:, :, 1], y_forces_y_batch)
        forces_loss_z = criterion_forces(y_forces_pred[:, :, 2], y_forces_z_batch)
        forces_loss = forces_loss_x + forces_loss_y + forces_loss_z
        
        loss = energy_loss + forces_loss
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * X_batch_positions_x.size(0)
    
    train_loss /= len(train_loader.dataset)
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {train_loss:.4f}')

# 모델 훈련 후, 훈련 데이터로 검증 수행
model.eval()
train_energy_loss = 0.0
train_forces_loss = 0.0
with torch.no_grad():
    for X_batch_positions_x, X_batch_positions_y, X_batch_positions_z, X_batch_atom_types, y_energy_batch, y_forces_x_batch, y_forces_y_batch, y_forces_z_batch in train_loader:
        X_batch_positions_x, X_batch_positions_y, X_batch_positions_z, X_batch_atom_types = (X_batch_positions_x.to(device), X_batch_positions_y.to(device), X_batch_positions_z.to(device), X_batch_atom_types.to(device))
        y_energy_batch, y_forces_x_batch, y_forces_y_batch, y_forces_z_batch = (y_energy_batch.to(device), y_forces_x_batch.to(device), y_forces_y_batch.to(device), y_forces_z_batch.to(device))
        y_energy_pred, y_forces_pred = model(X_batch_positions_x, X_batch_positions_y, X_batch_positions_z, X_batch_atom_types)
        
        energy_loss = criterion_energy(y_energy_pred.squeeze(), y_energy_batch)
        forces_loss_x = criterion_forces(y_forces_pred[:, :, 0], y_forces_x_batch)
        forces_loss_y = criterion_forces(y_forces_pred[:, :, 1], y_forces_y_batch)
        forces_loss_z = criterion_forces(y_forces_pred[:, :, 2], y_forces_z_batch)
        forces_loss = forces_loss_x + forces_loss_y + forces_loss_z
        
        train_energy_loss += energy_loss.item() * X_batch_positions_x.size(0)
        train_forces_loss += forces_loss.item() * X_batch_positions_x.size(0)
    
    train_energy_loss /= len(train_loader.dataset)
    train_forces_loss /= len(train_loader.dataset)
    print(f'Validation Energy Loss: {train_energy_loss:.4f}, Validation Forces Loss: {train_forces_loss:.4f}')


Epoch 1/2000, Loss: 835723.1758
Epoch 2/2000, Loss: 657462.1170
Epoch 3/2000, Loss: 97459.5042
Epoch 4/2000, Loss: 9511.5322
Epoch 5/2000, Loss: 4641.7566
Epoch 6/2000, Loss: 3281.3363
Epoch 7/2000, Loss: 2561.1624
Epoch 8/2000, Loss: 2134.8831
Epoch 9/2000, Loss: 1821.3103
Epoch 10/2000, Loss: 1605.1198
Epoch 11/2000, Loss: 1427.7482
Epoch 12/2000, Loss: 1273.1145
Epoch 13/2000, Loss: 1143.6827
Epoch 14/2000, Loss: 1040.8441
Epoch 15/2000, Loss: 957.1630
Epoch 16/2000, Loss: 877.7633
Epoch 17/2000, Loss: 793.1772
Epoch 18/2000, Loss: 736.3184
Epoch 19/2000, Loss: 689.5359
Epoch 20/2000, Loss: 628.2287
Epoch 21/2000, Loss: 584.9358
Epoch 22/2000, Loss: 539.1956
Epoch 23/2000, Loss: 508.2979
Epoch 24/2000, Loss: 471.7199
Epoch 25/2000, Loss: 442.8803
Epoch 26/2000, Loss: 403.6889
Epoch 27/2000, Loss: 378.6966
Epoch 28/2000, Loss: 350.4204
Epoch 29/2000, Loss: 333.6817
Epoch 30/2000, Loss: 309.3695
Epoch 31/2000, Loss: 292.6963
Epoch 32/2000, Loss: 270.2361
Epoch 33/2000, Loss: 252.2745


In [7]:
def predict_with_uncertainty(model, positions_x, positions_y, positions_z, atom_types, n_iter=50):
    model.train()  # Keep dropout layers in training mode
    predictions = []

    for _ in range(n_iter):
        with torch.no_grad():
            y_energy_pred, _ = model(positions_x, positions_y, positions_z, atom_types)
            predictions.append(y_energy_pred.cpu().numpy())

    predictions = np.array(predictions)
    prediction_mean = predictions.mean(axis=0)
    prediction_std = predictions.std(axis=0)
    
    return prediction_mean, prediction_std

# 예측 수행 (훈련 및 테스트 데이터)
model.eval()
X_train_positions_x_tensor = torch.tensor(X_train_positions_x_scaled, dtype=torch.float32).to(device)
X_train_positions_y_tensor = torch.tensor(X_train_positions_y_scaled, dtype=torch.float32).to(device)
X_train_positions_z_tensor = torch.tensor(X_train_positions_z_scaled, dtype=torch.float32).to(device)
X_train_atom_types_tensor = torch.tensor(X_train_atom_types_scaled, dtype=torch.float32).to(device)

energy_train_mean, energy_train_uncertainty = predict_with_uncertainty(model, X_train_positions_x_tensor, X_train_positions_y_tensor, X_train_positions_z_tensor, X_train_atom_types_tensor, n_iter=50)

with torch.no_grad():
    _, y_train_forces_pred = model(X_train_positions_x_tensor, X_train_positions_y_tensor, X_train_positions_z_tensor, X_train_atom_types_tensor)
y_train_forces_pred = y_train_forces_pred.cpu().numpy()

X_test_positions_x_tensor = torch.tensor(X_test_positions_x_scaled, dtype=torch.float32).to(device)
X_test_positions_y_tensor = torch.tensor(X_test_positions_y_scaled, dtype=torch.float32).to(device)
X_test_positions_z_tensor = torch.tensor(X_test_positions_z_scaled, dtype=torch.float32).to(device)
X_test_atom_types_tensor = torch.tensor(X_test_atom_types_scaled, dtype=torch.float32).to(device)

energy_test_mean, energy_test_uncertainty = predict_with_uncertainty(model, X_test_positions_x_tensor, X_test_positions_y_tensor, X_test_positions_z_tensor, X_test_atom_types_tensor, n_iter=50)

with torch.no_grad():
    _, y_test_forces_pred = model(X_test_positions_x_tensor, X_test_positions_y_tensor, X_test_positions_z_tensor, X_test_atom_types_tensor)
y_test_forces_pred = y_test_forces_pred.cpu().numpy()


In [9]:

# 결과 저장
def save_results(filename, energy_mean, energy_uncertainty, forces_pred):
    results = {
        'ID': [f'TEST_{i:04d}' for i in range(len(energy_mean))],
        'energy': energy_mean.flatten(),
        'energy_uncertainty': energy_uncertainty.flatten(),
        'forces': [forces.tolist() for forces in forces_pred]
    }

    results_df = pd.DataFrame(results)
    results_df.to_csv(filename, index=False)
    print(f'Results saved to {filename}')

# 훈련 데이터에 대한 결과 저장
save_results('train_predictions.csv', energy_train_mean, energy_train_uncertainty, y_train_forces_pred)

# 테스트 데이터에 대한 결과 저장
save_results('test_predictions.csv', energy_test_mean, energy_test_uncertainty, y_test_forces_pred)


Results saved to train_predictions.csv
Results saved to test_predictions.csv
