In [2]:
import json, math, re, unicodedata, uuid, random
from dataclasses import dataclass
from pathlib import Path
from datetime import date, datetime, time, timedelta
import re
import pandas as pd
import numpy as np

### BLOQUE 1

Definicion de constantes, la semilla, el rango de fechas, número de empresas, usuarios por empresa y tickets por usuario, pesos de la tarjeta, threshold de potencia de la estacion, offset y ruido diario, rango de kw/h, así como rutas de archivos y parámetros de simulación.

In [3]:
PY_SEED = 42
np.random.seed(PY_SEED)
random.seed(PY_SEED)

YEAR = 2025

from datetime import date
FECHA_INI = date(YEAR, 1, 1)
FECHA_FIN = date(YEAR, 7, 31)

N_EMPRESAS_TRANSP = 3
USUARIOS_POR_EMPRESA = 3
TICKETS_POR_USUARIO = 50
PESO_TARJETA = 0.85

DC_THRESHOLD_KW = 20

STATION_OFFSET_MIN, STATION_OFFSET_MAX = -0.02, 0.02
DAILY_NOISE_MIN, DAILY_NOISE_MAX       = -0.01, 0.01

KWH_MIN, KWH_MAX, KWH_MODE = 10.0, 80.0, 35.0

PRODUCTO = "Electricidad"

from pathlib import Path
DATA_DIR = Path(r"data")

PATH_PUNTOS = DATA_DIR / "PuntosCarga.csv"
PATH_OCM    = DATA_DIR / "ocm_agg_2025.csv"
PATH_CIFS   = DATA_DIR / "CIFs_puntos_carga.csv"

COL_STATION_ID      = "station_id"
COL_EMPRESA         = "empresa"
COL_POT_MAX_KW      = "potencia_max_kw"
COL_PRECIO_AC       = "precio_ac_eur_kwh"
COL_PRECIO_DC       = "precio_dc_eur_kwh"
COL_LAT             = "lat"
COL_LON             = "lon"

COL_EMPRESA_CIF_EMP = "empresa"
COL_EMPRESA_CIF_CIF = "cif"

OUT_JSONL = DATA_DIR / "tickets_ev_sinteticos.jsonl"
OUT_JSON  = DATA_DIR / "tickets_ev_sinteticos.json"

### BLOQUE 2

Bloque de funciones auxiliares.

Normalización de texto (norm_txt)

Redondeos (round2, round3)

Generación de fecha y hora aleatoria (random_fecha, random_hora)

Generación de kWh según distribución triangular (random_kwh)

Método de pago aleatorio (elegir_metodo_pago)

Conversión de fecha y hora a string (str_fecha, str_hora)

In [4]:
def norm_txt(x: str) -> str:
    if pd.isna(x): return ""
    x = str(x).strip()
    x = "".join(c for c in unicodedata.normalize("NFD", x) if unicodedata.category(c) != "Mn")
    return x.lower()

def round2(x: float) -> float:
    return float(np.round(x + 1e-12, 2))

def round3(x: float) -> float:
    return float(np.round(x + 1e-12, 3))

def random_fecha(fecha_ini: date, fecha_fin: date) -> date:
    delta = (fecha_fin - fecha_ini).days
    return fecha_ini + timedelta(days=int(np.random.randint(0, delta + 1)))

def random_hora() -> time:
    return (datetime.min + timedelta(seconds=int(np.random.randint(0, 24*3600)))).time()

def random_kwh() -> float:
    return round2(np.random.triangular(KWH_MIN, KWH_MODE, KWH_MAX))

def elegir_metodo_pago() -> str:
    return "Tarjeta crédito" if random.random() < PESO_TARJETA else "Efectivo"

def str_fecha(d: date) -> str:
    return d.strftime("%Y-%m-%d")

def str_hora(t: time) -> str:
    return t.strftime("%H:%M:%S")

### BLOQUE 3

Bloque donde se definen las funciones de limpieza del CSV generando est_std con todas las columnas relevantes:

id_estacion, empresa, provincia, municipio, direccion, lat, lon, potencia_max_kw

Normaliza nombres de provincia (provincia_norm) y empresa (empresa_norm)

Detecta columnas automáticamente, incluso si vienen empaquetadas con "|"

