# Script para monitorear

In [33]:
from pprint import pprint
import json
import requests
from requests.auth import HTTPBasicAuth
import base64
from typing import List, Dict, Optional
from datetime import datetime
import logging
from dataclasses import dataclass
import warnings
import urllib3
import os

# Suprimir las advertencias de SSL
# urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

### Datos generales

In [34]:
## datos generales
# Ruta al archivo JSON
#archivo_json = '../EndPointsAPIS/EndPoints/endpointsS1_uno.json'
archivo_json = '../EndPointsAPIS/EndPoints/endpointsS1.json'
directorio_salida_tokens ='/tokens_obtenidos/'

### Funciones de python

#### Obtener el token

In [35]:
# Configurar logging para Jupyter
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

@dataclass
class OAuthConfig:
    supplier_id: str
    token_url: str
    username: str
    password: str
    client_id: str
    client_secret: str
    scope: str

In [36]:
def read_json_file(file_path: str) -> List[Dict]:
    """Lee y retorna el contenido de un archivo JSON"""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    except Exception as e:
        logger.error(f"Error leyendo archivo JSON: {str(e)}")
        raise

def filter_active_configs(configs: List[Dict]) -> List[Dict]:
    """Filtra las configuraciones activas"""
    return [config for config in configs if config['status'] == 'ACTIVE']

def create_oauth_config(config_dict: Dict) -> OAuthConfig:
    """Crea un objeto OAuthConfig a partir de un diccionario"""
    return OAuthConfig(
        supplier_id=config_dict['supplier_id'],
        token_url=config_dict['token_url'],
        username=config_dict['username'],
        password=config_dict['password'],
        client_id=config_dict['client_id'],
        client_secret=config_dict['client_secret'],
        scope=config_dict['scope']
    )

def get_oauth_token(config: OAuthConfig) -> Dict:
    """
    Obtiene un token OAuth usando dos métodos específicos:
    1. Autenticación Basic en header
    2. Credenciales en el body
    
    Args:
        config (OAuthConfig): Configuración OAuth
    
    Returns:
        Dict: Token y metadatos relacionados
    """
    try:
        # Datos base que son comunes para ambos métodos
        base_data = {
            'grant_type': 'password',
            'username': config.username,
            'password': config.password,
            'scope': config.scope
        }
        
        # Método 1: Basic Auth en header
        def try_basic_auth():
            # Crear el token Basic Auth
            credentials = f"{config.client_id}:{config.client_secret}"
            basic_token = base64.b64encode(credentials.encode()).decode()
            
            headers = {
                'Authorization': f'Basic {basic_token}',
                'Content-Type': 'application/x-www-form-urlencoded'
            }
            
            response = requests.post(
                config.token_url,
                data=base_data,
                headers=headers,
                verify=False
            )
            return response
            
        # Método 2: Todo en el body
        def try_body_auth():
            headers = {
                'Content-Type': 'application/x-www-form-urlencoded'
            }
            
            # Agregar client_id y client_secret al body
            body_data = {
                **base_data,
                'client_id': config.client_id,
                'client_secret': config.client_secret
            }
            
            response = requests.post(
                config.token_url,
                data=body_data,
                headers=headers,
                verify=False
            )
            return response
        
        # Intentar primer método (Basic Auth en header)
        response = try_basic_auth()
        if response.status_code == 200:
            token_data = response.json()
            token_data['timestamp'] = datetime.now().isoformat()
            logger.info(f"Token obtenido exitosamente para {config.supplier_id} usando Basic Auth")
            return token_data
            
        # Si el primer método falla, intentar el segundo (credenciales en body)
        response = try_body_auth()
        if response.status_code == 200:
            token_data = response.json()
            token_data['timestamp'] = datetime.now().isoformat()
            logger.info(f"Token obtenido exitosamente para {config.supplier_id} usando Body Auth")
            return token_data
        
        # Si ambos métodos fallan, lanzar error con detalles
        error_msg = f"Falló la autenticación. Último código de estado: {response.status_code}"
        try:
            error_detail = response.json()
            error_msg += f". Detalle: {error_detail}"
        except:
            error_msg += f". Respuesta: {response.text}"
            
        raise Exception(error_msg)
            
    except Exception as e:
        logger.error(f"Error obteniendo token para {config.supplier_id}: {str(e)}")
        raise

