In [11]:
########################## Proceso ETL - Huawei 4G ##########################
# MA. 20250930.

# Librerías
import pandas as pd
import numpy as np
import re
import glob
import os
#import datetime
from datetime import datetime
from datetime import date
from openpyxl.utils import get_column_letter
from openpyxl.styles import Alignment
from openpyxl import load_workbook


## Variables compartidas

# Ubicación archivos
ruta_carpeta = 'C:/Users/SCaracoza/Documents/AT&T/LST Cell Ran/Huawei/Huawei-Feb'

ruta_ept = 'C:/Users/SCaracoza/Documents/AT&T/LST Cell Ran/Huawei/Huawei-Feb'

ruta_destino = 'C:/Users/SCaracoza/Documents/AT&T/LST Cell Ran/Huawei/Huawei-Feb'

# Fecha para el nombre de los archivos a crear
fecha_ejecucion: str = datetime.now().strftime('%Y%m')

In [12]:
# MA. 20250930
### LST CELL. Lectura y unificación robusta (vertical + horizontal) ###

# Archivos
archivo_prefijo = "MML_Task_Result_LST CELL_c"
archivos_txt = glob.glob(os.path.join(ruta_carpeta, f"{archivo_prefijo}*.txt"))

# Columnas objetivo (FAILED + SUCCEEDED)
columnas = [
    "Gestor", "Seccion", "NE", "Report",
    "Local Cell ID","Cell Name","Csg indicator",
    "Uplink cyclic prefix length","Downlink cyclic prefix length","NB-IoT Cell Flag",
    "Coverage Level Type","Frequency band",
    "Uplink EARFCN indication","Uplink EARFCN","Downlink EARFCN","Uplink bandwidth","Downlink bandwidth",
    "Cell ID","Physical cell ID","Additional spectrum emission","Cell active state",
    "Cell admin state","Cell middle block timer(min)","Cell Blocking Duration(min)","Cell FDD TDD indication",
    "Subframe assignment","Special subframe patterns",
    "SSP6 DwPTS Mode","Cell Standby Mode","Cell specific offset(dB)",
    "Frequency offset(dB)","Root sequence index","High speed flag",
    "Preamble format","Cell radius(m)",
    "Customized bandwidth configure indicator","Customized uplink bandwidth(0.1MHz)",
    "Customized downlink bandwidth(0.1MHz)",
    "Emergency Area Id indicator","Emergency Area ID","Ue max power allowed configure indicator",
    "Max transmit power allowed(dBm)","Flag of Multi-RRU Cell","Mode of Multi-RRU Cell",
    "CPRI Ethernet Compression Ratio","CPRI Compression","Physical Cell Number of SFN Cell",
    "Air Cell Flag","CRS Port Number","Cell transmission and reception mode",
    "CRS Antenna Port Mapping","User label","Work mode",
    "CN Operator Sharing Group ID","Intra Frequency RAN Sharing Indication","IntraFreq ANR Indication",
    "ANR Frequency Priority","Cell Radius Start Location(m)","Specified Cell Flag",
    "Downlink Punctured RB Number","SFN Master Cell Label","Multi Cell Share Mode",
    "Standby Cell SFN Recovery Time(h)","Compact Bandwidth Control Interference Mode",
    "Uplink Punctured RB Number Offset","Ultra High-Speed Cell Root Sequence Index"
]

# --- Parseo robusto para "Display static parameters of cells" ---
def _parse_tabla(cmd_text: str):
    """
    Devuelve lista de dicts. Soporta múltiples tablas "Display static parameters of cells"
    (por 'To be continued...') y formato vertical clave=valor.
    """
    patron = re.compile(
        r"Display static parameters of cells\s*-+\s*\n(.*?)(?=\n\s*\(Number of results|\n---\s*END|\nMML Command-----|$)",
        flags=re.S
    )

    resultados = []
    for m in patron.finditer(cmd_text):
        bloque = m.group(1)
        lineas = [ln.rstrip("\n") for ln in bloque.splitlines() if ln.strip()]

        # Variante vertical clave=valor
        kv_pairs = []
        for ln in lineas:
            mm = re.match(r"^\s*(.*?)\s*=\s*(.*)$", ln)
            if mm:
                kv_pairs.append((mm.group(1).strip(), mm.group(2).strip()))
        if kv_pairs and not any(re.match(r"^\d+\s", ln.strip()) for ln in lineas):
            resultados.append({k: v for k, v in kv_pairs})
            continue

        # Variante tabla ancha
        if not lineas:
            continue
        header_line = lineas[0]
        tokens = re.split(r"\s{2,}", header_line.strip())

        spans, pos = [], 0
        for tok in tokens:
            idx = header_line.find(tok, pos)
            spans.append((tok, idx))
            pos = idx + len(tok)
        spans = [(name, start, (spans[i+1][1] if i+1 < len(spans) else None))
                 for i, (name, start) in enumerate(spans)]

        data_lines = [ln for ln in lineas[1:] if re.match(r"^\d+", ln.strip())]
        for ln in data_lines:
            rec = {name: (ln[start:end].rstrip() if end is not None else ln[start:].rstrip())
                   for name, start, end in spans}
            resultados.append(rec)

    return resultados


dfs = []

for archivo in archivos_txt:
    with open(archivo, "r", encoding="utf-8") as f:
        contenido = f.read()

    # Gestor: p.ej. 'LST CELL_c1'
    gestor = os.path.basename(archivo)
    gestor = "_".join(re.sub(r"^MML_Task_Result_", "", gestor).split("_")[0:2])

    # -------- FAILED --------
    m_failed = re.search(r"=+Failed MML Command=+\s*(.*?)(?=\n=+)", contenido, flags=re.S)
    if m_failed:
        bloque_failed = m_failed.group(1)
        for seg in re.split(r"MML Command-----LST CELL:;", bloque_failed):
            if not seg.strip():
                continue
            m_ne = re.search(r"NE\s*:\s*(.*)", seg)
            m_rp = re.search(r"Report\s*:\s*(.*?)(?:\n---\s*END|\n\n|$)", seg, flags=re.S)
            if not m_ne:
                continue
            ne = m_ne.group(1).strip()
            report = (m_rp.group(1).strip() if m_rp else "")
            if report.startswith("+++"):
                report = ""
            dfs.append({"Gestor": gestor, "Seccion": "FAILED", "NE": ne, "Report": report})

    # -------- SUCCEEDED --------
    m_succ = re.search(r"=+Succeeded MML Command=+\s*(.*)", contenido, flags=re.S)
    if m_succ:
        bloque_succ = m_succ.group(1)
        for cmd in re.split(r"MML Command-----LST CELL:;", bloque_succ):
            if not cmd.strip():
                continue
            m_ne = re.search(r"NE\s*:\s*(\S+)", cmd)
            ne = m_ne.group(1).strip() if m_ne else ""
            m_rp = re.search(r"Report\s*:(.*?)(?:\n---\s*END|\n\n|$)", cmd, flags=re.S)
            report = (m_rp.group(1).strip() if m_rp else "")
            if report.startswith("+++"):
                report = ""

            # Tabla / KV
            for rec in _parse_tabla(cmd):
                fila = {"Gestor": gestor, "Seccion": "SUCCEEDED", "NE": ne, "Report": report}
                fila.update(rec)
                dfs.append(fila)

# DataFrame final, asegura columnas
df_LST_CELL_inicial = pd.DataFrame(dfs)
for col in columnas:
    if col not in df_LST_CELL_inicial.columns:
        df_LST_CELL_inicial[col] = ""
df_LST_CELL_inicial = df_LST_CELL_inicial[columnas].fillna("")