In [5]:
def norm_txt(x: str) -> str:
    if pd.isna(x): return ""
    x = str(x).strip()
    x = "".join(c for c in unicodedata.normalize("NFD", x) if unicodedata.category(c) != "Mn")
    return x.lower()

def prov_key(x: str) -> str:
    s = norm_txt(x)
    if "/" in s: s = s.split("/")[0].strip()
    repl = {
        "vizcaya":"bizkaia","guipuzcoa":"gipuzkoa","guipuzcoa.":"gipuzkoa",
        "la coruna":"a coruna","coruna":"a coruna","coruña":"a coruna",
        "orense":"ourense","lerida":"lleida","gerona":"girona",
        "valencia/valencia":"valencia","alicante/alacant":"alicante","castello":"castellon",
        "santa cruz de tenerife":"santa cruz de tenerife","tenerife":"santa cruz de tenerife",
        "araba alava":"alava","araba/alava":"alava","alava":"alava"
    }
    return repl.get(s, s)

def to_float_locale(series: pd.Series) -> pd.Series:
    return pd.to_numeric(series.astype(str).str.replace(",", ".", regex=False), errors="coerce")

def load_csv_guess(path: Path):
    encodings = ["cp1252", "utf-8", "latin1"]
    seps = [",", ";", "|", "\t"]
    last_err = None
    for enc in encodings:
        for sep in seps:
            try:
                df = pd.read_csv(path, encoding=enc, sep=sep)
                if df.shape[1] >= 2:
                    return df
            except Exception as e:
                last_err = e
                continue
    raise RuntimeError(f"No pude leer {path} — último error: {last_err}")

raw_est = load_csv_guess(PATH_PUNTOS).copy()
est = raw_est.copy()
est.columns = [norm_txt(c) for c in est.columns]

def _simp(s: str) -> str:
    return re.sub(r"[^a-z0-9]+", "", s)

cols = list(est.columns)
simp_map = {c: _simp(c) for c in cols}

def pick(*patterns):
    pats = [p.lower() for p in patterns]
    for c in cols:
        sc = simp_map[c]
        if any(p in sc for p in pats):
            return c
    return None

pack_col = None
for c in cols:
    s = est[c].astype(str)
    if s.str.contains(r"\|").mean() > 0.7:
        lens = s.str.split("|").map(len)
        if lens.quantile(0.5) >= 8:
            pack_col = c
            break

if pack_col is not None:
    parts = est[pack_col].astype(str).str.split("|", expand=True)
    parts = parts.rename(columns=lambda i: f"p{i}")
    est = pd.concat([est, parts], axis=1)
    col_id  = "p0"
    col_dir = "p2"
    col_prov = "p4"
    col_muni = "p5"
    col_lon = "p7"
    col_lat = "p8"
    col_pot = "p9"
    op_col = next((c for c in ["operador","operator","operatorinfo","operatorinfotitle","operador_nombre","operatorname"] if c in est.columns), None)
    if op_col is None and "p1" in est.columns:
        op_col = "p1"
else:
    col_id  = pick("idestacion","stationid","poi","ocmid","codigo","id","station","puntoid") or "_idgen"
    if col_id == "_idgen":
        est[col_id] = np.arange(1, len(est)+1)
    col_dir      = pick("direccion","addressline1","addressinfoaddressline1","address")
    col_prov     = pick("provincia","stateorprovince","addressinfostateorprovince","province","prov")
    col_muni     = pick("municipio","localidad","poblacion","ciudad","town","addressinfotown")
    col_lat      = pick("latitud","lat","addressinfolatitude","latitude")
    col_lon      = pick("longitud","lon","lng","long","addressinfolongitude","longitude")
    col_pot      = pick("potencia","powerkw","connections1powerkw","connectionspowerkw","kw","potenciamax","maxpowerkw")
    op_col = next((c for c in ["operador","operator","operatorinfo","operatorinfotitle","operador_nombre","operatorname"] if c in est.columns), None)

lat_series = to_float_locale(est[col_lat]) if col_lat in est.columns else pd.Series(np.nan, index=est.index)
lon_series = to_float_locale(est[col_lon]) if col_lon in est.columns else pd.Series(np.nan, index=est.index)
if (lat_series.lt(-20).mean() > 0.2) or (lon_series.gt(20).mean() > 0.2):
    lat_series, lon_series = lon_series, lat_series
