In [None]:
import asyncio
from bleak import BleakScanner

async def main():
    """Asynchronous function to scan for Bluetooth devices and print information."""

    print("Searching for devices...")
    try:
        devices = await BleakScanner.discover()  # Asynchronous discovery
        for d in devices:
            print(d)  # Print device details (may include address, name, RSSI)
    except BleakError as e:  # Handle potential errors gracefully
        print(f"Error occurred during scan: {e}")

await main()

In [None]:
"""import asyncio
from bleak import BleakClient

address = "24:71:89:cc:09:05"
MODEL_NBR_UUID = "2A24"

async def main():
    global
    async with BleakClient(address) as client:
        for d in devices:
            print(d)
            model_number = await client.read_gatt_char(MODEL_NBR_UUID)
            print("Model Number: {0}".format("".join(map(chr, model_number))))

asyncio.run(main(address))
"""

In [None]:
import asyncio
from bleak import BleakClient, BleakScanner, BLEDevice
from bleak.backends.characteristic import BleakGATTCharacteristic

async def explore_gatt(address):
    print("ok1")
    async with BleakClient(address) as client:
        
        print("ok2")
        # Lista todos los servicios disponibles
        services = await client.get_services()
        for service in services:
            print(f"Servicio: {service.uuid}")
            for characteristic in service.characteristics:
                print(f"  Característica: {characteristic.uuid}")
                # Opcional: intenta leer la característica si es posible
                if "read" in characteristic.properties:
                    for _ in range(3):  # Intentar hasta 3 veces
                        try:
                            value = await client.read_gatt_char(characteristic.uuid)
                            print(f"    Valor leído: {value}")
                            break
                        except Exception as e:
                            print(f"    Intento fallido de lectura en {characteristic.uuid}: {e}")
                            await asyncio.sleep(1)  # Espera antes del siguiente intento

# Dirección MAC del dispositivo
address = "EC:81:93:7A:D5:CD"
await explore_gatt(address)


In [None]:
import asyncio

async def explore_gatt(address):
    print("ok1")
    async with BleakClient(address) as client:
        
        print("ok2")
        # Lista todos los servicios disponibles
        services = await client.get_services()
        for service in services:
            print(f"Servicio: {service.uuid}")
            for characteristic in service.characteristics:
                print(f"  Característica: {characteristic.uuid}")
                # Opcional: intenta leer la característica si es posible
                if "read" in characteristic.properties:
                    for _ in range(3):  # Intentar hasta 3 veces
                        try:
                            value = await client.read_gatt_char(characteristic.uuid)
                            print(f"    Valor leído: {value}")
                            break
                        except Exception as e:
                            print(f"    Intento fallido de lectura en {characteristic.uuid}: {e}")
                            await asyncio.sleep(1)  # Espera antes del siguiente intento

# Dirección MAC del dispositivo
address = "A8:80:55:AA:88:C4"
await explore_gatt(address)
                    

In [None]:
import asyncio
from bleak import BleakClient

# Función para manejar las notificaciones de características
async def notification_handler(sender, data):
    print(f"Notificación de {sender}: {data}")

# Función principal para explorar servicios y características
async def explore_device(address):
    async with BleakClient(address) as client:
        # Verificar conexión
        if client.is_connected:
            print(f"Conectado a {address}")
            
            # Explorar servicios y características
            for service in client.services:
                print(f"Servicio: {service.uuid}")
                for characteristic in service.characteristics:
                    print(f"  Característica: {characteristic.uuid}")
                    print(f"    Propiedades: {characteristic.properties}")
                    
                    # Intentar leer el valor si es legible
                    if "read" in characteristic.properties:
                        for attempt in range(3):  # Intentar hasta 3 veces en caso de error
                            try:
                                value = await client.read_gatt_char(characteristic.uuid)
                                print(f"    Valor leído: {value}")
                                break  # Salir del loop si la lectura es exitosa
                            except Exception as e:
                                print(f"    Error al leer {characteristic.uuid}, intento {attempt+1}: {e}")
                                await asyncio.sleep(1)  # Espera antes del siguiente intento
                    else:
                        print("    Valor leído: No legible")

                    # Intentar suscribirse a notificaciones si está permitido
                    if "notify" in characteristic.properties:
                        try:
                            await client.start_notify(characteristic.uuid, notification_handler)
                            print(f"    Suscrito a notificaciones de {characteristic.uuid}")
                        except Exception as e:
                            print(f"    Error al suscribirse a notificaciones en {characteristic.uuid}: {e}")
        
        else:
            print(f"No se pudo conectar a {address}")

