<a href="https://colab.research.google.com/github/dmamur/elembert/blob/main/notebooks/elembert_classification_matbenchV0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install icc_rt --no-deps
!pip install matbench



In [2]:
!pip install ase
!git clone https://github.com/dmamur/elementsem.git
!git clone https://github.com/hackingmaterials/matbench
!pip install ./matbench
!pip install pymatgen
!pip install pydantic==1.10.12 --force-reinstall

fatal: destination path 'elementsem' already exists and is not an empty directory.
fatal: destination path 'matbench' already exists and is not an empty directory.
Processing ./matbench
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: matbench
  Building wheel for matbench (pyproject.toml) ... [?25l[?25hdone
  Created wheel for matbench: filename=matbench-0.6-py3-none-any.whl size=5449480 sha256=0228b282c053c5227925593bd8b60fdacfe5bc8172802f2c7c5e2c2f8190660b
  Stored in directory: /tmp/pip-ephem-wheel-cache-7shj9nvn/wheels/0d/71/69/39ae3b7c60edd18e46ed290a09bc2fe0c344372520cd165c70
Successfully built matbench
Installing collected packages: matbench
  Attempting uninstall: matbench
    Found existing installation: matbench 0.6
    Uninstalling matbench-0.6:
      Successfully uninstalled matbench-0.6
Successfully installed m

Restart the session and run again.

In [3]:
import pickle,random
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Embedding,Dense,Dropout,Input,Lambda
from tensorflow.keras.models import Model
import tensorflow.keras.backend as K
import re, glob,os,sys
from collections import defaultdict
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import to_categorical

def bert_module(query, key, value, i, config):
    # Multi headed self-attention
    attention_output = layers.MultiHeadAttention(
        num_heads=config.NUM_HEAD,
        key_dim=config.EMBED_DIM // config.NUM_HEAD,
        name="encoder_{}/multiheadattention".format(i),
        kernel_initializer=tf.keras.initializers.GlorotUniform(seed=config.seed)
    )(query, key, value)
    attention_output = layers.Dropout(0.1, name="encoder_{}/att_dropout".format(i))(
        attention_output
    )
    attention_output = layers.LayerNormalization(
        epsilon=1e-6, name="encoder_{}/att_layernormalization".format(i)
    )(query + attention_output)

    # Feed-forward layer
    ffn = tf.keras.Sequential(
        [
            layers.Dense(config.FF_DIM, activation="relu",kernel_initializer=tf.keras.initializers.GlorotUniform(seed=config.seed)),
            layers.Dense(config.EMBED_DIM,kernel_initializer=tf.keras.initializers.GlorotUniform(seed=config.seed)),
        ],
        name="encoder_{}/ffn".format(i),
    )
    ffn_output = ffn(attention_output)
    ffn_output = layers.Dropout(0.1, name="encoder_{}/ffn_dropout".format(i))(
        ffn_output
    )
    sequence_output = layers.LayerNormalization(
        epsilon=1e-6, name="encoder_{}/ffn_layernormalization".format(i)
    )(attention_output + ffn_output)
    return sequence_output


def get_pos_encoding_matrix(max_len, d_emb):
    pos_enc = np.array(
        [
            [pos / np.power(10000, 2 * (j // 2) / d_emb) for j in range(d_emb)]
            if pos != 0
            else np.zeros(d_emb)
            for pos in range(max_len)
        ]
    )
    pos_enc[1:, 0::2] = np.sin(pos_enc[1:, 0::2])  # dim 2i
    pos_enc[1:, 1::2] = np.cos(pos_enc[1:, 1::2])  # dim 2i+1
    return pos_enc

def create_elembert_model(inputs,config):
    word_embeddings = layers.Embedding(config.VOCAB_SIZE, config.EMBED_DIM, mask_zero=True,name="element_embdgs",
                                       embeddings_initializer=tf.keras.initializers.GlorotUniform(seed=config.seed))(inputs)
    position_embeddings = layers.Embedding(input_dim=config.MAX_LEN,output_dim=config.EMBED_DIM,
                                           weights=[get_pos_encoding_matrix(config.MAX_LEN, config.EMBED_DIM)],
                                           #embeddings_initializer=tf.keras.initializers.GlorotUniform(seed=config.seed),
                                           name="position_embedding",)(tf.range(start=0, limit=config.MAX_LEN, delta=1))
    embeddings = word_embeddings + position_embeddings
    encoder_output = embeddings
    for i in range(config.NUM_LAYERS):
        encoder_output = bert_module(encoder_output, encoder_output, encoder_output, i, config)
    mlm_model = Model(inputs, encoder_output, name="masked_bert_model")

    return mlm_model


In [4]:
class Config:
    MAX_LEN = 128
    BATCH_SIZE = 128
    LR = 0.001
    VOCAB_SIZE = 128
    EMBED_DIM = 32
    NUM_HEAD = 2 # used in bert model
    FF_DIM = 32 # used in bert model
    NUM_LAYERS = 2
    MNAME = 'elembertR_'
    MVER = 'V0'
    DSPATH="/content/elementsem/data/"
    PREPATH="/content/elementsem/models/pretrained/"
    PATH="/content/elementsem/models/"
config = Config()
seed=12345
tf.random.set_seed(seed)
np.random.seed(seed)

In [5]:
with open(config.PREPATH+'/el2id'+config.MVER+'.pkl', 'rb') as f:
    db = pickle.load(f)

element2id = db['el2id']
config.VOCAB_SIZE = len(element2id)
print('vocabSize: ', config.VOCAB_SIZE)

def getTypesXYZ(file):
    xyz,types=[],[]
    lattice=file['lattice']['matrix']
    for n in file['sites']:
        types.append(n['label'])
        xyz.append(n['xyz'])
    return ['[CLS]']+types+['[SEP]'],lattice,np.asarray(xyz)

def getModelInputs(inputs):
    typesLst,pdfAtoms = [],[]
    for i0,i in enumerate(inputs):
        types,lattice,coords=getTypesXYZ(i.as_dict())
        typesLst.append([element2id[i] for i in types])
    return typesLst

vocabSize:  101


# Define model

In [6]:
inputC = Input((config.MAX_LEN), dtype=tf.int64,name='types')
def scheduler(epoch):
    initial_lrate = config.LR
    drop = 0.92
    epochs_drop = 16
    lr = initial_lrate * np.power(drop, np.floor((1+epoch)/epochs_drop))
    if lr<0.0001:
        lr = 0.0001
    return lr

# Matbench: is_metal task

In [7]:
from matbench.bench import MatbenchBenchmark
from ase import Atoms
from ase.io import read,write

mb = MatbenchBenchmark(autoload=True,subset=["matbench_mp_is_metal"])


2024-01-28 11:42:53 INFO     Initialized benchmark 'matbench_v0.1' with 1 tasks: 
['matbench_mp_is_metal']


INFO:matbench:Initialized benchmark 'matbench_v0.1' with 1 tasks: 
['matbench_mp_is_metal']


In [None]:
randseed = np.asarray([38005, 26930, 57873, 37766, 62593])
for task in mb.tasks:
    task.load()
    for fold in task.folds:
        print(fold)
        # Inputs are either chemical compositions as strings
        # or crystal structures as pymatgen.Structure objects.
        # Outputs are either floats (regression tasks) or bools (classification tasks)
        train_inputs, train_outputs = task.get_train_and_val_data(fold)
        typesLst = getModelInputs(train_inputs)
        x = tf.keras.preprocessing.sequence.pad_sequences(typesLst,dtype='int64',padding= 'post',truncating='post',maxlen=config.MAX_LEN)
        # train and validate your model
        labels=to_categorical(np.asarray(train_outputs.astype(int)))
        p = np.random.RandomState(seed=seed).permutation(len(labels))
        n = len(p)
        trainidx=p[:int(n*0.9)]
        validx = p[int(n*0.9):]
        config.seed = randseed[fold]
        z = create_elembert_model(inputC,config)
        e = Lambda(lambda x: x[:,0],name='clsTokenEmb')(z.output)
        f = Dense(2, activation="softmax",name='out_tox',kernel_initializer=tf.keras.initializers.GlorotUniform(config.seed))(e)
        # Create and compile the model (assuming create_elembert_model function is available)
        model = Model(inputs=z.input, outputs=f)

        optimizer = tf.keras.optimizers.Adam(learning_rate=config.LR)
        model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=[tf.keras.metrics.AUC()])
        trainlog = model.fit(x=x[trainidx], y=labels[trainidx], validation_data = (x[validx],labels[validx]),
                             verbose = 0,epochs = config.BATCH_SIZE, batch_size = config.BATCH_SIZE,
                             callbacks = tf.keras.callbacks.LearningRateScheduler(scheduler))

        # Get testing data
        test_inputs,test_outputs = task.get_test_data(fold, include_target=True)

        # Predict on the testing data
        # Your output should be a pandas series, numpy array, or python iterable
        # where the array elements are floats or bools
        typesLstTest = getModelInputs(test_inputs)
        xtest = tf.keras.preprocessing.sequence.pad_sequences(typesLstTest,dtype='int64',padding= 'post',truncating='post',maxlen=config.MAX_LEN)
        predictions = model.predict(xtest,batch_size=config.BATCH_SIZE)
        m = tf.keras.metrics.AUC()
        m.update_state(to_categorical(np.asarray(test_outputs.astype(int))),predictions)
        print('Fold:', fold, ' AUC:',m.result().numpy(),'binary tf-acc:',tf.keras.metrics.binary_accuracy(np.asarray(test_outputs.astype(int)),predictions.argmax(axis=-1), threshold=0.5).numpy())
        # Record your data!
        task.record(fold, np.asarray(predictions.argmax(axis=-1),dtype=bool))


2024-01-28 11:42:53 INFO     Dataset matbench_mp_is_metal already loaded; not reloading dataset.


INFO:matbench.task:Dataset matbench_mp_is_metal already loaded; not reloading dataset.


0
Fold: 0  AUC: 0.94794345 binary tf-acc: 0.8823446
2024-01-28 12:08:09 INFO     Recorded fold matbench_mp_is_metal-0 successfully.


INFO:matbench.task:Recorded fold matbench_mp_is_metal-0 successfully.


1
Fold: 1  AUC: 0.9451462 binary tf-acc: 0.8816379
2024-01-28 12:33:25 INFO     Recorded fold matbench_mp_is_metal-1 successfully.


INFO:matbench.task:Recorded fold matbench_mp_is_metal-1 successfully.


2
Fold: 2  AUC: 0.9447557 binary tf-acc: 0.87829244
2024-01-28 12:58:42 INFO     Recorded fold matbench_mp_is_metal-2 successfully.


INFO:matbench.task:Recorded fold matbench_mp_is_metal-2 successfully.


3
Fold: 3  AUC: 0.94661325 binary tf-acc: 0.8812553
2024-01-28 13:23:58 INFO     Recorded fold matbench_mp_is_metal-3 successfully.


INFO:matbench.task:Recorded fold matbench_mp_is_metal-3 successfully.


4


In [None]:
mb.to_file('elembert_matbenchmark.json.gz')

# Save results

In [None]:
extractorEmb = Model(inputs=model.inputs,outputs=model.get_layer(name="clsTokenEmb").output)

dbresults={}
dbresults['pred_emb'] = extractorEmb.predict(xtest,batch_size = 32)
dbresults['y_cls'] = to_categorical(np.asarray(test_outputs.astype(int)))
dbresults['types'] = xtest
dbresults['pred_cls'] = predictions
dbresults['trainidx'] = trainidx
dbresults['testidx'] = np.arange(len(xtest))
dbresults['validx'] = validx

# Postprocessing

In [None]:
from matplotlib import cm
from sklearn.manifold import TSNE
from sklearn import metrics
import matplotlib.pyplot as plt
import seaborn as sns
tsne = TSNE(n_components=2, verbose=0, random_state=123)
z = tsne.fit_transform(dbresults['pred_emb'])

In [None]:
dfr = pd.DataFrame()
dfr["y"] = np.asarray(dbresults['y_cls'].argmax(axis=-1))
dfr["yp"] = np.asarray(dbresults['pred_cls'].argmax(axis=-1))
dfr["tSNE1"] = z[:,0]
dfr["tSNE2"] = z[:,1]

In [None]:
import matplotlib.pyplot as plt
plt.rcParams.update({'font.size': 16})
fig, axes = plt.subplots(1, 3, figsize=(15, 4.8))
y_pclasses = dbresults['pred_cls'].argmax(axis=-1)
y_classes = dbresults['y_cls'].argmax(axis=-1)
print('binary tf-acc:',tf.keras.metrics.binary_accuracy(y_classes, y_pclasses, threshold=0.5).numpy())
m = tf.keras.metrics.AUC()
m.update_state(dbresults['y_cls'], dbresults['pred_cls'])
print('Fold: ',fold, ' AUC:',m.result().numpy())
confusion_matrix = metrics.confusion_matrix(y_classes, y_pclasses)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix)
                                            #display_labels=['False','True'])
cm_display.plot(ax=axes[0],cmap=plt.cm.Blues)
dfr=dfr.sort_values(by=['y'])
sns.scatterplot(ax=axes[1],x="tSNE1", y="tSNE2", hue=dfr.y.tolist(),data=dfr,alpha=0.3)#.set(title=title+" data T-SNE projection")
axes[1].set_xlabel('$tSNE_1$')
axes[1].set_ylabel('$tSNE_2$',labelpad=1)
axes[1].set_title('Reference')
#axes[1].legend('',frameon=False)
dfr=dfr.sort_values(by=['yp'])
sns.scatterplot(ax=axes[2],x="tSNE1", y="tSNE2", hue=dfr.yp.tolist(),data=dfr,alpha=0.3)#.set(title=title+" data T-SNE projection")
axes[2].set_title('Predicted')
axes[2].set_xlabel('$tSNE_1$')
axes[2].set_ylabel('$tSNE_2$',labelpad=1)
axes[2].legend('',frameon=False)
fig.tight_layout()
plt.show()