In [90]:
import re
from copy import deepcopy

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pygad
from aiida.manage.configuration import load_profile
from aiida.orm import ArrayData, QueryBuilder
from aiida_aurora.calculations import BatteryCyclerExperiment
from aiida_aurora.data import BatterySampleData, CyclingSpecsData
from aiida_aurora.utils.parsers import get_data_from_results
from aiida_aurora.workflows import CyclingSequenceWorkChain
from sklearn.linear_model import RANSACRegressor
from sklearn.preprocessing import MinMaxScaler, PolynomialFeatures

load_profile()

Profile<uuid='14f418767fa44f6f9d65ac40c9d45d39' name='default'>

In [None]:
pd.options.display.max_rows = 200


In [None]:
import matplotlib as mpl

# Set global font size for labels, ticks, and other elements
mpl.rcParams['font.size'] = 10


In [None]:
%matplotlib widget
# %matplotlib inline


# Global params

In [None]:
generations = {
    "231012": {
        "formation_cycles": 3,
        "additive": {
            "type": "VC",
            "amount": "2",
        },
    },
    "231018": {
        "formation_cycles": 2,
        "additive": {
            "type": "VC",
            "amount": "2",
        },
    },
    "231116": {
        "formation_cycles": 1,
        "additive": {
            "type": "VC",
            "amount": "2",
        },
    },
    "231121": {
        "formation_cycles": 3,
        "additive": {
            "type": "VC",
            "amount": "2",
        },
    },
    "231124": {
        "formation_cycles": 3,
        "additive": {
            "type": "FEC",
            "amount": "2",
        },
    },
    "231128": {
        "formation_cycles": 2,
        "additive": {
            "type": "FEC",
            "amount": "2",
        },
    },
    "231129": {
        "formation_cycles": 1,
        "additive": {
            "type": "FEC",
            "amount": "2",
        },
    },
    "231130": {
        "formation_cycles": 3,
        "additive": {
            "type": "FEC",
            "amount": "4",
        },
    },
    "231201": {
        "formation_cycles": 2,
        "additive": {
            "type": "FEC",
            "amount": "4",
        },
    },
    "231204": {
        "formation_cycles": 1,
        "additive": {
            "type": "FEC",
            "amount": "4",
        },
    },
}


In [None]:
E_CYCLE = 75
C_CYCLE = 50
CYCLE_SHIFT = 25
E_KEY = "E term"
T_KEY = "t term"
VOLTAGES = (4.2, )


# Fetch Data from AiiDA

In [None]:
qb = QueryBuilder().append(
    CyclingSequenceWorkChain,
    tag="workflow",
).append(
    BatterySampleData,
    with_outgoing="workflow",
    filters={
        "and": [
            {
                "attributes.metadata.batch": {"in": list(generations.keys())},
                # "attributes.metadata.batch": "231128",
            },
            {
                "attributes.metadata.batch": {"!==": "231012"},
            },
            {
                "attributes.metadata.name": {
                    "!in": [
                        "231012-11",
                        "231018-17",
                        "231124-13",
                        "231124-14",
                        "231124-18",
                    ],
                }
            }
        ],
    },
    tag="battery",
).append(
    BatteryCyclerExperiment,
    with_incoming="workflow",
    filters={
        "label": {
            "and": [
                {
                    "like": r"%long_term%",
                },
                # {
                #     "!like": r"%42%",
                # },
                {
                    "!like": r"%44%",
                },
                {
                    "!like": r"%46%",
                },
                {
                    "!like": r"%48%",
                },
            ]
        }
    },
    tag="long_term_job",
).append(
    CyclingSpecsData,
    with_outgoing="long_term_job",
    tag="long_term_protocol",
).append(
    ArrayData,
    with_incoming="long_term_job",
    tag="results",
)

qb.add_projection("battery", [
    "attributes.metadata.name",
    "attributes.specs.composition.cathode.weight.net",
])
qb.add_projection("long_term_protocol", "attributes.method")
qb.add_projection("results", "*")

items: list[tuple[str, float, list[dict], list[dict]]] = qb.all()


In [None]:
collected = {
    "batch": [],
    "number": [],
    "voltage": [],
    "additive_type": [],
    "additive_amount": [],
    "formation_cycles": [],
    # "long_term_cycles": [],
    # "start": [],
    # "end": [],
    E_KEY: [],
    T_KEY: [],
}