# Se deja solo la información necesaria
df_LST_CELL_inicial = df_LST_CELL_inicial[
    (df_LST_CELL_inicial["Seccion"] == "SUCCEEDED") |
    ((df_LST_CELL_inicial["Seccion"] == "FAILED") &
     (df_LST_CELL_inicial["Report"].str.lower() == "ne is not connected."))
]

# NE -> Sitio
df_LST_CELL_inicial.rename(columns={"NE": "Sitio"}, inplace=True)

# Comentarios
df_LST_CELL_inicial["Comentarios"] = np.where(
    #(df_LST_CELL_inicial["Seccion"] == "FAILED") &
    (df_LST_CELL_inicial["Report"].str.lower() == "ne is not connected."),
    "Offline",
    ""
)

# Exporta a Excel
#salida = os.path.join(ruta_destino, f"LST_CELL_{fecha_ejecucion}.xlsx")
#df_LST_CELL_inicial.to_excel(salida, index=False, engine="openpyxl")

In [13]:
# MA. 20250930
### LST CNOPERATORTA. Lectura y unificación ###

# Archivos
# MA. 20250930
archivo_prefijo = "MML_Task_Result_LST CNOPERATORTA_c"
archivos_txt = glob.glob(os.path.join(ruta_carpeta, f"{archivo_prefijo}*.txt"))

# Inicializa lista de DataFrames
dfs = []

for archivo in archivos_txt:
    with open(archivo, "r", encoding="utf-8") as f:
        contenido = f.read()

    # Obtiene nombre de archivo origen reducido
    origen_match = re.search(r"(LST CNOPERATORTA_c\d+)", os.path.basename(archivo))
    origen = origen_match.group(1) if origen_match else os.path.basename(archivo)

    # Procesa sección FAILED
    bloques_failed = re.split(r"=+Failed MML Command=+", contenido)
    if len(bloques_failed) > 1:
        fallidos = re.findall(r"NE\s*:\s*(.*?)\nReport\s*:\s*(.*?)(?:\n|$)", bloques_failed[1], re.DOTALL)
        for ne, report in fallidos:
            report = report.strip()
            if report.startswith("+++"):
                report = ""
            dfs.append(pd.DataFrame([{
                "Gestor": origen, "Seccion": "FAILED", "NE": ne.strip(), "Report": report
            }]))

    # Procesa sección SUCCEEDED
    bloques_succ = re.split(r"=+Succeeded MML Command=+", contenido)
    if len(bloques_succ) > 1:
        comandos = re.split(r"MML Command-----LST CNOPERATORTA:;", bloques_succ[1])
        for cmd in comandos:
            match_ne = re.search(r"NE\s*:\s*(\S+)", cmd)
            if not match_ne:
                continue
            ne = match_ne.group(1).strip()
            report = re.search(r"Report\s*:(.*?)\n", cmd)
            report = report.group(1).strip() if report else ""
            if report.startswith("+++"):
                report = ""

            if "Local tracking area ID" in cmd:
                partes = cmd.splitlines()
                headers = None
                for i, linea in enumerate(partes):
                    if re.match(r"^\s*Local tracking area ID", linea):
                        headers = re.split(r"\s{2,}", linea.strip())
                        data_lines = partes[i+1:]
                        break
                if headers:
                    # Filtra columnas basura "=" y "0"
                    headers = [h for h in headers if h not in ["=", "0"]]
                    for dl in data_lines:
                        if not dl.strip() or dl.startswith("---") or dl.startswith("RETCODE"):
                            continue
                        valores = re.split(r"\s{2,}", dl.strip())
                        if len(valores) <= 1:
                            continue
                        fila = dict(zip(headers, valores[:len(headers)]))
                        fila.update({"Gestor": origen, "Seccion": "SUCCEEDED", "NE": ne, "Report": report})
                        dfs.append(pd.DataFrame([fila]))
            else:
                dfs.append(pd.DataFrame([{
                    "Gestor": origen, "Seccion": "SUCCEEDED", "NE": ne, "Report": report
                }]))

# Une todos los resultados en un DataFrame con todas las columnas detectadas
df_LST_CNOPERATORTA_inicial = pd.concat(dfs, ignore_index=True).fillna("")

# --- Limpieza cruzada ---
# Sitios presentes en SUCCEEDED
sitios_succ = set(df_LST_CNOPERATORTA_inicial.loc[
    df_LST_CNOPERATORTA_inicial["Seccion"] == "SUCCEEDED", "NE"
])

# Elimina elementos FAILED
df_LST_CNOPERATORTA_inicial = df_LST_CNOPERATORTA_inicial[
    ~(
        (df_LST_CNOPERATORTA_inicial["Seccion"] == "FAILED") & (
            # Regla 1: Sitio en SUCCEEDED + Local tracking area ID vacía/nula
            (
                df_LST_CNOPERATORTA_inicial["NE"].isin(sitios_succ) &
                (
                    df_LST_CNOPERATORTA_inicial["Local tracking area ID"].astype("string").isna() |
                    df_LST_CNOPERATORTA_inicial["Local tracking area ID"].astype("string").str.strip().eq("")
                )
            )
            |
            # Regla 2: Report vacío/nulo
            (
                df_LST_CNOPERATORTA_inicial["Report"].astype("string").isna() |
                df_LST_CNOPERATORTA_inicial["Report"].astype("string").str.strip().eq("")
            )
        )
    )
].reset_index(drop=True)

# Renombra columnas
df_LST_CNOPERATORTA_inicial.rename(columns={"NE": "Sitio", "Tracking area code":"TAC"}, inplace=True)

# Crea campo llaveTAC (Sitio + Local tracking area ID)
df_LST_CNOPERATORTA_inicial["llaveTAC"] = (
    df_LST_CNOPERATORTA_inicial["Sitio"].astype("string").str.strip()
      .str.cat(df_LST_CNOPERATORTA_inicial["Local tracking area ID"].astype("string").str.strip(),
               sep="", na_rep=None)
)

# Convierte TAC a string
df_LST_CNOPERATORTA_inicial["TAC"] = df_LST_CNOPERATORTA_inicial["TAC"].apply(str)

# Exporta a Excel
#salida = os.path.join(ruta_destino, f"LST_CNOPERATORTA_{fecha_ejecucion}.xlsx")
#df_LST_CNOPERATORTA_inicial.to_excel(salida, index=False, engine="openpyxl")

In [14]:
# MA. 20250930
### LST CNOPERATOR. Lectura y unificación (con limpieza cruzada) ###

# Archivos
archivo_prefijo = "MML_Task_Result_LST CNOPERATOR_c"
archivos_txt = glob.glob(os.path.join(ruta_carpeta, f"{archivo_prefijo}*.txt"))

# Inicializa lista de DataFrames
dfs = []

