# Library

In [1]:
import os
import random
import numpy as np
import pandas as pd
from ase.io import read
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset
from tqdm.auto import tqdm
import matplotlib.pyplot as plt

np.set_printoptions(threshold=np.inf)
def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed)

seed_everything(42) # Seed 고정

# Pre-processing

In [2]:
# Train 데이터 및 Test 데이터 불러오기
train_file_path = r'C:\Users\james\DATA_LAB\samsung\data\Train.xyz'
test_file_path = r'C:\Users\james\DATA_LAB\samsung\data\Test.xyz'
sample_file_path = r'C:\Users\james\DATA_LAB\samsung\data\sample_submission.csv'

train = read(train_file_path, format='extxyz', index=':') # 전체 Train 데이터
test = read(test_file_path, format='extxyz', index=':') # 전체 Test 데이터
sample = pd.read_csv(sample_file_path) # 샘플 제출 파일 불러오기

# Train 데이터 정보 출력
print(f"The number of data: {len(train)}")
print(train[0])  # 첫 번째 데이터 확인

The number of data: 1500
Atoms(symbols='Hf32O64', pbc=True, cell=[10.07550514, 9.97216976, 10.40475547], calculator=SinglePointCalculator(...))


In [3]:
# 데이터를 담을 리스트 초기화
sequence_train, symbols, positions_x, positions_y, positions_z, forces, energies = [], [], [], [], [], [], []

# Train 데이터 처리
for i in range(len(train)):
    mole = train[i]  # 각 분자 정보
    atoms = len(mole)  # 원자 개수
    sequence_train.append(atoms)  # 원자 개수 저장
    
    position = mole.get_positions()  # 원자 위치 정보
    force = mole.get_forces()  # 힘 (force) 정보 - label 1
    energy = mole.get_total_energy()  # 에너지 정보 - label 2
    
    energies.append(energy)  # 에너지 저장
    
    for j in range(len(mole)):  # 각 원자에 대한 위치 및 힘 정보 저장
        positions_x.append(position[j][0])
        positions_y.append(position[j][1])
        positions_z.append(position[j][2])
        forces.append(force[j])

# Train 데이터를 DataFrame으로 생성
train_df = pd.DataFrame({'position_x': positions_x, 
                         'position_y': positions_y, 
                         'position_z': positions_z, 
                         'force': forces})
train_df.head()  # Train 데이터 확인

Unnamed: 0,position_x,position_y,position_z,force
0,2.230816,8.155257,6.39114,"[0.08813055, -0.90894865, 1.04011568]"
1,5.820498,5.539081,6.063752,"[0.49469689, -0.23481429, 1.14418526]"
2,0.649109,8.043429,9.16234,"[-0.47646964, 1.67774442, -1.52065335]"
3,7.276341,7.946647,9.368211,"[0.70496183, 1.83900631, 1.37385827]"
4,7.695766,7.129786,3.224149,"[-0.09108712, -1.1645404, 1.44755996]"


In [4]:
# Test 데이터를 담을 리스트 초기화
sequence_test, positions_x, positions_y, positions_z = [], [], [], []

# Test 데이터 처리
for i in range(len(test)):
    mole = test[i]  # 각 분자 정보
    atoms = len(mole)  # 원자 개수
    sequence_test.append(atoms)  # 원자 개수 저장
    
    position = mole.get_positions()  # 원자 위치 정보
    
    for j in range(len(mole)):  # 각 원자에 대한 위치 정보 저장
        positions_x.append(position[j][0])
        positions_y.append(position[j][1])
        positions_z.append(position[j][2])

# Test 데이터를 DataFrame으로 생성 (force는 없음)
test_df = pd.DataFrame({'position_x': positions_x, 
                        'position_y': positions_y, 
                        'position_z': positions_z, 
                        'force': None})

# Test 데이터 확인
test_df.head()

Unnamed: 0,position_x,position_y,position_z,force
0,3.434929,4.600871,6.329366,
1,10.632562,3.27695,6.474922,
2,3.43848,2.931912,2.621738,
3,3.042868,6.105789,3.277181,
4,2.52715,0.367885,0.119567,


In [8]:
# force 값을 fx, fy, fz로 분리
force_array = pd.DataFrame(train_df['force'].tolist(), columns=['fx', 'fy', 'fz'])

# 기존 train_df에 fx, fy, fz 칼럼 추가
train_df = pd.concat([train_df.drop(columns=['force']), force_array], axis=1)

# 분리된 데이터 확인
train_df.head()

Unnamed: 0,position_x,position_y,position_z,fx,fy,fz
0,2.230816,8.155257,6.39114,0.088131,-0.908949,1.040116
1,5.820498,5.539081,6.063752,0.494697,-0.234814,1.144185
2,0.649109,8.043429,9.16234,-0.47647,1.677744,-1.520653
3,7.276341,7.946647,9.368211,0.704962,1.839006,1.373858
4,7.695766,7.129786,3.224149,-0.091087,-1.16454,1.44756