pot_series = to_float_locale(est[col_pot]) if col_pot in est.columns else pd.Series(np.nan, index=est.index)
empresa_series = est[op_col].astype(str) if op_col in est.columns else pd.Series("", index=est.index)

est_std = pd.DataFrame({
    "id_estacion": est[col_id].astype(str),
    "empresa": empresa_series,
    "provincia": est[col_prov].astype(str) if col_prov in est.columns else "",
    "municipio": est[col_muni].astype(str) if col_muni in est.columns else "",
    "direccion": est[col_dir].astype(str) if col_dir in est.columns else "",
    "lat": lat_series,
    "lon": lon_series,
    "potencia_max_kw": pot_series
})

est_std["empresa"] = est_std["empresa"].astype(str).replace({"nan": "", "none": "", "None": ""}).str.strip()
est_std["provincia_norm"] = est_std["provincia"].map(prov_key)
est_std["empresa_norm"] = est_std["empresa"].map(norm_txt)



### BLOQUE 4

Segundo bloque de limpieza:

Busca posibles columnas de operador/empresa (operador, operator, etc.)

Si encuentra una, la usa para rellenar est_std["empresa"]; si no, deja vacío

Normaliza nombres y elimina valores nulos o espacios en blanco

Actualiza est_std["empresa_norm"] usando norm_txt

In [6]:
op_candidates = [c for c in est.columns if re.search(r"(?:^|_)(operador|operator)(?:$|_)", c, flags=re.I)]
if op_candidates:
    pref = [c for c in op_candidates if c in ("operador","operator","operador_nombre","operatorname","operatorinfo","operatorinfotitle")]
    op_col = pref[0] if pref else op_candidates[0]
else:
    op_col = "p1" if "p1" in est.columns else None

if op_col:
    est_std["empresa"] = est[op_col].astype(str).str.strip()
else:
    est_std["empresa"] = est_std.get("empresa", pd.Series([""], index=est_std.index)).astype(str).str.strip()

est_std["empresa"] = est_std["empresa"].fillna("").astype(str).str.strip()
est_std["empresa_norm"] = est_std["empresa"].map(norm_txt)

### BLOQUE 6

Bloque de generación de CIFs.

cif_generate() crea un CIF aleatorio siguiendo las reglas de letras/dígitos de control.

get_cif(empresa) busca en map_empresa_to_cif un CIF existente para la empresa; si no existe, genera uno aleatorio.

In [7]:
CONTROL_LETTERS = "JABCDEFGHI"
LETTER_GROUP = set("PQRSNW")
DIGIT_GROUP  = set("ABEH")
ANY_GROUP    = set("CDFGJUVXYZ")

def _sum_digits(n: int) -> int:
    return n if n < 10 else n//10 + n%10

def cif_generate() -> str:
    first = random.choice(list(LETTER_GROUP | DIGIT_GROUP | ANY_GROUP))
    digits = [random.randint(0,9) for _ in range(7)]
    sum_even = digits[1] + digits[3] + digits[5]
    sum_odd = sum(_sum_digits(2*d) for d in (digits[0], digits[2], digits[4], digits[6]))
    total = sum_even + sum_odd
    cd_num = (10 - (total % 10)) % 10
    if first in LETTER_GROUP:
        control = CONTROL_LETTERS[cd_num]
    elif first in DIGIT_GROUP:
        control = str(cd_num)
    else:
        control = CONTROL_LETTERS[cd_num] if random.random() < 0.5 else str(cd_num)
    body = "".join(str(d) for d in digits)
    return f"{first}{body}{control}"

def get_cif(empresa: str) -> str:
    k = norm_txt(empresa)
    return map_empresa_to_cif.get(k) or cif_generate()



### BLOQUE 7

Bloque de enlace de la función nif_de_grupo con get_cif. --> Se usa para asignar un CIF/NIF a la empresa de cada ticket.

In [8]:
def nif_de_grupo(grupo_texto: str) -> str:
    return get_cif(grupo_texto)

### BLOQUE 8

Bloque que define funciones de cálculo de precios:

