In [1]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from ase.io import read,write
from ase import Atoms, Atom
from ase.visualize import view

import numpy as np
from pymatgen.core.structure import Structure
from pymatgen.io.ase import AseAtomsAdaptor
import pymatgen as mg

from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from torch.autograd import Variable
import torch.optim as optim
from torchtext import data # torchtext.data 임포트
from torchtext.data import Iterator
from torch.utils.data import Dataset, DataLoader


import csv
import pandas as pd

# from data import CIFData, AtomCustomJSONInitializer, GaussianDistance
import os
import csv
import random


In [2]:
import pandas as pd
df = pd.read_csv('/home/cut6089/research/GASpy/data/co_motif.csv')

target_df = df[['name','target' ]]

# Make DataSet

## Utils

In [3]:

def remove_adsorbates(atoms):
    binding_sites = []
    adsorbates_index = []
    
    for atom in atoms:
        if atom.tag == 1:
            binding_sites.append([atom.symbol,list(atom.position)])
            adsorbates_index.append(atom.index)
    del atoms[adsorbates_index]
    bare_slab = atoms.copy()
    
    return binding_sites, bare_slab

def get_nearest_atoms(atoms):
    # view(atoms)
    binding_sites, slab = remove_adsorbates(atoms)
    binding_sites.sort(key = lambda x: x[1][2] )    
    
    copied_atom = slab.copy()
    copied_atom += Atom(binding_sites[0][0],binding_sites[0][1],tag =1 )
    copied_atom = copied_atom.repeat((3,3,1))
    ads_index = np.where((copied_atom.get_tags()) ==1)[0][4]
    
    structure = AseAtomsAdaptor.get_structure(copied_atom)
    # nn = structure.get_neighbors(site=structure[ads_index] , r= min(structure.lattice.abc))
    nn = structure.get_neighbors(site=structure[ads_index] , r= 10)
    nn.sort(key = lambda x : x[1]) # sort nearest atoms
    nn_index = [nn[i][2] for i in range(len(nn))]
    nn_distances =[nn[i][1] for i in range(len(nn))]
    
    
    # view(copied_atom)
    return copied_atom, nn_index,nn_distances

def get_atom_property(atoms):
    global feature
    feature_df = pd.DataFrame(columns=[f'feat{i}' for i in range(10)])
    atom, nn_index,nn_distances = get_nearest_atoms(atoms)
    elements = [atom[i].symbol for i in nn_index[:15]]
    distances = nn_distances[:15]
    def block_to_num(block):
        """
        Convert blokc to number

        Args:
            block (str) : 's', 'p', 'd' or 'f'

        Return:
            int : 1, 2, 3, 4
        """

        if block == 's':
            return 1
        elif block == 'p':
            return 2
        elif block == 'd':
            return 3
        elif block == 'f':
            return 4

    for el, distance in zip(elements, distances):
        e = mg.core.Element(el)
        atomic_number = e.Z
        average_ionic_radius = e.average_ionic_radius.real

        # Lowest oxidiation state of the element is used as common oxidation state
        common_oxidation_states = e.common_oxidation_states[0]
        Pauling_electronegativity = e.X
        row = e.row
        group = e.group
        thermal_conductivity = e.thermal_conductivity.real
        boiling_point = e.boiling_point.real
        melting_point = e.melting_point.real
        block = block_to_num(e.block)
        IE = e.ionization_energy
        
        feature_df.loc[len(feature_df)] = [atomic_number, common_oxidation_states, Pauling_electronegativity, 
                                           row, group, thermal_conductivity, boiling_point,
                                           melting_point, block, IE]
        feature = feature_df.to_numpy()
        feature = feature/distance
        feature = torch.Tensor(feature)
        # feature.unsqueeze_(0).shape
    return feature


In [4]:
class make_dataset(Dataset):
    def __init__(self, root_dir, dmin = 0, step = 0.2, random_seed = 123):
        self.root_dir = root_dir
        target_file = os.path.join(self.root_dir, 'data/co_target.csv')
        target_df = pd.read_csv(f'{self.root_dir}data/co_target.csv')
        random.seed(random_seed)
        self.target_df = target_df.sample(frac=1).reset_index(drop= True)
    
    def __len__(self):
        return len(self.target_df)
    
    def __getitem__(self, idx):
        traj_id = self.target_df['name'][idx]
        target = self.target_df['target'][idx]
        atoms = read(f'{self.root_dir}final_CO_slab/{traj_id}.traj')
        feature = get_atom_property(atoms)

        
        return feature, target
    
