# Prosumers – Full replication (Excel logic) + Dashboards (COMPLETE)
This notebook is designed to run end‑to‑end in **Google Colab** (or Jupyter).

It:
1) Reads the input Excel workbook.
2) Replicates the **Utility Function plus20%** sheet logic in Python (best‑effort via formula extraction + explicit SoC recursion).
3) Produces **interactive, high-quality dashboards** (Plotly + widgets).
4) Exports all computed outputs to a new Excel file.

✅ Fix included: `import re` and Colab widget manager enable.


In [None]:
# ===== Colab / environment setup (safe to run in Jupyter too) =====
try:
    import sys, subprocess
    # Install missing deps quietly (Colab already has many of these)
    subprocess.check_call([sys.executable, "-m", "pip", "-q", "install", "ipywidgets", "plotly", "openpyxl"])
except Exception as e:
    print("Install step skipped/failed (often OK in local Jupyter):", e)

# Enable ipywidgets in Colab (no-op in local Jupyter)
try:
    from google.colab import output
    output.enable_custom_widget_manager()
except Exception:
    pass


In [None]:
import pandas as pd
import numpy as np
from pathlib import Path

# Dashboards
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import ipywidgets as widgets
from IPython.display import display

# Excel (formulas + values)
import openpyxl

# Helpers
import re
import warnings
import ast
import math

# ===== INPUTS =====
EXCEL_PATH = Path("/content/Solar_availability_per_month_and_Hour.xlsx")  # update if your file name/path differs
OUTPUT_XLSX = Path("/content/prosumers_full_outputs.xlsx")

EXCEL_PATH, EXCEL_PATH.exists()


(PosixPath('/content/Solar_availability_per_month_and_Hour.xlsx'), True)

## 0) If running on Colab: upload the Excel file
If `EXCEL_PATH.exists()` is False, run the next cell to upload your Excel file.

In [None]:
# Colab upload helper (optional)
try:
    from google.colab import files
    if not EXCEL_PATH.exists():
        up = files.upload()
        # pick the first uploaded file as EXCEL_PATH (unless you want to set it manually)
        if len(up) > 0:
            first = next(iter(up.keys()))
            EXCEL_PATH = Path("/content") / first
            print("Using:", EXCEL_PATH)
except Exception as e:
    print("Upload cell is only for Colab. In local Jupyter, place the file next to notebook and set EXCEL_PATH.")


## 1) Helpers: read formulas + evaluate a subset of Excel formulas
Supported: `IF`, `MIN`, `MAX`, `ABS`, arithmetic, and basic comparisons.

Cell references like `E2`, `$E$2` in the **same row** are supported.

**SoC** is computed explicitly (recursive):
`SoC[t] = SoC[t-1] + Charge[t] − Discharge[t]`.


In [None]:
def load_sheet_wb(path: Path, sheet_name: str):
    wb = openpyxl.load_workbook(path, data_only=False)  # keep formulas
    ws = wb[sheet_name]
    return wb, ws

def sheet_to_df_values(path: Path, sheet_name: str):
    # values as displayed (requires the workbook to have been calculated/saved in Excel)
    wb = openpyxl.load_workbook(path, data_only=True)
    ws = wb[sheet_name]
    rows = list(ws.iter_rows(values_only=True))
    header = rows[0]
    data = rows[1:]
    return pd.DataFrame(data, columns=header)

CELL_RE = re.compile(r"(\$?[A-Z]{1,3}\$?\d+)")

FUNC_MAP = {"IF":"if_", "MIN":"min_", "MAX":"max_", "ABS":"abs_"}

def if_(cond, a, b):
    return a if cond else b
def min_(*args):
    return min(args)
def max_(*args):
    return max(args)
def abs_(x):
    return abs(x)

SAFE_GLOBALS = {
    "__builtins__": {},
    "if_": if_,
    "min_": min_,
    "max_": max_,
    "abs_": abs_,
    "np": np,
    "math": math
}

