In [None]:
from mp_api.client import MPRester
import numpy as np
from mendeleev import element
import re
import inspect 
from IPython.utils import io
from monty.serialization import dumpfn, loadfn
import time
import gzip
import json
from PIL import Image 
import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.rcParams.update(
    {
        'text.usetex': False,
        'font.family': 'stixgeneral',
        'mathtext.fontset': 'stix',
    })
import cv2
from sklearn.metrics import confusion_matrix
import seaborn as sns
import heapq
import dtreeviz

from pymatgen.analysis.diffraction.xrd import XRDCalculator
from pymatgen.symmetry.analyzer import SpacegroupAnalyzer
from pymatgen.analysis.diffraction.tem import TEMCalculator

import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, BatchNormalization
from keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.optimizers import schedules

import sklearn
from sklearn import metrics
from sklearn.metrics import confusion_matrix
from sklearn.ensemble import ExtraTreesClassifier, ExtraTreesRegressor, RandomForestClassifier
from sklearn.tree import plot_tree


In [None]:
chemical_elements = ['H', 'He', 'Li', 'Be', 'B', 'C', 'N', 'O', 'F', 'Ne', 'Na', 'Mg', 'Al', 'Si', 'P', 'S', 'Cl', 'Ar', 'K', 'Ca', 'Sc', 'Ti', 'V', 'Cr', 'Mn', 'Fe', 'Co', 'Ni', 'Cu', 'Zn', 'Ga', 'Ge', 'As', 'Se', 'Br', 'Kr', 'Rb', 'Sr', 'Y', 'Zr', 'Nb', 'Mo', 'Tc', 'Ru', 'Rh', 'Pd', 'Ag', 'Cd', 'In', 'Sn', 'Sb', 'Te', 'I', 'Xe', 'Cs', 'Ba', 'La', 'Ce', 'Pr', 'Nd', 'Pm', 'Sm', 'Eu', 'Gd', 'Tb', 'Dy', 'Ho', 'Er', 'Tm', 'Yb', 'Lu', 'Hf', 'Ta', 'W', 'Re', 'Os', 'Ir', 'Pt', 'Au', 'Hg', 'Tl', 'Pb', 'Bi', 'Po', 'At', 'Rn', 'Fr', 'Ra', 'Ac', 'Th', 'Pa', 'U', 'Np', 'Pu', 'Am', 'Cm', 'Bk', 'Cf', 'Es', 'Fm', 'Md', 'No', 'Lr', 'Rf', 'Db', 'Sg', 'Bh', 'Hs', 'Mt', 'Ds', 'Rg', 'Cn', 'Nh', 'Fl', 'Mc', 'Lv', 'Ts', 'Og']
atomic_masses = [element(elem).mass for elem in chemical_elements]

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]
        
def gzip_load(file):
    with gzip.open(file, "r") as f:
        data = f.read()
        data = json.loads(data.decode('utf-8'))  
        return [AttrDict(i) for i in data]