def process_oauth_configs(configs: List[Dict]) -> Dict[str, Dict]:
    """Procesa una lista de configuraciones OAuth y obtiene tokens para cada una"""
    results = {}
    
    for config_dict in configs:
        try:
            oauth_config = create_oauth_config(config_dict)
            token_data = get_oauth_token(oauth_config)
            results[oauth_config.supplier_id] = token_data
            logger.info(f"Token obtenido exitosamente para {oauth_config.supplier_id}")
        except Exception as e:
            logger.error(f"Error procesando {config_dict.get('supplier_id', 'unknown')}: {str(e)}")
            results[config_dict.get('supplier_id', 'unknown')] = {
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }
    
    return results

def save_tokens_to_file(tokens: Dict[str, Dict], prefix: str = "tokens") -> str:
    """Guarda los tokens en un archivo JSON"""
    directorio_salida = "tokens_obtenidos"
    os.makedirs(directorio_salida, exist_ok=True)
    
    output_file = os.path.join(directorio_salida, f'{prefix}.json')
    try:
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(tokens, f, indent=2)
        logger.info(f"Tokens guardados en {output_file}")
        return output_file
    except Exception as e:
        logger.error(f"Error guardando tokens: {str(e)}")
        raise

#### Consultar el endpoint del api

In [37]:
def call_api_with_token(config: Dict, token_data: Dict) -> Dict:
    """
    Llama a la API usando el token obtenido
    """
    try:
        headers = {
            'Authorization': f"{token_data.get('token_type', 'Bearer')} {token_data.get('access_token')}",
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        }
        
        response = requests.get(
            config['url'],
            headers=headers,
            verify=False
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        logger.error(f"Error llamando API {config['url']}: {str(e)}")
        return {"error": str(e)}

def process_all_apis(configs: List[Dict], tokens: Dict[str, Dict]) -> Dict[str, Dict]:
    """
    Procesa todas las APIs con sus respectivos tokens
    """
    results = {}
    
    for config in configs:
        supplier_id = config['supplier_id']
        try:
            if supplier_id in tokens and 'token_data' in tokens[supplier_id]:
                api_response = call_api_with_token(config, tokens[supplier_id]['token_data'])
                results[supplier_id] = {
                    'data': api_response,
                    'status': 'success',
                    'timestamp': datetime.now().isoformat()
                }
            else:
                logger.warning(f"No se encontró token para {supplier_id}")
                results[supplier_id] = {
                    'error': 'Token no encontrado',
                    'status': 'error',
                    'timestamp': datetime.now().isoformat()
                }
        except Exception as e:
            results[supplier_id] = {
                'error': str(e),
                'status': 'error',
                'timestamp': datetime.now().isoformat()
            }
    
    # Guardar resultados
    output_file = os.path.join('api_responses', f'responses_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
    os.makedirs('api_responses', exist_ok=True)
    
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(results, f, indent=2)
    
    return results

### Main del proceso

##### Obteniendo el token

In [38]:
def process_configs(configs: List[Dict]) -> Dict[str, Dict]:
    """Procesa configuraciones y obtiene tokens"""
    active_configs = filter_active_configs(configs)
    logger.info(f"Configuraciones activas encontradas: {len(active_configs)}")
    
    tokens = process_oauth_configs(active_configs)
    return tokens

In [39]:
# Procesar configuraciones y obtener tokens
configs = read_json_file('../EndPointsAPIS/EndPoints/endpointsS1_uno.json')
tokens = process_configs(configs)


# Guardar tokens (opcional)
output_file = save_tokens_to_file(tokens)
print(f"Tokens guardados en: {output_file}")
pprint(tokens)

INFO:__main__:Configuraciones activas encontradas: 4
INFO:__main__:Token obtenido exitosamente para SESEA_AGUASCALIENTES usando Basic Auth
INFO:__main__:Token obtenido exitosamente para SESEA_AGUASCALIENTES
INFO:__main__:Token obtenido exitosamente para SESEA_JALISCO usando Basic Auth
INFO:__main__:Token obtenido exitosamente para SESEA_JALISCO
INFO:__main__:Token obtenido exitosamente para GUANAJUATO usando Basic Auth
INFO:__main__:Token obtenido exitosamente para GUANAJUATO
INFO:__main__:Token obtenido exitosamente para EDOMEX usando Basic Auth
INFO:__main__:Token obtenido exitosamente para EDOMEX
INFO:__main__:Tokens guardados en tokens_obtenidos/tokens.json


Tokens guardados en: tokens_obtenidos/tokens.json
{'EDOMEX': {'access_token': '111fbbf3-ccfb-4135-998e-17a42877db53',
            'expires_in': 360,
            'refresh_token': '60cf8ea7-1a9c-4b22-b800-25cebc60ac80',
            'refresh_token_expires_in': 720,
            'scope': 'read',
            'timestamp': '2025-02-05T17:59:26.357673',
            'token_type': 'bearer'},
 'GUANAJUATO': {'access_token': 'eyJhbGciOiJSUzI1NiIsImtpZCI6IkxiQkhHVGJNLUpLdlRwRV80YVlGaEEiLCJ0eXAiOiJhdCtqd3QifQ.eyJuYmYiOjE3Mzg3OTk5NjUsImV4cCI6MTczODgwMDU2NSwiaXNzIjoiaHR0cHM6Ly9wbGF0YWZvcm1hZGlnaXRhbC5ndWFuYWp1YXRvLmdvYi5teC9pbmRlbnRpZnlzZXJ2ZXIiLCJhdWQiOiJteXJlc291cmNlYXBpIiwiY2xpZW50X2lkIjoic2VjcmV0X2NsaWVudF9pZCIsInN1YiI6IjEiLCJhdXRoX3RpbWUiOjE3Mzg3OTk5NjUsImlkcCI6ImxvY2FsIiwicm9sZVR5cGUiOiJDYW5SZWFkZGF0YSIsInNjb3BlIjpbImFwaWRlY2xhcmFjaW9uIl0sImFtciI6WyJwd2QiXX0.has8m8zmca38xUfnYkXwgLOt2kMeDjiVoOQmSpO0UYitk8uxku3KDuUz_DWKxYLJltfIlpgriamN7zkrlf9wlTgWqfcGCmo61Um7OLzgfAO4ykFXT27ggHDSBuByDtFf49CtFCOi1Sha

##### Obteniendo los datos del endpoint

In [40]:
# Primero obtén los tokens
tokens = process_configs(configs)

# Luego llama a las APIs
api_results = process_all_apis(configs, tokens)

# Ver resultados
pprint(api_results)

INFO:__main__:Configuraciones activas encontradas: 4
INFO:__main__:Token obtenido exitosamente para SESEA_AGUASCALIENTES usando Basic Auth
INFO:__main__:Token obtenido exitosamente para SESEA_AGUASCALIENTES
INFO:__main__:Token obtenido exitosamente para SESEA_JALISCO usando Basic Auth
INFO:__main__:Token obtenido exitosamente para SESEA_JALISCO
INFO:__main__:Token obtenido exitosamente para GUANAJUATO usando Basic Auth
INFO:__main__:Token obtenido exitosamente para GUANAJUATO
INFO:__main__:Token obtenido exitosamente para EDOMEX usando Basic Auth
INFO:__main__:Token obtenido exitosamente para EDOMEX


{'EDOMEX': {'error': 'Token no encontrado',
            'status': 'error',
            'timestamp': '2025-02-05T17:59:28.370600'},
 'GUANAJUATO': {'error': 'Token no encontrado',
                'status': 'error',
                'timestamp': '2025-02-05T17:59:28.370185'},
 'SESEA_AGUASCALIENTES': {'error': 'Token no encontrado',
                          'status': 'error',
                          'timestamp': '2025-02-05T17:59:28.368802'},
 'SESEA_JALISCO': {'error': 'Token no encontrado',
                   'status': 'error',
                   'timestamp': '2025-02-05T17:59:28.369495'}}