station_offset(id_estacion): offset fijo por estación.

band_by_power(potencia_max_kw): determina AC o DC según potencia.

precio_base_mes(prov_norm, mes, band, id_estacion): obtiene precio base mensual por estación o provincia.

precio_diario(prov_norm, fecha, id_estacion, band): añade offset y ruido al precio base.

In [9]:
def station_offset(id_estacion: str) -> float:
    rnd = random.Random(hash(id_estacion) & 0xffffffff)
    return float(rnd.uniform(STATION_OFFSET_MIN, STATION_OFFSET_MAX))

def band_by_power(potencia_max_kw: float | int | None) -> str:
    try:
        v = float(potencia_max_kw)
    except (TypeError, ValueError):
        v = np.nan
    return "DC" if pd.notna(v) and v > DC_THRESHOLD_KW else "AC"

def precio_base_mes(prov_norm: str, mes: int, band: str, id_estacion: str | None = None) -> float | None:
    val = None
    if id_estacion is not None and (id_estacion, mes, band) in precios_station_idx:
        val = precios_station_idx.get((id_estacion, mes, band))
    if (val is None or pd.isna(val)) and (prov_norm, mes, band) in precios_prov_idx:
        val = precios_prov_idx.get((prov_norm, mes, band))
    return float(val) if val is not None and not pd.isna(val) else None

def precio_diario(prov_norm: str, fecha: date, id_estacion: str, band: str) -> float:
    base = precio_base_mes(prov_norm, fecha.month, band, id_estacion)
    if base is None or pd.isna(base):
        candidatos = []
        if (prov_norm, fecha.month, band) in precios_prov_idx:
            v = precios_prov_idx.get((prov_norm, fecha.month, band))
            if v is not None and not pd.isna(v):
                candidatos.append(v)
        if not candidatos:
            candidatos = [v for (p, m, b), v in precios_prov_idx.items() if m == fecha.month and b == band and v is not None and not pd.isna(v)]
        base = float(np.mean(candidatos)) if candidatos else 0.45
    off = station_offset(id_estacion)
    noise = random.uniform(DAILY_NOISE_MIN, DAILY_NOISE_MAX)
    return round3(max(0.15, base + off + noise))

### BLOQUE 9

Bloque donde se crean las empresas y usuarios:

Se generan empresas: EMP04, EMP05, EMP06

Se generan 3 usuarios por empresa 

Se asignan IDs de usuario secuenciales de U28 a U36 y se realiza un check rapido

In [10]:
@dataclass
class Empresa:
    id: str
    nombre: str

@dataclass
class Usuario:
    id: str
    empresa_id: str
    nombre: str


empresas = [
    Empresa(id=f"EMP{i+1:03d}", nombre=f"Transporte_{i+1:02d} S.L.") 
    for i in range(N_EMPRESAS_TRANSP)
]

usuarios = []
usuario_cont = 28  
for e in empresas:
    for j in range(USUARIOS_POR_EMPRESA):
        u_id = f"{e.id}-U{usuario_cont}"
        usuarios.append(Usuario(
            id=u_id,
            empresa_id=e.id,
            nombre=f"Usuario_{usuario_cont:02d}_{e.id}"
        ))
        usuario_cont += 1


for u in usuarios:
    print(u.id, u.nombre)

EMP001-U28 Usuario_28_EMP001
EMP001-U29 Usuario_29_EMP001
EMP001-U30 Usuario_30_EMP001
EMP002-U31 Usuario_31_EMP002
EMP002-U32 Usuario_32_EMP002
EMP002-U33 Usuario_33_EMP002
EMP003-U34 Usuario_34_EMP003
EMP003-U35 Usuario_35_EMP003
EMP003-U36 Usuario_36_EMP003


### BLOQUE 10

Bloque prepara el pool de estaciones de carga para generar tickets:

Resetea el índice y asegura que id_estacion sea string.

Convierte lat, lon y potencia_max_kw a numérico, reemplazando comas por puntos.

Filtra estaciones con coordenadas válidas (lat entre -90 y 90, lon entre -180 y 180).

Normaliza la provincia (provincia_norm) y asigna la banda de carga (band = AC/DC según potencia).

Lanza error si no queda ninguna estación válida.

