In [None]:
import logging
import os
import queue
import signal
import sys
import traceback
from multiprocessing import Process, Queue

import click
import pybamm
from core import ModelParameters
from core.Parameters.DegradationParameters import DegradationParameters
from core.Parameters.NonDegradationParameters import NonDegradationParameters
from core.PyBammWrapper import PyBammWrapper
from core.utils import *
from core.utils import get_log_path
from databases.DatasetEV import BatteryIDEV
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
import os
%load_ext autoreload
%autoreload 2

In [None]:
import os
os.getenv("PYBAMM_RESULTS_PATH")

In [None]:
dataset_path = "EV_DATAPATH"
result_path = "RESULT_PATH"
inputs_path = "INPUTS_PATH"
id_dataset = "EV"
id_battery = "W04"
seed  = 1
id_instance = 100
n_cycles_per_experiments = 2
n_experiments = 2
id_configuration = 53
seed = 123456789

In [None]:
parameter_dataset = "param.json"
dataset_path = os.environ.get(dataset_path)
result_path = os.environ.get(result_path)
save_result_path = f"{result_path}/{seed}"

inputs_path = os.getenv(inputs_path)


# Create a dictionary to easily map variable names to their values
env_vars = {
    "dataset_path": dataset_path,
    "inputs_path": inputs_path,
    "result_path": result_path,
}

# Check each environment variable and collect the names of those that are not set
missing_vars = [
    var_name for var_name, var_value in env_vars.items() if var_value is None
]

if missing_vars:
    # Raise an error with the names of the missing environment variables
    missing_vars_str = ", ".join(
        missing_vars
    )  # Join the list of missing variable names into a string
    raise ValueError(
        f"Required environment variable(s) not set: {missing_vars_str}"
    )

# Ensure common folder exists
common_folder_path = ensure_common_folder_exists(
    inputs_path, id_dataset, id_battery
)

result_path = get_result_path(
    common_folder_path, id_configuration=id_configuration)

state_path = get_state_path(
    common_folder_path, id_configuration=id_configuration)

log_path = get_log_path(
    common_folder_path, id_configuration=id_configuration)


print(common_folder_path, dataset_path, result_path, inputs_path)


In [None]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)

In [None]:
"""
Initializes model parameters and PyBaMM wrapper for the simulation.
"""
#parameter_file_path = "/home/ccanodom/data/PyBamm/41_param.json"
parameter_file_path = "/home/ccanodom/dev/nextbat_ai/phase_1/param_test/no_degradation.json"
external_param = pybamm.ParameterValues("OKane2022")
model_parameters = ModelParameters.ModelParameters(
    file_path=parameter_file_path, external_param=external_param)

In [None]:
os.environ["EV_DATAPATH"]

In [None]:
model_parameters.degradation_parameters = None

In [None]:
current_pybamm = PyBammWrapper(
    id_configuration=id_configuration,
    seed=seed,
    dataset_path=dataset_path,
    result_path=result_path,
    state_path=state_path,
    instance_id=id_instance,
    id_dataset=id_dataset,
    id_battery=id_battery,
    model_parameters=model_parameters,
)
os.environ["PYBAMM_TO_PLOT"] = "1"
sol =  None
current_pybamm.initialize_state()


In [None]:
results = current_pybamm.run_pybamm(6)

In [None]:
for i in range(0,400):
    print(f"Cycle {i}")
    results = current_pybamm.run_pybamm(i)

In [None]:
for i in range(25,400):
    print(f"Cycle {i}")
    results = current_pybamm.run_pybamm(i)


In [None]:
results = current_pybamm.run_pybamm(0)

In [None]:
results

In [None]:
results.plot()

In [None]:

non_degradation_params = NonDegradationParameters()
degradation_params = DegradationParameters()
model_parameters = ModelParameters(non_degradation_params, None)
dataset_id = get_dataset_id(dataset_id)
battery_id = get_battery_by_name(battery_id)
dataset_path = os.environ.get(dataset_path)
result_path = os.environ.get(result_path)
save_result_path = f"{result_path}/{seed}"
current_pybamm = PyBammWrapper.PyBammWrapper(
    dataset_path=dataset_path,
    result_path=save_result_path,
    id_configuration=id_configuration,
    instance_id=id_instance,
    dataset_id=dataset_id,
    battery_id=battery_id,
    seed = seed,
    n_cycles_per_experiments=n_cycles_per_experiments,
)
if dataset_path is None:
    print(f"Dataset path is not a valid environmental variable")
if os.path.isfile(dataset_path) is False:
    print(f"Dataset path is not a file path - {dataset_path}")

current_pybamm.initiate_param(model_parameters)
current_pybamm.initiate_model()
current_pybamm.initiate_var_pts()
current_pybamm.initiate_solver(atol=1e-3, rtol=1e-3)
current_pybamm.initiate_experiment()
current_pybamm.add_custom_variables()


In [None]:
solution = current_pybamm.run_pybamm_non_degradation(0)

In [None]:
for i in range(1,6):
    print(f"metric of cycle {i} is {current_pybamm.run_pybamm_non_degradation(i)}")
    

In [None]:
solution = current_pybamm.run_pybamm_non_degradation(7)

In [None]:
solution

In [None]:
from databases.DatasetEV import calc_error_non_degradation, get_df_simulated, get_df_experimental, get_metrics_non_degradation
df_exp = get_df_experimental(battery_id, "/home/ccanodom/data/EV_battery/battery.h5", 0)