# Dirección MAC del dispositivo
address = "A8:80:55:AA:88:C4"
await explore_device(address)


In [1]:
import re
import struct 
import logging
import asyncio
from datetime import datetime
from bleak import BleakClient, BleakScanner, BLEDevice
from bleak.backends.characteristic import BleakGATTCharacteristic

# Configurar logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)



class BLEInteractor:
    def __init__(self):
        self.devices = []
        self.client = None
        self.max_retries = 3
        self.retry_delay = 2
        
        # UUIDs comunes
        self.BATTERY_SERVICE_UUID = "0000180f-0000-1000-8000-00805f9b34fb"
        self.BATTERY_LEVEL_UUID = "00002a19-0000-1000-8000-00805f9b34fb"
        self.MANUFACTURER_NAME_UUID = "00002a29-0000-1000-8000-00805f9b34fb"
        self.MODEL_NUMBER_UUID = "00002a24-0000-1000-8000-00805f9b34fb"
        self.CURRENT_TIME_UUID = "00002a2b-0000-1000-8000-00805f9b34fb"
        
        # Callbacks para notificaciones
        self.notification_callbacks = {}
        
    @staticmethod
    def format_mac_address(address):
        """
        Convierte cualquier formato de dirección MAC al formato correcto para BLE
        Ejemplo: '79-27-7B-7C-AC-91' -> '79:27:7B:7C:AC:91'
        """
        clean_address = address.replace("-",":").replace("_",":")
        # Agrupa en pares y une con ':'
        return clean_address

    async def scan_devices(self, timeout=5.0):
        """Escanea dispositivos BLE cercanos y devuelve la lista ordenada por intensidad de conexión (RSSI)."""

        try:
            logger.info("Buscando dispositivos BLE cercanos...")
            
            # Realiza el escaneo de dispositivos, obteniendo la lista de dispositivos
            devices = await BleakScanner.discover(timeout=timeout)

            # Crea una lista de tuplas (dispositivo, rssi) y filtra solo los que tienen RSSI disponible
            devices_with_rssi = [(device, device.rssi) for device in devices if device.rssi is not None]

            # Ordena la lista por la intensidad de conexión (RSSI) de mayor a menor
            sorted_devices = sorted(devices_with_rssi, key=lambda x: x[1], reverse=True)

            # Devuelve solo los dispositivos, ignorando el RSSI
            self.devices = [device[0] for device in sorted_devices]

            return self.devices
        except Exception as e:
            logger.error(f"Error durante el escaneo: {e}")
            return []

    def display_devices(self):
        """Muestra los dispositivos encontrados."""
        if not self.devices:
            logger.info("No se encontraron dispositivos.")
            return

        print("\nDispositivos encontrados:")
        for idx, device in enumerate(self.devices):
            display_str = f"\nDEVICE {idx}:\nName - {device.name}\nAddress - {device.address}\nDetails - {device.details}\nRSSI - {device._rssi}\nMetadata - {device._metadata}"
            print(display_str)

    async def connect_to_device(self, device):
        """Conecta al dispositivo BLE seleccionado con reintentos."""
        for attempt in range(self.max_retries):
            try:
                # Use the device address directly, don't format it
                logger.info(f"Intentando conectar a {device.address} (Intento {attempt + 1})")
                async with BleakClient(device.address) as client:
                    return client
            except Exception as e:
                logger.error(f"Intento {attempt + 1} fallido: {str(e)}")
                await asyncio.sleep(self.retry_delay)  # Esperar antes de reintentar

        raise ConnectionError(f"No se pudo conectar al dispositivo después de {self.max_retries} intentos")

    async def connect(self, device):
        """Conecta al dispositivo y guarda el cliente para futuras interacciones."""
        self.client = await self.connect_to_device(device)
        

    async def read_battery_level(self):
        """Lee el nivel de batería del dispositivo."""
        try:
            value = await self.client.read_gatt_char(self.BATTERY_LEVEL_UUID)
            return int(value[0])
        except Exception as e:
            logger.error(f"Error al leer nivel de batería: {e}")
            return None

    def battery_level_notification_handler(self, sender, data):
        """Manejador para notificaciones de cambio de batería."""
        battery_level = int(data[0])
        logger.info(f"Nivel de batería actualizado: {battery_level}%")

    async def subscribe_to_battery_notifications(self):
        """Suscribe a notificaciones de cambios en el nivel de batería."""
        try:
            await self.client.start_notify(
                self.BATTERY_LEVEL_UUID, 
                self.battery_level_notification_handler
            )
            logger.info("Suscrito a notificaciones de batería")
        except Exception as e:
            logger.error(f"Error al suscribirse a notificaciones de batería: {e}")

    async def read_device_info(self):
        """Lee información básica del dispositivo."""
        try:
            manufacturer = await self.client.read_gatt_char(self.MANUFACTURER_NAME_UUID)
            model = await self.client.read_gatt_char(self.MODEL_NUMBER_UUID)
            return {
                "manufacturer": manufacturer.decode(),
                "model": model.decode()
            }
        except Exception as e:
            logger.error(f"Error al leer información del dispositivo: {e}")
            return None

    async def read_current_time(self):
        """Lee el tiempo actual del dispositivo."""
        try:
            value = await self.client.read_gatt_char(self.CURRENT_TIME_UUID)
            # Decodifica el formato de tiempo según la especificación BLE
            year, month, day, hours, minutes, seconds = struct.unpack('<HBBBBB', value[:7])
            return datetime(year, month, day, hours, minutes, seconds)
        except Exception as e:
            logger.error(f"Error al leer tiempo actual: {e}")
            return None

    async def send_media_command(self, command):
        """Envía un comando multimedia al dispositivo."""
        MEDIA_REMOTE_COMMAND_UUID = "9b3c81d8-57b1-4a8a-b8df-0e56f7ca51c2"
        commands = {
            'play': b'\x01',
            'pause': b'\x02',
            'next': b'\x03',
            'previous': b'\x04',
            'volume_up': b'\x05',
            'volume_down': b'\x06'
        }
        
        if command not in commands:
            logger.error(f"Comando no válido: {command}")
            return False
            
        try:
            await self.client.write_gatt_char(
                MEDIA_REMOTE_COMMAND_UUID,
                commands[command],
                response=True
            )
            return True
        except Exception as e:
            logger.error(f"Error al enviar comando multimedia: {e}")
            return False

    async def get_all_characteristics(self):
        """Obtiene todas las características disponibles del dispositivo."""
        try:
            services = await self.client.get_services()
            all_characteristics = {}
            
            for service in services:
                service_chars = []
                for char in service.characteristics:
                    char_info = {
                        "uuid": str(char.uuid),
                        "description": char.description,
                        "properties": [str(prop) for prop in char.properties]
                    }
                    service_chars.append(char_info)
                
                all_characteristics[str(service.uuid)] = {
                    "description": service.description,
                    "characteristics": service_chars
                }
            
            return all_characteristics
        except Exception as e:
            logger.error(f"Error al obtener características: {e}")
            return None


