In [None]:
import pandas as pd

# Load the Excel file
cols = ["보압시간", "사출속도1~4", "보압1~2"]
level_col_name = "변수 수준"
file_path = r"./data/사출 실험계획표 27.xlsx"


excel_data = pd.read_excel(file_path)[[level_col_name] + cols]
# # find the index of the first NaN value
null_idx = excel_data.index[excel_data.iloc[:, 0].isna()].tolist()[0]

# Display the first few rows of the dataframe to understand its structure
level_table = excel_data.iloc[:null_idx, :].copy()
data_table = (
    excel_data[cols].iloc[null_idx + 2:, :].copy().reset_index(drop=True)
)
levels = level_table[level_col_name].tolist()

# Display the first few rows of the dataframe to understand its structure
assert data_table.shape[1] == len(
    cols
), "The number of columns is not correct"
print(f"Levels: {levels}")
print(f"Data table shape: {data_table.shape}")
level_table.head()

In [None]:
# Function to map actual values to level values (-1, 0, 1)
def map_to_level(value, column):
    # Find the corresponding level for the value in the specified column
    level = level_table[level_table[column] == value][level_col_name].values[
        0
    ]
    return level


# Initialize an empty DataFrame with the same shape as data_table
mapped_data = pd.DataFrame(columns=cols, index=range(len(data_table)))

# Map each column in data_table to its corresponding level
for col in cols:
    mapped_data[col] = data_table[col].apply(lambda x: map_to_level(x, col))

# Convert the DataFrame to integer type
mapped_data = mapped_data.astype(int)

# Set the index name to "Case"
mapped_data.index.name = "Case"

# Set the indices to start from 1
mapped_data.index = mapped_data.index.map(lambda x: x + 1)

# Display the first few rows of the mapped data
assert mapped_data.shape[1] == len(
    cols
), "The number of columns is not correct"
mapped_data.head()

In [None]:
from pathlib import Path
from nn.schemas import _read_ss_curves, group_ss_curves

ss_curves = group_ss_curves(_read_ss_curves(raw_data_path=Path("data")))
ss_curves["Case"] = ss_curves.index.to_series().apply(
    lambda x: int(x.split("-")[1])
)
ss_curves = pd.merge(
    ss_curves.reset_index(drop=True),
    mapped_data,
    left_on="Case",
    right_index=True,
).drop(columns=["Case"])


ss_curves.head()

In [None]:
import random
from typing import Tuple
import numpy as np
from scipy.interpolate import interp1d
from nn.inference import inference
from nn.schemas import normalize_1d_sequence


def pick_random_data(
    train_inputs: np.ndarray,
    train_outputs: np.ndarray,
    n: int = 1,
) -> Tuple[np.ndarray, np.ndarray]:
    x_test, y_test = train_inputs, train_outputs
    assert isinstance(x_test, np.ndarray) and isinstance(
        y_test, np.ndarray
    ), f"{type(x_test)} & {type(y_test)}"
    assert (
        x_test.shape[0] == y_test.shape[0]
    ), f"{x_test.shape} != {y_test.shape}"
    # pick n random data points
    idx = random.sample(range(x_test.shape[0]), n)
    return x_test[idx], y_test[idx]


def make_equal_interval_data(
    ss_curves: pd.DataFrame, seq_len: int, normalize: bool = True
) -> Tuple[np.ndarray, np.ndarray]:
    interpolated_stresses = []
    interpolated_strains = []
    for _, row in ss_curves.iterrows():
        row_strain = row["strain"].astype(float)
        row_stress = row["stress"].astype(float)
        interpolated_stresses.append(
            normalize_1d_sequence(
                np.column_stack((row_strain, row_stress)),
                seq_len,
                normalize=normalize,
            )  # shape: (seq_len, 1)
        )
        if normalize:
            interpolated_strains.append(np.linspace(0, 1, seq_len))
        else:
            interpolated_strains.append(
                np.linspace(
                    row_strain.min(), row_strain.max(), seq_len
                ).tolist()
            )
    # Return equal interval strain, stress data
    return (
        np.array(interpolated_strains).reshape(-1, seq_len, 1),
        np.array(interpolated_stresses).reshape(-1, seq_len, 1),
    )


def inference_lstm(
    test_data: Tuple[np.ndarray, np.ndarray],
    model_path: str,
    tolerance: float = 0.5,
    n: int = 5,
) -> np.ndarray:
    x_test, y_test = test_data
    y_pred = inference(model_path=model_path, input_data=x_test)
    assert y_pred.shape == y_test.shape, f"{y_pred.shape} != {y_test.shape}"
    seq_len = y_pred.shape[1]

    def extract_points(y: np.ndarray):
        gap = seq_len // (n - 1)
        last_idx = seq_len - 1
        return tuple(y[0, min(i * gap, last_idx), 0] for i in range(n))

    y_pred_points = extract_points(y_pred)[1:]
    y_test_points = extract_points(y_test)[1:]
    print(f"prediction: {y_pred_points}, true: {y_test_points}")
    for yp, yt in zip(y_pred_points, y_test_points):
        assert abs(yp - yt) <= yt * tolerance, f"{yp} != {yt}"
    return y_pred