for (
        label,
        cathode_weight,
        long_term_protocol,
        results,
) in items:

    data = get_data_from_results(results)

    if len(data["cycle-number"]) < E_CYCLE:
        continue

    energy = np.sum(data["Ed"][:E_CYCLE]) / cathode_weight

    shift = np.argmax(data["Qd"])  # maximum capacity cycle

    C0 = data["Qd"][shift]  # [mAh]

    c_cycle = C_CYCLE + shift
    C = data["Qd"][c_cycle - 1]  # [mAh]

    idx = data["cycle-index"]
    start, end = idx[shift], idx[c_cycle]
    ts, te = data["time"][start] / 3600, data["time"][end] / 3600  # [h]
    td = te - ts
    t80 = 0.2 * (C0 * td) / (C0 - C) + ts  # [h]

    long_term_protocol = next(
        filter(
            lambda tech: tech["technique"] == "constant_current",
            long_term_protocol,
        ))

    voltage = long_term_protocol["parameters"]["limit_voltage_max"]["value"]

    batch, number = label.split("-")
    additive = generations[batch]["additive"]

    n_formation_cycles = generations[batch]["formation_cycles"]

    collected["batch"].append(batch)
    collected["number"].append(int(number))
    collected["voltage"].append(float(voltage))
    collected["additive_type"].append(additive["type"])
    collected["additive_amount"].append(float(additive["amount"]))
    collected["formation_cycles"].append(n_formation_cycles)
    # collected["long_term_cycles"].append(len(data["cycle-number"]))
    # collected["start"].append(shift)
    # collected["end"].append(c_cycle)
    collected[E_KEY].append(energy)
    collected[T_KEY].append(1/t80)

df = pd.DataFrame(collected)
# df = df[~df["voltage"].isin((4.4, 4.6, 4.8))]

df = df.sort_values(["batch", "voltage", "number"]).reset_index(drop=True)

# df


# Process EC-lab data

In [None]:
ROOT = "/home/jovyan/data_files/ga/"

ec_lab_batches = [
    "231116",
    "231201",
]


In [None]:
# excel_data = {
#     batch: {
#         "I": pd.read_excel(f"{ROOT}/{batch}.xlsx", sheet_name=f"I_vs_time"),
#         "Q": pd.read_excel(f"{ROOT}/{batch}.xlsx", sheet_name=f"Q_vs_cycle"),
#         "E": pd.read_excel(f"{ROOT}/{batch}.xlsx", sheet_name=f"E_vs_cycle")
#     } for batch in ec_lab_batches
# }


In [None]:
excel_data = {
    batch: pd.read_excel(f"{ROOT}/{batch}_I_VvsTime.xlsx")
    for batch in ec_lab_batches
}


In [None]:
from aiida_aurora.utils.parsers import post_process_data


def process_batch(dataset: pd.DataFrame):

    pattern = r"\d{6}-(\d+)"
    sample = dataset.columns.str.extract(pattern).dropna().values.flatten()

    t, Ewe, I = np.split(dataset[1:].dropna().to_numpy(), 3, axis=1)
    t = t.flatten() - t[0]
    Ewe, I = Ewe.flatten(), I.flatten() / 1000

    data = post_process_data(t, Ewe, I)

    energy = np.sum(data["Ed"][:E_CYCLE])

    shift = np.argmax(data["Qd"])  # maximum capacity cycle

    C0 = data["Qd"][shift]  # [mAh]

    c_cycle = C_CYCLE + shift
    C = data["Qd"][c_cycle - 1]  # [mAh]

    idx = data["cycle-index"]
    start, end = idx[shift], idx[c_cycle]
    ts, te = data["time"][start] / 3600, data["time"][end] / 3600  # [h]
    td = te - ts

    t80 = 0.2 * (C0 * td) / (C0 - C) + ts  # [h]

    return pd.DataFrame({
        "number": sample,
        "long_term_cycles": data["cycle-number"][-1],
        E_KEY: [energy],
        T_KEY: [1 / t80],
    }).T


In [None]:
for batch in ec_lab_batches:

    dataframe = excel_data[batch]

    terms = dataframe.groupby(
        np.arange(len(dataframe.columns)) // 3,
        axis=1,
    ).apply(process_batch).T.reset_index(drop=True)

    terms: pd.DataFrame = terms[terms["long_term_cycles"] >= E_CYCLE]

    meta = dataframe.columns.str.extract(r"(\d{6}-\d+-\d+)").dropna()
    meta = meta[0].str.split("-", expand=True)
    meta.rename({0: "batch", 1: "number", 2: "voltage"}, axis=1, inplace=True)
    meta["voltage"] = meta["voltage"].apply(lambda v: f"4.{v}").astype(float)
    meta = meta[~meta["voltage"].isin((4.4, 4.6, 4.8))]
    meta = meta.sort_values(["batch", "voltage"]).reset_index(drop=True)
    additive = generations[batch]["additive"]
    meta = meta.assign(
        additive_type=additive["type"],
        additive_amount=float(additive["amount"]),
        formation_cycles=generations[batch]["formation_cycles"],
    )
    weights = pd.read_csv(
        f"{ROOT}/robot_outputs/{batch}_robot_output.csv",
        sep=";",
    )[[
        "Battery_Number",
        "Cathode Weight (mg)",
        "Cathode Current Collector Weight (mg)",
    ]].astype({"Battery_Number": str})
    weights.rename({"Battery_Number": "number"}, axis=1, inplace=True)
    cat_weight_tot = weights["Cathode Weight (mg)"]
    cat_weight_cc = weights["Cathode Current Collector Weight (mg)"]
    weights["weight"] = (cat_weight_tot - cat_weight_cc) / 1000
    terms = terms.merge(weights[["number", "weight"]])
    terms[E_KEY] /= terms["weight"]

    final = meta.merge(terms[["number", E_KEY, T_KEY]])

    df = pd.concat((df, final)).reset_index(drop=True)