In [11]:
EST_POOL = est_std.reset_index(drop=True).copy()
EST_POOL["id_estacion"] = EST_POOL["id_estacion"].astype(str)

EST_POOL["lat"] = pd.to_numeric(EST_POOL["lat"].astype(str).str.replace(",", ".", regex=False), errors="coerce")
EST_POOL["lon"] = pd.to_numeric(EST_POOL["lon"].astype(str).str.replace(",", ".", regex=False), errors="coerce")

mask_coords = EST_POOL["lat"].between(-90, 90) & EST_POOL["lon"].between(-180, 180)
if mask_coords.any():
    EST_POOL = EST_POOL.loc[mask_coords].reset_index(drop=True)

EST_POOL["potencia_max_kw"] = pd.to_numeric(EST_POOL["potencia_max_kw"], errors="coerce")
EST_POOL["provincia_norm"] = EST_POOL["provincia"].map(prov_key)
EST_POOL["band"] = EST_POOL["potencia_max_kw"].map(band_by_power)

if len(EST_POOL) == 0:
    raise ValueError("EST_POOL vacío: revisa columnas de lat/lon en PuntosCarga.csv o el mapeo de columnas.")


### BLOQUE 11

Bloque calcula los importes teniendo en cuenta impuestos:

Define tipos impositivos: IVA_TIPO = 21 % y IEE_TIPO = 5,11 %.

Función calcular_importes(kwh, precio_unit, unit_price_includes_taxes):

    Si unit_price_includes_taxes=True, separa el precio unitario en base, IVA e IEE.

    Si unit_price_includes_taxes=False, calcula el total sumando IVA e IEE al precio unitario.

Ajusta redondeos para que base + IVA + IEE = total.

Devuelve un diccionario con:

    precio_unitario sin impuestos

    precio_unitario_con_impuestos

    base_imponible

    iee

    iva

    importe_total

In [12]:
IVA_TIPO = 0.21
IEE_TIPO = 0.0511

def calcular_importes(kwh: float, precio_unit: float, unit_price_includes_taxes: bool = False) -> dict:
    kwh = max(0.0, float(kwh))
    precio_unit = round3(float(precio_unit))
    if unit_price_includes_taxes:
        total = round2(kwh * precio_unit)
        mult = (1 + IEE_TIPO) * (1 + IVA_TIPO)
        base = round2(total / mult)
        iee = round2(base * IEE_TIPO)
        iva = round2((base + iee) * IVA_TIPO)
        ajuste = round2(total - (base + iee + iva))
        if ajuste != 0:
            iva = round2(iva + ajuste)
        precio_unit_con_imp = precio_unit
        precio_unit_sin_imp = round3(precio_unit_con_imp / mult)
    else:
        mult = (1 + IEE_TIPO) * (1 + IVA_TIPO)
        base = round2(kwh * precio_unit)
        iee = round2(base * IEE_TIPO)
        iva = round2((base + iee) * IVA_TIPO)
        total = round2(base + iee + iva)
        ajuste = round2((kwh * precio_unit * mult) - total)
        if ajuste != 0:
            iva = round2(iva + ajuste)
            total = round2(base + iee + iva)
        precio_unit_sin_imp = precio_unit
        precio_unit_con_imp = round3(precio_unit_sin_imp * mult)
    return {
        "precio_unitario": precio_unit_sin_imp,
        "precio_unitario_con_impuestos": precio_unit_con_imp,
        "importe_total": total,
        "base_imponible": base,
        "iee": iee,
        "iva": iva
    }


### BLOQUE 12

Bloque de generacion de un ticket sintético de electricidad para un usuario y empresa:

Selecciona fecha y hora aleatorias.

Elige una estación válida y determina coordenadas, provincia y banda de carga (AC/DC).

Calcula precio unitario, kWh consumidos y desglosa importes con IVA e IEE.

Asigna método de pago y CIF del operador.

Devuelve un diccionario con todos los datos del ticket, línea de consumo y totales.

In [13]:
def _to_float_locale(val):
    if pd.isna(val):
        return np.nan
    s = str(val).strip().replace(",", ".")
    try:
        return float(s)
    except Exception:
        return np.nan