def inference_ann(
    test_data: Tuple[np.ndarray, np.ndarray],
    model_path: str,
    tolerance: float = 0.5,
) -> np.ndarray:
    x_test, y_test = test_data
    y_pred = inference(model_path, x_test)
    print(f"prediction: {y_pred}, true: {y_test}")

    # Check the predictions are within the tolerance
    strength_pred = float(y_pred[0][0])
    strength_true = float(y_test[0][0])
    assert (
        abs(strength_pred - strength_true) <= strength_true * tolerance
    ), f"{strength_pred} != {strength_true}"

    elongation_pred = float(y_pred[0][1])
    elongation_true = float(y_test[0][1])
    assert (
        abs(elongation_pred - elongation_true) <= elongation_true * tolerance
    ), f"{elongation_pred} != {elongation_true}"
    return y_pred

In [None]:
import multiprocessing
from typing import List
from uuid import uuid4

from nn.ann import ANN
from nn.config import ANNModelConfig
from nn.dataloader import DataLoader
from nn.train import Trainer
from nn.utils.logger import ApiLogger

logger = ApiLogger(__name__)

epochs = 200
patience = 50
batch_size = 1
print_per_epoch = 1
ann_hyper_params = {
    "n1": [40],
    "n2": [30],
    "n3": [10],
}

# Creating a DataFrame
ann_df = ss_curves.copy()


def get_max(x: np.ndarray) -> float:
    return np.max(x.astype(float))


# Calculating maximum of stress and strain for each row
ann_df["strength"] = ann_df["stress"].apply(get_max)
ann_df["elongation"] = ann_df["strain"].apply(get_max)

# Dropping the original strain and stress columns
ann_df.drop(["strain", "stress"], axis=1, inplace=True)

ann_x_data = ann_df[cols].astype(float).to_numpy()
ann_y_data = ann_df[["strength", "elongation"]].astype(float).to_numpy()

dim_out = ann_y_data.shape[1]
ann_model_config = ANNModelConfig(
    output_path=f".tmp/{uuid4().hex}",
    metrics=["mse", "mae", "mape"],
    kfold_splits=0,
    print_per_epoch=print_per_epoch,
    batch_size=batch_size,
    epochs=epochs,
    patience=patience,
    loss_funcs=["mape" for _ in range(dim_out)],
    loss_weights=[1 / dim_out for _ in range(dim_out)],
    l1_reg=None,
    l2_reg=None,
    dropout_rate=0.0,
    normalize_layer=False,
    dim_out=dim_out,
    dim_in=len(cols),
)
ann_data_loader = DataLoader(
    train_inputs=ann_x_data,
    train_outputs=ann_y_data,
    train_input_params=cols,
    train_output_params=["strength", "elongation"],
)
ann_trainer = Trainer(
    data_loader=ann_data_loader,
    model_class=ANN,
    model_name=ANN.__name__,
    model_config=ann_model_config,
    workers=multiprocessing.cpu_count(),
    use_multiprocessing=False,
)

ann_x_data.shape, ann_y_data.shape

In [None]:
from functools import reduce
import json


num_hyper_params = reduce(
    lambda x, y: x * len(y), ann_hyper_params.values(), 1
)
ann_fstem = ""


for ann_fstem, phist in ann_trainer.hyper_train(ann_hyper_params):
    num_hyper_params -= 1

    json.dumps(phist["train_output"], indent=4)

    inference_ann(
        model_path=ann_fstem + ".keras",
        test_data=pick_random_data(ann_x_data, ann_y_data, n=1),
    )

assert ann_fstem != "", "fstem is empty"
assert num_hyper_params == 0, f"{num_hyper_params} != 0"
ann_fstem

In [None]:
from functools import reduce
import json
import multiprocessing
from uuid import uuid4

from nn.config import LSTMModelConfig
from nn.dataloader import DataLoader
from nn.lstm import EmbeddingAttentionLSTMRegressor
from nn.train import Trainer


epochs = 500
patience = 50
batch_size = 1
print_per_epoch = 1
seq_len = 64
lstm_hyper_params = {
    "seq_len": [seq_len],
}


lstm_x_data = ss_curves[cols].astype(float).to_numpy()
_, lstm_y_data = make_equal_interval_data(ss_curves, seq_len, normalize=True)

# lstm_y_data = (
#     ss_curves[["strain", "stress"]]
#     .apply(lambda x: pd.Series(normalize_1d_sequence(x, seq_len)))
#     .to_numpy()
# )[:, :, np.newaxis]
# decoder_inputs = np.zeros_like(lstm_y_data)
# decoder_inputs[:, 1:, :] = lstm_y_data[:, :-1, :]  # Teacher forcing

assert lstm_x_data.shape[0] == lstm_y_data.shape[0], (
    f"Encoder input shape {lstm_x_data.shape} and decoder output shape {lstm_y_data.shape} "
    f"do not match"
)

