### ***PACKAGES***

In [1]:
!pip install uvicorn fastapi python-multipart
!wget -q https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64.deb
!dpkg -i cloudflared-linux-amd64.deb

Selecting previously unselected package cloudflared.
(Reading database ... 117540 files and directories currently installed.)
Preparing to unpack cloudflared-linux-amd64.deb ...
Unpacking cloudflared (2026.1.2) ...
Setting up cloudflared (2026.1.2) ...
Processing triggers for man-db (2.10.2-1) ...


### ***PARSER***

In [2]:
%%writefile gcode_parser.py
import re
import math
import pandas as pd


ACC_LINEAR = 100.0


def parse_lavorazioni_regex(file_path: str) -> pd.DataFrame:
    start_re = re.compile(r'\*{3}\s*Esecuzione lavorazione')
    end_re = re.compile(r'\*{7}\s*FINE INFO VOLUMI ASPORTATI')

    line_re = re.compile(
        r'(?:\bN(?P<N>\d+))?'
        r'(?:.*?\bG(?P<G>[0123]))?'
        r'(?:.*?X(?P<X>-?\d+(?:[.,]\d+)?))?'
        r'(?:.*?Y(?P<Y>-?\d+(?:[.,]\d+)?))?'
        r'(?:.*?Z(?P<Z>-?\d+(?:[.,]\d+)?))?'
        r'(?:.*?I(?P<I>-?\d+(?:[.,]\d+)?))?'
        r'(?:.*?J(?P<J>-?\d+(?:[.,]\d+)?))?'
        r'(?:.*?\bF(?P<F>\d+(?:[.,]\d+)?))?'
    )

    def to_float(val):
        return float(val.replace(',', '.')) if val is not None else None

    def versor(value: float) -> int:
        if value > 0:
            return 1
        elif value < 0:
            return -1
        else:
            return 0

    records = []
    in_block = False

    last_x = last_y = last_z = None
    prev_x = prev_y = prev_z = None

    esecuzione_idx = 0
    esecuzione_nome = None

    current_diametro = None
    block_diametro = None
    current_vol_tot = None
    block_start_idx = None
    current_s = None
    cum_t_ideal = 0.0

    with open(file_path, 'r', encoding='latin-1') as f:
        for raw_line in f:
            original_line = raw_line.rstrip('\n')
            line = original_line.strip()

            # Lettura S
            m_s = re.search(r'\bS([0-9.,]+)', line)
            if m_s:
                current_s = to_float(m_s.group(1))

            # Lettura diametro utensile
            m_diam = re.compile(
                r'DIAMETRO\s+UTENSILE:\s*([0-9.,]+)', re.IGNORECASE
            ).search(line)
            if m_diam:
                current_diametro = to_float(m_diam.group(1))

            # Lettura VOL_TOT (solo dentro al blocco)
            m_vol = re.search(r"VOL_TOT:\s*([0-9.,]+)", line, re.IGNORECASE)
            if m_vol and in_block:
                current_vol_tot = to_float(m_vol.group(1))
                if block_start_idx is not None:
                    for rec in records[block_start_idx:]:
                        rec['VOL_TOT'] = current_vol_tot

            # Inizio blocco lavorazione
            if start_re.search(line):
                in_block = True
                esecuzione_idx += 1

                last_x = last_y = last_z = None
                prev_x = prev_y = prev_z = None

                block_diametro = current_diametro
                current_vol_tot = None
                block_start_idx = len(records)
                cum_t_ideal = 0.0

                m_nome = re.search(r'\*{3}\s*Esecuzione lavorazione\s*(.*)', line)
                esecuzione_nome = (
                    m_nome.group(1).strip()
                    if m_nome and m_nome.group(1).strip()
                    else f"Lavorazione_{esecuzione_idx}"
                )
                continue

            # Fine blocco lavorazione
            if end_re.search(line):
                in_block = False
                esecuzione_nome = None
                block_diametro = None
                current_vol_tot = None
                block_start_idx = None
                continue

            if not in_block:
                continue

            # Salta commenti / righe vuote
            if not line or line.startswith(('(', ';')):
                continue

            m = line_re.search(line)
            if not m:
                continue

            g_str = m.group('G')
            if g_str is None:
                continue

            movimento = f"G{g_str}"

            if movimento in ('G2', 'G3'):
                i_val = to_float(m.group('I'))
                j_val = to_float(m.group('J'))
            else:
                i_val = j_val = None

            n_str = m.group('N')
            x_val = to_float(m.group('X'))
            y_val = to_float(m.group('Y'))
            z_val = to_float(m.group('Z'))
            f_val = to_float(m.group('F')) if m.group('F') else 0.0

            # Mantieni l'ultimo valore noto di X,Y,Z se omessi
            x_val = last_x if x_val is None and last_x is not None else x_val
            y_val = last_y if y_val is None and last_y is not None else y_val
            z_val = last_z if z_val is None and last_z is not None else z_val

            if x_val is not None:
                last_x = x_val
            if y_val is not None:
                last_y = y_val
            if z_val is not None:
                last_z = z_val

            cur_x = x_val or 0.0
            cur_y = y_val or 0.0
            cur_z = z_val or 0.0

            # Delta rispetto alla riga precedente
            if prev_x is None:
                delta_x = delta_y = delta_z = 0.0
            else:
                delta_x = cur_x - prev_x
                delta_y = cur_y - prev_y
                delta_z = cur_z - prev_z

            xv = versor(delta_x)
            yv = versor(delta_y)
            zv = versor(delta_z)

            # Raggio per G2
            if movimento in ('G2') and i_val is not None and j_val is not None:
                R_val = math.sqrt((cur_x - i_val) ** 2 + (cur_y - j_val) ** 2)
            else:
                R_val = 0.0

            theta_val = None
            dist_lin = None
            dist_circ = None
            dist = None

            # Distanza (lineare o circolare)
            if (
                movimento in ('G2')
                and i_val is not None and j_val is not None
                and prev_x is not None and prev_y is not None
            ):
                vx_prev = prev_x - i_val
                vy_prev = prev_y - j_val
                vx_curr = cur_x - i_val
                vy_curr = cur_y - j_val

                num = vx_prev * vx_curr + vy_prev * vy_curr
                den = math.sqrt(vx_prev ** 2 + vy_prev ** 2) * math.sqrt(vx_curr ** 2 + vy_curr ** 2)

                if den != 0:
                    cos_theta = max(-1.0, min(1.0, num / den))
                    theta_val = math.acos(cos_theta)
                    dist_circ = R_val * theta_val
                    dist = dist_circ
            else:
                if prev_x is not None and prev_y is not None and prev_z is not None:
                    dist_lin = math.sqrt(
                        (cur_x - prev_x) ** 2 +
                        (cur_y - prev_y) ** 2 +
                        (cur_z - prev_z) ** 2
                    )
                else:
                    dist_lin = 0.0
                dist = dist_lin

            # Tempo ideale del segmento con profilo trapezoidale (ACC_LINEAR)
            if dist is not None and dist > 0 and f_val not in (None, 0):
                v_cmd = f_val / 60.0  # mm/min -> mm/s
                a = ACC_LINEAR
                t_acc = v_cmd / a
                s_acc = 0.5 * v_cmd * v_cmd / a
                if dist >= 2.0 * s_acc:
                    seg_t_ideal = 2.0 * t_acc + (dist - 2.0 * s_acc) / v_cmd
                else:
                    seg_t_ideal = 2.0 * math.sqrt(dist / a)
            else:
                seg_t_ideal = 0.0

            cum_t_ideal += seg_t_ideal
            t_ideal = cum_t_ideal

            # Estraggo la linea di codice G completa
            m_code = re.search(r'\bG[012].*', original_line)
            code_line = m_code.group(0).strip() if m_code else None

            record = {
                'esecuzione_n': esecuzione_idx,
                'esecuzione_nome': esecuzione_nome,
                'N': int(n_str) if n_str else None,
                'movimento': movimento,
                'X': x_val,
                'Y': y_val,
                'Z': z_val,
                'delta_x': delta_x,
                'delta_y': delta_y,
                'delta_z': delta_z,
                'xv': xv,
                'yv': yv,
                'zv': zv,
                'I': i_val,
                'J': j_val,
                'F': f_val,
                'S': current_s,
                'diametro_utensile': block_diametro,
                'VOL_TOT': current_vol_tot,
                'code': code_line,
                'R': R_val,
                'theta': theta_val,
                'dist_lineare': dist_lin,
                'dist_circolare': dist_circ,
                'dist': dist,
                't_ideal': t_ideal,
            }

            records.append(record)
            prev_x, prev_y, prev_z = cur_x, cur_y, cur_z

    # -----------------------------------------
    # COSTRUZIONE DATAFRAME BASE
    # -----------------------------------------
    df = pd.DataFrame(
        records,
        columns=[
            'esecuzione_n', 'esecuzione_nome',
            'N', 'movimento',
            'X', 'Y', 'Z', 'I', 'J',
            'delta_x', 'delta_y', 'delta_z',
            'xv', 'yv', 'zv',
            'R',
            'F', 'S',
            'diametro_utensile', 'VOL_TOT',
            'code',
            'theta', 'dist_lineare', 'dist_circolare', 'dist',
            't_ideal',
        ],
    )

    # ----------------------------------------------------------
    # CALCOLO MRR_IDEAL
    # ----------------------------------------------------------
    ESEC_COL = "esecuzione_n"
    FEED_COL = "F"
    VOL_COL = "VOL_TOT"
    TIME_COL = "t_ideal"

    # Conversione numerica
    df[FEED_COL] = pd.to_numeric(df[FEED_COL], errors="coerce")
    df[VOL_COL] = pd.to_numeric(df[VOL_COL], errors="coerce")
    df[TIME_COL] = pd.to_numeric(df[TIME_COL], errors="coerce")

    # Feed unici
    unique_F = sorted(df[FEED_COL].dropna().unique().tolist())

    # -----------------------------------------
    # ideal_count_block_F*
    # -----------------------------------------
    for idx_f, _ in enumerate(unique_F):
        df[f"ideal_count_block_F{idx_f}"] = 0.0

    for _, block in df.groupby(ESEC_COL, sort=False):
        for idx_f, f_val in enumerate(unique_F):
            mask = (block[FEED_COL] == f_val)
            df.loc[block.index, f"ideal_count_block_F{idx_f}"] = mask.astype(int).cumsum()

    # -----------------------------------------
    # ideal_T*
    # -----------------------------------------
    for idx_f in range(len(unique_F)):
        df[f"ideal_T{idx_f}"] = 0.0

    for _, block in df.groupby(ESEC_COL, sort=False):
        idx = block.index
        if block[TIME_COL].isna().all():
            T_tot = 0
        else:
            T_tot = block[TIME_COL].iloc[-1]

        final_counts = [
            float(block[f"ideal_count_block_F{idx_f}"].iloc[-1])
            for idx_f in range(len(unique_F))
        ]
        sum_counts = sum(final_counts)

        if T_tot > 0 and sum_counts > 0:
            for idx_f in range(len(unique_F)):
                df.loc[idx, f"ideal_T{idx_f}"] = T_tot * (final_counts[idx_f] / sum_counts)

    # -----------------------------------------
    # ideal_MRR*
    # -----------------------------------------
    active_idx = [i for i, f in enumerate(unique_F) if f > 0]

    for idx_f in range(len(unique_F)):
        df[f"ideal_MRR{idx_f}"] = 0.0

    for _, block in df.groupby(ESEC_COL, sort=False):
        idx = block.index
        vol_valid = block[VOL_COL].dropna()
        vol = vol_valid.iloc[0] if not vol_valid.empty else 0.0

        T_vec = [block[f"ideal_T{idx_f}"].iloc[0] for idx_f in range(len(unique_F))]
        denom = sum(T_vec[i] * unique_F[i] for i in active_idx)

        k = vol / denom if denom > 0 else 0.0

        for idx_f, f_val in enumerate(unique_F):
            df.loc[idx, f"ideal_MRR{idx_f}"] = k * f_val if f_val > 0 else 0.0

    # ideal_MRR attivo per riga
    feed_to_idx = {f: i for i, f in enumerate(unique_F)}
    def _pick_mrr(row):
        f = row[FEED_COL]
        if f in feed_to_idx:
            idx_f = feed_to_idx[f]
            return row[f"ideal_MRR{idx_f}"]
        return 0.0

    df["ideal_MRR"] = df.apply(_pick_mrr, axis=1)

    # -----------------------------------------
    # TEMPO DI SEGMENTO E CUMULATA GLOBALE
    # -----------------------------------------
    df["ideal_seg_time"] = 0.0

    for _, block in df.groupby(ESEC_COL, sort=False):
        seg = block[TIME_COL].diff().fillna(block[TIME_COL])
        df.loc[block.index, "ideal_seg_time"] = seg

    df["t_ideal_cumul"] = df["ideal_seg_time"].cumsum()

    return df