def get_atom_vector(material, atoms, natoms=True, ndensity=True):
    atom_vector = np.zeros(len(chemical_elements))
    atoms_ = re.findall(r'[A-Z][a-z]*|\d+', re.sub('[A-Z][a-z]*(?![\da-z])', r'\g<0>1', atoms))
    if natoms:
        for i in range( len(atoms_)//2 ): atom_vector[ chemical_elements.index(atoms_[2*i]) ] += int( atoms_[2*i+1]  )
    else:
        for i in range( len(atoms_)//2 ): atom_vector[ chemical_elements.index(atoms_[2*i]) ] += 1
    if ndensity: atom_vector = np.append(atom_vector, material.density)
    return atom_vector

mpr = MPRester("YOUR_API")
calculator = XRDCalculator(wavelength='CuKa')
TEMcalculator = TEMCalculator(voltage = 200, beam_direction = (0, 0, 1))
_p_ = None  
try: _p_ = np.load("_p_.npy") # To shuffle the materials
except: pass

class dotdict(dict):
    """dot.notation access to dictionary attributes"""
    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__

In [None]:
# Get the materials   
try:
    print("Loading materials...")
    start = time.time()
    materials = np.load("materials.npy", allow_pickle=True) # loadfn("materials.json.gz")
    materials = [dotdict(i) for i in materials]
    #for i in materials: i.symmetry = dotdict(i.symmetry)
    end = time.time()
except:
    start = time.time()
    print("Failed.\nRetrieving materials from The Materials Project...")
    materials = mpr.summary.search(volume=(0,100000000), fields=["material_id", "structure", "formula_pretty", "volume", "symmetry", "xas", "elements", "density", "density_atomic"])
    
    for i in materials:
        dictionary = {"crystal_system": str(i.symmetry.crystal_system), "symbol": i.symmetry.symbol,
                 "number": i.symmetry.number, "point_group": i.symmetry.point_group, "symprec": i.symmetry.symprec,
                 "version": i.symmetry.version}
        i.symmetry = dotdict(dictionary)
        i.xas = str(i.xas)
    #dumpfn(materials, "materials.json.gz")
    end = time.time()
    
print(len(materials), " materials in", np.round((end - start)/60,2), "min.")

In [None]:
# Get the conventional structures
try:
    print("Loading conventional structures...")
    start = time.time()
    conventional_structures = np.load("conventional_structures.npy", allow_pickle=True) #  loadfn("conventional_structures.json.gz")
    end = time.time()
except:
    print("Failed.\nGetting conventional structures...")
    start = time.time()
    conventional_structures = []
    check = 0
    for index, material in enumerate(materials):
        if index%10==0: print( "  "+str(np.round(index/len(materials)*100, 3))+"%    "+str(check)+"    ", end='\r' ) 
        try:
            sga = SpacegroupAnalyzer(material.structure)
            conventional_structure = sga.get_conventional_standard_structure()
            conventional_structures.append(conventional_structure)
            check += 1
        except: conventional_structures.append(material.structure)
    dumpfn(conventional_structures, "conventional_structures.json.gz")
    end = time.time()
    
print(np.round((end - start)/60,2), "min.                    ")

In [None]:
space_groups_230 = []
with open("SpaceGroups.txt",'r') as data_file:
    for line in data_file:
        data = (line.replace(" ","").replace("\n","")).split(",")
        for i in data: 
            if len(i)>0: space_groups_230.append(i)
                
try:
    print("Loading patterns...")
    start = time.time()
    patterns = np.load("Xpatterns.npy", allow_pickle=True) # loadfn("Xpatterns.json.gz")
    patterns = [dotdict({"x": pattern[0], "intensity": pattern[1]}) for pattern in patterns]
    crystal_systems = np.load("crystal_systems.npy", allow_pickle=True)
    point_groups = np.load("point_groups.npy", allow_pickle=True)
    
    end = time.time()
    print(np.round((end - start)/60,2), "min.                    ")
except:
    print("Failed.\nGetting patterns...")
    patterns = []
    crystal_systems = []
    point_groups = []
    check = 0

    for i, conventional_structure in enumerate(conventional_structures):
        try:
            if i%10==0: print( "  "+str(np.round(i/len(conventional_structures)*100,3))+"%    "+str(check)+"      ", end='\r' )      
            pattern = calculator.get_pattern(conventional_structure)
            patterns.append(pattern)
            crystal_systems.append( str(materials[i].symmetry.crystal_system) )
            point_groups.append( materials[i].symmetry.symbol )
            check += 1
        except: pass
    np.save("Xpatterns.npy", patterns)

crystal_systems_classes = [*set(crystal_systems)]
point_groups_classes = space_groups_230

In [None]:
Npeaks = 20 #  Number of peaks to analyze
test_train = 0.9 # Proportion of data that will be used for training
include_intensities = True # To include the intensity of each peak in the input
include_n_peaks = False # To include the number of peaks in the input

max_features = 2*Npeaks if include_intensities else Npeaks
XRD_data = []; crystal_system_data = []; crystal_system_data_hot = []; point_group_data = [];  point_group_data_hot = []
for i, pattern in enumerate(patterns):
    new_data_x = 4*np.pi * np.sin( (pattern.x/2)*np.pi/180 )/1.542   # 4*np.pi * np.sin( (pattern.x/2)*np.pi/180 )/1.542  2*np.sin( (pattern.x/2)*np.pi/180 )/1.542   1.542/(2*np.sin( (pattern.x/2)*np.pi/180 ))    (pattern.x)/90              // np.sin( (pattern.x/2) *np.pi/180 )/1.542
    new_data_I = pattern.intensity/100
    how_many_peaks = len(pattern.intensity)
    if len(new_data_x)>=Npeaks:
        if include_intensities: XRD_data.append( np.concatenate((new_data_x[0:Npeaks],new_data_I[0:Npeaks])) ) # .reshape(2,Npeaks)
        else: XRD_data.append( new_data_x[0:Npeaks] )
    else:
        new_data_x = np.pad(new_data_x, (0,Npeaks-len(new_data_x)), constant_values=(-1))
        new_data_I = np.pad(new_data_I, (0,Npeaks-len(new_data_I)), constant_values=(-1))
        if include_intensities: XRD_data.append( np.concatenate((new_data_x,new_data_I)) )   
        else: XRD_data.append( new_data_x )
    if include_n_peaks:
        XRD_data[-1] = np.append(XRD_data[-1], how_many_peaks)
        
    dvar = 1 if crystal_systems[i]=="Cubic" else 0    
        
    crystal_system_data.append( dvar ) # crystal_systems_classes.index(crystal_systems[i])
    hot_vector = np.zeros( len(crystal_systems_classes) );
    hot_vector[crystal_system_data[-1]] = 1
    crystal_system_data_hot.append(hot_vector)
    hot_vector = np.zeros( 230 );
    point_group_data.append( space_groups_230.index(point_groups[i].replace("Cmce","Cmca")) )
    hot_vector[point_group_data[-1]] = 1
    point_group_data_hot.append(hot_vector)
XRD_data = np.array(XRD_data); crystal_system_data = np.array(crystal_system_data); point_group_data = np.array(point_group_data)  
crystal_system_data_hot = np.array(crystal_system_data_hot); point_group_data_hot = np.array(point_group_data_hot)

def unison_shuffled_copies(a, b, c):
    global _p_
    assert len(a) == len(b) and len(a) == len(c)
    if _p_ is not None: return a[_p_], b[_p_], c[_p_]
    p = np.random.permutation(len(a))
    _p_ = p
    return a[p], b[p], c[p]
Xdata, Y1data, Y2data = unison_shuffled_copies(XRD_data, crystal_system_data, point_group_data)

test_train = int(test_train*len(Xdata))
X_train = Xdata[0:test_train]; Y1_train = Y1data[0:test_train]; Y2_train = Y2data[0:test_train]
X_test = Xdata[test_train:]; Y1_test = Y1data[test_train:]; Y2_test = Y2data[test_train:]

# XRD

In [None]:
# ExtraTreesClassifier for crystalline systems
random_state = 42
get_crystal_system = ExtraTreesClassifier(n_estimators=500, # 600
                            max_depth=30, # 40
                            max_features=100, # 20
                            n_jobs=-1, 
                            random_state=random_state,
                            warm_start=False)
get_crystal_system.fit(X_train, Y1_train)

y1_pred0 = get_crystal_system.predict(X_train)
print('Accuracy of crystal system prediction (train): ', metrics.accuracy_score(Y1_train, y1_pred0)*100, "%")
y1_pred = get_crystal_system.predict(X_test)
print('Accuracy of crystal system prediction (test): ', metrics.accuracy_score(Y1_test, y1_pred)*100, "%")

In [None]:
# ExtraTreesClassifier for space groups
random_state = 24
get_point_group = ExtraTreesClassifier(n_estimators=200, # 200
                            max_depth=25, # 20
                            max_features=100, # max_features
                            n_jobs=-1, 
                            random_state=random_state, # 42
                            warm_start=False)
get_point_group.n_classes_ = 230
get_point_group.fit(X_train, Y2_train)

y2_pred0 = get_point_group.predict(X_train)
print('Accuracy of point group prediction (train): ', metrics.accuracy_score(Y2_train, y2_pred0)*100, "%")
y2_pred = get_point_group.predict(X_test)
print('Accuracy of point group prediction (test): ', metrics.accuracy_score(Y2_test, y2_pred)*100, "%")

# TEM

In [None]:
TEMpatterns = []

try:
    TEMfiles = []
    for (_, _, filenames) in os.walk("./TEMpatterns"):
        for filename in filenames:
            TEMpatterns.append( 255-cv2.imread("./TEMpatterns/"+filename, cv2.IMREAD_GRAYSCALE) )
            TEMfiles.append(int(filename.split(".png")[0]))
    TEMpatterns = np.array( [x for _, x in sorted(zip(TEMfiles, TEMpatterns))] ) 
except:
    for i, conventional_structure in enumerate(conventional_structures[0:100]):
        print( "  "+str(np.round(i/len(conventional_structures[0:100])*100,2))+"%  ", end='\r' ) 
        TEMpattern = TEMcalculator.get_plot_2d(conventional_structure)
        file_name = "TEMpatterns/"+str(i)+".png"
        TEMpattern.write_image(file_name)
        im_gray = (255-cv2.imread(file_name, cv2.IMREAD_GRAYSCALE))[100:470, 90:460]
        TEMpatterns.append(im_gray)
        cv2.imwrite(file_name, im_gray)
    TEMpatterns = np.array(TEMpatterns)/255    

In [None]:
pTEM = np.random.permutation(len(TEMpatterns))
TEMpatterns, Ydata_TEM = TEMpatterns[pTEM], crystal_system_data_hot[0:len(TEMpatterns)][pTEM]
TEMpatterns = np.expand_dims(TEMpatterns, axis=-1)
TEM_train_test = 0.1
NTEM = int(TEM_train_test*len(TEMpatterns))
input_shape = TEMpatterns.shape[1:]

In [None]:
TEM_train_test = 0.1
NTEM = int(TEM_train_test*len(TEMpatterns))
Xtrain_TEM, Ytrain_TEM = TEMpatterns[0:8*NTEM], Ydata_TEM[0:8*NTEM]
Xvalidation_TEM, Yvalidation_TEM = TEMpatterns[8*NTEM:9*NTEM], Ydata_TEM[8*NTEM:9*NTEM]
Xtest_TEM, Ytest_TEM = TEMpatterns[9*NTEM:], Ydata_TEM[9*NTEM:]
input_shape = TEMpatterns.shape[1:]

In [None]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D

def create_CNN():
    model = Sequential()
    model.add(Conv2D(10, kernel_size=(5, 5),
                     activation='relu',
                     input_shape=input_shape))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(20, (5, 5), activation='relu'))
    model.add(Dropout(0.5))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Flatten())
    model.add(Dense(20*4*4, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(len(crystal_systems_classes), activation='softmax'))
    model.compile(loss=keras.losses.categorical_crossentropy,
                  optimizer='Adam',
                  metrics=['accuracy'])
    return model

In [None]:
batch_size = 32
epochs = 10

stopping_callback = tf.keras.callbacks.EarlyStopping(monitor='accuracy', patience=3, restore_best_weights=True)
model_CNN=create_CNN()
model_CNN.fit(TEMpatterns[0:8*NTEM], Ydata_TEM[0:8*NTEM],
          batch_size=batch_size,
          epochs=epochs,
          verbose=1,
          callbacks=[stopping_callback],
          validation_data=(TEMpatterns[8*NTEM:9*NTEM], Ydata_TEM[8*NTEM:9*NTEM]))

score = model_CNN.evaluate(TEMpatterns[9*NTEM:], Ydata_TEM[9*NTEM:], verbose=1)
print('Test loss:', score[0])
print('Test accuracy:', score[1])