for archivo in archivos_txt:
    with open(archivo, "r", encoding="utf-8") as f:
        contenido = f.read()

    # Origen reducido, p.ej. 'LST CNOPERATOR_c1'
    origen_match = re.search(r"(LST CNOPERATOR_c\d+)", os.path.basename(archivo))
    origen = origen_match.group(1) if origen_match else os.path.basename(archivo)

    # --------- FAILED ---------
    bloques_failed = re.split(r"=+Failed MML Command=+", contenido)
    if len(bloques_failed) > 1:
        fallidos = re.findall(r"NE\s*:\s*(.*?)\nReport\s*:\s*(.*?)(?:\n|$)", bloques_failed[1], re.DOTALL)
        for ne, report in fallidos:
            report = report.strip()
            if report.startswith("+++"):
                report = ""
            # Inicializamos CN Operator ID vacío para poder filtrar después
            dfs.append(pd.DataFrame([{
                "Gestor": origen, "Seccion": "FAILED", "NE": ne.strip(), "Report": report,
                "CN Operator ID": ""
            }]))

    # --------- SUCCEEDED ---------
    bloques_succ = re.split(r"=+Succeeded MML Command=+", contenido)
    if len(bloques_succ) > 1:
        comandos = re.split(r"MML Command-----LST CNOPERATOR:;", bloques_succ[1])
        for cmd in comandos:
            match_ne = re.search(r"NE\s*:\s*(\S+)", cmd)
            if not match_ne:
                continue
            ne = match_ne.group(1).strip()
            report_m = re.search(r"Report\s*:(.*?)\n", cmd)
            report = report_m.group(1).strip() if report_m else ""
            if report.startswith("+++"):
                report = ""

            # Detecta tabla
            if "CN Operator" in cmd:
                partes = cmd.splitlines()
                headers = None
                for i, linea in enumerate(partes):
                    if re.match(r"^\s*CN Operator ID", linea):
                        headers = re.split(r"\s{2,}", linea.strip())
                        data_lines = partes[i+1:]
                        break
                if headers:
                    for dl in data_lines:
                        if not dl.strip() or dl.startswith("---") or dl.startswith("RETCODE"):
                            continue
                        valores = re.split(r"\s{2,}", dl.strip())
                        if len(valores) <= 1:
                            continue
                        fila = dict(zip(headers, valores))
                        fila.update({"Gestor": origen, "Seccion": "SUCCEEDED", "NE": ne, "Report": report})
                        dfs.append(pd.DataFrame([fila]))
            else:
                # Sin tabla -> registro de SUCCEEDED sin detalle
                dfs.append(pd.DataFrame([{
                    "Gestor": origen, "Seccion": "SUCCEEDED", "NE": ne, "Report": report,
                    "CN Operator ID": ""
                }]))

# Une todos los resultados
df_LST_CNOPERATOR_inicial = pd.concat(dfs, ignore_index=True).fillna("")

# Asegura columna clave para la limpieza (por si algún archivo no la trajo)
if "CN Operator ID" not in df_LST_CNOPERATOR_inicial.columns:
    df_LST_CNOPERATOR_inicial["CN Operator ID"] = ""

# --- Limpieza cruzada ---
# Sitios presentes en SUCCEEDED
sitios_succ = set(df_LST_CNOPERATOR_inicial.loc[
    df_LST_CNOPERATOR_inicial["Seccion"] == "SUCCEEDED", "NE"
])

# Elimina FAILED cuando mismo Sitio está en SUCCEEDED y CN Operator ID está vacío/nulo
df_LST_CNOPERATOR_inicial = df_LST_CNOPERATOR_inicial[
    ~(
        (df_LST_CNOPERATOR_inicial["Seccion"] == "FAILED") &
        (df_LST_CNOPERATOR_inicial["NE"].isin(sitios_succ)) &
        (df_LST_CNOPERATOR_inicial["CN Operator ID"].isna() | (df_LST_CNOPERATOR_inicial["CN Operator ID"] == ""))
    )
].reset_index(drop=True)

# Renombra NE -> Sitio
df_LST_CNOPERATOR_inicial.rename(columns={"NE": "Sitio"}, inplace=True)

# Exporta a Excel
#salida = os.path.join(ruta_destino, f"LST_CNOPERATOR_{fecha_ejecucion}.xlsx")
#df_LST_CNOPERATOR_inicial.to_excel(salida, index=False, engine="openpyxl")

In [15]:
# MA. 20251001
### DSP S1INTARFACE. Lectura y unificación ###

# Archivos
archivo_prefijo = "MML_Task_Result_DSP S1INTARFACE_c"
archivos_txt = glob.glob(os.path.join(ruta_carpeta, f"{archivo_prefijo}*.txt"))

dfs = []

for archivo in archivos_txt:
    with open(archivo, "r", encoding="utf-8") as f:
        contenido = f.read()

    # Nombre de archivo reducido (ej: DSP S1INTARFACE_c1)
    origen_match = re.search(r"(DSP S1INTARFACE_c\d+)", os.path.basename(archivo))
    origen = origen_match.group(1) if origen_match else os.path.basename(archivo)

    # --------- FAILED ---------
    bloques_failed = re.split(r"=+Failed MML Command=+", contenido)
    if len(bloques_failed) > 1:
        fallidos = re.findall(r"NE\s*:\s*(.*?)\nReport\s*:\s*(.*?)(?:\n|$)", bloques_failed[1], re.DOTALL)
        for ne, report in fallidos:
            report = report.strip()
            if report.startswith("+++"):
                report = ""
            dfs.append(pd.DataFrame([{
                "Gestor": origen, "Seccion": "FAILED", "NE": ne.strip(), "Report": report,
                "S1 Interface ID": ""  # inicializado vacío porque no aparece en FAILED
            }]))

    # --------- SUCCEEDED ---------
    bloques_succ = re.split(r"=+Succeeded MML Command=+", contenido)
    if len(bloques_succ) > 1:
        comandos = re.split(r"MML Command-----DSP S1IN(?:TER|TAR)FACE:;", bloques_succ[1])
        #comandos = re.split(r"MML Command-----DSP S1INTARFACE:;", bloques_succ[1])
        for cmd in comandos:
            match_ne = re.search(r"NE\s*:\s*(\S+)", cmd)
            if not match_ne:
                continue
            ne = match_ne.group(1).strip()
            report = re.search(r"Report\s*:(.*?)\n", cmd)
            report = report.group(1).strip() if report else ""
            if report.startswith("+++"):
                report = ""

            # Busca tabla
            if "Display S1 Interface Information" in cmd:
                partes = cmd.split("Display S1 Interface Information", 1)[1].splitlines()
                headers = None
                for i, linea in enumerate(partes):
                    if re.match(r"^\s*S1 Interface ID", linea):
                        headers = re.split(r"\s{2,}", linea.strip())
                        data_lines = partes[i+1:]
                        break
                if headers:
                    for dl in data_lines:
                        if not dl.strip() or dl.startswith("---") or dl.startswith("RETCODE"):
                            continue
                        valores = re.split(r"\s{2,}", dl.strip())
                        if len(valores) <= 1:
                            continue
                        fila = dict(zip(headers, valores))
                        fila.update({
                            "Gestor": origen, "Seccion": "SUCCEEDED",
                            "NE": ne, "Report": report
                        })
                        dfs.append(pd.DataFrame([fila]))
            else:
                dfs.append(pd.DataFrame([{
                    "Gestor": origen, "Seccion": "SUCCEEDED",
                    "NE": ne, "Report": report,
                    "S1 Interface ID": ""  # inicializado vacío si no hay tabla
                }]))

# Unifica en DataFrame
df_DSP_S1INTARFACE_inicial = pd.concat(dfs, ignore_index=True).fillna("")

# --- Limpieza cruzada ---
# Sitios presentes en SUCCEEDED
sitios_succ = set(df_DSP_S1INTARFACE_inicial.loc[
    df_DSP_S1INTARFACE_inicial["Seccion"] == "SUCCEEDED", "NE"
])

# Elimina registros FAILED cuando el mismo Sitio está en SUCCEEDED y S1 Interface ID vacío/nulo
df_DSP_S1INTARFACE_inicial = df_DSP_S1INTARFACE_inicial[
    ~(
        (df_DSP_S1INTARFACE_inicial["Seccion"] == "FAILED") &
        (df_DSP_S1INTARFACE_inicial["NE"].isin(sitios_succ)) &
        (df_DSP_S1INTARFACE_inicial["S1 Interface ID"].isna() |
         (df_DSP_S1INTARFACE_inicial["S1 Interface ID"] == ""))
    )
].reset_index(drop=True)