In [None]:
# df = df.assign(t80=1 / df[T_KEY])
# df["t80"].min(), df["t80"].max()


# Normalization

In [None]:
# # normalization
# for key in ("formation_cycles"):
#     df[key] /= df[key].max()
#     # df[key] = MinMaxScaler().fit_transform(df[key].to_numpy().reshape(-1, 1))


# Plotting

In [None]:
# _, ax = plt.subplots(figsize=(8, 6))

# # cost functions
# dfs = df.assign(cost_t=1 / df[T_KEY] /
#                 24).sort_values("batch").reset_index(drop=True)

# avg = dfs.groupby(["batch", "voltage"])["cost_t"].mean().reset_index()

# dfs.plot.scatter(x="batch", y="cost_t", ax=ax)
# ax.scatter(avg["batch"], avg["cost_t"], color='k', marker="x")
# # dfs

# ax.set_ylim(0, 120)
# ax.set_xlabel("Batch", fontsize=14)
# ax.set_ylabel(r"Time to 80% of $C_0$ (in days)", fontsize=14)


In [None]:
from ipywidgets import FloatSlider

fig, ax = plt.subplots(figsize=(8, 6))
plt.show()


def plot(w):
    """docstring"""

    global avg, best

    cost = df["formation_cycles"] + w * df[T_KEY]
    dfs = df.assign(fitness=1 / cost)

    colors = ("y", "g")
    min_colors = ("r", "b")

    ax.clear()

    for i, V in enumerate((4.2, )):
        c = colors[i]
        mc = min_colors[i]

        dfs_V = dfs[dfs["voltage"] == V]
        avg = dfs_V.groupby("batch")["fitness"].mean().reset_index()
        best = dfs_V.groupby("batch")["fitness"].max().reset_index()
        minimum = avg.iloc[avg["fitness"].idxmin()]

        # plot averages
        ax.scatter(
            avg["batch"],
            avg["fitness"],
            color=c,
            marker="x",
            label=f"{V} V | avg",
        )

        # plot bests
        ax.scatter(
            best["batch"],
            best["fitness"],
            color="violet",
            marker="o",
            label=f"{V} V | best",
        )

        # mark minimum average
        ax.scatter(
            minimum["batch"],
            minimum["fitness"],
            color=mc,
            marker=".",
            label=f"{V} V | min",
        )

        # dfs_V.plot.scatter(x="batch", y="fitness", ax=ax)

    s, e, step = 0, 1, 0.1
    ax.set_yticks(np.arange(s, e + step, step))
    ax.set_xlabel("Batch")
    ax.set_ylabel("Score")
    # ax.grid(True)
    ax.legend(loc="upper left")


# Create a slider widget
slider = FloatSlider(
    value=500,
    min=0,
    max=10000.0,
    step=0.1,
    continuous_update=True,
    description="w",
    orientation='horizontal',
    readout=True,
    readout_format='.2f',
)


# Define a function to update the plot when the slider value changes
def update_plot(change):
    plot(change.new)


# Attach the update function to the slider's value change event
slider.observe(update_plot, 'value')

# Display the slider and initial plot
display(slider)
plot(slider.value)


# Fitting

In [None]:
# _, ax = plt.subplots()

# xt, yt = [], []

# for (
#         label,
#         cathode_weight,
#         formation_protocol,
#         long_term_protocol,
#         results,
# ) in items:

#     Q_data = get_data_from_results(results)

#     x, y = Q_data["cycle-number"], Q_data["Qd"]

#     xt.append(x)
#     yt.append(y)

#     ax.plot(x, y, '.', label=label)

# xc = np.concatenate(xt)
# yc = np.concatenate(yt)

# poly = PolynomialFeatures(degree=7, include_bias=False)
# poly_features = poly.fit_transform(xc.reshape(-1, 1))

# best_fit_line = RANSACRegressor().fit(poly_features, yc)
# prediction = best_fit_line.predict(poly_features)

# ax.plot(xc, prediction, "k.", markersize=1, label="fitted")
# plt.legend()


In [None]:
# _, ax = plt.subplots()