lstm_model_config = LSTMModelConfig(
    output_path=f".tmp/{uuid4().hex}",
    metrics=["mse", "mae"],
    kfold_splits=0,
    print_per_epoch=print_per_epoch,
    batch_size=batch_size,
    epochs=epochs,
    patience=patience,
    loss_funcs=["mse"],
    loss_weights=[1.0],
    l1_reg=None,
    l2_reg=None,
    dropout_rate=0.0,
    normalize_layer=False,
    dim_out=1,
    ann_model_path=ann_fstem + ".keras",
    dim_in=len(cols),
)
lstm_data_loader = DataLoader(
    train_inputs=lstm_x_data,
    train_outputs=lstm_y_data,
    train_input_params=cols,
    train_output_params=["stress"],
)
lstm_trainer = Trainer(
    data_loader=lstm_data_loader,
    model_class=EmbeddingAttentionLSTMRegressor,
    model_name=EmbeddingAttentionLSTMRegressor.__name__,
    model_config=lstm_model_config,
    workers=multiprocessing.cpu_count(),
    use_multiprocessing=False,
)
lstm_x_data.shape, lstm_y_data.shape

In [None]:
num_hyper_params = reduce(
    lambda x, y: x * len(y), lstm_hyper_params.values(), 1
)

lstm_fstem = ""

for lstm_fstem, phist in lstm_trainer.hyper_train(lstm_hyper_params):
    num_hyper_params -= 1

    json.dumps(phist["train_output"], indent=4)

    inference_lstm(
        model_path=lstm_fstem + ".keras",
        test_data=pick_random_data(lstm_x_data, lstm_y_data, n=1),
    )


assert lstm_fstem != "", "fstem is empty"


assert num_hyper_params == 0, f"{num_hyper_params} != 0"
lstm_fstem

In [None]:
from keras.models import load_model

n = 5
seq_len = 64
ann_fstem = (
    ".tmp\\e752431ae6834f93bef7da33e7f29891\\ANN_E91[N1=40][N2=30][N3=10]"
)
lstm_fstem = ".tmp\\82957e361f6846a2ba5f7d300485264a\\EmbeddingAttentionLSTMRegressor_E309[SEQ_LEN=64]"
lstm_model = load_model(lstm_fstem + ".keras")
ann_model = load_model(ann_fstem + ".keras")
assert lstm_model is not None and ann_model is not None, "Model is None"

strain_true, stress_true = make_equal_interval_data(
    ss_curves, seq_len, normalize=False
)
model_input = ss_curves[cols].astype(float).to_numpy()
assert (
    strain_true.shape[0] == stress_true.shape[0] == model_input.shape[0]
), (
    f"Strain shape {strain_true.shape}, stress shape {stress_true.shape} and model input shape {model_input.shape} "
    f"do not match"
)

# pick n random data points
random_indices = random.sample(range(model_input.shape[0]), n)

x = model_input[random_indices]
stress_true = stress_true[random_indices].reshape(n, seq_len, 1)
strain_true = strain_true[random_indices].reshape(n, seq_len, 1)

ann_pred = ann_model.predict(x)  # np.array of shape (n, 2)
lstm_pred = lstm_model.predict(x)  # np.array of shape (n, seq_len, 1)
assert ann_pred.shape == (n, 2) and lstm_pred.shape == (n, seq_len, 1)

strength_pred = ann_pred[:, 0]  # type: np.ndarray
elongation_pred = ann_pred[:, 1]  # type: np.ndarray
normalized_stress_pred = lstm_pred  # type: np.ndarray
stress_pred = (
    normalized_stress_pred * strength_pred[:, np.newaxis, np.newaxis]
)
assert (
    stress_pred.shape == stress_true.shape
), f"stress_pred shape {stress_pred.shape} != stress_true shape {stress_true.shape}"

strain_pred = np.array(
    [np.linspace(0, max_strain, seq_len) for max_strain in elongation_pred]
).reshape(n, seq_len, 1)
assert (
    strain_pred.shape == strain_true.shape
), f"strain_pred shape {strain_pred.shape} != strain_true shape {strain_true.shape}"

assert (
    strain_pred.shape == strain_true.shape
    and stress_pred.shape == stress_true.shape
), (
    f"Strain shape {strain_pred.shape}, stress shape {stress_pred.shape} and true strain shape {strain_true.shape} "
    f"do not match with true stress shape {stress_true.shape}"
)

strain_pred.shape, stress_pred.shape
elongation_pred

In [None]:
import matplotlib.pyplot as plt

# Plotting the true and predicted stress-strain curves
for i in range(n):
    plt.figure(figsize=(10, 6))

    # Plotting the true curve
    plt.plot(
        strain_true[i, :, 0],
        stress_true[i, :, 0],
        label="True",
        color="blue",
    )

    # Plotting the predicted curve
    plt.plot(
        strain_pred[i, :, 0],
        stress_pred[i, :, 0],
        label="Predicted",
        color="red",
    )

    plt.xlabel("Strain")
    plt.ylabel("Stress")
    plt.title(f"Stress-Strain Curve for Sample {i+1}")
    plt.legend()
    plt.show()