def excel_to_python_expr(formula: str):
    f = str(formula).strip()
    if f.startswith("="):
        f = f[1:]
    f = f.replace("^", "**")
    f = f.replace(";", ",")  # Greek locale often uses ';'
    for k,v in FUNC_MAP.items():
        f = re.sub(rf"\b{k}\s*\(", f"{v}(", f, flags=re.IGNORECASE)
    # Replace '=' with '==' for equality (but not >=, <=, !=)
    f = re.sub(r"(?<![<>!])=(?!=)", "==", f)
    return f

def build_col_letter_map(ws):
    headers = [c.value for c in ws[1]]
    col_letter_map = {}
    for idx, h in enumerate(headers, start=1):
        if h is None:
            continue
        col_letter = openpyxl.utils.get_column_letter(idx)
        col_letter_map[col_letter] = h
    return col_letter_map

def build_cell_map_for_row(df: pd.DataFrame, col_letter_map: dict, row_idx_excel: int):
    m = {}
    for col_letter, col_name in col_letter_map.items():
        if col_name not in df.columns:
            continue
        key = f"{col_letter}{row_idx_excel}"
        v = df.at[row_idx_excel-2, col_name]  # df row 0 corresponds to excel row 2
        m[key] = v
        m[f"${col_letter}${row_idx_excel}"] = v
        m[f"${col_letter}{row_idx_excel}"] = v
        m[f"{col_letter}${row_idx_excel}"] = v
    return m

def eval_excel_formula(formula: str, cell_map: dict):
    py = excel_to_python_expr(formula)

    def repl(m):
        ref = m.group(1)
        if ref in cell_map:
            v = cell_map[ref]
            if v is None or (isinstance(v,float) and np.isnan(v)):
                return "np.nan"
            if isinstance(v,(int,float,np.integer,np.floating)):
                return f"({float(v)})"
            return f"({repr(v)})"
        return "np.nan"

    py = CELL_RE.sub(repl, py)

    node = ast.parse(py, mode="eval")
    allowed = (ast.Call, ast.Name, ast.Load, ast.Expression, ast.BinOp, ast.UnaryOp,
               ast.Compare, ast.BoolOp, ast.IfExp, ast.Constant, ast.Subscript,
               ast.Attribute, ast.Tuple, ast.List)
    for n in ast.walk(node):
        if isinstance(n, allowed):
            continue
        raise ValueError(f"Unsupported AST node: {type(n).__name__}")

    return eval(compile(node, "<excel_formula>", "eval"), SAFE_GLOBALS, {})


## 2) Read core sheets (Demand + PV)
Replicates the earlier pipeline: hourly demand matrix + PV generation vector.

In [None]:
# --- System Load / Demand ---
df_load = pd.read_excel(EXCEL_PATH, sheet_name="System Load")
df_daily = pd.read_excel(EXCEL_PATH, sheet_name="Daily Demand per household")

load_shape = pd.to_numeric(df_load["Load_Shape"], errors="coerce").to_numpy().reshape(-1, 1)
years = df_daily.iloc[0, :-1].astype(int).to_numpy()
row_array = df_daily.iloc[1, :-1].astype(float).to_numpy().reshape(1, -1)

hourly_demand = (load_shape @ row_array) / 100.0
hourly_demand_df = pd.DataFrame(hourly_demand, columns=[int(y) for y in years])

# --- Solar / PV generation ---
PV_AREA_M2 = 20
PV_EFFICIENCY = 0.18
pv_factor = PV_AREA_M2 * PV_EFFICIENCY

df_solar = pd.read_excel(EXCEL_PATH, sheet_name="Solar Availability and Gener GR")
solar_kw_m2 = pd.to_numeric(df_solar["Solar_Availability_kW/m^2"], errors="coerce").to_numpy().reshape(-1, 1)

pv_df = pd.DataFrame({"PV_kWh": (solar_kw_m2 * pv_factor).flatten()})

hourly_demand_df.shape, pv_df.shape


((290, 7), (294, 1))

## 3) Utility Function sheet: values + formulas
Loads `Utility Function plus20%` in two ways:
- values (`data_only=True`) and
- formulas (`data_only=False`).

In [None]:
SHEET_UF = "Utility Function plus20%"

