# Download file ARERA and parsing Excel

In [None]:
# === DOWNLOAD FILE FROM WEB IF AVAILABLE ===

import requests
from typing import Optional
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import os

BASE_URL = "https://www.arera.it/area-operatori/prezzi-e-tariffe"
PATTERN = "smt.xlsx"
TIMEOUT = 30 # managing timeout

# Fetch function
def fetch_html(url: str) -> str:
    response = requests.get(url, timeout=TIMEOUT)
    response.raise_for_status()
    return response.text

# Find function
def find_excel_link(html: str, base_url: str, pattern: str) -> Optional[str]:
    soup = BeautifulSoup(html, "html.parser")
    link_tag = soup.find("a", href=lambda x: x and pattern in x)
    if not link_tag:
        return None
    return urljoin(base_url, link_tag["href"])

# Download function
def download_file(url: str, output_path: str) -> None:
    response = requests.get(url, timeout=TIMEOUT)
    response.raise_for_status()

    with open(output_path, "wb") as f:
        f.write(response.content)

def main():
    # print("Fetching HTML...")
    html = fetch_html(BASE_URL)

    # print("Searching for Excel link...")
    file_url = find_excel_link(html, BASE_URL, PATTERN)

    if not file_url:
        print(f"No file found containing '{PATTERN}'")
        return

    file_name = os.path.basename(file_url)

    #print(f"Downloading file: {file_name} ...")
    download_file(file_url, file_name)

    print(f"Download completed: {file_name}")

if __name__ == "__main__":
    main()

In [None]:
# === DOWNLOAD DATA FROM LOCAL XLS FILE ===

import pandas as pd
import json
from numbers import Number
import sqlite3

# === COSTANTI ===
EXCEL_FILE = "E2025-3_smt.xlsx"
SHEET_NAME = 0
HEADER_ROW = 16
COLS = list(range(2, 21))

RESIDENTIAL_ROWS = {
    "EN €/kWh": 19,
    "FIX €/Y": 20,
    "POT €/kW/Y": 21
}
NON_RESIDENTIAL_ROWS = {
    "EN €/kWh": 28,
    "FIX €/Y": 29,
    "POT €/kW/Y": 30
}

PE_KEYS = ["F0", "F1", "F23"]

# Normalizing values from - to null
def normalize_value(v):
    if isinstance(v, str) and v.strip() == "-":
        return None
    return v

# Load function
def load_raw_data(file_path, sheet, header_row, cols):
    df = pd.read_excel(file_path, sheet_name=sheet, header=None)
    headers = df.iloc[header_row, cols].astype(str).str.strip().tolist()
    return df, headers

# Extract function
def extract_grouped_data(df, rows_map, cols, headers):
    me_idx = next((i for i, h in enumerate(headers) if str(h).strip().upper() == "MATERIA ENERGIA"), None)
    if me_idx is None:
        raise ValueError("Intestazione 'Materia energia' non trovata.")

    PD_TO_PPE_KEYS = headers[3:me_idx]
    rest_start = me_idx + 3
    rest_keys = headers[rest_start:]

    extracted = []
    for descrizione, row_idx in rows_map.items():
        values = [normalize_value(v) for v in df.iloc[row_idx, cols].tolist()]

        pe = dict(zip(PE_KEYS, values[0:3]))

        pd_ppe = {
            k: v for k, v in zip(PD_TO_PPE_KEYS, values[3:3 + len(PD_TO_PPE_KEYS)])
            if str(k).strip().upper() != "MATERIA ENERGIA"
        }

        if descrizione == "EN €/kWh":
            me_values = values[me_idx:me_idx + 3]
            me = dict(zip(["Fascia unica", "F1", "F23"], me_values))
        else:
            me = values[me_idx]

        rest_vals = values[rest_start:]
        rest = {
            k: v for k, v in zip(rest_keys, rest_vals)
            if str(k).strip().upper() not in {"MATERIA ENERGIA", "TOTALE"}
        }

        row_dict = {"PE": pe, "Materia energia": me}
        row_dict.update(pd_ppe)
        row_dict.update(rest)
        row_dict = {k: v for k, v in row_dict.items() if str(k).strip().upper() != "TOTALE"}

        extracted.append({"descrizione": descrizione, "valori": row_dict})

    return extracted