# Descarta registros cuando "S1 Interface ID" contiene "+++" o "O&M"
df_DSP_S1INTARFACE_inicial = df_DSP_S1INTARFACE_inicial[
    ~df_DSP_S1INTARFACE_inicial["S1 Interface ID"].astype(str).str.contains(r"\+\+\+|O&M", na=False)
].reset_index(drop=True)

# Renombramiento columnas
df_DSP_S1INTARFACE_inicial.rename(columns={"NE": "Sitio"}, inplace=True)

# Cuenta la cantidad de veces que se repite CN Operator ID por cada Sitio
df_MMEs = (
    df_DSP_S1INTARFACE_inicial
    .groupby(["Sitio", "CN Operator ID"])
    .size()
    .reset_index(name="MMEs")
)


# Exporta a Excel
#salida = os.path.join(ruta_destino, f"DSP_S1INTARFACE_{fecha_ejecucion}.xlsx")
#df_MMEs.to_excel(salida, index=False, engine="openpyxl")

In [16]:
# MA. 20250930
### LST CELLOP. Lectura y unificación ###

# Archivos
archivo_prefijo = "MML_Task_Result_LST CELLOP_c"
archivos_txt = glob.glob(os.path.join(ruta_carpeta, f"{archivo_prefijo}*.txt"))

dfs = []

for archivo in archivos_txt:
    with open(archivo, "r", encoding="utf-8") as f:
        contenido = f.read()

    # Nombre de archivo reducido
    origen_match = re.search(r"(LST CELLOP_c\d+)", os.path.basename(archivo))
    origen = origen_match.group(1) if origen_match else os.path.basename(archivo)

    # --------- FAILED ---------
    bloques_failed = re.split(r"=+Failed MML Command=+", contenido)
    if len(bloques_failed) > 1:
        fallidos = re.findall(r"NE\s*:\s*(.*?)\nReport\s*:\s*(.*?)(?:\n|$)", bloques_failed[1], re.DOTALL)
        for ne, report in fallidos:
            report = report.strip()
            if report.startswith("+++"):
                report = ""
            dfs.append(pd.DataFrame([{
                "Gestor": origen, "Seccion": "FAILED", "NE": ne.strip(), "Report": report,
                "Local cell ID": ""  # se inicializa vacío porque no se listan en FAILED
            }]))

    # --------- SUCCEEDED ---------
    bloques_succ = re.split(r"=+Succeeded MML Command=+", contenido)
    if len(bloques_succ) > 1:

        # --- Agrupa fragmentos por NE (maneja "To be continued...") ---
        comandos_raw = re.split(r"MML Command-----LST CELLOP:;", bloques_succ[1])
        bloques_por_ne = {}
        for cmd in comandos_raw:
            match_ne = re.search(r"NE\s*:\s*(\S+)", cmd)
            if match_ne:
                ne = match_ne.group(1).strip()
                bloques_por_ne.setdefault(ne, "")
                bloques_por_ne[ne] += cmd  # concatena fragmentos continuados

        # --- Procesa cada NE completo ---
        for ne, cmd in bloques_por_ne.items():
            report = re.search(r"Report\s*:(.*?)\n", cmd)
            report = report.group(1).strip() if report else ""
            if report.startswith("+++"):
                report = ""

            # --- Busca secciones de "List Cell Operator" ---
            if "List Cell Operator" in cmd:
                partes = cmd.split("List Cell Operator")
                for parte in partes[1:]:  # puede haber más de una tabla por NE
                    lineas = parte.splitlines()
                    headers = None
                    for i, linea in enumerate(lineas):
                        if re.match(r"^\s*Local cell ID", linea):
                            headers = re.split(r"\s{2,}", linea.strip())
                            data_lines = lineas[i+1:]
                            break
                    if headers:
                        for dl in data_lines:
                            if not dl.strip() or dl.startswith("---") or dl.startswith("RETCODE"):
                                continue
                            valores = re.split(r"\s{2,}", dl.strip())
                            if len(valores) <= 1:
                                continue
                            fila = dict(zip(headers, valores))
                            fila.update({"Gestor": origen, "Seccion": "SUCCEEDED", "NE": ne, "Report": report})
                            dfs.append(pd.DataFrame([fila]))
            else:
                dfs.append(pd.DataFrame([{
                    "Gestor": origen, "Seccion": "SUCCEEDED", "NE": ne, "Report": report,
                    "Local cell ID": ""
                }]))

# --- Unificación en DataFrame ---
df_LST_CELLOP_inicial = pd.concat(dfs, ignore_index=True).fillna("")

# --- Limpieza cruzada ---
# Sitios presentes en SUCCEEDED
sitios_succ = set(df_LST_CELLOP_inicial.loc[
    df_LST_CELLOP_inicial["Seccion"] == "SUCCEEDED", "NE"
])

# Elimina registros FAILED cuando el mismo Sitio está en SUCCEEDED y Local Cell ID está vacío/nulo
df_LST_CELLOP_inicial = df_LST_CELLOP_inicial[
    ~(
        (df_LST_CELLOP_inicial["Seccion"] == "FAILED") &
        (df_LST_CELLOP_inicial["NE"].isin(sitios_succ)) &
        (df_LST_CELLOP_inicial["Local cell ID"].isna() | (df_LST_CELLOP_inicial["Local cell ID"] == ""))
    )
].reset_index(drop=True)

# --- Descarta registros cuando "Local cell ID" contiene "+++" o "O&M" ---
df_LST_CELLOP_inicial = (
    df_LST_CELLOP_inicial[
        ~df_LST_CELLOP_inicial["Local cell ID"].astype(str).str.contains(r"\+\+\+|O&M", na=False)
    ].reset_index(drop=True)
)

# --- Renombramiento columnas ---
df_LST_CELLOP_inicial.rename(columns={"NE": "Sitio", "Local cell ID": "Local Cell ID"}, inplace=True)

# --- Exporta a Excel ---
#salida = os.path.join(ruta_destino, f"LST_CELLOP_{fecha_ejecucion}.xlsx")
#df_LST_CELLOP_inicial.to_excel(salida, index=False, engine="openpyxl")


In [17]:
# MA. 20251001
### DSP CELL. Lectura y unificación robusta (vertical + horizontal) ###

# Archivos
archivo_prefijo = "MML_Task_Result_DSP CELL_c"
archivos_txt = glob.glob(os.path.join(ruta_carpeta, f"{archivo_prefijo}*.txt"))

# Columnas objetivo (FAILED + SUCCEEDED)
columnas = [
    "Gestor", "Seccion", "NE", "Report",
    "Local Cell ID", "Cell Name", "Cell instance state",
    "Reason for latest state change", "Cell latest setup time",
    "Cell latest setup operate type", "Cell latest remove time",
    "Cell latest remove operate type", "Cell power save state",
    "Symbol shutdown state", "Anti Interfere Status in High Speed Scenario",
    "Primary BBP information", "Cell topology type",
    "Maximum transmit power(0.1dBm)", "Cell PLMN Info",
    "Time of Cell State Change to Unavailable",
    "Time of Last Config Before Cell Unavailable"
]