def _coords_from_row(row):
    lat = _to_float_locale(row.get("lat", np.nan))
    lon = _to_float_locale(row.get("lon", np.nan))

    def _ok(la, lo):
        return (27.0 <= la <= 44.5) and (-20.0 <= lo <= 5.5)

    if not _ok(lat, lon) and _ok(lon, lat):
        lat, lon = lon, lat

    if not _ok(lat, lon):
        return np.nan, np.nan
    return lat, lon

def generar_ticket(empresa: Empresa, usuario: Usuario) -> dict:
    f = random_fecha(FECHA_INI, FECHA_FIN)
    h = random_hora()

    lat, lon = np.nan, np.nan
    attempts = 0
    row = None
    while attempts < 5 and (pd.isna(lat) or pd.isna(lon)):
        row = EST_POOL.sample(1).iloc[0]
        lat, lon = _coords_from_row(row)
        attempts += 1

    prov_norm = row["provincia_norm"]
    sid = str(row["id_estacion"])
    band = band_by_power(row.get("potencia_max_kw"))
    punit = precio_diario(prov_norm, f, sid, band)

    kwh = random_kwh()
    metodo = elegir_metodo_pago()
    imp = calcular_importes(kwh, punit)

    nif_operador = get_cif(row.get("empresa", ""))

    ticket = {
        "idTicket": f"T-{uuid.uuid4().hex[:12].upper()}",
        "idEmpresa": empresa.id,
        "empresaNombre": empresa.nombre,
        "idUsuario": usuario.id,
        "fechaEmision": str_fecha(f),
        "horaEmision": str_hora(h),
        "metodoPago": metodo,
        "estacion": {
            "id": sid,
            "provincia": row.get("provincia", ""),
            "municipio": row.get("municipio", ""),
            "direccion": row.get("direccion", ""),
            "lat": None if pd.isna(lat) else float(lat),
            "lon": None if pd.isna(lon) else float(lon),
            "empresa": row.get("empresa", ""),
            "nifEmpresa": nif_operador,
            "potenciaMaxKW": None if pd.isna(row.get("potencia_max_kw")) else float(row.get("potencia_max_kw")),
            "tarifa": band
        },
        "lineas": [
            {
                "producto": PRODUCTO,
                "kwh": kwh,
                "precioUnitarioSinImpuestos": imp["precio_unitario"],
                "precioUnitario": imp["precio_unitario_con_impuestos"],
                "importe": imp["importe_total"]
            }
        ],
        "baseImponible": imp["base_imponible"],
        "iee": imp.get("iee", 0.0),
        "iva": imp["iva"],
        "total": imp["importe_total"],
        "moneda": "EUR",
        "tipoDocumento": "Factura simplificada"
    }
    return ticket


### BLOQUE 13

Depuracion, Inicializa los precios por estación y por provincia para los tickets:

Comprueba si existen los diccionarios precios_station_idx y precios_prov_idx; si no, los crea vacíos.

Si ambos están vacíos, genera precios por defecto para cada provincia y mes (1‑7) y para cada banda (AC/DC):

    AC = 0,42 €/kWh

    DC = 0,55 €/kWh

Si no hay provincias válidas, usa "nacional" como valor por defecto.

In [14]:
try:
    precios_station_idx
except NameError:
    precios_station_idx = {}
try:
    precios_prov_idx
except NameError:
    precios_prov_idx = {}

if (not precios_station_idx) and (not precios_prov_idx):
    meses = range(1, 8)
    if 'EST_POOL' in globals() and isinstance(EST_POOL, pd.DataFrame) and len(EST_POOL) > 0 and "provincia_norm" in EST_POOL.columns:
        provs = EST_POOL["provincia_norm"].dropna().astype(str).unique().tolist()
    else:
        provs = ["nacional"]
    defaults = {"AC": 0.42, "DC": 0.55}
    precios_prov_idx = {(prov, m, band): defaults[band] for prov in provs for m in meses for band in ("AC","DC")}


### BLOQUE 14

Inicializa y gestiona los CIF/NIF de los operadores --> asegura que cada empresa tenga un CIF válido para los tickets, usando datos reales si están disponibles o generando uno sintético.

Comprueba si existe map_empresa_to_cif; si no, lo crea vacío.