def collate_fn(dataset_list):
    """
    list of tuples for each data point.
    """
    batch_feature = []
    batch_target = []
    for  i, (feature, target) in enumerate(dataset_list):
        batch_feature.append(feature)
        batch_target.append(target)
    batch_feature = torch.nn.utils.rnn.pad_sequence(batch_feature, batch_first = True) 
    batch_target = torch.Tensor(batch_target)
    return batch_feature,batch_target
    

In [75]:
dataset= make_dataset(root_dir= '/home/cut6089/research/GASpy/')
total_size = len(dataset)
indices = list(range(total_size))
train_ratio = 0.7
valid_ratio = 0.15
train_size = int(total_size * train_ratio)
valid_size = int(total_size * valid_ratio)

train_sampler =SubsetRandomSampler(indices[:train_size])
# valid_sampler = SubsetRandomSampler(indices[:valid_size])
train_loader = DataLoader(dataset, batch_size = 50,sampler=train_sampler, num_workers=1, collate_fn = collate_fn )
dataiter = iter(train_loader)
features, targets = next(dataiter)
# features.shape
targets.shape
features.shape

torch.Size([50, 15, 10])

In [55]:
struct_files = '/home/cut6089/research/GASpy/final_CO_slab/'
           
atoms = read(struct_files+'mp-10010_5d83020d30582ea2977a314b.traj') 
structure = AseAtomsAdaptor.get_structure(atoms)
R   = atoms.get_positions()
D_ij = np.linalg.norm(R[:, None, :] - R[None,:,:], axis =-1)
D  = D_ij[0,:15].reshape(1,-1)
D.shape


(1, 15)

In [None]:
torch.matmul(features[0],torch.Tensor(D)).shape

# Model

In [64]:
def initialize_weight(x):
    nn.init.xavier_uniform_(x.weight)
    if x.bias is not None:
        nn.init.constant_(x.bias, 0)
        

class MultiHeadAttention(nn.Module):
    def __init__(self, hidden_size, dropout_rate , head_size = 8):
        super(MultiHeadAttention, self).__init__()
        
        self.hidden_size = hidden_size
        self.dropout_rate = dropout_rate
        self.head_size = head_size
        self.att_size = att_size = hidden_size // head_size
        self.scale = att_size ** -0.5
        
        self.linear_q = nn.Linear(hidden_size, head_size * att_size, bias = False)
        self.linear_k = nn.Linear(hidden_size, head_size * att_size, bias = False)
        self.linear_v = nn.Linear(hidden_size, head_size * att_size, bias = False)
        initialize_weight(self.linear_q)
        initialize_weight(self.linear_k)
        initialize_weight(self.linear_v)
        
        self.att_dropout = nn.Dropout(dropout_rate)
        self.output_layer = nn.Linear(head_size * att_size, hidden_size, bias = False)
        initialize_weight(self.output_layer)
        
    def forward(self, q, k, v):
        orig_q_size = q.size()
        
        d_k = self.att_size
        d_v = self.att_size
        batch_size = q.size(0)
        
        # head_i 
        q = self.linear_q(q).view(batch_size, -1, self.head_size, d_k)
        k = self.linear_k(k).view(batch_size, -1 , self.head_size, d_k)
        v = self.linear_v(v).view(batch_size, -1, self.head_size, d_v)
        
        q= q.transpose(1,2)
        v = v.transpose(1, 2)
        k = k.transpose(1, 2).transpose(2,3)
        
        # scaled dot product
        q.mul_(self.scale)
        x = torch.matmul(q,k)
        x = torch.softmax(x, dim = 3)
        x = self.att_dropout(x)
        x= x.matmul(v)
        
        x= x.transpose(1,2).contiguous()
        x = x.view(batch_size, -1, self.head_size * d_v)
        
        x = self.output_layer(x)
        return (x)

class FeedForwardNetwork(nn.Module):
    def __init__(self, hidden_size, filter_size, dropout_rate):
        super(FeedForwardNetwork, self).__init__()

        self.layer1 = nn.Linear(hidden_size, filter_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)
        self.layer2 = nn.Linear(filter_size, hidden_size)

        initialize_weight(self.layer1)
        initialize_weight(self.layer2)

    def forward(self, x):
        x = self.layer1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.layer2(x)
        return x

    