# --- Parseo robusto para "Display dynamic parameters of cells" ---
def _parse_tabla(cmd_text: str):
    """
    Devuelve una lista de dicts. Soporta:
    1) Tabla horizontal (encabezado + filas con números)
    2) Tabla vertical (clave = valor)
    """
    m = re.search(
        r"Display dynamic parameters of cells\s*-+\s*\n(.*?)(?=\n\s*\(Number of results|\n---\s*END|\nMML Command-----|$)",
        cmd_text, flags=re.S
    )
    if not m:
        return []

    bloque = m.group(1)
    lineas = [ln.rstrip("\n") for ln in bloque.splitlines() if ln.strip()]
    if not lineas:
        return []

    # --- Variante 2: clave = valor ---
    kv_pairs = []
    for ln in lineas:
        mm = re.match(r"^\s*(.*?)\s*=\s*(.*)$", ln)
        if mm:
            kv_pairs.append((mm.group(1).strip(), mm.group(2).strip()))
    if kv_pairs and not any(re.match(r"^\d+\s", ln.strip()) for ln in lineas):
        return [{k: v for k, v in kv_pairs}]

    # --- Variante 1: tabla horizontal ---
    header_line = lineas[0]
    tokens = re.split(r"\s{2,}", header_line.strip())

    spans, pos = [], 0
    for tok in tokens:
        idx = header_line.find(tok, pos)
        spans.append((tok, idx))
        pos = idx + len(tok)
    spans = [(name, start, (spans[i+1][1] if i+1 < len(spans) else None))
             for i, (name, start) in enumerate(spans)]

    data_lines = [ln for ln in lineas[1:] if re.match(r"^\d+", ln.strip())]
    filas = []
    for ln in data_lines:
        rec = {name: (ln[start:end].rstrip() if end is not None else ln[start:].rstrip())
               for name, start, end in spans}
        filas.append(rec)
    return filas

dfs = []

for archivo in archivos_txt:
    with open(archivo, "r", encoding="utf-8") as f:
        contenido = f.read()

    # Gestor: p.ej. 'DSP CELL_c1'
    gestor = os.path.basename(archivo)
    gestor = "_".join(re.sub(r"^MML_Task_Result_", "", gestor).split("_")[0:2])

    # -------- FAILED --------
    m_failed = re.search(r"=+Failed MML Command=+\s*(.*?)(?=\n=+)", contenido, flags=re.S)
    if m_failed:
        bloque_failed = m_failed.group(1)
        for seg in re.split(r"MML Command-----DSP CELL:;", bloque_failed):
            if not seg.strip():
                continue
            m_ne = re.search(r"NE\s*:\s*(.*)", seg)
            m_rp = re.search(r"Report\s*:\s*(.*?)(?:\n---\s*END|\n\n|$)", seg, flags=re.S)
            if not m_ne:
                continue
            ne = m_ne.group(1).strip()
            report = (m_rp.group(1).strip() if m_rp else "")
            if report.startswith("+++"):
                report = ""
            dfs.append({"Gestor": gestor, "Seccion": "FAILED", "NE": ne, "Report": report})

    # -------- SUCCEEDED --------
    m_succ = re.search(r"=+Succeeded MML Command=+\s*(.*)", contenido, flags=re.S)
    if m_succ:
        bloque_succ = m_succ.group(1)
        for cmd in re.split(r"MML Command-----DSP CELL:;", bloque_succ):
            if not cmd.strip():
                continue
            m_ne = re.search(r"NE\s*:\s*(\S+)", cmd)
            ne = m_ne.group(1).strip() if m_ne else ""
            m_rp = re.search(r"Report\s*:(.*?)(?:\n---\s*END|\n\n|$)", cmd, flags=re.S)
            report = (m_rp.group(1).strip() if m_rp else "")
            if report.startswith("+++"):
                report = ""

            # Parse tabla si existe
            recs = _parse_tabla(cmd)
            if recs:
                for rec in recs:
                    fila = {"Gestor": gestor, "Seccion": "SUCCEEDED", "NE": ne, "Report": report}
                    fila.update(rec)
                    dfs.append(fila)
            else:
                dfs.append({"Gestor": gestor, "Seccion": "SUCCEEDED", "NE": ne, "Report": report})

# DataFrame final, asegura columnas
df_DSP_CELL_inicial = pd.DataFrame(dfs)
for col in columnas:
    if col not in df_DSP_CELL_inicial.columns:
        df_DSP_CELL_inicial[col] = ""
df_DSP_CELL_inicial = df_DSP_CELL_inicial[columnas].fillna("")

# NE -> Sitio
df_DSP_CELL_inicial.rename(columns={"NE": "Sitio"}, inplace=True)

# Comentarios
df_DSP_CELL_inicial["Comentarios"] = np.where(
    #(df_DSP_CELL_inicial["Seccion"] == "FAILED") &
    (df_DSP_CELL_inicial["Report"].str.lower() == "ne is not connected."),
    "Offline",
    ""
)

# Exporta a Excel
#salida = os.path.join(ruta_destino, f"DSP_CELL_{fecha_ejecucion}.xlsx")
#df_DSP_CELL_inicial.to_excel(salida, index=False, engine="openpyxl")

In [18]:
# MA. 20251002.
### Información archivo mes anterior ###

# Sufijo mes anterior
today = date.today()
prev_year  = today.year if today.month > 1 else today.year - 1
prev_month = today.month - 1 or 12
yyyymm = f"{prev_year}{prev_month:02d}"
fecha_hoy = today.strftime("%d/%m/%Y")

# Busca archivo
ruta: str = ruta_destino  # --> MA. Por Definir.
archivo = f"All_Huawei_4G_{yyyymm}.xlsx"
path = os.path.join(ruta, archivo)

if os.path.exists(path):
    # Fecha del día y de creación de archivo anterior
    wb = load_workbook(path, read_only=True)
    props = wb.properties
    fecha_creacion = props.created.strftime("%d/%m/%Y")

    # Info archivo anterior
    df_All_Huawei_4G_Anterior = pd.read_excel(path)
    df_All_Huawei_4G_Anterior = df_All_Huawei_4G_Anterior.drop(columns="Comentarios")

    # Agrega sufijo _ant a todas las columnas
    df_All_Huawei_4G_Anterior = df_All_Huawei_4G_Anterior.add_suffix("_ant")
    # Convierte valor a string
    df_All_Huawei_4G_Anterior["Local Cell ID_ant"] = df_All_Huawei_4G_Anterior["Local Cell ID_ant"].apply(str)

    # Info TAC y TAL
    df_All_Huawei_4G_Ant_TAC = pd.read_excel(
        path,
        usecols=["TAC", "TAL"]
    )

    # TAC -> TAC_ant
    df_All_Huawei_4G_Ant_TAC.rename(columns={"TAC": "TAC_ant"}, inplace=True)
    # Convierte valor a string
    df_All_Huawei_4G_Ant_TAC["TAC_ant"] = df_All_Huawei_4G_Ant_TAC["TAC_ant"].apply(str)

    # Trae solo registros únicos
    df_All_Huawei_4G_Ant_TAC = (
    df_All_Huawei_4G_Ant_TAC
    .drop_duplicates(subset=["TAC_ant"])   # elimina duplicados
    )

else:
    # No existe archivo, crea DataFrame vacío con columnas necesarias
    df_All_Huawei_4G_Anterior = pd.DataFrame(columns=["Sitio_ant", "Local Cell ID_ant"])
    df_All_Huawei_4G_Ant_TAC = pd.DataFrame(columns=["TAC_ant", "TAL"])

# Exporta a Excel
#salida = os.path.join(ruta_destino, f"All_Huawei_4G_Anterior_{fecha_ejecucion}.xlsx")
#df_All_Huawei_4G_Anterior.to_excel(salida, index=False, engine="openpyxl")

In [19]:
# MA. 20251002.
### Información del EPT ###