In [None]:
np.sum(df_exp["Step"] == 1)

In [None]:
df_sim = get_df_simulated(solution)
df_sim

In [None]:
metric = calc_error_non_degradation(df_exp, df_sim, 4, "C")
print(metric)

In [None]:
var = "C"
from scipy.interpolate import interp1d

interp_EV = interp1d(df_exp["relative_time"], df_exp[var])


In [None]:
get_metrics_non_degradation(battery_id, dataset_path, cycle_number=0, solution=solution)

In [None]:
time

In [None]:
df = PyBammWrapper.read_cycle_from_hdf5(10, "W04", os.getenv("PYBAMM_RESULTS_PATH") + "/1")

In [None]:
import pickle
import h5py
import numpy as np
serialized_object = pickle.dumps(results.last_state)
# Save the serialized bytes to an HDF5 file
with h5py.File('compressed_data.hdf5', 'w') as hdf:
    # Create a dataset with LZF compression
    hdf.create_dataset('compressed_dataset', data=np.void(serialized_object), compression='lzf')

In [None]:
with open('my_object.pickle', 'wb') as file_handle:
    pickle.dump(results.last_state, file_handle)

In [None]:
df[df["Step"] == 5].plot(x = "Time", y = ["V"])

In [None]:
df_simulated["relative_time"] = df_simulated.Time - min(df_simulated.Time)

In [None]:
from databases.DatasetEV import get_fit_linear_metric, get_df_experimental,fit_udds_profile_linear, BatteryIDEV, ResultFit, get_integral_metric, get_integral_value
from scipy.optimize import curve_fit
import numpy as np
import pandas as pd
to_plot = True

df_simulated = pd.read_csv("here.csv")
df_simulated["relative_time"] = df_simulated.Time - min(df_simulated.Time)
datapath = "/home/ccanodom/data/EV_battery/test.h5"
df_experimental_W04 = get_df_experimental(BatteryIDEV.W04, datapath,10)
df_experimental_W10 = get_df_experimental(BatteryIDEV.W10, datapath,10)
print(f"Metric with W04 = {get_integral_metric(df_experimental_W04, df_simulated)}\tMetric with W10 = {get_integral_metric(df_experimental_W10, df_simulated)}")

In [None]:
get_integral_value(df_experimental_W04, df_simulated, 1, True)

In [None]:
get_integral_value(df_experimental_W10, df_simulated, 1, True)

In [None]:
from scipy.integrate import trapz

df = df_experimental_W04

In [None]:
a = fit_udds_profile_linear(df_experimental_W04, True)

In [None]:
a = fit_udds_profile_linear(df_simulated, True)

In [None]:
get_fit_linear_metric(df_experimental_W04, df_simulated)

In [None]:
step_ussd = 5
# Filter data by step
selected_df = df[df["Step"] == step_ussd].copy()
selected_df.loc[:, "relative_time"] = (
    selected_df["relative_time"] - selected_df["relative_time"].min()
)

# Extraer datos
x = selected_df["relative_time"].values
V = selected_df["V"].values
C = selected_df["C"].values

# Definir el modelo personalizado
def custom_model(x_I, a, b, k):
    x, I = x_I
    return a * x + b + k * I

# Ajustar el modelo
popt, pcov = curve_fit(custom_model, (x, C), V, maxfev=10000)
V_fit = custom_model((x, C), *popt)
# Calcular R^2
ss_res = np.sum((V - V_fit) ** 2)
ss_tot = np.sum((V - np.mean(V)) ** 2)
r_squared = 1 - (ss_res / ss_tot)
# Mostrar los coeficientes
print("Coefficients (a, b, k):")
print(popt)
if to_plot:

    # Graficar los datos originales y el ajuste
    plt.figure()
    plt.plot(x, custom_model((x, C), *popt), "--r", label="V Fit")
    plt.plot(x, V, "-b", label="V Original")
    plt.legend()
    plt.xlabel("x")
    plt.ylabel("V")
    plt.title("Polynomial Fit of V = a*x + b + k*I - p * x^-n")
    plt.show()
result_fit = ResultFit(
    rsquared=r_squared, parameters={"a": popt[0], "b": popt[1], "k": popt[2]}
)
result_fit

In [None]:
pcov

In [None]:
popt

In [None]:
os.getenv(dataset_path)

In [None]:
f"/{string_id}_Cycle{cycle:04d}"

In [None]:
string_id = "decay"
cycle = 10.0

In [None]:
f"/{string_id}_Cycle{int(cycle):04d}"

In [None]:
def read_cycle_from_hdf5(cycle_number, string_id, filepath):
    """
    Reads and returns the DataFrame of a specific cycle from an HDF5 file.
    
    Parameters:
    - cycle_number: Integer representing the cycle number to read.
    - string_id: String ID used as part of the key to identify the cycle data.
    - filepath: Path to the HDF5 file from which to read the data.
    
    Returns:
    - A pandas DataFrame containing the data for the specified cycle, or None if the cycle data does not exist.
    """
    key = f'/{string_id}_Cycle{int(cycle_number):04d}'
    print(key)
    try:
        cycle_data = pd.read_hdf(filepath, key=key)
        return cycle_data
    except KeyError:
        print(f"No data found for cycle {cycle_number} with ID '{string_id}' in the file.")
        return None