async def interactive_session():
    """Sesión interactiva para probar diferentes funcionalidades."""
    ble = BLEInteractor()
    
    # Escanear y conectar
    devices = await ble.scan_devices()
    if not devices:
        return
    
    ble.display_devices()
    print("ole")
    
    try:
        idx = int(input("\nSelecciona el índice del dispositivo: "))
        # idx = 1
        device = ble.devices[idx]
        await ble.connect(device)
    except Exception as e:
        logger.error(f"Error al conectar: {e}")
        return

    # Menú interactivo
    while True:
        print("\nOpciones disponibles:")
        print("1. Leer nivel de batería")
        print("2. Suscribirse a notificaciones de batería")
        print("3. Leer información del dispositivo")
        print("4. Leer tiempo actual")
        print("5. Enviar comando multimedia")
        print("6. Ver todas las características")
        print("7. Salir")
        
        choice = input("\nSelecciona una opción: ")
        
        if choice == "1":
            level = await ble.read_battery_level()
            if level is not None:
                print(f"Nivel de batería: {level}%")
        
        elif choice == "2":
            await ble.subscribe_to_battery_notifications()
            print("Suscrito a notificaciones de batería. Las actualizaciones aparecerán en el log.")
        
        elif choice == "3":
            info = await ble.read_device_info()
            if info:
                print(f"Fabricante: {info['manufacturer']}")
                print(f"Modelo: {info['model']}")
        
        elif choice == "4":
            time = await ble.read_current_time()
            if time:
                print(f"Tiempo del dispositivo: {time}")
        
        elif choice == "5":
            print("\nComandos disponibles:")
            print("play, pause, next, previous, volume_up, volume_down")
            cmd = input("Introduce el comando: ")
            success = await ble.send_media_command(cmd)
            if success:
                print("Comando enviado exitosamente")
        
        elif choice == "6":
            characteristics = await ble.get_all_characteristics()
            if characteristics:
                print("\nCaracterísticas disponibles:")
                for service_uuid, service_info in characteristics.items():
                    print(f"\nServicio: {service_uuid}")
                    print(f"Descripción: {service_info['description']}")
                    print("Características:")
                    for char in service_info['characteristics']:
                        print(f"  UUID: {char['uuid']}")
                        print(f"  Descripción: {char['description']}")
                        print(f"  Propiedades: {', '.join(char['properties'])}")
        
        elif choice == "7":
            if ble.client and ble.client.is_connected:
                await ble.client.disconnect()
            break
        