# Round function
def round_values(obj, decimals=5):
    if isinstance(obj, dict):
        return {k: round_values(v, decimals) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [round_values(elem, decimals) for elem in obj]
    elif isinstance(obj, Number):
        return round(obj, decimals)
    return obj

# Save json function
def save_json(data, filename):
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=4, ensure_ascii=False)
    print(f"File JSON salvato in {filename}")

# Flatten function
def flatten_json(json_data):
    """
    Appiattisce la struttura JSON generata e restituisce:
    - flat_rows: righe pronte per il DB
    - columns_ordered: ordine delle colonne identico al JSON appiattito
    """

    flat_rows = []
    columns_ordered = ["tipo_cliente", "categoria"]
    columns_seen = set(columns_ordered)

    for tipo_cliente, items in json_data.items():
        key = tipo_cliente.lower().strip()

        if key.startswith("abitazioni di residenza"):
            tipo_flag = "residenziale"
        elif key.startswith("abitazioni diverse"):
            tipo_flag = "non_residenziale"
        else:
            raise ValueError(f"Tipo cliente non riconosciuto: {tipo_cliente}")

        for item in items:
            categoria = item["descrizione"]
            valori = item["valori"]

            row = {
                "tipo_cliente": tipo_flag,
                "categoria": categoria
            }

            for k, v in valori.items():
                if isinstance(v, dict):
                    for subk, subv in v.items():
                        colname = f"{k.replace(' ', '_')}_{subk.replace(' ', '_')}"
                        row[colname] = subv

                        if colname not in columns_seen:
                            columns_ordered.append(colname)
                            columns_seen.add(colname)
                else:
                    colname = k.replace(" ", "_")
                    row[colname] = v

                    if colname not in columns_seen:
                        columns_ordered.append(colname)
                        columns_seen.add(colname)

            flat_rows.append(row)

    return flat_rows, columns_ordered

# create db function
def create_database(db_name, flat_rows, columns_ordered):
    conn = sqlite3.connect(db_name)
    cur = conn.cursor()

    columns_sql = ", ".join([f"'{col}' TEXT" for col in columns_ordered])

    cur.execute("DROP TABLE IF EXISTS tariffe;")
    cur.execute(f"CREATE TABLE tariffe ({columns_sql});")

    conn.commit()
    conn.close()
    print(f"Created: {db_name}")

# save db function
def save_to_database(db_name, flat_rows, columns_ordered):
    conn = sqlite3.connect(db_name)
    cur = conn.cursor()

    colnames_sql = ", ".join([f"'{c}'" for c in columns_ordered])
    placeholders = ", ".join(["?"] * len(columns_ordered))

    for row in flat_rows:
        values = [row.get(c) for c in columns_ordered]
        cur.execute(f"INSERT INTO tariffe ({colnames_sql}) VALUES ({placeholders})", values)

    conn.commit()
    conn.close()
    print(f"Created database: {db_name}.")

def main():
    df_raw, headers = load_raw_data(EXCEL_FILE, SHEET_NAME, HEADER_ROW, COLS)
    residential_data = extract_grouped_data(df_raw, RESIDENTIAL_ROWS, COLS, headers)
    non_residential_data = extract_grouped_data(df_raw, NON_RESIDENTIAL_ROWS, COLS, headers)

    final_result = {
        "Abitazioni di residenza anagrafica": residential_data,
        "Abitazioni diverse dalla residenza anagrafica": non_residential_data,
    }

    final_result = round_values(final_result, decimals=5)
    save_json(final_result, "output.json")

    flat_rows, columns_ordered = flatten_json(final_result)
    create_database("tariffe.db", flat_rows, columns_ordered)
    save_to_database("tariffe.db", flat_rows, columns_ordered)

if __name__ == "__main__":
    main()