# Prefijo del archivo
prefijo_ept = "EPT_ATT_UMTS_LTE_"

# Busca archivo que empiece con el prefijo
archivo = glob.glob(os.path.join(ruta_ept, f"{prefijo_ept}*.xlsx"))

# Verifica si se encontró archivo
if archivo:
    archivo_encontrado = archivo[0]
    nombre_archivo = os.path.basename(archivo_encontrado)

    # Lista de hojas a leer
    hojas = [
        "EPT_3G_LTE_OUTDOOR",
        "PLAN_OUTDOOR",
        "EPT_3G_LTE_INDOOR",
        "PLAN_INDOOR",
        "Eventos_Especiales"
    ]

    # Lee todas las hojas y agrega el nombre de la hoja en columna
    dfs = [
        pd.read_excel(archivo_encontrado, sheet_name=hoja, engine="openpyxl")
        .assign(Hoja=hoja, Origen=nombre_archivo)
        for hoja in hojas
    ]

    # Concatena todo en un solo DataFrame
    df_EPT_inicial = pd.concat(dfs, ignore_index=True)


    # Renombramiento columna(s)
    nuevos_nombres = {"CellName" : "Cell Name", "Latitud" : "LAT", "Longitud" : "LON"}
    df_EPT_inicial.rename(columns=nuevos_nombres, inplace=True)

    # Convierte columnas totalmente numéricas (cuando aplica)
    df_EPT_ini = df_EPT_inicial.apply(
        lambda col: pd.to_numeric(col, errors="coerce")
        if not pd.to_numeric(col, errors="coerce").isna().any()
        else col
    )

    # EPT
    df_EPT_unificado = df_EPT_ini[["Cell Name", "LAT", "LON", "AT&T_Site_Name"]]

# Archivo unificado en Excel, para fines de validación.
#ruta_salida = os.path.join(ruta_destino, f"EPT_unificado_{fecha_ejecucion}.xlsx")
#df_EPT_unificado.to_excel(ruta_salida, index=False)

In [20]:
# MA. 20251001.
### Creación archivo final ###

## Extraemos solo las columnas requeridas de cada DataFrame.

## LST_CELL
df_LST_CELL = df_LST_CELL_inicial[["Gestor", "Seccion", "Sitio", "Local Cell ID", "Cell Name",
"Csg indicator", "Uplink cyclic prefix length", "Downlink cyclic prefix length",
"NB-IoT Cell Flag", "Coverage Level Type", "Frequency band", "Uplink EARFCN indication",
"Uplink EARFCN", "Downlink EARFCN", "Uplink bandwidth", "Downlink bandwidth",
"Cell ID", "Physical cell ID", "Additional spectrum emission", "Cell active state",
"Cell admin state", "Cell middle block timer(min)", "Cell FDD TDD indication",
"Subframe assignment", "Special subframe patterns", "SSP6 DwPTS Mode", "Cell Standby Mode",
"Cell specific offset(dB)", "Frequency offset(dB)", "Root sequence index", "High speed flag",
"Preamble format", "Cell radius(m)", "Customized bandwidth configure indicator",
"Customized uplink bandwidth(0.1MHz)", "Customized downlink bandwidth(0.1MHz)",
"Emergency Area Id indicator", "Emergency Area ID", "Ue max power allowed configure indicator",
"Max transmit power allowed(dBm)", "Flag of Multi-RRU Cell", "Mode of Multi-RRU Cell",
"CPRI Ethernet Compression Ratio", "CPRI Compression", "Physical Cell Number of SFN Cell",
"Air Cell Flag", "CRS Port Number", "Cell transmission and reception mode",
"CRS Antenna Port Mapping", "User label", "Work mode", "CN Operator Sharing Group ID",
"Intra Frequency RAN Sharing Indication", "IntraFreq ANR Indication", "ANR Frequency Priority",
"Cell Radius Start Location(m)", "Specified Cell Flag", "Downlink Punctured RB Number",
"SFN Master Cell Label", "Multi Cell Share Mode", "Standby Cell SFN Recovery Time(h)",
"Compact Bandwidth Control Interference Mode", "Uplink Punctured RB Number Offset",
"Ultra High-Speed Cell Root Sequence Index", "Comentarios"]]

## LST_CELLOP
df_LST_CELLOP = df_LST_CELLOP_inicial[["Sitio", "Local Cell ID", "Local tracking area ID"]]

# Obtiene valor máximo de "Local tracking area ID"
df_LST_CELLOP = (
    df_LST_CELLOP
    .loc[df_LST_CELLOP["Local Cell ID"].notna() & (df_LST_CELLOP["Local Cell ID"] != "")]
    .groupby(["Sitio", "Local Cell ID"], as_index=False)["Local tracking area ID"]
    .max()
)

# "Local tracking area ID" -> "Cellda con MOCN"
df_LST_CELLOP.rename(columns={"Local tracking area ID": "Cellda con MOCN"}, inplace=True)

## LST_CNOPERATOR
df_LST_CNOPERATOR = df_LST_CNOPERATOR_inicial[["Sitio", "CN Operator ID"]]

# Obtiene valor máximo de "CN Operator ID"
df_LST_CNOPERATOR = (
    df_LST_CNOPERATOR
    .loc[df_LST_CNOPERATOR["CN Operator ID"].notna() & (df_LST_CNOPERATOR["CN Operator ID"] != "")]
    .groupby(["Sitio"], as_index=False)["CN Operator ID"]
    .max()
)

# "CN Operator ID" -> "MOCN Configurado Sitio"
df_LST_CNOPERATOR.rename(columns={"CN Operator ID": "MOCN Configurado Sitio"}, inplace=True)

## DSP_S1INTARFACE
df_DSP_S1INTARFACE = df_DSP_S1INTARFACE_inicial[["Sitio", "S1 Interface ID", "CN Operator ID", "S1 Interface Fault Reason"]]

# Obtiene valor máximo de "CN Operator ID"
df_DSP_S1INTARFACE = (
    df_DSP_S1INTARFACE
    .loc[df_DSP_S1INTARFACE["CN Operator ID"].notna() & (df_DSP_S1INTARFACE["CN Operator ID"] != "")]
    .groupby(["Sitio"], as_index=False)[["CN Operator ID", "S1 Interface ID", "S1 Interface Fault Reason"]]
    .max()
)
# "CN Operator ID" -> "S1 MOCN"
df_DSP_S1INTARFACE.rename(columns={"CN Operator ID": "S1 MOCN"}, inplace=True)
## DSP_OPERATORTA
df_LST_CNOPERATORTA = df_LST_CNOPERATORTA_inicial[["llaveTAC", "TAC"]]
## DSP_CELL
df_DSP_CELL = df_DSP_CELL_inicial[["Sitio", "Local Cell ID", "Cell instance state"]]
## Se unen los DataFrames en uno solo por medio de left joins.
df_Huawei_4G_inicial = df_LST_CELL.merge(df_LST_CELLOP, on=["Sitio", "Local Cell ID"], how="left")
df_Huawei_4G_inicial = df_Huawei_4G_inicial.merge(df_LST_CNOPERATOR, on=["Sitio"], how="left")
df_Huawei_4G_inicial = df_Huawei_4G_inicial.merge(df_DSP_S1INTARFACE, on=["Sitio"], how="left")
df_Huawei_4G_inicial = df_Huawei_4G_inicial.merge(df_DSP_CELL, on=["Sitio", "Local Cell ID"], how="left")

# Crea campo llaveTAC (Sitio + Local tracking area ID) para unirse con LST_OPERATORTA
df_Huawei_4G_inicial["llaveTAC"] = (
    df_Huawei_4G_inicial["Sitio"].astype("string").str.strip()
      .str.cat(df_Huawei_4G_inicial["Cellda con MOCN"].astype("string").str.strip(),
               sep="", na_rep=None)
)