Writing gcode_parser.py


In [3]:

%%writefile parser_module.py
import pandas as pd
from gcode_parser import parse_lavorazioni_regex

def parse_gcode(file_path: str):
    df = parse_lavorazioni_regex(file_path)
    return df.to_dict(orient="records")


Writing parser_module.py


### ***PREDICTOR***

In [4]:
from google.colab import drive
drive.mount("/content/drive")



Mounted at /content/drive


In [5]:
%%writefile power_predictor.py
import numpy as np
import pandas as pd
import tensorflow as tf
import joblib
import os

# ============================
# BASE PATH (GOOGLE DRIVE)
# ============================
BASE_PATH = "/content/drive/MyDrive/models"

def p(name):
    return os.path.join(BASE_PATH, name)

# ============================
# LOAD MODELS
# ============================
custom_objects = {
    "mse": tf.keras.losses.MeanSquaredError(),
    "MeanSquaredError": tf.keras.losses.MeanSquaredError(),
}

model_G0 = tf.keras.models.load_model(p("model_G0.h5"), custom_objects=custom_objects, compile=False)
model_G1 = tf.keras.models.load_model(p("model_G1.h5"), custom_objects=custom_objects, compile=False)
model_G2 = tf.keras.models.load_model(p("model_G2.h5"), custom_objects=custom_objects, compile=False)