uf_vals = sheet_to_df_values(EXCEL_PATH, SHEET_UF)
wb, ws = load_sheet_wb(EXCEL_PATH, SHEET_UF)
col_letter_map = build_col_letter_map(ws)

uf_vals.head(), len(uf_vals)


(   Hour_Index  Battery_Charge_kWh  Battery_Discharge_kWh   SOC_kWh  \
 0         1.0                 1.2               0.000000  1.200000   
 1         2.0                 0.0              -0.295969  0.830039   
 2         3.0                 0.0              -0.295969  0.460078   
 3         4.0                 0.0              -0.295969  0.090116   
 4         5.0                 0.0               0.000000  0.090116   
 
    Spot Price MIN charge MAX charge       None   None  None  ...  \
 0      230.90        0.1        0.9       None   None  None  ...   
 1      268.19       None       None       None   None  None  ...   
 2      229.58     10 kwh       5 kw  per house  n=90%  None  ...   
 3      235.98         12       None       None   None  None  ...   
 4      234.28       None       None        4kw     pv  None  ...   
 
    Buy from Grid or import from Discharge  income   payment  \
 0                               discharge    None      None   
 1                          

## 4) Full replication in Python (row-wise)
Best-effort approach:
- Pull formulas from Excel row 2 and apply row-wise.
- Compute SoC recursively.
- Compute annual KPIs.

In [None]:
def find_col(cols, candidates):
    """Return the first column whose (string) name contains any candidate substring (case-insensitive).
    Skips None/NaN/non-string column names."""
    safe_cols = [c for c in cols if isinstance(c, str) and c.strip() != ""]
    low_map = {c.lower(): c for c in safe_cols}

    for cand in candidates:
        cand_l = str(cand).lower()
        for k, v in low_map.items():
            if cand_l in k:
                return v
    return None


cols = list(uf_vals.columns)

col_spot     = find_col(cols, ["spot price"])
col_pv       = find_col(cols, ["generated", "pv"])
col_dem      = find_col(cols, ["household demand"])
col_sur      = find_col(cols, ["surplus/deficit"])
col_sur_cum  = find_col(cols, ["cumulative"])
col_u_dec    = find_col(cols, ["charge or sell"])
col_v_dec    = find_col(cols, ["buy from grid", "import from discharge"])
col_b_chg    = find_col(cols, ["battery charge"])
col_c_dis    = find_col(cols, ["battery discharge"])
col_soc      = find_col(cols, ["soc"])
col_income   = find_col(cols, ["income"])
col_payment  = find_col(cols, ["payment"])
col_expenses = find_col(cols, ["expences", "expenses"])

(col_spot, col_pv, col_dem, col_sur, col_u_dec, col_v_dec, col_b_chg, col_c_dis, col_soc, col_income, col_payment, col_expenses)


('Spot Price',
 'Generated  from Pv kwh',
 'household Demand Kwh',
 'Surplus/Deficit (Kwh)',
 'Charge or Sell to grid',
 'Buy from Grid or import from Discharge',
 None,
 None,
 'SOC_kWh',
 'income',
 'payment',
 'expences at current regime')

In [None]:
def get_row_formulas(ws, row_excel: int):
    formulas = {}
    for cell in ws[row_excel]:
        if isinstance(cell.value, str) and cell.value.startswith("="):
            col_letter = openpyxl.utils.get_column_letter(cell.column)
            header = col_letter_map.get(col_letter)
            if header:
                formulas[header] = cell.value
    return formulas

row2_formulas = get_row_formulas(ws, 2)
len(row2_formulas), list(row2_formulas.items())[:8]


(9,
 [('Battery_Charge_kWh', '=D2'),
  ('SOC_kWh', '=F5*0.1'),
  ('Generated  from Pv kwh', "='Solar Availability and Gener GR'!F2"),
  ('Surplus/Deficit (Kwh)', '=K2-L2'),
  ('Export to Grid Kwh from PV',
   '=IF(M2<0,"Buy from Grid","sell the surplus or charge")'),
  ('Buy from Grid or import from Discharge',
   '=IF(D2+M2>0,"discharge","buy from the Grid")'),
  ('expences at current regime', '=(L2*E2)/1000'),
  ('Surplus/Deficit cumulative', '=M2')])

In [None]:
uf_py = uf_vals.copy()

# If PV/Demand are blank in UF sheet, fill from computed PV/Demand (common setup)
if col_pv and uf_py[col_pv].isna().all():
    uf_py[col_pv] = pv_df["PV_kWh"].values[:len(uf_py)]

if col_dem and uf_py[col_dem].isna().all():
    # Default to 2023 if present, else first year
    default_year = 2023 if 2023 in hourly_demand_df.columns else int(hourly_demand_df.columns[0])
    uf_py[col_dem] = hourly_demand_df[default_year].values[:len(uf_py)]

# Surplus/Deficit
if col_sur and (uf_py[col_sur].isna().all()) and col_pv and col_dem:
    uf_py[col_sur] = pd.to_numeric(uf_py[col_pv], errors="coerce") - pd.to_numeric(uf_py[col_dem], errors="coerce")

# Cumulative
if col_sur_cum and (uf_py[col_sur_cum].isna().all()) and col_sur:
    uf_py[col_sur_cum] = pd.to_numeric(uf_py[col_sur], errors="coerce").cumsum()

target_cols = [c for c in [col_u_dec, col_v_dec, col_b_chg, col_c_dis, col_income, col_payment, col_expenses] if c]
formulas = row2_formulas

soc_list = []
prev_soc = 0.0

# Initial SoC if present
if col_soc and pd.notna(uf_py.loc[0, col_soc]):
    try:
        prev_soc = float(uf_py.loc[0, col_soc])
    except Exception:
        prev_soc = 0.0

for i in range(len(uf_py)):
    excel_row = i + 2  # df row 0 -> excel row 2
    cell_map = build_cell_map_for_row(uf_py, col_letter_map, excel_row)

    # Evaluate row formulas best-effort
    for c in target_cols:
        f = formulas.get(c)
        if isinstance(f, str) and f.startswith("="):
            try:
                val = eval_excel_formula(f, cell_map)
            except Exception:
                val = uf_py.loc[i, c]  # fallback to Excel value (if present)
        else:
            val = uf_py.loc[i, c]
        uf_py.loc[i, c] = val

    # SoC recursion (explicit)
    chg = float(uf_py.loc[i, col_b_chg]) if col_b_chg and pd.notna(uf_py.loc[i, col_b_chg]) else 0.0
    dis = float(uf_py.loc[i, col_c_dis]) if col_c_dis and pd.notna(uf_py.loc[i, col_c_dis]) else 0.0
    soc = prev_soc + chg - dis
    soc_list.append(soc)
    prev_soc = soc

if col_soc:
    uf_py[col_soc] = soc_list

uf_py[[c for c in [col_spot, col_pv, col_dem, col_sur, col_sur_cum, col_u_dec, col_v_dec, col_b_chg, col_c_dis, col_soc, col_income, col_payment, col_expenses] if c]].head()


Unnamed: 0,Spot Price,Generated from Pv kwh,household Demand Kwh,Surplus/Deficit (Kwh),Surplus/Deficit cumulative,Charge or Sell to grid,Buy from Grid or import from Discharge,SOC_kWh,income,payment,expences at current regime
0,230.9,0.0,0.297217,-0.297217,-0.297217,,discharge,1.2,,,0.068627
1,268.19,0.0,0.295969,-0.295969,-0.593186,,discharge,1.2,,,0.079376
2,229.58,0.0,0.295969,-0.295969,-0.889155,,discharge,1.2,,,0.067949
3,235.98,0.0,0.295969,-0.295969,-1.185124,,buy from the Grid,1.2,,-0.069843,0.069843
4,234.28,0.0,0.295969,-0.295969,-1.481093,,buy from the Grid,1.2,,-0.06934,0.06934


## 5) Annual KPIs

In [None]:
uf_py["net_cashflow"] = pd.to_numeric(uf_py[col_income], errors="coerce").fillna(0) - pd.to_numeric(uf_py[col_payment], errors="coerce").fillna(0)

annual_net_income = uf_py["net_cashflow"].sum()
annual_expenses_consumer = pd.to_numeric(uf_py[col_expenses], errors="coerce").fillna(0).sum()
annual_benefit = annual_net_income + annual_expenses_consumer

kpis = pd.DataFrame([{
    "annual_net_income": annual_net_income,
    "annual_expenses_as_consumer": annual_expenses_consumer,
    "annual_benefit_per_household": annual_benefit
}])

kpis


Unnamed: 0,annual_net_income,annual_expenses_as_consumer,annual_benefit_per_household
0,-14309130000.0,35233.605707,-14309090000.0


## 6) Dashboards (interactive)
Includes KPI cards + multi-panel dashboard with a zoom slider.

In [None]:
plot_df = uf_py.copy()
plot_df["t"] = np.arange(1, len(plot_df)+1)

# KPI cards
kpi1 = go.Figure(go.Indicator(mode="number", value=float(kpis.iloc[0]["annual_net_income"]), title={"text":"Annual net income"}))
kpi2 = go.Figure(go.Indicator(mode="number", value=float(kpis.iloc[0]["annual_expenses_as_consumer"]), title={"text":"Annual expenses as consumer"}))
kpi3 = go.Figure(go.Indicator(mode="number", value=float(kpis.iloc[0]["annual_benefit_per_household"]), title={"text":"Annual benefit per household"}))

for f in (kpi1, kpi2, kpi3):
    f.update_layout(height=140, margin=dict(l=10, r=10, t=35, b=10))

display(widgets.HBox([go.FigureWidget(kpi1), go.FigureWidget(kpi2), go.FigureWidget(kpi3)]))


HBox(children=(FigureWidget({
    'data': [{'mode': 'number',
              'title': {'text': 'Annual net inco…

In [None]:
t_min, t_max = 1, len(plot_df)

slider = widgets.IntRangeSlider(
    value=[t_min, min(t_max, 72)],
    min=t_min,
    max=t_max,
    step=1,
    description="Hours:",
    continuous_update=False,
    layout=widgets.Layout(width="85%")
)

out = widgets.Output()

def dashboard(t_range):
    a, b = t_range
    d = plot_df[(plot_df["t"] >= a) & (plot_df["t"] <= b)]

    fig = make_subplots(
        rows=4, cols=1, shared_xaxes=True, vertical_spacing=0.06,
        subplot_titles=("Spot Price / PV / Demand",
                        "Surplus/Deficit & Cumulative",
                        "SoC / Charge / Discharge",
                        "Income / Payment / Net cashflow")
    )

    # Row 1
    if col_spot:
        fig.add_trace(go.Scatter(x=d["t"], y=pd.to_numeric(d[col_spot], errors="coerce"), name="Spot Price"), row=1, col=1)
    if col_pv:
        fig.add_trace(go.Scatter(x=d["t"], y=pd.to_numeric(d[col_pv], errors="coerce"), name="PV (kWh)"), row=1, col=1)
    if col_dem:
        fig.add_trace(go.Scatter(x=d["t"], y=pd.to_numeric(d[col_dem], errors="coerce"), name="Demand (kWh)"), row=1, col=1)

    # Row 2
    if col_sur:
        fig.add_trace(go.Scatter(x=d["t"], y=pd.to_numeric(d[col_sur], errors="coerce"), name="Surplus/Deficit"), row=2, col=1)
    if col_sur_cum:
        fig.add_trace(go.Scatter(x=d["t"], y=pd.to_numeric(d[col_sur_cum], errors="coerce"), name="Cumulative"), row=2, col=1)

    # Row 3
    if col_soc:
        fig.add_trace(go.Scatter(x=d["t"], y=pd.to_numeric(d[col_soc], errors="coerce"), name="SoC"), row=3, col=1)
    if col_b_chg:
        fig.add_trace(go.Bar(x=d["t"], y=pd.to_numeric(d[col_b_chg], errors="coerce").fillna(0), name="Charge"), row=3, col=1)
    if col_c_dis:
        fig.add_trace(go.Bar(x=d["t"], y=pd.to_numeric(d[col_c_dis], errors="coerce").fillna(0), name="Discharge"), row=3, col=1)

    # Row 4
    if col_income:
        fig.add_trace(go.Scatter(x=d["t"], y=pd.to_numeric(d[col_income], errors="coerce"), name="Income"), row=4, col=1)
    if col_payment:
        fig.add_trace(go.Scatter(x=d["t"], y=pd.to_numeric(d[col_payment], errors="coerce"), name="Payment"), row=4, col=1)
    fig.add_trace(go.Scatter(x=d["t"], y=pd.to_numeric(d["net_cashflow"], errors="coerce"), name="Net cashflow"), row=4, col=1)

    fig.update_layout(height=950, legend_orientation="h", legend_y=-0.08, margin=dict(l=30, r=20, t=60, b=40))
    return fig

def on_change(change):
    if change["name"] == "value":
        with out:
            out.clear_output(wait=True)
            display(dashboard(change["new"]))

slider.observe(on_change, names="value")

display(slider)
with out:
    display(dashboard(slider.value))
display(out)


IntRangeSlider(value=(1, 72), continuous_update=False, description='Hours:', layout=Layout(width='85%'), max=3…

Output()

## 7) Export outputs to Excel

In [None]:
export_cols = [c for c in [col_spot, col_pv, col_dem, col_sur, col_sur_cum, col_u_dec, col_v_dec, col_b_chg, col_c_dis, col_soc, col_income, col_payment, col_expenses] if c]
export_df = uf_py[export_cols].copy()
export_df.insert(0, "HourIndex", np.arange(1, len(export_df)+1))
export_df["net_cashflow"] = uf_py["net_cashflow"]

with pd.ExcelWriter(OUTPUT_XLSX, engine="openpyxl") as writer:
    hourly_demand_df.to_excel(writer, sheet_name="Hourly Demand (all years)", index=False)
    pv_df.to_excel(writer, sheet_name="Hourly PV Generation", index=False)
    export_df.to_excel(writer, sheet_name="UtilityFunction_Python", index=False)
    kpis.to_excel(writer, sheet_name="KPIs", index=False)

OUTPUT_XLSX


PosixPath('/content/prosumers_full_outputs.xlsx')

## 8) Quality check (optional)
Compare Python vs Excel values for key columns (if Excel has stored values).
If the workbook wasn't recalculated/saved in Excel before upload, Excel 'values' may be blank.

In [None]:
def compare(colname, n=12):
    if colname is None:
        return None
    a = pd.to_numeric(uf_vals[colname], errors="coerce")
    b = pd.to_numeric(uf_py[colname], errors="coerce")
    diff = (b - a).abs()
    return pd.DataFrame({"excel": a, "python": b, "abs_diff": diff}).head(n)

checks = {}
for c in [col_u_dec, col_v_dec, col_b_chg, col_c_dis, col_income, col_payment, col_expenses]:
    if c:
        checks[c] = compare(c, n=12)

checks


{'Charge or Sell to grid':     excel  python  abs_diff
 0     NaN     NaN       NaN
 1     NaN     NaN       NaN
 2     NaN     NaN       NaN
 3     NaN     NaN       NaN
 4     NaN     NaN       NaN
 5     NaN     NaN       NaN
 6     NaN     NaN       NaN
 7     NaN     NaN       NaN
 8     NaN     NaN       NaN
 9     NaN     NaN       NaN
 10    NaN     NaN       NaN
 11    NaN     NaN       NaN,
 'Buy from Grid or import from Discharge':     excel  python  abs_diff
 0     NaN     NaN       NaN
 1     NaN     NaN       NaN
 2     NaN     NaN       NaN
 3     NaN     NaN       NaN
 4     NaN     NaN       NaN
 5     NaN     NaN       NaN
 6     NaN     NaN       NaN
 7     NaN     NaN       NaN
 8     NaN     NaN       NaN
 9     NaN     NaN       NaN
 10    NaN     NaN       NaN
 11    NaN     NaN       NaN,
 'income':        excel    python  abs_diff
 0        NaN       NaN       NaN
 1        NaN       NaN       NaN
 2        NaN       NaN       NaN
 3        NaN       NaN       