# OPC UA a SQLite (Nuevo Esquema - 12 Variables)

Este notebook se conecta al PLC mediante `asyncua`, busca las 12 variables principales del MES, y las inserta periódicamente en la base de datos `datos.db`.

In [8]:
import asyncio
import sqlite3
import logging
import os
import time
from datetime import datetime
from asyncua import Client

# Configuraciones OPC UA
ENDPOINT = "opc.tcp://192.168.0.20:4840"
DB_PATH = "../database/datos.db"
DB_NAME = "MES"

# Nombres de las 12 variables a buscar (Pascal_Case)
VAR_NAMES = [
    "Machine_State", "Heartbeat", "Target_Speed", "Total_Parts_Produced",
    "Parts_OK", "Parts_NOK", "Last_Cycle_Time", "Availability",
    "Performance", "Quality", "Initial_Timestamp", "Final_Timestamp"
]

logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('opcua_sqlite')

### 1. Inicializar Base de Datos
Aseguramos que existe el directorio y creamos la tabla con los 12 campos nuevos.

In [9]:
def init_db():
    os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    # Creamos la tabla adaptada al nuevo esquema (usando los nombres oficiales)
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS mes_data (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            db_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
            Machine_State INTEGER,
            Heartbeat INTEGER,
            Target_Speed REAL,
            Total_Parts_Produced INTEGER,
            Parts_OK INTEGER,
            Parts_NOK INTEGER,
            Last_Cycle_Time DECIMAL(5,3),
            Availability DECIMAL(5,3),
            Performance DECIMAL(5,3),
            Quality DECIMAL(5,3),
            Initial_Timestamp char(50),
            Final_Timestamp char(50)
        )
    ''')
    conn.commit()
    return conn

conn = init_db()
_logger.info(f"Base de datos conectada en {DB_PATH}")

INFO:opcua_sqlite:Base de datos conectada en ../database/datos.db


## Código para eliminar la tabla o truncarla

In [10]:
# os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
# conn = sqlite3.connect(DB_PATH)
# cursor = conn.cursor()

# Creamos la tabla adaptada al nuevo esquema (usando los nombres oficiales)
# cursor.execute('''
#     DROP TABLE mes_data
# ''')

# cursor.execute('''
#     TRUNCATE mes_data
# ''')

### 2. Helper para buscar nodos
Función recursiva para encontrar nodos dentro de la carpeta Objects por su BrowseName.

In [11]:
async def find_node_by_name(client, db_name, var_name, max_depth=5):
    objects = client.nodes.objects
    
    async def walk(node, depth):
        if depth > max_depth: return None
        try: children = await node.get_children()
        except: return None
        
        for child in children:
            try: bname = (await child.read_browse_name()).Name
            except: continue
            
            if bname == db_name:
                db_children = await child.get_children()
                for var in db_children:
                    if (await var.read_browse_name()).Name == var_name:
                        return var
            
            found = await walk(child, depth + 1)
            if found: return found
        return None
    return await walk(objects, 0)

### 3. Bucle Principal Asíncrono OPC UA
Conectamos, localizamos los 12 nodos e insertamos continuamente.