Si no hay datos, intenta cargar un CSV (PATH_CIFS) con empresas y sus CIFs, construyendo map_empresa_to_cif.

Define funciones para generar un CIF aleatorio (cif_generate) según las reglas de control de letras y dígitos.

get_cif(empresa) devuelve el CIF de la empresa si existe en el mapa; si no, genera uno aleatorio.

In [15]:
try:
    map_empresa_to_cif
except NameError:
    map_empresa_to_cif = {}

if not map_empresa_to_cif:
    try:
        raw_cifs = load_csv_guess(PATH_CIFS).copy()
        cifs = raw_cifs.copy()
        cifs.columns = [norm_txt(c) for c in cifs.columns]

        def _simp(s: str) -> str:
            return re.sub(r"[^a-z0-9]+", "", norm_txt(s))

        cols = list(cifs.columns)
        cols_simpl = {c: _simp(c) for c in cols}

        def pick_col(keys):
            for c in cols:
                sc = cols_simpl[c]
                if any(k in sc for k in keys):
                    return c
            return None

        col_emp = pick_col(["empresa","operador","operator","compania","compañia","grupo","marca","proveedor","razon","razonsocial"])
        col_cif = pick_col(["cif","nif","nifcif","cifnif","vat","vatid"])
        if col_emp and col_cif:
            for _, r in cifs.iterrows():
                emp = norm_txt(str(r[col_emp]))
                cif = str(r[col_cif]).strip().upper()
                if emp and cif:
                    map_empresa_to_cif[emp] = cif
    except Exception:
        pass

if "CONTROL_LETTERS" not in globals():
    CONTROL_LETTERS = "JABCDEFGHI"
    LETTER_GROUP = set("PQRSNW")
    DIGIT_GROUP  = set("ABEH")
    ANY_GROUP    = set("CDFGJUVXYZ")

def _sum_digits(n: int) -> int:
    return n if n < 10 else n//10 + n%10

def cif_generate() -> str:
    first = random.choice(list(LETTER_GROUP | DIGIT_GROUP | ANY_GROUP))
    digits = [random.randint(0,9) for _ in range(7)]
    sum_even = digits[1] + digits[3] + digits[5]
    sum_odd = sum(_sum_digits(2*d) for d in (digits[0], digits[2], digits[4], digits[6]))
    total = sum_even + sum_odd
    cd_num = (10 - (total % 10)) % 10
    if first in LETTER_GROUP:
        control = CONTROL_LETTERS[cd_num]
    elif first in DIGIT_GROUP:
        control = str(cd_num)
    else:
        control = CONTROL_LETTERS[cd_num] if random.random() < 0.5 else str(cd_num)
    body = "".join(str(d) for d in digits)
    return f"{first}{body}{control}"

def get_cif(empresa: str) -> str:
    k = norm_txt(empresa)
    return map_empresa_to_cif.get(k) or cif_generate()


### BLOQUE 15

Bloque de generacion de todos los tickets eléctricos y gestiona posibles errores:

Recorre todas las empresas y sus usuarios.

Para cada usuario, genera TICKETS_POR_USUARIO tickets llamando a generar_ticket(emp, u).

Captura excepciones para no interrumpir la ejecución:

Muestra hasta 10 errores por pantalla.

Cuenta el total de errores ocurridos.

Imprime un resumen: tickets generados y errores.

Devuelve la lista completa de tickets en tickets.

In [16]:
def generar_todos() -> list[dict]:
    tickets = []
    errores = 0
    for emp in empresas:
        us_emp = [u for u in usuarios if u.empresa_id == emp.id]
        for u in us_emp:
            for _ in range(TICKETS_POR_USUARIO):
                try:
                    t = generar_ticket(emp, u)
                    tickets.append(t)
                except Exception as e:
                    if errores < 10:
                        print("Error en generar_ticket:", repr(e))
                    errores += 1
                    continue
    print(f"Tickets generados: {len(tickets)} | Errores: {errores}")
    return tickets

tickets = generar_todos()


Tickets generados: 450 | Errores: 0


Guardado de los tickets

In [17]:
with open(OUT_JSONL, "w", encoding="utf-8") as f:
    for tk in tickets:
        f.write(json.dumps(tk, ensure_ascii=False) + "\n")