# for (
#         label,
#         cathode_weight,
#         formation_protocol,
#         long_term_protocol,
#         results,
# ) in items:

#     # if label != "231012-2":
#     #     continue

#     Q_data = get_data_from_results(results)
#     x, y = Q_data["cycle-number"], Q_data["Qd"]

#     line, = ax.plot(x, y, ".", label=label)

#     target = .8 * max(y)  # 80% capacity

#     if y[-1] > target:
#         avg_diff = np.mean(y - prediction[:len(y)])
#         idx = np.argmin(np.abs(prediction + avg_diff - target))
#         cycle = xc[idx]
#         ax.plot(xc[:idx], prediction[:idx] + avg_diff, ".", markersize=1, color=line.get_color())
#     else:
#         idx = np.argmin(np.abs(y - target))
#         cycle = x[idx]

#     print(f"{label:9s} | C0 = {max(y):0.2f} mAh -> 80% = {target:0.2f} mAh @ cycle {cycle}")

# plt.legend()


In [None]:
# best_fits = []

# _, ax = plt.subplots()

# xt, yt = [], []

# for (
#         label,
#         cathode_weight,
#         formation_protocol,
#         long_term_protocol,
#         results,
# ) in items:

#     Q_data = get_data_from_results(results)

#     if len(Q_data["Qd"]) < 50:
#         continue

#     x, y = Q_data["cycle-number"], Q_data["Qd"]
#     poly = PolynomialFeatures(degree=3, include_bias=False)
#     poly_features = poly.fit_transform(x.reshape(-1, 1))

#     best_fit_line = RANSACRegressor().fit(poly_features, y)
#     best_fits.append(best_fit_line)
#     # best_fits.append((best_fit_line.coef_[0], best_fit_line.intercept_))

#     pd.DataFrame({"x": x, "y": y}).plot.scatter(x="x", y="y", ylim=(0, 2), ax=ax)
#     plt.plot(x, best_fit_line.predict(poly_features), "red")


# GA

In [201]:
ADDITIVES = {
    generation["additive"]["type"]
    for generation in generations.values()
}
ADDITIVES_ORD = {
    additive: sum(ord(c) for c in additive)
    for additive in ADDITIVES
}
REVERSE_MAP = {v: k for k, v in ADDITIVES_ORD.items()}

In [None]:
def as_int(v):
    return sum(ord(c) for c in v) if isinstance(v, str) else v

def process_batch(batch: pd.DataFrame):
    feature_columns = ["formation_cycles", "additive_type", "additive_amount"]
    features = batch[feature_columns].iloc[0].apply(as_int).to_list()
    outcome = batch[T_KEY].mean()
    # print(features, outcome)
    return features + [outcome]

def process_population(dataframe: pd.DataFrame):
    data = np.array(dataframe.groupby("batch").apply(process_batch).to_list())
    return data[:, :3], data[:, -1]


In [198]:
WEIGHT = 1000

features, t_terms = process_population(df)


def fitness_func(ga_instance, solution, solution_idx):
    formation_cycles = solution[0]
    t_term = t_terms[solution_idx]
    cost = formation_cycles + WEIGHT * t_term
    return 1 / cost


def ord_to_str(additive: int) -> str:
    return "".join()


def output_population(ga, which: str):
    print(f"{which.capitalize()} population:\n")
    dfp = pd.DataFrame(ga.population, columns=["#", "add", "%"])
    dfp["add"] = dfp["add"].apply(lambda add: REVERSE_MAP[int(add)])
    dfp = dfp.astype({"#": int, "add": str, "%": float})
    print(dfp.to_string(index=False))
    print()


def on_start(ga):
    output_population(ga, "initial")


def on_generation(ga):
    output_population(ga, "next")


gene_space = [
    [1, 2, 3],  # formation cycles
    [as_int(additive) for additive in ("VC", "FEC")],  # additive types
    np.arange(0, 4, 0.1).tolist(),  # additive amounts
]

ga_instance = pygad.GA(
    num_generations=1,
    num_parents_mating=2,
    initial_population=features,
    gene_space=gene_space,
    mutation_percent_genes=40,
    fitness_func=fitness_func,
    on_generation=on_generation,
    on_start=on_start,
)

ga_instance.run()

Initial population:

 # add   %
 2  VC 2.0
 1  VC 2.0
 3  VC 2.0
 3 FEC 2.0
 2 FEC 2.0
 1 FEC 2.0
 3 FEC 4.0
 2 FEC 4.0
 1 FEC 4.0

Next population:

 # add   %
 1 FEC 2.0
 1  VC 4.0
 3 FEC 2.0
 3 FEC 4.0
 1 FEC 0.0
 1 FEC 0.5
 1  VC 2.0
 1  VC 4.0
 1  VC 2.0