In [None]:
async def main_loop():
    # _logger.info(f"Conectando a {ENDPOINT}...")
    async with Client(url=ENDPOINT) as client:
        # _logger.info("Conectado a OPC UA.")
        
        # 1. Resolver los 12 nodos OPC UA
        # _logger.info("Resolviendo Nodos...")
        nodes = {}
        for vname in VAR_NAMES:
            node = await find_node_by_name(client, DB_NAME, vname)
            if getattr(node, 'nodeid', None):
                nodes[vname] = node
                _logger.info(f"Encontrado {vname}: {node.nodeid}")
            else:
                _logger.error(f"No se encontró la variable: {vname}")
        
        if len(nodes) < len(VAR_NAMES):
            _logger.error("Faltan nodos. Deteniendo el script para revisar la nomenclatura.")
            return
            
        _logger.info("Todos los nodos encontrados. --- INICIANDO LECTURA E INSERCIÓN ---")
        
        cursor = conn.cursor()
        tiempo_inicio = 0
        try:
            while True:
                # 1. Leer todos los valores desde OPC UA
                values = {name: await node.read_value() for name, node in nodes.items()}
                
                # 2. Asignación manual y truncado/redondeo a 3 decimales
                machine_state = values['Heartbeat']
                heartbeat = values['Machine_State']
                target_speed = values['Target_Speed']
                total_parts_produced = values['Total_Parts_Produced']
                parts_ok = values['Parts_OK'] 
                parts_nok = values['Parts_NOK']
                last_cycle_time = round(values['Last_Cycle_Time'], 3)
                availability = round(values['Availability'], 3)
                performance = round(values['Performance'], 3)
                quality =round(values['Quality'], 3)
                initial_timestamp = values['Initial_Timestamp']
                final_timestamp = values['Final_Timestamp']

                # 2. Parsear el string indicando su estructura actual
                objeto_fecha_initial_timestamp = datetime.strptime(initial_timestamp, "%H:%M:%S - %Y-%m-%d")
                objeto_fecha_final_timestamp = datetime.strptime(final_timestamp, "%H:%M:%S - %Y-%m-%d")
                # 3. Reestructurar al formato compatible con SQLite
                fecha_sqlite_initial_timestamp = objeto_fecha_initial_timestamp.strftime("%Y-%m-%d %H:%M:%S")
                fecha_sqlite_final_timestamp = objeto_fecha_final_timestamp.strftime("%Y-%m-%d %H:%M:%S")
                                
                # 4. Empaquetar en una tupla en el MISMO ORDEN que VAR_NAMES
                data_tuple = (
                    machine_state, 
                    heartbeat, 
                    target_speed, 
                    total_parts_produced, 
                    parts_ok,
                    parts_nok,
                    last_cycle_time,
                    availability,
                    performance,
                    quality,
                    fecha_sqlite_initial_timestamp,
                    fecha_sqlite_final_timestamp
                )
                
                # 4. Preparar sentencia SQL dinámica
                placeholders = ", ".join(["?"] * len(VAR_NAMES))
                columns = ", ".join(VAR_NAMES)
                query = f"INSERT INTO mes_data ({columns}) VALUES ({placeholders})"
                
                # 5. Ejecutar la inserción
                cursor.execute(query, data_tuple)
                conn.commit()
                
                # Print amigable usando las variables que ya hemos extraído
                print(f"[SQL Insert] HB:{heartbeat} | ST:{machine_state} | Spd:{target_speed:.1f} | Prod:{total_parts_produced} | OEE:{performance*100:.1f}%")
                
                await asyncio.sleep(1.0) # Ciclo de captura de 1s
                
        except asyncio.CancelledError:
            _logger.info("Lectura detenida por el usuario.")
            
try:
    task = asyncio.create_task(main_loop())
    await task
except KeyboardInterrupt:
    task.cancel()

INFO:asyncua.client.client:connect
INFO:asyncua.client.ua_client.UaClient:opening connection
INFO:asyncua.uaprotocol:updating client limits to: TransportLimits(max_recv_buffer=65535, max_send_buffer=65535, max_chunk_count=0, max_message_size=0)
INFO:asyncua.client.ua_client.UASocketProtocol:open_secure_channel
INFO:asyncua.client.ua_client.UaClient:create_session
INFO:asyncua.client.client:find_endpoint [EndpointDescription(EndpointUrl='opc.tcp://192.168.0.20:4840', Server=ApplicationDescription(ApplicationUri='urn:SIMATIC:PLC_MES', ProductUri='https://www.siemens.com/s7-1500', ApplicationName=LocalizedText(Locale=None, Text='PLC_MES'), ApplicationType_=<ApplicationType.Server: 0>, GatewayServerUri=None, DiscoveryProfileUri=None, DiscoveryUrls=['opc.tcp://192.168.0.20:4840']), ServerCertificate=b'0\x82\x04w0\x82\x02\xdf\xa0\x03\x02\x01\x02\x02\x08\x01\x91*\xf4[\xfe\x85\xb60\r\x06\t*\x86H\x86\xf7\r\x01\x01\x0b\x05\x000\x1a1\x180\x16\x06\x03U\x04\x03\x13\x0fPLC-1/OPCUA-1-20\x1e\x17\r2602

[SQL Insert] HB:0 | ST:308 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:309 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:310 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:311 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:312 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:313 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:314 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:315 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:316 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:317 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:318 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:319 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:320 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:321 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:322 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:323 | Spd:0.0 | Prod:40 | OEE:235.6%
[SQL Insert] HB:0 | ST:324 | Spd:0.0 | P

### 4. Consultar resultados con Pandas

In [None]:
# import pandas as pd

# # Leemos la tabla entera de SQLite y la ponemos en un DataFrame
# df = pd.read_sql_query("SELECT * FROM mes_data ORDER BY id DESC LIMIT 10", conn)

# print("Últimos 10 registros en SQLite:")
# display(df)