await interactive_session()

2024-11-03 22:59:49,624 - INFO - Buscando dispositivos BLE cercanos...
  devices_with_rssi = [(device, device.rssi) for device in devices if device.rssi is not None]



Dispositivos encontrados:

DEVICE 0:
Name - D4-18-5F-BB-A6-5E
Address - D4:18:5F:BB:A6:5E
Details - {'path': '/org/bluez/hci0/dev_D4_18_5F_BB_A6_5E', 'props': {'Address': 'D4:18:5F:BB:A6:5E', 'AddressType': 'random', 'Alias': 'D4-18-5F-BB-A6-5E', 'Paired': False, 'Bonded': False, 'Trusted': False, 'Blocked': False, 'LegacyPairing': False, 'RSSI': -74, 'Connected': False, 'UUIDs': [], 'Adapter': '/org/bluez/hci0', 'ManufacturerData': {76: bytearray(b'\x12\x02\x00\x03')}, 'ServicesResolved': False}}
RSSI - -74
Metadata - {'uuids': [], 'manufacturer_data': {76: b'\x12\x02\x00\x03'}}

DEVICE 1:
Name - 7C-02-BB-36-74-AE
Address - 7C:02:BB:36:74:AE
Details - {'path': '/org/bluez/hci0/dev_7C_02_BB_36_74_AE', 'props': {'Address': '7C:02:BB:36:74:AE', 'AddressType': 'random', 'Alias': '7C-02-BB-36-74-AE', 'Paired': False, 'Bonded': False, 'Trusted': False, 'Blocked': False, 'LegacyPairing': False, 'RSSI': -82, 'Connected': False, 'UUIDs': [], 'Adapter': '/org/bluez/hci0', 'ManufacturerData': {

2024-11-03 23:00:09,651 - INFO - Intentando conectar a 43:A2:49:54:C5:14 (Intento 1)
2024-11-03 23:00:19,763 - ERROR - Intento 1 fallido: Device with address 43:A2:49:54:C5:14 was not found.
2024-11-03 23:00:21,765 - INFO - Intentando conectar a 43:A2:49:54:C5:14 (Intento 2)
2024-11-03 23:00:36,816 - ERROR - Intento 2 fallido: 
2024-11-03 23:00:38,819 - INFO - Intentando conectar a 43:A2:49:54:C5:14 (Intento 3)


CancelledError: 