# Obtiene informacion de LST_CNOPERATORTA (TAC)
df_Huawei_4G_inicial = df_Huawei_4G_inicial.merge(df_LST_CNOPERATORTA, on=["llaveTAC"], how="left")

## Obtiene elementos del Archivo anterior
# Divide los registros en SUCCEEDED y FAILED
df_succeeded = df_Huawei_4G_inicial[df_Huawei_4G_inicial["Seccion"] == "SUCCEEDED"].copy()
df_failed    = df_Huawei_4G_inicial[df_Huawei_4G_inicial["Seccion"] == "FAILED"].copy()

# --- Caso SUCCEEDED ---
df_succeeded = df_succeeded.merge(
    df_All_Huawei_4G_Anterior,
    left_on=["Sitio", "Local Cell ID"],
    right_on=["Sitio_ant", "Local Cell ID_ant"],
    how="left"
)

# --- Caso FAILED ---
# Para asegurar que solo tome el primer "Local Cell ID" por "Sitio_ant"
df_all_failed = (
    df_All_Huawei_4G_Anterior
    .sort_values(by=["Sitio_ant", "Local Cell ID_ant"])
    .drop_duplicates(subset=["Sitio_ant"], keep="first")
)
df_failed = df_failed.merge(
    df_all_failed,
    left_on="Sitio",
    right_on="Sitio_ant",
    how="left"
)

# Une resultados
df_Huawei_4G_inicial = pd.concat([df_succeeded, df_failed], ignore_index=True)

# Reemplazo automático de columnas con sufijo "_ant" cuando Seccion == "FAILED"
mask_failed = df_Huawei_4G_inicial["Seccion"] == "FAILED"
for col_ant in df_Huawei_4G_inicial.columns:
    if col_ant.endswith("_ant"):
        col = col_ant[:-4]
        if col in df_Huawei_4G_inicial.columns:
            df_Huawei_4G_inicial.loc[mask_failed, col] = df_Huawei_4G_inicial.loc[mask_failed, col_ant]


# Se une con EPT
# ===================== FIX EPT MATCH (pegar ANTES del merge con EPT) =====================

# 1) Normaliza Cell Name en ambos DF (quita invisibles/espacios)
df_Huawei_4G_inicial["Cell Name"] = (
    df_Huawei_4G_inicial["Cell Name"]
    .astype(str)
    .str.replace("\u00a0", " ", regex=False)  # NBSP
    .str.replace("\r", "", regex=False)
    .str.replace("\n", "", regex=False)
    .str.strip()
)

df_EPT_unificado["Cell Name"] = (
    df_EPT_unificado["Cell Name"]
    .astype(str)
    .str.replace("\u00a0", " ", regex=False)
    .str.replace("\r", "", regex=False)
    .str.replace("\n", "", regex=False)
    .str.strip()
)

# 2) Renombra columnas EPT para que SIEMPRE queden *_EPT (evita colisiones)
df_EPT_unificado = df_EPT_unificado.rename(columns={
    "LAT": "LAT_EPT",
    "LON": "LON_EPT",
    "AT&T_Site_Name": "AT&T_Site_Name_EPT"
})

# =================== FIN FIX EPT MATCH (antes del merge con EPT) ===================

# Se une con EPT (ahora EPT ya trae nombres *_EPT)
df_Huawei_4G_inicial = df_Huawei_4G_inicial.merge(
    df_EPT_unificado,
    on="Cell Name",
    how="left"
)

# Asegura columna base AT&T_Site_Name
if "AT&T_Site_Name" not in df_Huawei_4G_inicial.columns:
    df_Huawei_4G_inicial["AT&T_Site_Name"] = np.nan

# Rellena AT&T_Site_Name: primero EPT, luego mes anterior (_ant)
df_Huawei_4G_inicial["AT&T_Site_Name"] = (
    df_Huawei_4G_inicial["AT&T_Site_Name"].replace("", np.nan)
    .combine_first(df_Huawei_4G_inicial.get("AT&T_Site_Name_EPT", pd.Series(index=df_Huawei_4G_inicial.index)).replace("", np.nan))
    .combine_first(df_Huawei_4G_inicial.get("AT&T_Site_Name_ant", pd.Series(index=df_Huawei_4G_inicial.index)).replace("", np.nan))
)

# Rellena LAT/LON: primero EPT, luego _ant
for base, ept, ant in [("LAT", "LAT_EPT", "LAT_ant"), ("LON", "LON_EPT", "LON_ant")]:
    if base not in df_Huawei_4G_inicial.columns:
        df_Huawei_4G_inicial[base] = np.nan

    if ept in df_Huawei_4G_inicial.columns:
        df_Huawei_4G_inicial[base] = (
            df_Huawei_4G_inicial[base].replace("", np.nan)
            .combine_first(df_Huawei_4G_inicial[ept].replace("", np.nan))
        )

    if ant in df_Huawei_4G_inicial.columns:
        df_Huawei_4G_inicial[base] = (
            df_Huawei_4G_inicial[base].replace("", np.nan)
            .combine_first(df_Huawei_4G_inicial[ant].replace("", np.nan))
        )

# ======================= Obtiene valor TAL del Archivo anterior =======================
def _norm_tac_series(s: pd.Series) -> pd.Series:
    """Normaliza TAC a string sin .0, sin espacios, solo dígitos."""
    s = s.astype(str).str.strip()
    s = s.str.replace(r"\.0$", "", regex=True)   # 30136.0 -> 30136
    s = s.str.replace(r"[^\d]", "", regex=True)  # quita signos no numéricos
    s = s.mask(s.eq(""), np.nan)
    return s

# Normaliza TAC en ambos lados
if "TAC" in df_Huawei_4G_inicial.columns:
    df_Huawei_4G_inicial["TAC"] = _norm_tac_series(df_Huawei_4G_inicial["TAC"])
if not df_All_Huawei_4G_Ant_TAC.empty:
    df_All_Huawei_4G_Ant_TAC["TAC_ant"] = _norm_tac_series(df_All_Huawei_4G_Ant_TAC["TAC_ant"])

# Deduplicación robusta de TAC_ant priorizando TAL no nulo
if not df_All_Huawei_4G_Ant_TAC.empty:
    _aux = df_All_Huawei_4G_Ant_TAC.copy()
    _aux["_tal_isna"] = _aux["TAL"].isna() | (_aux["TAL"].astype(str).str.strip().eq(""))
    # Ordena por TAC y pone primero los TAL válidos
    _aux = _aux.sort_values(by=["TAC_ant", "_tal_isna"])
    df_All_Huawei_4G_Ant_TAC_clean = _aux.drop_duplicates(subset=["TAC_ant"], keep="first").drop(columns="_tal_isna")
else:
    df_All_Huawei_4G_Ant_TAC_clean = df_All_Huawei_4G_Ant_TAC

# Merge principal TAC -> TAL
df_Huawei_4G_inicial = df_Huawei_4G_inicial.merge(
    df_All_Huawei_4G_Ant_TAC_clean[["TAC_ant", "TAL"]],
    left_on="TAC",
    right_on="TAC_ant",
    how="left"
)

# Fallback por Sitio si sigue faltando TAL y el archivo anterior trae TAL_ant
if "TAL" not in df_Huawei_4G_inicial.columns:
    df_Huawei_4G_inicial["TAL"] = np.nan