class EncoderLayer(nn.Module):
    def __init__(self, hidden_size, filter_size, dropout_rate):
        super(EncoderLayer, self).__init__()

        self.self_attention_norm = nn.LayerNorm(hidden_size, eps=1e-6)
        self.self_attention = MultiHeadAttention(hidden_size, dropout_rate)
        self.self_attention_dropout = nn.Dropout(dropout_rate)

        self.ffn_norm = nn.LayerNorm(hidden_size, eps=1e-6)
        self.ffn = FeedForwardNetwork(hidden_size, filter_size, dropout_rate)
        self.ffn_dropout = nn.Dropout(dropout_rate)

    def forward(self, x):  # pylint: disable=arguments-differ
        y = self.self_attention_norm(x)
        y = self.self_attention(y, y, y)
        y = self.self_attention_dropout(y)
        x = x + y

        y = self.ffn_norm(x)
        y = self.ffn(y)
        y = self.ffn_dropout(y)
        x = x + y
        return x

# class DecoderLayer(nn.Module):
#     def __init__(self, hidden_size, filter_size, dropout_rate):
#         super(DecoderLayer, self).__init__()
        
#         self.self_attention_norm = nn.LayerNorm(hidden_size, eps = 1e-6)
#         self.self_attention = MultiHeadAttentioin(hidden_size=hidden_size, dropout_rate=dropout_rate)
#         self.self_attention_dropout = nn.Dropout(dropout_rate)
        
#         self.enc_dec_attention_norm = nn.LayerNorm(hidden_size, eps=1e-6)
#         self.enc_dec_attention = MultiHeadAttentioin(hidden_size, dropout_rate)
#         self.enc_dec_attention_dropout = nn.Dropout(dropout_rate)
        
#         self.ffn_norm =  nn.LayerNorm(hidden_size, eps = 1e-6)
#         self.ffn =  FeedForwardNetwork(hidden_size, filter_size,  dropout_rate)
#         self.ffn_dropout = nn.Dropout(dropout_rate)
        
#     def forward(self, x, enc_output):
#         y = self.self_attention_norm(x)
#         y = self.self_attention(y,y,y)
#         y = self.self_attention_dropout(y)
#         x = x+y
        
#         y = self.enc_dec_attention_norm(x)
#         y = self.enc_dec_attention_norm(y, enc_output, enc_output)
#         y = self.enc_dec_attention_dropout(y)
#         x= x+y
        
#         y= self.ffn_norm(x)
#         y = self.ffn(y)
#         y = self.ffn_dropout(y)
#         x = x+y
#         return x
    
class DecoderLayer(nn.Module):
    def __init__(self, hidden_size, filter_size, dropout_rate):
        super(DecoderLayer, self).__init__()

        self.self_attention_norm = nn.LayerNorm(hidden_size, eps=1e-6)
        self.self_attention = MultiHeadAttention(hidden_size, dropout_rate)
        self.self_attention_dropout = nn.Dropout(dropout_rate)

        self.enc_dec_attention_norm = nn.LayerNorm(hidden_size, eps=1e-6)
        self.enc_dec_attention = MultiHeadAttention(hidden_size, dropout_rate)
        self.enc_dec_attention_dropout = nn.Dropout(dropout_rate)

        self.ffn_norm = nn.LayerNorm(hidden_size, eps=1e-6)
        self.ffn = FeedForwardNetwork(hidden_size, filter_size, dropout_rate)
        self.ffn_dropout = nn.Dropout(dropout_rate)

    def forward(self, x, enc_output):
        y = self.self_attention_norm(x)
        y = self.self_attention(y, y, y)
        y = self.self_attention_dropout(x)
        x = x + y

        if enc_output is not None:
            y = self.enc_dec_attention_norm(x)
            y = self.enc_dec_attention(y, enc_output, enc_output)
            y = self.enc_dec_attention_dropout(y)
            x = x + y

        y = self.ffn_norm(x)
        y = self.ffn(y)
        y = self.ffn_dropout(y)
        x = x + y
        return x
    
    
