In [5]:
import pandas as pd
import datetime
import os
from ortools.linear_solver import pywraplp

# === Configuration ===
input_file = "C:/Users/Administrator/VSCODE/BatteryOptimization/Data/Dataset_Einhundert_2000kWh_20bat.xlsx"
output_file = "output/ResultEinhundert_Rolling_2000kWh_20bat.xlsx"
output_folder = os.path.dirname(output_file)

if not os.path.isdir(output_folder):
    os.makedirs(output_folder)

# === Read Input Data ===
workbook = pd.ExcelFile(input_file)

# Timeseries data
marketDF = workbook.parse("Timeseries data").iloc[:, :5]
marketDF.columns = ["time", "solar", "load", "market_price_1", "feed_in_tariff"]
marketDF = marketDF[~pd.isnull(marketDF["time"])].fillna(0)

marketDF["time"] = pd.to_datetime(marketDF["time"])
print(marketDF["time"].dt.tz)
marketDF["time"] = marketDF["time"].dt.tz_localize("Europe/Berlin", ambiguous="infer")

marketDF.set_index("time", inplace=True)
marketDF.sort_index(inplace=True)

# Battery data
battDF = workbook.parse("Battery").iloc[:, :8]
battDF.columns = ["max_charge_rate", "max_discharge_rate", "capacity", "charge_eff", "discharge_eff", "min_soc", "max_soc", "initial_soc"]
batt = {key: row[0] for key, row in battDF.to_dict().items()}

# === Prepare time steps ===
timeIndex = marketDF.index
dt = (timeIndex[1] - timeIndex[0]).total_seconds() / 3600  # time resolution in hours
all_days = pd.date_range(start=timeIndex[0].normalize(), end=timeIndex[-1], freq="D")

# === Rolling Optimization ===
results_all_days = []
SOC = batt["initial_soc"]

for day_start in all_days:
    day_end = day_start + pd.Timedelta(days=1)
    df_day = marketDF[(marketDF.index >= day_start) & (marketDF.index < day_end)]

    if df_day.empty:
        continue  # skip empty days (e.g. month change)
 # 🔍 Check for duplicate timestamps
    assert df_day.index.is_unique, f"Duplicate timestamps found on {day_start.date()}!"
    
    time_slots = df_day.index
    T = len(time_slots)

    solver = pywraplp.Solver.CreateSolver("CBC")
    inf = solver.infinity()

    # === Variables ===
    vGrid = [solver.NumVar(-inf, inf, f"grid_{i}") for i in range(T)]
    vImport = [solver.NumVar(0, inf, f"import_{i}") for i in range(T)]
    vExport = [solver.NumVar(0, 100, f"export_{i}") for i in range(T)]
    vImExStatus = [solver.BoolVar(f"imex_status_{i}") for i in range(T)]

    vBattPower = [solver.NumVar(-inf, inf, f"batt_power_{i}") for i in range(T)]
    vCharge = [solver.NumVar(-batt["max_charge_rate"], 0, f"charge_{i}") for i in range(T)]
    vDischarge = [solver.NumVar(0, batt["max_discharge_rate"], f"discharge_{i}") for i in range(T)]
    vChargeStatus = [solver.BoolVar(f"charge_status_{i}") for i in range(T)]
    vSOC = [solver.NumVar(batt["min_soc"], batt["max_soc"], f"soc_{i}") for i in range(T)]

    # === Constraints ===
    for i in range(T):
        t = time_slots[i]
        solver.Add(vGrid[i] == df_day.at[t, "load"] - df_day.at[t, "solar"] - vBattPower[i])
        solver.Add(vGrid[i] == vImport[i] - vExport[i])
        solver.Add(vImport[i] <= 1000 * vImExStatus[i])
        solver.Add(vExport[i] <= 1000 * (1 - vImExStatus[i]))

        solver.Add(vBattPower[i] == vCharge[i] + vDischarge[i])
        solver.Add(vCharge[i] >= -batt["max_charge_rate"] * vChargeStatus[i])
        solver.Add(vDischarge[i] <= batt["max_discharge_rate"] * (1 - vChargeStatus[i]))

        if i == 0:
            solver.Add(vSOC[i] == SOC - dt / batt["capacity"] * (
                vCharge[i] * (1 - batt["charge_eff"]) + vDischarge[i] / (1 - batt["discharge_eff"])))
        else:
            solver.Add(vSOC[i] == vSOC[i - 1] - dt / batt["capacity"] * (
                vCharge[i] * (1 - batt["charge_eff"]) + vDischarge[i] / (1 - batt["discharge_eff"])))

    # === Objective Function: Minimize Net Cost (Import cost - Feed-in revenue) ===
    solver.Minimize(solver.Sum([
        vImport[i] * df_day.iloc[i]["market_price_1"] * dt -
        vExport[i] * df_day.iloc[i]["feed_in_tariff"] * dt
        for i in range(T)
    ]))

    status = solver.Solve()

    # === Store results if solution is found ===
    if status == solver.OPTIMAL or status == solver.FEASIBLE:
        for i in range(T):
            results_all_days.append({
                "time": time_slots[i],
                "grid": vGrid[i].solution_value(),
                "import": vImport[i].solution_value(),
                "export": vExport[i].solution_value(),
                "batt_power": vBattPower[i].solution_value(),
                "charge": vCharge[i].solution_value(),
                "discharge": vDischarge[i].solution_value(),
                "soc": vSOC[i].solution_value(),
                "charge_status": vChargeStatus[i].solution_value(),
                "import_cost": vImport[i].solution_value() * df_day.iloc[i]["market_price_1"] * dt,
                "feed_in_revenue": vExport[i].solution_value() * df_day.iloc[i]["feed_in_tariff"] * dt
            })
        # Carry over SOC to next day
        SOC = vSOC[-1].solution_value()
    else:
        print(f"No solution found for {day_start.date()}")

# === Write Results ===
result_df = pd.DataFrame(results_all_days)
result_df.set_index("time", inplace=True)
result_df.sort_index(inplace=True)

result_df["net_cost"] = result_df["import_cost"] - result_df["feed_in_revenue"]

# === Summary Table ===
cashflowDF = pd.DataFrame.from_dict({
    "Total Import Cost (€)": [result_df["import_cost"].sum()],
    "Total Feed-in Revenue (€)": [result_df["feed_in_revenue"].sum()],
    "Net Gain (€)": [result_df["feed_in_revenue"].sum() - result_df["import_cost"].sum()]
}, orient="index", columns=["Total (€)"])


# === Save to Excel ===

# Ensure datetime index is timezone-naive (Excel can't handle timezone-aware datetimes)
if result_df.index.tz is not None:
    result_df.index = result_df.index.tz_convert(None)

with pd.ExcelWriter(output_file, engine="xlsxwriter") as writer:
    result_df.to_excel(writer, sheet_name="Operation")
    cashflowDF.to_excel(writer, sheet_name="Cost Summary")

print("✅ Optimization completed and saved.")

None
✅ Optimization completed and saved.