if "TAL_ant" in df_All_Huawei_4G_Anterior.columns:
    tal_por_sitio = (
        df_All_Huawei_4G_Anterior[["Sitio_ant", "TAL_ant"]]
        .dropna(subset=["Sitio_ant"])
        .drop_duplicates(subset=["Sitio_ant"], keep="first")
        .rename(columns={
            "Sitio_ant": "__sitio_prev__",
            "TAL_ant": "TAL_fallback_sitio"
        })
    )

    # Por si ya existiera accidentalmente la columna temporal
    if "__sitio_prev__" in df_Huawei_4G_inicial.columns:
        df_Huawei_4G_inicial.drop(columns=["__sitio_prev__"], inplace=True, errors="ignore")

    df_Huawei_4G_inicial = df_Huawei_4G_inicial.merge(
        tal_por_sitio,
        left_on="Sitio",
        right_on="__sitio_prev__",
        how="left",
        suffixes=("", "")
    )

    # Rellena TAL sólo donde está vacío
    _tal_vacio = df_Huawei_4G_inicial["TAL"].isna() | df_Huawei_4G_inicial["TAL"].astype(str).str.strip().eq("")
    if "TAL_fallback_sitio" in df_Huawei_4G_inicial.columns:
        df_Huawei_4G_inicial.loc[_tal_vacio, "TAL"] = df_Huawei_4G_inicial.loc[_tal_vacio, "TAL_fallback_sitio"]

    # Limpieza de columnas auxiliares
    df_Huawei_4G_inicial.drop(columns=["__sitio_prev__", "TAL_fallback_sitio"], inplace=True, errors="ignore")


# =============================================================================================

## Calculo valor "Almenos una celda encendida con MOCN"
df_Huawei_4G_inicial["Almenos una celda encendida con MOCN"] = np.where(
    pd.to_numeric(df_Huawei_4G_inicial["Cellda con MOCN"], errors="coerce").eq(2),
    "Si",
    "NO"
)

## Obtiene valor de MMEs
df_Huawei_4G_inicial = df_Huawei_4G_inicial.merge(
    df_MMEs, left_on=["Sitio", "S1 MOCN"], right_on=["Sitio", "CN Operator ID"], how="left"
)

## Reemplaza 'LST CELL_c' por 'MAE-' y conserva el número final
df_Huawei_4G_inicial["Gestor"] = df_Huawei_4G_inicial["Gestor"].str.replace(
    r"LST CELL_c(\d+)", r"MAE-\1", regex=True
)

## Reordenamiento Estructura Final
nuevo_orden = ["AT&T_Site_Name", "Gestor", "Sitio", "Local Cell ID", "Cell Name",
"Csg indicator", "Uplink cyclic prefix length", "Downlink cyclic prefix length",
"NB-IoT Cell Flag", "Coverage Level Type", "Frequency band", "Uplink EARFCN indication",
"Uplink EARFCN", "Downlink EARFCN", "Uplink bandwidth", "Downlink bandwidth",
"Cell ID", "Physical cell ID", "Additional spectrum emission", "Cell active state",
"Cell admin state", "Cell middle block timer(min)", "Cell FDD TDD indication",
"Subframe assignment", "Special subframe patterns", "SSP6 DwPTS Mode", "Cell Standby Mode",
"Cell specific offset(dB)", "Frequency offset(dB)", "Root sequence index", "High speed flag",
"Preamble format", "Cell radius(m)", "Customized bandwidth configure indicator",
"Customized uplink bandwidth(0.1MHz)", "Customized downlink bandwidth(0.1MHz)",
"Emergency Area Id indicator", "Emergency Area ID", "Ue max power allowed configure indicator",
"Max transmit power allowed(dBm)", "Flag of Multi-RRU Cell", "Mode of Multi-RRU Cell",
"CPRI Ethernet Compression Ratio", "CPRI Compression", "Physical Cell Number of SFN Cell",
"Air Cell Flag", "CRS Port Number", "Cell transmission and reception mode",
"CRS Antenna Port Mapping", "User label", "Work mode", "CN Operator Sharing Group ID",
"Intra Frequency RAN Sharing Indication", "IntraFreq ANR Indication", "ANR Frequency Priority",
"Cell Radius Start Location(m)", "Specified Cell Flag", "Downlink Punctured RB Number",
"SFN Master Cell Label", "Multi Cell Share Mode", "Standby Cell SFN Recovery Time(h)",
"Compact Bandwidth Control Interference Mode", "Uplink Punctured RB Number Offset",
"Ultra High-Speed Cell Root Sequence Index", "Cellda con MOCN", "MOCN Configurado Sitio",
"S1 MOCN", "S1 Interface Fault Reason", "Almenos una celda encendida con MOCN", "TAC", "TAL",
"Cell instance state", "LAT", "LON", "MMEs", "Comentarios"]

df_Huawei_4G_inicial = df_Huawei_4G_inicial[nuevo_orden]

# Remueve sufijo _ant
df_Huawei_4G_inicial.columns = [col.replace("_ant", "") for col in df_Huawei_4G_inicial.columns]
#print("\n[DEBUG] ANTES del filtro de AT&T_Site_Name:", df_Huawei_4G_inicial.shape)

#mask_site = df_Huawei_4G_inicial["Sitio"].eq("YUCMER0399")
#cols_att = [c for c in df_Huawei_4G_inicial.columns if "AT&T_Site" in c]

#print("[DEBUG] YUCMER0399 antes del filtro:", df_Huawei_4G_inicial.loc[mask_site].shape)
#print(df_Huawei_4G_inicial.loc[mask_site, ["Sitio","Local Cell ID","Cell Name"] + cols_att].head(20))

# Descarta valores sin AT&T_Site_Name
df_Huawei_4G_inicial = df_Huawei_4G_inicial.loc[
    df_Huawei_4G_inicial["AT&T_Site_Name"].notna() & (df_Huawei_4G_inicial["AT&T_Site_Name"].str.strip() != "")
]
#print("\n[DEBUG] DESPUÉS del filtro de AT&T_Site_Name:", df_Huawei_4G_inicial.shape)
#print("[DEBUG] YUCMER0399 después del filtro:", df_Huawei_4G_inicial.loc[df_Huawei_4G_inicial["Sitio"].eq("YUCMER0399")].shape)


# Convierte columnas totalmente numéricas (sin forzar TAL a numérico)
df_Huawei_4G = df_Huawei_4G_inicial.apply(
    lambda col: pd.to_numeric(col, errors="coerce")
    if not pd.to_numeric(col, errors="coerce").isna().any()
    else col
)

# Asegura TAL como string (evita perder ceros a la izquierda si aplica)
if "TAL" in df_Huawei_4G.columns:
    # después de construir df_Huawei_4G
    df_Huawei_4G["TAL"] = (
    df_Huawei_4G["TAL"]
        .astype(str).str.strip()
        .str.replace(r"\.0+$", "", regex=True)   # 30136.0 -> 30136
        .replace({"nan": "", "NaN": "", "<NA>": ""})
)

## Archivo Final formateado
ruta_salida = os.path.join(ruta_destino, f"All_Huawei_4G_{fecha_ejecucion}.xlsx")
df_Huawei_4G.to_excel(ruta_salida, index=False, engine="openpyxl")

# Abre archivo creado para rotar los encabezados
wb = load_workbook(ruta_salida)
ws = wb.active

# Rota los encabezados 90° (vertical)
for col_num, column_title in enumerate(df_Huawei_4G.columns, 1):
    cell = ws[f"{get_column_letter(col_num)}1"]
    cell.alignment = Alignment(textRotation=90, horizontal="center", vertical="bottom")

# Guarda cambios
wb.save(ruta_salida)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_EPT_unificado["Cell Name"] = (