class Encoder(nn.Module):
    def __init__(self, hidden_size, filter_size, dropout_rate, n_layers):
        super(Encoder, self).__init__()
        
        encoders = [EncoderLayer(hidden_size= hidden_size, filter_size= filter_size, dropout_rate = dropout_rate)
                   for _ in range(n_layers)]
        self.layers = nn.ModuleList(encoders)
        
        self.last_norm = nn.LayerNorm(hidden_size, eps = 1e-6)
        
    def forward(self, inputs):
        inputs = inputs.view(-1, 150, 512)
        encoder_output = inputs
        for enc_layer in self.layers:
            encoder_output = enc_layer(encoder_output)
        return self.last_norm(encoder_output)
    
class Decoder(nn.Module):
    def __init__(self, hidden_size, filter_size, dropout_rate, n_layers):
        super(Decoder, self).__init__()
        
        decoders = [DecoderLayer(hidden_size, filter_size, dropout_rate)
                   for _ in range(n_layers)]
        self.layers = nn.ModuleList(decoders)
        self.last_norm = nn.LayerNorm(hidden_size, eps = 1e-6 )
        
    def forward(self, targets, enc_output):
        decoder_ouput = targets
        for dec_layer in self.layers:
            decoder_ouput= dec_layer(decoder_ouput, enc_output)
        return self.last_norm(decoder_ouput)
    

    
    
class Transformer(nn.Module):
    def __init__(self, i_vocab_size, t_vocab_size, n_layers = 6, hidden_size = 512,
                filter_size = 2048, dropout_rate = 0.1):
        super(Transformer, self).__init__()
        
        self.hidden_size = hidden_size
        self.emb_scale=  hidden_size ** 0.5
        
        # self.t_vocab_embedding = nn.Embedding(t_vocab_size, hidden_size)
        self.input_normalize = nn.LayerNorm(10,eps =1e-6)
        self.target_normalize = nn.LayerNorm(1, eps= 1e-6)
        
        self.t_vocab_embedding = nn.Embedding(t_vocab_size, hidden_size)
        nn.init.normal_(self.t_vocab_embedding.weight, mean = 0, std = hidden_size ** -0.5)
        self.t_emb_dropout = nn.Dropout(dropout_rate)
        
        self.i_vocab_embedding = nn.Embedding(i_vocab_size, hidden_size)
        nn.init.normal_(self.i_vocab_embedding.weight, mean = 0 , std = hidden_size ** -0.5)
        self.i_emb_dropout = nn.Dropout(dropout_rate)
        self.encoder = Encoder(hidden_size, filter_size, dropout_rate, n_layers)
        self.decoder = Decoder(hidden_size, filter_size, dropout_rate, n_layers)
        # self.out = nn.Linear(t_vocab_size *1, 1)
        self.out = nn.Linear(hidden_size, 1)
    
    def forward(self, inputs, targets):
        # input_normed = self.input_normalize(inputs.float()).long()
        enc_output = self.encode(inputs)
        return  self.decode(targets, enc_output)
    
    def encode(self, inputs):
        inputs = self.input_normalize(inputs.to(torch.float32)).long()
        # Input embedding
        input_embedded = self.i_vocab_embedding(inputs.long())
        input_embedded *- self.emb_scale
        input_embedded = self.i_emb_dropout(input_embedded)
        
        return self.encoder(input_embedded)
    
    
    
    def decode(self, targets, enc_output):
        # target embedding
        targets = self.target_normalize(targets.view(-1,1).to(torch.float32)).long()
        target_embedded = self.t_vocab_embedding(targets.long())
        # target_embedded *= self.emb_scale
        target_embedded = self.t_emb_dropout(target_embedded).to(torch.float32)
        decoder_output = self.decoder(target_embedded, enc_output)
        # output = torch.matmul(decoder_output, self.t_vocab_embedding.weight.transpose(0,1))
        weights =self.t_vocab_embedding.weight.unsqueeze(0).transpose(1,2)
        # output = torch.matmul(decoder_output, weights)
        # output = self.out(output).squeeze()
        output = self.out(decoder_output.squeeze()).view(-1)

        return output
    
        
        

In [76]:
model = Transformer(i_vocab_size= 100, t_vocab_size=100, n_layers=6, hidden_size=512, filter_size=2048,  dropout_rate=0.1)

enc_output = model.encode(features[0])
nn.Linear(512,1)(enc_output).view(1,150).view(15,-1).shape

torch.Size([15, 10])

In [81]:
D_ij[:15,:15]

(15, 15)