with open(OUT_JSON, "w", encoding="utf-8") as f:
    json.dump(tickets, f, ensure_ascii=False, indent=2)

OUT_JSONL, OUT_JSON

(WindowsPath('data/tickets_ev_sinteticos.jsonl'),
 WindowsPath('data/tickets_ev_sinteticos.json'))

Control de calidad de los archivos generados

In [18]:
df_chk = pd.DataFrame([{
    "empresa":  t.get("idEmpresa"),
    "usuario":  t.get("idUsuario"),
    "fecha":    t.get("fechaEmision"),
    "hora":     t.get("horaEmision"),
    "metodo":   t.get("metodoPago"),

    "producto": (t.get("lineas") or [{}])[0].get("producto"),
    "precio":   (t.get("lineas") or [{}])[0].get("precioUnitario"),
    "precio_sin_imp": (t.get("lineas") or [{}])[0].get("precioUnitarioSinImpuestos"),
    "kwh":      (t.get("lineas") or [{}])[0].get("kwh"),
    "total":    t.get("total"),

    "est_id":     (t.get("estacion") or {}).get("id"),
    "est_empresa":(t.get("estacion") or {}).get("empresa"),
    "est_nif":    (t.get("estacion") or {}).get("nifEmpresa"),
    "provincia":  (t.get("estacion") or {}).get("provincia"),
    "municipio":  (t.get("estacion") or {}).get("municipio"),
    "direccion":  (t.get("estacion") or {}).get("direccion"),
    "lat":        (t.get("estacion") or {}).get("lat"),
    "lon":        (t.get("estacion") or {}).get("lon"),
    "tarifa":     (t.get("estacion") or {}).get("tarifa"),
    "pot_kw":     (t.get("estacion") or {}).get("potenciaMaxKW"),
} for t in tickets])

for c in set(["precio","precio_sin_imp","kwh","total","lat","lon","pot_kw"]).intersection(df_chk.columns):
    df_chk[c] = pd.to_numeric(df_chk[c], errors="coerce")

print("Tickets totales:", len(df_chk))
print(df_chk.head(5))

print("\nPor empresa (transportista):")
print(df_chk.groupby("empresa").size())

print("\nPor usuario:")
print(df_chk.groupby("usuario").size().head(10))

print("\nPor producto:")
print(df_chk.groupby("producto").agg(n=("producto","size"), p_med=("precio","mean")).reset_index())

print("\nPor operador de punto (est_empresa):")
print(
    df_chk.groupby("est_empresa")
          .agg(tickets=("empresa","size"),
               p_med=("precio","mean"),
               kwh=("kwh","sum"),
               gasto=("total","sum"))
          .sort_values("gasto", ascending=False)
          .head(10)
)

print("\nTop 10 estaciones por gasto total:")
print(
    df_chk.groupby(["est_id","est_empresa"])
          .agg(tickets=("empresa","size"), gasto=("total","sum"))
          .sort_values("gasto", ascending=False)
          .head(10)
)


Tickets totales: 450
  empresa     usuario       fecha      hora           metodo      producto  \
0  EMP001  EMP001-U28  2025-04-13  04:23:15  Tarjeta crédito  Electricidad   
1  EMP001  EMP001-U28  2025-07-18  18:28:03  Tarjeta crédito  Electricidad   
2  EMP001  EMP001-U28  2025-01-12  16:21:40  Tarjeta crédito  Electricidad   
3  EMP001  EMP001-U28  2025-06-30  06:55:20  Tarjeta crédito  Electricidad   
4  EMP001  EMP001-U28  2025-04-12  14:23:10  Tarjeta crédito  Electricidad   

   precio  precio_sin_imp    kwh  total                       est_id  \
0   0.692           0.544  76.92  53.22      ES*WEN*ESGASVALDEMORO14   
1   0.675           0.531  44.94  30.35               ES*IBD*E157199   
2   0.683           0.537  45.55  31.11                ES*EDP*E00923   
3   0.698           0.549  63.51  44.34  ES*ESX*E21XP22T3KKJAK006051   
4   0.711           0.559  46.32  32.93                ES*EVC*E14765   

                       est_empresa    est_nif               provincia  \
0   