# ============================
# LOAD SCALERS
# ============================
scalerX2_G0 = joblib.load(p("scalerX2_G0.pkl"))
scalerY2_G0 = joblib.load(p("scalerY2_G0.pkl"))

scalerX2_G1 = joblib.load(p("scalerX2_G1.pkl"))
scalerY2_G1 = joblib.load(p("scalerY2_G1.pkl"))

scalerX2_G2 = joblib.load(p("scalerX2_G2.pkl"))
scalerY2_G2 = joblib.load(p("scalerY2_G2.pkl"))

# ============================
# PREDICTION
# ============================
def predict_power(df):
    preds = []

    for _, row in df.iterrows():
        mov = row["movimento"]

        if mov == "G0":
            X = np.array([[row["xv"], row["yv"], row["zv"], row["S"]]], float)
            y = scalerY2_G0.inverse_transform(
                model_G0.predict(scalerX2_G0.transform(X), verbose=0)
            )[0, 0]

        elif mov == "G1":
            X = np.array([[row["xv"], row["yv"], row["zv"],
                           row["F"], row["S"], row["ideal_MRR"]]], float)
            y = scalerY2_G1.inverse_transform(
                model_G1.predict(scalerX2_G1.transform(X), verbose=0)
            )[0, 0]

        elif mov == "G2":
            X = np.array([[row["R"], row["F"], row["S"], row["ideal_MRR"]]], float)
            y = scalerY2_G2.inverse_transform(
                model_G2.predict(scalerX2_G2.transform(X), verbose=0)
            )[0, 0]

        else:
            y = 0.0

        preds.append(max(0.0, float(y)))  # vincolo fisico

    df["Power_pred"] = preds
    return df