# [Force] Hyperparameter Setting

In [9]:
# 하이퍼파라미터
input_size = 3  # feature 개수
hidden_size = 256
output_size = 3 # target 개수
num_epochs = 3
batch_size = 256
learning_rate = 0.001

# [Force] Dataset

In [10]:
class ForceDataset(Dataset):
    def __init__(self, df, mode='test'):
        self.df = df
        self.mode = mode
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        # 위치 정보 가져오기
        pos_x = self.df.loc[idx, 'position_x']
        pos_y = self.df.loc[idx, 'position_y']
        pos_z = self.df.loc[idx, 'position_z']
        
        inputs = torch.tensor([pos_x, pos_y, pos_z], dtype=torch.float32)
        
        # 테스트 데이터가 아닌 경우 (즉, 학습 데이터인 경우), force 값도 반환
        if self.mode != 'test':
            fx = self.df.loc[idx, 'fx']
            fy = self.df.loc[idx, 'fy']
            fz = self.df.loc[idx, 'fz']
            label = torch.tensor([fx, fy, fz], dtype=torch.float32)
            return inputs, label
        else:
            return inputs

In [11]:
# ForceDataset 클래스 사용
train_dataset = ForceDataset(train_df, mode='train')  # 학습 데이터셋
test_dataset = ForceDataset(test_df, mode='test')  # 테스트 데이터셋

# DataLoader 설정
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# DataLoader 테스트 (첫 번째 배치 확인)
for inputs, labels in train_loader:
    print(f"Input batch shape: {inputs.shape}")
    print(f"Label batch shape: {labels.shape}")
    break

Input batch shape: torch.Size([256, 3])
Label batch shape: torch.Size([256, 3])


# Model

In [12]:
class ForceModel(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(ForceModel, self).__init__()
        
        self.layers = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.BatchNorm1d(hidden_size),
            nn.ReLU(),
            nn.Dropout(0.5),
            
            nn.Linear(hidden_size, 128),
            nn.BatchNorm1d(128),
            nn.LeakyReLU(0.01),
            nn.Dropout(0.5),
            
            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.5),
            
            nn.Linear(64, 3)
        )
    
    def forward(self, x):
        y = self.layers(x)
        
        return y

In [13]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"current device is {device}")

model = ForceModel(input_size, hidden_size).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

current device is cpu


# [Force]Train

In [None]:
print("Training Start!")

model.train()
for epoch in range(num_epochs):
    print(f"{epoch+1}/{num_epochs} epoch..")
    for inputs, labels in tqdm(train_loader):
        optimizer.zero_grad()

        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

print("Training Complete!")

Training Start!
1/3 epoch..


  0%|          | 0/563 [00:00<?, ?it/s]

In [33]:
# 손실 기록 리스트
train_losses = []

print("Training Start!")

model.train()
for epoch in range(num_epochs):
    running_loss = 0.0
    print(f"{epoch+1}/{num_epochs} epoch..")
    
    for inputs, labels in tqdm(train_loader):
        optimizer.zero_grad()

        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # 에포크별 평균 손실 기록
    avg_loss = running_loss / len(train_loader)
    train_losses.append(avg_loss)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

print("Training Complete!")

# 손실 그래프 그리기
plt.plot(train_losses, label="Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training Loss Over Time")
plt.legend()
plt.show()

Training Start!
1/3 epoch..


  0%|          | 0/563 [00:00<?, ?it/s]


KeyboardInterrupt



# [Force] Inference

In [None]:
print("Inference Start!")

model.eval()

preds = []
with torch.no_grad():
    for inputs in tqdm(test_loader):
        inputs = inputs.to(device)
        outputs = model(inputs)

        pred = outputs.detach().cpu().numpy()
        preds.extend(pred)

print("Inference Complete!")
len(preds)

# [Force]Submission

In [None]:
test_df['force'] = preds # 예측 결과 저장

# 한 분자가 몇 개의 원자로 이루어져 있는지에 따라 범위를 생성
bundles_train, bundles_test = [], []

flag = 0
for size in sequence_train:
    bundles_train.append((flag, flag+size))
    flag += size

flag = 0
for size in sequence_test:
    bundles_test.append((flag, flag+size))
    flag += size

preds_force = []

for start, end in bundles_test:
    preds_force.append(np.vstack(preds[start:end])) # 2차원 array로 저장

sample['forces'] = preds_force
sample

# [Energy] Pre-processing¶

In [None]:
# 'force' 컬럼의 값을 분해하여 각각의 행으로 만듦
force_df = train_df['force'].apply(pd.Series)
force_df.columns = [f'force{i}' for i in range(3)]

