# Для работы в Google Colab
_______________

In [2]:
# !pip uninstall -y numpy
# !pip uninstall -y setuptools

# !pip install setuptools
# !pip install numpy

In [3]:
# !wget https://github.com/HSE-LAMBDA/IDAO-2022/archive/refs/heads/main.zip
# !unzip main.zip

# from google.colab import drive
# drive.mount('/content/drive/')

# %cd /content/drive/MyDrive/DLS/IDAO_2022/

# !tar -C "/IDAO_2022/data" -xzvf /content/drive/MyDrive/IDAO_2022/data/dichalcogenides_private.tar.gz
# !tar -C "/IDAO_2022/data" -xzvf /content/drive/MyDrive/IDAO_2022/data/dichalcogenides_public.tar.gz

In [4]:
# !pip install megnet
# !pip install pymatgen

_________________

In [6]:
import yaml
import json

import pandas as pd
import numpy as np
import tensorflow as tf

from pathlib import Path
from pymatgen.core import Structure
from sklearn.model_selection import train_test_split
from megnet.models import MEGNetModel
from megnet.data.crystal import CrystalGraph

In [15]:
def cation_vacancy(pymatgen_dict: Structure,
                   coord_a: float = 0.041667,
                   coord_b: float = 0.083333):
    vacancy_coords_list = []
    for i in range(8):
        for j in range(8):
            vacancy_coords_list.append([coord_a + 0.125 * i,
                                        coord_b + 0.125 * j,
                                        0.25])
    for i in pymatgen_dict:
        coords = [round(float(i.a), 6), round(float(i.b), 6), round(float(i.c), 6)]
        if coords in vacancy_coords_list:
            vacancy_coords_list.remove(coords)

    for i in range(len(vacancy_coords_list)):
        pymatgen_dict.append('Cr', vacancy_coords_list[i], False)
    return pymatgen_dict


def anion_vacancy(pymatgen_dict: Structure,
                  coord_a: float = 0.083333,
                  coord_b: float = 0.041667,
                  coord_c: float = 0.144826,
                  first_second_layer_distance: float = 0.210348):
    vacancy_coords_list = []
    coords_list_1_layer = []
    coords_list_2_layer = []
    # 1st anion layer
    for i in range(8):
        for j in range(8):
            coords_list_1_layer.append([coord_a + 0.125 * i,
                                        coord_b + 0.125 * j,
                                        coord_c])
    # 2nd anion layer
    for i in range(8):
        for j in range(8):
            coords_list_2_layer.append([coord_a + 0.125 * i,
                                        coord_b + 0.125 * j,
                                        coord_c + first_second_layer_distance])
    # 1st anion layer
    for i in pymatgen_dict:
        coords = [round(float(i.a), 6), round(float(i.b), 6), round(float(i.c), 6)]
        if coords in coords_list_1_layer:
            coords_list_1_layer.remove(coords)

    # 2nd anion layer
    for i in pymatgen_dict:
        coords = [round(float(i.a), 6), round(float(i.b), 6), round(float(i.c), 6)]
        if coords in coords_list_2_layer:
            coords_list_2_layer.remove(coords)

    # 1st anion layer
    for i in range(len(coords_list_1_layer)):
        pymatgen_dict.append('O', coords_list_1_layer[i], False)

    # 2nd anion layer
    for i in range(len(coords_list_2_layer)):
        pymatgen_dict.append('O', coords_list_2_layer[i], False)

    return pymatgen_dict


def data_preprocessing(pymatgen_dict: Structure, cation: int = 0, anion: int = 0):
    formula = str(pymatgen_dict.formula).split(' ')
    for i in formula:
        if 'Mo' in i or 'W' in i:
            cation += int(i.lstrip('MoW'))
        if 'S' in i or 'Se' in i:
            anion += int(i.lstrip('SeS'))
    if cation < 64:
        pymatgen_dict = cation_vacancy(pymatgen_dict)
    if anion < 128:
        pymatgen_dict = anion_vacancy(pymatgen_dict)
    return pymatgen_dict


def read_pymatgen_dict(file):
    with open(file, "r") as f:
        d = json.load(f)
    return data_preprocessing(Structure.from_dict(d))


def energy_within_threshold(prediction, target):
    # compute absolute error on energy per system.
    # then count the no. of systems where max energy error is < 0.02.
    e_thresh = 0.02
    error_energy = tf.math.abs(target - prediction)

    success = tf.math.count_nonzero(error_energy < e_thresh)
    total = tf.size(target)
    return success / tf.cast(total, tf.int64)

def prepare_dataset(dataset_path):
    dataset_path = Path(dataset_path)
    targets = pd.read_csv(dataset_path / "targets.csv", index_col=0)
    struct = {
        item.name.strip(".json"): read_pymatgen_dict(item)
        for item in (dataset_path / "structures").iterdir()
    }

    data = pd.DataFrame(columns=["structures"], index=struct.keys())
    data = data.assign(structures=struct.values(), targets=targets)

    return train_test_split(data, test_size=0.25, random_state=666)


def prepare_model(cutoff, lr):
    nfeat_bond = 100
    r_cutoff = cutoff
    gaussian_centers = np.linspace(0, r_cutoff + 1, nfeat_bond)
    gaussian_width = 0.8

    return MEGNetModel(
        graph_converter=CrystalGraph(cutoff=r_cutoff),
        centers=gaussian_centers,
        width=gaussian_width,
        loss=["MAE"],
        npass=2,
        lr=lr,
        metrics=energy_within_threshold
    )


def main_to_train(config):
    train, test = prepare_dataset(config["datapath"])
    model = prepare_model(
        float(config["model"]["cutoff"]),
        float(config["model"]["lr"]),
    )

    model.train(
        train.structures,
        train.targets,
        validation_structures=test.structures,
        validation_targets=test.targets,
        epochs=int(config["model"]["epochs"]),
        batch_size=int(config["model"]["batch_size"])
    )
    return model


config_path = './dataset/config.yaml'

if __name__ == "__main__":
    with open(config_path) as file:
        config = yaml.safe_load(file)
    main_to_train(config)

In [9]:
config_path = './dataset/config.yaml'

In [13]:
if __name__ == "__main__":
    with open(config_path) as file:
        config = yaml.safe_load(file)
    main_to_train(config)

TypeError: Trainer.compile() got an unexpected keyword argument 'sample_weight_mode'

In [None]:
def main_to_predict(config):
    train, test = prepare_dataset(config["datapath"])

    model = prepare_model(
        float(config["model"]["cutoff"]), float(config["model"]["lr"])
    )
    #? model.load_weights(config['checkpoint_path'])

    # dataset_path = Path(config['test_datapath'])
    # struct = {item.name.strip('.json'): read_pymatgen_dict(item) for item in (dataset_path/'structures').iterdir()}
    struct = test

    # private_test = pd.DataFrame(columns=['id', 'structures'], index=struct.keys())
    # private_test = private_test.assign(structures=struct.values())
    # private_test = private_test.assign(predictions=model.predict_structures(private_test.structures))
    # private_test[['predictions']].to_csv('./submission111.csv', index_label='id')

    private_test = pd.DataFrame(columns=['id', 'structures'], index=struct.index.values.tolist())
    private_test = private_test.assign(structures=struct.structures.values.tolist())
    private_test = private_test.assign(predictions=model.predict_structures(private_test.structures))
    private_test[['predictions']].to_csv('./submission111.csv', index_label='id')


In [None]:
if __name__ == "__main__":
    with open("config.yaml") as file:
        config = yaml.safe_load(file)
    main_to_predict(config)