Writing power_predictor.py


### ***OVERRIDE***

In [6]:
%%writefile override_module.py
import math
import pandas as pd

ACC_LINEAR = 100.0

# ==========================================================
# OVERRIDE CNC
# ==========================================================
def apply_override(df_base: pd.DataFrame, k: float) -> pd.DataFrame:
    df = df_base.copy()
    df["k"] = k
    df["F"] = df["F"] * k
    df["S"] = df["S"] * k
    return df

# ==========================================================
# RICALCOLO TEMPI
# ==========================================================
def recompute_time(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["ideal_seg_time"] = 0.0

    for _, block in df.groupby("esecuzione_n", sort=False):
        idx = block.index
        seg_times = []

        for d, f in zip(block["dist"], block["F"]):
            if d > 0 and f > 0:
                v = f / 60.0
                a = ACC_LINEAR
                s_acc = v * v / a
                t = (
                    2 * v / a + (d - s_acc) / v
                    if d >= s_acc
                    else 2 * math.sqrt(d / a)
                )
            else:
                t = 0.0
            seg_times.append(t)

        df.loc[idx, "ideal_seg_time"] = seg_times

    df["t_ideal"] = df.groupby("esecuzione_n")["ideal_seg_time"].cumsum()
    df["t_ideal_cumul"] = df["ideal_seg_time"].cumsum()
    return df

# ==========================================================
# RICALCOLO MRR
# ==========================================================
def recompute_MRR(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["ideal_MRR"] = 0.0

    for _, block in df.groupby("esecuzione_n", sort=False):
        idx = block.index
        vol = block["VOL_TOT"].dropna()
        if vol.empty:
            continue
        vol = vol.iloc[0]

        denom = (block["ideal_seg_time"] * block["F"]).sum()
        if denom > 0:
            df.loc[idx, "ideal_MRR"] = (vol / denom) * block["F"]

    return df

Writing override_module.py


### ***API***

In [7]:
%%writefile api_app.py
import os
import base64
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt

from fastapi import FastAPI, UploadFile, File, Query, Form
from fastapi.responses import HTMLResponse, FileResponse

from parser_module import parse_gcode
from power_predictor import predict_power
from override_module import apply_override, recompute_time, recompute_MRR

# ==========================================================
# APP
# ==========================================================
app = FastAPI()

UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)

# ==========================================================
# HOME — INPUT COSTI + FILE
# ==========================================================
@app.get("/", response_class=HTMLResponse)
async def upload_form():
    return """
    <html>
    <body style="font-family:Arial; padding:20px">
        <h2>Carica file G-Code</h2>

        <form action="/upload" enctype="multipart/form-data" method="post">

            <label>Costo orario macchina [€/h]</label><br>
            <input type="number" step="1" name="costo_orario" value="60" required><br><br>

            <label>Costo energia elettrica [€/kWh]</label><br>
            <input type="number" step="0.01" name="costo_kwh" value="0.40" required><br><br>

            <label>Costo utensile [€/utensile]</label><br>
            <input type="number" step="1" name="costo_utensile" value="30" required><br><br>

            <input name="file" type="file" required><br><br>

            <input type="submit" value="Upload">
        </form>
    </body>
    </html>
    """

# ==========================================================
# UPLOAD → SIMULAZIONE BASE (k = 1)
# ==========================================================
@app.post("/upload")
async def upload_gcode(
    file: UploadFile = File(...),
    costo_orario: float = Form(...),
    costo_kwh: float = Form(...),
    costo_utensile: float = Form(...)
):
    file_path = f"{UPLOAD_DIR}/{file.filename}"
    with open(file_path, "wb") as f:
        f.write(await file.read())

    base_name, _ = os.path.splitext(file.filename)

    # -------- PARSER --------
    df_base = pd.DataFrame(parse_gcode(file_path))
    df_base.to_pickle(os.path.join(UPLOAD_DIR, f"{base_name}_base.pkl"))

    # -------- SIMULAZIONE (k = 1) --------
    k = 1.0
    df = apply_override(df_base, k)
    df = recompute_time(df)
    df = recompute_MRR(df)
    df = predict_power(df)

    return render_result(
        df, base_name, file.filename, k,
        costo_orario, costo_kwh, costo_utensile
    )

# ==========================================================
# RECOMPUTE — CAMBIO k
# ==========================================================
@app.get("/recompute", response_class=HTMLResponse)
async def recompute_view(
    base_name: str,
    k: float = 1.0,
    costo_orario: float = 60.0,
    costo_kwh: float = 0.25,
    costo_utensile: float = 50.0
):
    df_base = pd.read_pickle(os.path.join(UPLOAD_DIR, f"{base_name}_base.pkl"))

    df = apply_override(df_base, k)
    df = recompute_time(df)
    df = recompute_MRR(df)
    df = predict_power(df)

    return render_result(
        df, base_name, base_name, k,
        costo_orario, costo_kwh, costo_utensile
    )

# ==========================================================
# RENDER RISULTATI
# ==========================================================
def render_result(df, base_name, filename, k,
                  costo_orario, costo_kwh, costo_utensile):

    import math

    # -------------------------
    # Protezione k
    # -------------------------
    try:
        k_safe = float(k)
    except Exception:
        k_safe = 1.0
    if k_safe <= 0:
        k_safe = 1e-6

    # ================= KPI TEMPO & ENERGIA =================
    T_tot = float(df["t_ideal_cumul"].max() or 0.0)  # [s]
    E_tot = float((df["Power_pred"] * df["ideal_seg_time"]).sum() or 0.0)  # [J]
    P_mean = E_tot / T_tot if T_tot > 0 else 0.0  # [W]

    # ================= COSTO MACCHINA =================
    T_tot_h = T_tot / 3600.0
    C_macchina = T_tot_h * float(costo_orario)

    # ================= COSTO ENERGIA =================
    E_kWh = E_tot / 3.6e6
    C_energia = E_kWh * float(costo_kwh)

    # ================= COSTO UTENSILE =================
    K_TAYLOR = 0.3
    C_TAYLOR = 900.0

    D_max = df["diametro_utensile"].dropna().max()
    if pd.isna(D_max) or D_max <= 0:
        D_max = 1.0

    vita_consumata = 0.0
    df_cut = df[df["movimento"].isin(["G1", "G2"])]

    for _, row in df_cut.iterrows():

        S = row.get("S", 0.0)
        t_seg = row.get("ideal_seg_time", 0.0)

        if pd.isna(S) or pd.isna(t_seg):
            continue
        if S <= 0 or t_seg <= 0:
            continue

        # velocità di taglio [m/min]
        v_t = (S * math.pi * D_max) / 1000.0 * k_safe
        if v_t <= 0:
            continue

        # vita utensile Taylor [s]
        T_ut = (C_TAYLOR / v_t) ** (1.0 / K_TAYLOR) * 60.0
        if T_ut <= 0:
            continue

        vita_consumata += t_seg / T_ut

    # costo utensile proporzionale alla vita consumata
    C_utensile = vita_consumata * float(costo_utensile)

    # ================= COSTO TOTALE =================
    C_totale = C_macchina + C_energia + C_utensile

    # ================= EMISSIONI UTENSILE =================

    # ---- PARAMETRI FISICI ----
    densita_tungsteno = 19e-6          # [kg/mm^3]
    L_ut = 100.0                       # [mm] lunghezza utensile
    fattore_geom = 0.7                 # coefficiente geometrico
    EF_tungsteno = 50.0                # [kgCO2e / kg] ← valore tipico LCA

    # ---- DIAMETRO UTENSILE (massimo usato) ----
    D_max = df["diametro_utensile"].dropna().max()
    if pd.isna(D_max) or D_max <= 0:
        D_max = 1.0   # fallback di sicurezza

    # ---- VOLUME UTENSILE ----
    # V = (π/4) * D^2 * L * coeff
    volume_utensile = (math.pi / 4.0) * (D_max ** 2) * L_ut * fattore_geom   # [mm^3]

    # ---- MASSA UTENSILE ----
    massa_utensile = volume_utensile * densita_tungsteno  # [kg]

    # ---- MASSA CONSUMATA ----
    massa_consumata = massa_utensile * vita_consumata     # [kg]

    # ---- EMISSIONI UTENSILE ----
    Emissioni_utensile = massa_consumata * EF_tungsteno   # [kgCO2e]

    # ================= EMISSIONI MACCHINA =================

    # fattore di emissione elettricità [kgCO2e / kWh]
    EF_ELETTRICITA = 0.25

    # emissioni macchina
    Emissioni_macchina = E_kWh * EF_ELETTRICITA  # [kgCO2e]

    # ================= EMISSIONI TOTALI =================

    Emissioni_tot = Emissioni_macchina + Emissioni_utensile


    # ================= KPI HTML =================
    summary_block = f"""
    <div style="
        margin-top:20px;
        padding:18px;
        border:2px solid #333;
        background:#fafafa;
        width:420px;
        font-size:14px;
    ">

    <h3 style="margin-top:0;">Risultati globali</h3>

    <table style="width:100%; border-collapse:collapse;">
    <tr>
        <td>Tempo totale</td>
        <td style="text-align:right;"><b>{T_tot:.2f} s</b></td>
    </tr>
    <tr>
        <td>Energia totale</td>
        <td style="text-align:right;"><b>{E_tot:.2f} J</b></td>
    </tr>
    <tr>
        <td>Potenza media equivalente</td>
        <td style="text-align:right;"><b>{P_mean:.2f} W</b></td>
    </tr>
    </table>

    <hr>

    <h3>Costi</h3>

    <table style="width:100%; border-collapse:collapse;">
    <tr>
        <td>Costo macchina</td>
        <td style="text-align:right;"><b>{C_macchina:.2f} €</b></td>
    </tr>
    <tr>
        <td>Costo energia</td>
        <td style="text-align:right;"><b>{C_energia:.2f} €</b></td>
    </tr>
    <tr>
        <td>Costo utensile</td>
        <td style="text-align:right;"><b>{C_utensile:.2f} €</b></td>
    </tr>
    </table>

    <hr>

    <table style="width:100%; font-size:15px;">
    <tr>
        <td><b>Costo totale ciclo</b></td>
        <td style="text-align:right;"><b>{C_totale:.2f} €</b></td>
    </tr>
    </table>

    <hr>

    <h3>Impatto ambientale</h3>

    <table style="width:100%; border-collapse:collapse;">
    <tr>
        <td>Emissioni macchina</td>
        <td style="text-align:right;"><b>{Emissioni_macchina:.3f} kgCO2e</b></td>
    </tr>
    <tr>
        <td>Emissioni utensile</td>
        <td style="text-align:right;"><b>{Emissioni_utensile:.3f} kgCO2e</b></td>
    </tr>
    </table>

    <hr>

    <table style="width:100%; font-size:15px;">
    <tr>
        <td><b>Emissioni totali</b></td>
        <td style="text-align:right;"><b>{Emissioni_tot:.3f} kgCO2e</b></td>
    </tr>
    </table>

    </div>
    """
    # ================= PLOT =================
    fig, ax = plt.subplots(figsize=(16, 6))
    x = pd.to_numeric(df["t_ideal_cumul"], errors="coerce")
    y = pd.to_numeric(df["Power_pred"], errors="coerce")
    ax.plot(x, y, linewidth=2)
    ax.set_xlabel("t_ideal_cumul [s]")
    ax.set_ylabel("Power_pred [W]")
    ax.set_title(f"Potenza Predetta — k = {k_safe:.2f}")
    ax.grid(True)

    plot_path = os.path.join(UPLOAD_DIR, f"{base_name}_plot.png")
    fig.savefig(plot_path, bbox_inches="tight")
    plt.close(fig)

    with open(plot_path, "rb") as f:
        img_base64 = base64.b64encode(f.read()).decode()
    img_html = f'<img src="data:image/png;base64,{img_base64}"/>'

    # ================= TABELLA (STYLER + COLORI) =================
    df_disp = df.copy()
    for c in df_disp.select_dtypes(include="number").columns:
        df_disp[c] = df_disp[c].apply(lambda v: f"{v:.3f}" if pd.notnull(v) else "")

    def highlight(row):
        mov = row.get("movimento", "")
        if mov == "G0":
            return ["background-color:#e8f4ff"] * len(row)
        if mov == "G1":
            return ["background-color:#e8ffe8"] * len(row)
        if mov == "G2":
            return ["background-color:#fff3e8"] * len(row)
        return [""] * len(row)

    table_html = (
        df_disp.style
        .apply(highlight, axis=1)
        .set_table_attributes('class="dataframe"')
        .to_html()
    )
    # ================= SALVATAGGIO EXCEL =================
    excel_path = os.path.join(UPLOAD_DIR, f"{base_name}_table.xlsx")
    df.to_excel(excel_path, index=False)


    # ================= HTML =================
    return HTMLResponse(f"""
    <html>
    <head>
    <style>
        .dataframe {{
            border-collapse: collapse;
            width: 100%;
            font-size: 12px;
        }}
        .dataframe th, .dataframe td {{
            border: 1px solid #ccc;
            padding: 6px;
            text-align: center;
            white-space: nowrap;
        }}
        .dataframe thead th {{
            position: sticky;
            top: 0;
            background-color: #f2f2f2;
            z-index: 2;
        }}
        .container {{
            height: 700px;
            overflow-y: scroll;
            border: 1px solid #ddd;
        }}
    </style>
    </head>
    <body style="font-family:Arial; padding:20px">

        <h2>File: {filename}</h2>

        <form action="/recompute">
            <input type="hidden" name="base_name" value="{base_name}">
            <input type="hidden" name="costo_orario" value="{costo_orario}">
            <input type="hidden" name="costo_kwh" value="{costo_kwh}">
            <input type="hidden" name="costo_utensile" value="{costo_utensile}">
            k:
            <input type="number" step="0.1" name="k" value="{k_safe:.2f}">
            <input type="submit" value="Ricalcola">
        </form>

        <br>
        {img_html}
        {summary_block}

        <br><br>
        <div class="container">{table_html}</div>

        <br>
        <a href="/">Nuovo upload</a>

    </body>
    </html>
    """)

# ==========================================================
# DOWNLOAD EXCEL
# ==========================================================
@app.get("/download/excel")
async def download_excel(base_name: str = Query(...)):
    path = os.path.join(UPLOAD_DIR, f"{base_name}_table.xlsx")
    if not os.path.exists(path):
        return HTMLResponse("File Excel non trovato. (Devi salvarlo prima in render_result)", status_code=404)
    return FileResponse(
        path,
        media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        filename=f"{base_name}_table.xlsx"
    )


Writing api_app.py


### ***TUNNEL***

In [8]:
import threading
import uvicorn

def start_api():
    try:
        uvicorn.run(
            "api_app:app",
            host="0.0.0.0",
            port=8000,
            log_level="info"
        )
    except Exception as e:
        print("UVICORN ERROR:", e)

threading.Thread(target=start_api, daemon=True).start()


In [9]:
import subprocess
import re
import time

def start_tunnel():
    process = subprocess.Popen(
        ["cloudflared", "tunnel", "--url", "http://localhost:8000"],
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True
    )

    url_regex = re.compile(r"https://[-a-z0-9]+\.trycloudflare\.com")
    url_found = None

    for line in process.stdout:
        print(line.strip())
        match = url_regex.search(line)
        if match:
            url_found = match.group(0)
            print("\nPUBLIC URL:", url_found, "\n")
            break

    return url_found

public_url = start_tunnel()


2026-02-04T19:37:27Z INF Thank you for trying Cloudflare Tunnel. Doing so, without a Cloudflare account, is a quick way to experiment and try it out. However, be aware that these account-less Tunnels have no uptime guarantee, are subject to the Cloudflare Online Services Terms of Use (https://www.cloudflare.com/website-terms/), and Cloudflare reserves the right to investigate your use of Tunnels for violations of such terms. If you intend to use Tunnels in production you should use a pre-created named tunnel by following: https://developers.cloudflare.com/cloudflare-one/connections/connect-apps
2026-02-04T19:37:27Z INF Requesting new quick Tunnel on trycloudflare.com...
2026-02-04T19:37:32Z INF +--------------------------------------------------------------------------------------------+
2026-02-04T19:37:32Z INF |  Your quick Tunnel has been created! Visit it at (it may take some time to be reachable):  |
2026-02-04T19:37:32Z INF |  https://gateway-customs-result-eminem.trycloudflare.c