# 분해한 'force' 컬럼을 추가
train_df = train_df.drop('force', axis=1).join(force_df)

# 'force' 컬럼의 값을 분해하여 각각의 행으로 만듦
force_df = test_df['force'].apply(pd.Series)
force_df.columns = [f'force{i}' for i in range(3)]

# 분해한 'force' 컬럼을 추가
test_df = test_df.drop('force', axis=1).join(force_df)
test_df.head()

# 데이터프레임에서 값 추출
sequences_train = [train_df.iloc[start:end].values for start, end in bundles_train]
sequences_test = [test_df.iloc[start:end].values for start, end in bundles_test]

# [Energy] Hyperparameter Setting

In [None]:
input_size = 6  # feature 개수
hidden_size = 256
output_size = 1 # target 개수
num_epochs = 3
batch_size = 64
learning_rate = 0.001

# [Energy] Dataset

In [None]:
# 패딩을 사용하여 모든 시퀀스의 길이를 동일하게 만듦
max_len = max(seq.shape[0] for seq in sequences_train)
padded_sequences = [np.vstack([seq, np.zeros((max_len - seq.shape[0], 6))]) for seq in sequences_train]

# 패딩된 시퀀스를 2차원 배열로 변환
padded_array_train = np.stack(padded_sequences)
X_tensor_train = torch.tensor(padded_array_train, dtype=torch.float32)
y_tensor_train = torch.tensor(energies, dtype=torch.float32).view(-1, 1)
train_dataset = TensorDataset(X_tensor_train, y_tensor_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# 패딩을 사용하여 모든 시퀀스의 길이를 동일하게 만듦
max_len = max(seq.shape[0] for seq in sequences_test)
padded_sequences = [np.vstack([seq, np.zeros((max_len - seq.shape[0], 6))]) for seq in sequences_test]

# 패딩된 시퀀스를 2차원 배열로 변환
padded_array_test = np.stack(padded_sequences)
X_tensor_test = torch.tensor(padded_array_test, dtype=torch.float32)
test_dataset = TensorDataset(X_tensor_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# [Energy] Model

In [None]:
# BiLSTM 모델 정의
class EnergyModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1, dropout_rate=0.5):
        super(EnergyModel, self).__init__()
        
        # Bidirectional LSTM with Dropout
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, 
                            batch_first=True, 
                            dropout=dropout_rate,
                            bidirectional=True)
        
        # Bidirectional LSTM이므로 hidden_size 조정
        self.fc1 = nn.Linear(hidden_size * 2, hidden_size)
        self.fc2_mean = nn.Linear(hidden_size, 1)
        self.fc2_variance = nn.Linear(hidden_size, 1)
        self.relu = nn.ReLU()
        self.batchnorm = nn.BatchNorm1d(hidden_size)
        self.dropout = nn.Dropout(dropout_rate)
        self.softplus = nn.Softplus()  
        
    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        x = self.fc1(lstm_out[:, -1, :])
        x = self.relu(x)
        x = self.batchnorm(x)
        x = self.dropout(x)

        mean = self.fc2_mean(x)
        variance = self.softplus(self.fc2_variance(x))
        return mean, variance

# 모델, 손실 함수, 옵티마이저 초기화
model = EnergyModel(input_size, hidden_size).to(device)
criterion = nn.GaussianNLLLoss() 
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# [Energy] Train

In [None]:
print("Training Start!!")

# 학습
model.train()
for epoch in range(num_epochs):
    print(f"{epoch+1}/{num_epochs} epoch..")    
    for inputs, labels in tqdm(train_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        
        mean, variance = model(inputs)

        loss = criterion(mean, labels, variance) 
        
        loss.backward()
        optimizer.step()
        
print("Training Complete!")     

# [Energy] Inference

In [None]:
print("Inference Start!")

model.eval()

preds_mean = []
preds_variance = []
with torch.no_grad():
    for inputs in tqdm(test_loader):
        inputs = inputs[0].to(device)

        mean, variance = model(inputs)
        pred_mean = mean.detach().cpu().numpy()
        pred_variance = variance.detach().cpu().numpy()

        preds_mean.extend(pred_mean)
        preds_variance.extend(pred_variance)

print("Inference Complete!")    

# [Energy] Submission

In [None]:
preds_mean = [pred.item() for pred in preds_mean]
preds_variance = [pred.item() for pred in preds_variance]

sample['energy'] = preds_mean
sample['energy_uncertainty'] = preds_variance
sample

# forces 열을 리스트로 변환한 후 문자열로 저장
sample['forces'] = sample['forces'].apply(lambda x: str(x.tolist()))

sample.to_csv('./baseline.csv',index=False)