In [2]:
from pydantic import BaseModel, model_serializer

In [3]:
from centraal_client_flow.models.schemas import EntradaEsquemaUnificado, IDModel
from typing import List

class Venta(BaseModel):
    identificador: str
    producto: str


class Ventas(BaseModel):
    ventas: List[Venta]

class ClienteHacebID(IDModel):
    clase_doc : int
    num_doc: int

    @model_serializer
    def serialize_as_str(self) -> str:
        """Metodo para serializar el id."""
        return f"{self.clase_doc}{self.separator}{self.num_doc}"

class Maestra(BaseModel):
    cdc: str
    ciuidad: str

class ClienteHaceb(EntradaEsquemaUnificado):

    id: ClienteHacebID
    venta: Ventas
    maestra: Maestra
 



In [4]:
import json
from typing import Type, List, Tuple, Optional, Any
from abc import ABC, abstractmethod

from azure.functions import Blueprint, ServiceBusMessage
from azure.servicebus import ServiceBusMessage as SBMessage
from pydantic import BaseModel, ValidationError

from centraal_client_flow.connections.cosmosdb import CosmosDBSingleton
from centraal_client_flow.connections.service_bus import IServiceBusClient
from centraal_client_flow.models.schemas import (
    EntradaEsquemaUnificado,
    IDModel,
    EventoBase,
    AuditoriaEntry,
)

In [29]:
topics = {"root", "venta", "maestr"}

In [31]:
s  =set( ClienteHaceb.model_fields)
for t in topics:
    if t == "root":
        pass
    elif t not in s:
        raise ValueError(f"El topic {t} debe corresponder a un subesquema {s}")

ValueError: El topic maestr debe corresponder a un subesquema {'venta', 'maestra', 'id'}

In [18]:
topics.issubset(s)

False

In [26]:
cliente = ClienteHaceb.model_validate(
    {
        "id": {"clase_doc": 23, "num_doc": 1032446156},
        "venta": {"ventas": [{"identificador": "venta-1", "producto": "prod123"}]},
        "maestra" : {"cdc": "si", "ciuidad":"rionegro"}
    }
)

In [27]:
new_cliente = ClienteHaceb.model_validate(
    {
        "id": {"clase_doc": 23, "num_doc": 1032446156},
        "venta": {"ventas": [{"identificador": "venta-1", "producto": "prod123"}, {"identificador": "venta-2", "producto": "prod124"}]},
        "maestra" : {"cdc": "no", "ciuidad":"rionegro"}
    }
)

In [29]:
current_data = None
updated_data = cliente

In [32]:
[c.model_dump(mode="json") for c in detect_changes(None, cliente)]

[{'subesquema': 'venta',
  'campo': 'ventas',
  'new_value': [{'identificador': 'venta-1', 'producto': 'prod123'}],
  'old_value': None,
  'fecha_evento': '2024-08-16T03:04:43.884759Z'},
 {'subesquema': 'id',
  'campo': 'id',
  'new_value': '23-1032446156',
  'old_value': None,
  'fecha_evento': '2024-08-16T03:04:43.884776Z'},
 {'subesquema': 'maestra',
  'campo': 'ciuidad',
  'new_value': 'rionegro',
  'old_value': None,
  'fecha_evento': '2024-08-16T03:04:43.884781Z'},
 {'subesquema': 'maestra',
  'campo': 'cdc',
  'new_value': 'si',
  'old_value': None,
  'fecha_evento': '2024-08-16T03:04:43.884784Z'}]

In [33]:
[c.model_dump(mode="json") for c in detect_changes(cliente, new_cliente)]

[{'subesquema': 'venta',
  'campo': 'ventas',
  'new_value': [{'identificador': 'venta-1', 'producto': 'prod123'},
   {'identificador': 'venta-2', 'producto': 'prod124'}],
  'old_value': [{'identificador': 'venta-1', 'producto': 'prod123'}],
  'fecha_evento': '2024-08-16T03:04:57.684827Z'},
 {'subesquema': 'maestra',
  'campo': 'cdc',
  'new_value': 'no',
  'old_value': 'si',
  'fecha_evento': '2024-08-16T03:04:57.684848Z'}]

Creacion de eventos

In [None]:
from centraal_client_flow.events.timer import TimerBase

def custom_extract_data():
    # Lógica específica para extracción de datos
    return {"key": "value"}

def custom_determine_session_id(event_data):
    # Lógica específica para determinar el session ID
    return "custom_session_id"

In [None]:
schedule = "0 */5 * * * *"  # Cada 5 minutos
queue_name = "example_queue"
connection_str = "your_connection_string"

timer = TimerBase(
    schedule=schedule, queue_name=queue_name, connection_str=connection_str
)
bp = timer.create_blueprint(
    extract_data=custom_extract_data, determine_session_id=custom_determine_session_id
)

In [None]:
from centraal_client_flow.events.receiver import ReceiverBase, HttpRequest, HttpResponse


def custom_process_event(event_data):
    # Lógica específica para este evento
    event_data["processed"] = True


def custom_validate_event(req: HttpRequest):
    # Validación específica para este evento
    if not req.get_json():
        return HttpResponse("Invalid event data", status_code=400)
    return None


def custom_log_event(req: HttpRequest):
    # Logging específico para este evento
    print(f"Custom log: {req.get_json()}")


# Crear la instancia de ReceiverBase
event_source = "example_source"
queue_name = "example_queue"
connection_str = "your_connection_string"

receiver = ReceiverBase(
    event_source=event_source, queue_name=queue_name, connection_str=connection_str
)
bp = receiver.create_blueprint(
    process_event=custom_process_event,
    validate_event=custom_validate_event,
    log_event=custom_log_event,
)

# Al finalizar, cerrar la conexión del Singleton
receiver.close()

Definiciones de reglas de procesamiento

In [None]:
from centraal_client_flow.rules.update.processor import ProcessorBase

def custom_process_message(data):
    # Lógica específica para el procesamiento del mensaje
    processed_data = {"id": "1", "name": data}
    return processed_data

def custom_prepare_output(processed_data):
    # Lógica específica para preparar la salida (por ejemplo, agregar metadata)
    processed_data["metadata"] = "additional information"
    return processed_data

def custom_post_process(processed_data):
    # Lógica específica después del procesamiento
    print(f"Post-process: {processed_data}")

queue_name = "example_queue"
connection_str = "your_connection_string"
topic_names = ["example_topic_1", "example_topic_2"]
cosmos_container_name = "your_container_name"

processor = ProcessorBase(queue_name=queue_name, connection_str=connection_str, topic_names=topic_names, cosmos_container_name=cosmos_container_name)
bp = processor.create_blueprint(process_message=custom_process_message, prepare_output=custom_prepare_output, post_process=custom_post_process)

# Al finalizar, cerrar la conexión del Singleton
processor.close()


Reglas de integración

In [None]:
from centraal_client_flow.rules.integration.processor import IntegrationBase
from centraal_client_flow.rules.integration.strategy import RESTIntegration


def custom_post_integration(message_body):
    # Lógica específica después de la integración
    print(f"Post-integration: {message_body}")


integration = IntegrationBase(
    topic_name="example_topic",
    connection_str="your_connection_string",
    subscription_name="example_subscription",
)
bp = integration.create_blueprint(
    integration_strategy=RESTIntegration(), post_integration=custom_post_integration
)