[<img src="../assets/images/Logo.png" width="64" align="center">](https://github.com/a-fuchs/jupyter-micropython-extended-kernel) &emsp;  [README](../README.md) / [NoteBooks](./Index.ipynb) | **BLE REPL**

---

# BLE REPL


If all is set up, a connection to the controller could be established with the apropriate port, for example:

```
%bleconnect --name='ESP32-fuh-000' --address='94:B9:7E:C0:A2:12'
```

---
## Information

The Nordic UART Service (NUS) Application is an example that emulates a serial port over BLEBase.

The application includes one service: the Nordic UART Service. The 128-bit vendor-specific UUID of the Nordic UART Service is 6E400001-B5A3-F393-E0A9-E50E24DCCA9E (16-bit offset: 0x0001).

This service exposes two characteristics: one for transmitting and one for receiving (as seen from the peer).

---
## Copy moduls to the controller
---
### class BLE, class REPLManager


In [32]:
%serialconnect

[34m
Closing BLE device: ESP32-fuh-000 @ 78:21:84:99:8E:D6
[0m[34mConnecting to --port=/dev/ttyUSB3 --baud=115200 [0m
[32m
 ** Connected to SerialPort: port=/dev/ttyUSB3 baudrate=115200 **

[0mService Discovery has not been performed yet
[34mReady.
[0m

In [33]:
%sendtofile "/replmanager.py"
#%%writefile replmanager.py

from micropython import const
from binascii import hexlify
import struct
import bluetooth
import machine
import time
import sys
import io
import os


class BLE:
    # Advertising abytePayloads are repeated packets of the following form:
    #   1 byte data length (N + 1)
    #   1 byte type (see constants below)
    #   N bytes type-specific data

    # @see Helper    
    ADV_TYPE_FLAGS            = const(0x01)
    ADV_TYPE_NAME             = const(0x09)
    ADV_TYPE_UUID16_COMPLETE  = const(0x03)
    ADV_TYPE_UUID32_COMPLETE  = const(0x05)
    ADV_TYPE_UUID128_COMPLETE = const(0x07)
    # ADV_TYPE_UUID16_MORE      = const(0x02) # @unused
    # ADV_TYPE_UUID32_MORE      = const(0x04) # @unused
    # ADV_TYPE_UUID128_MORE     = const(0x06) # @unused
    ADV_TYPE_APPEARANCE       = const(0x19)
    
    ADV_APPEARANCE_GENERIC_COMPUTER = const(128) # org.bluetooth.characteristic.gap.appearance.xml

    
    #==========================================================================

    # FLAG_WRITE  = const(0x0008) # @unused
    # FLAG_NOTIFY = const(0x0010) # @unused
    
    #==========================================================================

    # @see UARTServer and UARTClient
    
    UART_UUID =   bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")  # the Nordic UART Service. The 128-bit vendor-specific UUID of the Nordic UART Service is 6E400001-B5A3-F393-E0A9-E50E24DCCA9E (16-bit offset: 0x0001)
    UART_TX   = ( bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"), bluetooth.FLAG_READ  | bluetooth.FLAG_WRITE | bluetooth.FLAG_NOTIFY | bluetooth.FLAG_INDICATE )
    UART_RX   = ( bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"), bluetooth.FLAG_READ  | bluetooth.FLAG_WRITE | bluetooth.FLAG_NOTIFY | bluetooth.FLAG_INDICATE )
    
    UART_SERVICE = ( UART_UUID, ( UART_TX, UART_RX ) )
    

    #@fuh DEVICE_FILEIO_SERV_UUID = bluetooth.UUID("E0B9145A-D949-49E4-9DC1-54236E02206A")

    # https://infocenter.nordicsemi.com/index.jsp?topic=%2Fcom.nordic.infocenter.sdk5.v11.0.0%2Fbledfu_transport_bleservice.html&anchor=ota_spec_control_state
    #@fuh DEVICE_FILEIO_CONTROL_AND_PACKET_CHAR = ( bluetooth.UUID("55574ADA-552E-48FA-AFC0-D81CE0D6EFB0"), bluetooth.FLAG_WRITE | bluetooth.FLAG_NOTIFY )

    #@fuh DEVICE_FILEIO_SERVICE = ( DEVICE_FILEIO_SERV_UUID, ( FILEIO_CONTROL_AND_PACKET_CHAR, ), )
    
    #==========================================================================
    # Helper

    @staticmethod
    def appendToPayload( abytePayload, advType, value ):
        abytePayload += struct.pack( "BB", len(value) + 1, advType ) + value

    
    @staticmethod
    def advertisingPayload( limitedDisc=False, brEdr=False, name=None, aService=None, appearance=0 ):
        # Generate a abytePayload to be passed to gap_advertise(adv_data=...).
        
        abytePayload = bytearray()
    
        BLE.appendToPayload(
            abytePayload,
            BLE.ADV_TYPE_FLAGS,
            #struct.pack("B", (0x01 if limitedDisc else 0x02) + (0x18 if brEdr else 0x04))
            struct.pack("B", (0x01 if limitedDisc else 0x02) + (0x00 if brEdr else 0x04))
        )
    
        if name:
            BLE.appendToPayload( abytePayload, BLE.ADV_TYPE_NAME, name )
    
        if aService:
            for uuid in aService:
                abyteUUID = bytes(uuid)
                iLenUUID  = len( abyteUUID )
                
                if iLenUUID == 2:
                    BLE.appendToPayload( abytePayload, BLE.ADV_TYPE_UUID16_COMPLETE,  abyteUUID )
                elif iLenUUID == 4:
                    BLE.appendToPayload( abytePayload, BLE.ADV_TYPE_UUID32_COMPLETE,  abyteUUID )
                elif iLenUUID == 16:
                    BLE.appendToPayload( abytePayload, BLE.ADV_TYPE_UUID128_COMPLETE, abyteUUID )
    
        # See org.bluetooth.characteristic.gap.appearance.xml
        if appearance:
            BLE.appendToPayload( abytePayload, BLE.ADV_TYPE_APPEARANCE, struct.pack("<h", appearance) )
    
        return abytePayload
  
    class UART:
        IRQ_CENTRAL_CONNECT    = const(1)
        IRQ_CENTRAL_DISCONNECT = const(2)
        IRQ_GATTS_WRITE        = const(3)
        IRQ_GATTS_READ_REQUEST = const(4)
        
        IRQ_MTU_EXCHANGED      = const(21) # @added
        IRQ_ENCRYPTION_UPDATE  = const(28) # @added
        
        def __init__( self, ble, name='ESP32-UART', iRXTXBufferSize=512, uuid='' ):
            self._ble = ble
            self._ble.active(True)
            self.iRXTXBufferSize = iRXTXBufferSize

            self.strName = name
            self.strUuid = uuid
            
            gap_name = f"{sys.platform.upper()}@{uuid}"

            try:
                self._ble.config( gap_name=gap_name, mtu=iRXTXBufferSize+3, rxbuf=iRXTXBufferSize )
            except Exception:
                # MICROPY_PY_BLUETOOTH_USE_SYNC_EVENTS
                self._ble.config( gap_name=gap_name, mtu=iRXTXBufferSize+3 )
                
            self._ble.irq( self._bleIrqEventHandler )
            
            #@fuh ( (self._tx_handle, self._rx_handle,), (self._fileio_handle,) ) = self._ble.gatts_register_services((BLE:UART_SERVICE, BLE.DEVICE_FILEIO_SERVICE))
            ( (self._tx_handle, self._rx_handle,), ) = self._ble.gatts_register_services( ( BLE.UART_SERVICE, ))
            
            # Increase the size of the rx buffer and enable append mode.
            self._ble.gatts_set_buffer(self._rx_handle, iRXTXBufferSize, True)
            self._ble.gatts_set_buffer(self._tx_handle, iRXTXBufferSize)
            # @fuh self._ble.gatts_set_buffer(self._fileio_handle, iRXTXBufferSize)
            
            # @fuh self._filename = ''
            self._connections  = set()
            self._rx_buffer    = bytearray()
            self._tx_available = True
            self._gattsWriteIrqEventHandler = None
            self._gattsWriteIrqEventHandlerParameter = self
            
            # Optionally add services=[BLE.UART_UUID], but this is likely to make the payload too large.
            #self._payload = BLE.advertisingPayload( name=name, aService=[ BLE.UART_UUID ], appearance=BLE.ADV_APPEARANCE_GENERIC_COMPUTER )
            self._payload = BLE.advertisingPayload( name=name, appearance=BLE.ADV_APPEARANCE_GENERIC_COMPUTER )
            self._advertise( interval_us=30000 )

    
        def irq( self, handler, parameter = None ): # its a gattsWriteIrqEventHandler
            self._gattsWriteIrqEventHandler = handler

            if parameter is not None:
                self._gattsWriteIrqEventHandlerParameter = parameter
    
        def _bleIrqEventHandler( self, event, data ):
            
            # Track connections so we can send notifications.
            if event == BLE.UART.IRQ_CENTRAL_CONNECT:
                #print( "Event IRQ_CENTRAL_CONNECT", event, "data", data )
                conn_handle, _, _, = data
                self._connections.add(conn_handle)
                
            elif event == BLE.UART.IRQ_CENTRAL_DISCONNECT:
                #print( "Event IRQ_CENTRAL_DISCONNECT", event, "data", data )
                conn_handle, _, _, = data
                
                if conn_handle in self._connections:
                    self._connections.remove(conn_handle)
                    
                # Start advertising again to allow a new connection.
                self._advertise()
                
            elif event == BLE.UART.IRQ_GATTS_WRITE:
                #print( "Event IRQ_GATTS_WRITE", event, "data", data )
                conn_handle, value_handle, = data
                
                if conn_handle in self._connections and value_handle == self._rx_handle:
                    self._rx_buffer += self._ble.gatts_read(self._rx_handle)
                    
                    if self._gattsWriteIrqEventHandler:
                        self._gattsWriteIrqEventHandler( self._gattsWriteIrqEventHandlerParameter )
            else:
                #print( "Event", event, "data", data )
                pass
    
        def any(self):
            return len(self._rx_buffer)
    
        def read(self, nbytes=None):
            if not nbytes:
                nbytes= len(self._rx_buffer)
                
            abyteRead = self._rx_buffer[0:nbytes]
            self._rx_buffer = self._rx_buffer[nbytes:]
            
            return abyteRead

        
        def write(self, data):
            for conn_handle in self._connections:
                while True:
                    try:
                        #print( f"Try write: conn_handle {data}" )
                        # self._ble.gatts_notify(conn_handle, self._tx_handle, data)
                        self._ble.gatts_write(self._tx_handle, data)
                        self._ble.gatts_notify(conn_handle, self._tx_handle)
#                        self._ble.gatts_indicate(conn_handle, self._tx_handle)
                        self._tx_available = True
                        break
                    except Exception as e:
                        #print( f"Exception write: conn_handle {data}" )
                        # self._ble.gatts_read(self._tx_handle)
                        self._tx_available = False
                        time.sleep_ms(50)
                        
                # self._ble.gatts_write(self._tx_handle, data)
                # self._ble.gatts_indicate(conn_handle, self._tx_handle)
    
        def close(self):
            for conn_handle in self._connections:
                self._ble.gap_disconnect(conn_handle)
            self._connections.clear()
    
        def _advertise(self, interval_us=500000):
            self._ble.gap_advertise( interval_us, adv_data=self._payload )
    
        # @fuh def init_file(self, filename):
        # @fuh     self._filename = filename



    # Proof-of-concept of a REPL over BLE UART.
    #
    # Tested with the Adafruit Bluefruit app on Android.
    # Set the EoL characters to \r\n.

    #from sys import platform
    
    # Simple buffering stream to support the dupterm requirements.
    
    class REPL( io.IOBase ):
        MP_STREAM_POLL    = const(3)
        MP_STREAM_POLL_RD = const(0x0001)
        
        def __init__( self, uart ):
            self._uart      = uart
            self._tx_buffer = bytearray()
            try:
                self._timer = machine.Timer( -1 )
            except:
                self._timer = machine.Timer( 0 )
            
            # Needed for ESP32:
            if hasattr(os, 'dupterm_notify'):
                self._uart.irq( lambda _uart : os.dupterm_notify(_uart), self )
    
        def read(self, nbytes=None):
            return self._uart.read(nbytes)
    
        def readinto( self, abyteBuf ):
            abyteRead = self._uart.read( len( abyteBuf ) )
            
            if not abyteRead:
                return None
                
            for i in range(len(abyteRead)):
                abyteBuf[i] = abyteRead[i]
                
            return len(abyteRead)
    
        def ioctl(self, op, arg):
            if op == REPL.MP_STREAM_POLL:
                if self._uart.any():
                    return REPL.MP_STREAM_POLL_RD
            return 0
    
        def writeChunk( self, timer ):
            if self._uart._tx_available:
                data = self._tx_buffer[0:self._uart.iRXTXBufferSize]
                self._tx_buffer = self._tx_buffer[self._uart.iRXTXBufferSize:]
                self._uart.write(data)
                
                if self._tx_buffer:
                    self.sceduleInWriteChunk()
                    
        def sceduleInWriteChunk(self, delayMillis = 1 ):
            self._timer.init( period=delayMillis, mode=machine.Timer.ONE_SHOT, callback=self.writeChunk )
    
        def write( self, abyteBuf ):
            bWasEmpty = not self._tx_buffer
            
            self._tx_buffer += abyteBuf
    
            if bWasEmpty:
                self.sceduleInWriteChunk()
                
        @property
        def name( self ):
            return self._uart.strName

        @property
        def uuid( self ):
            return self._uart.strUuid
            
    @staticmethod
    def newREPL( name = '', uuid = '' ):
        if not name:
            strUniqeId = hexlify(machine.unique_id()).decode()
            name = (f"{sys.platform}-{strUniqeId[0]}{strUniqeId[-1]}")

        if not uuid:
            uuid = hexlify(machine.unique_id()).decode()

        return BLE.REPL( BLE.UART( bluetooth.BLE(), name = name, uuid = uuid ) ) #, iRXTXBufferSize=512 ) )

        
#import os

class REPLManager:
    bleREPL = None
    
    @staticmethod
    def setBLEREPL( name = '', uuid = '' ):
        
        if REPLManager.bleREPL is None:
            REPLManager.bleREPL = BLE.newREPL( name = name, uuid = uuid )

        os.dupterm( REPLManager.bleREPL )

    @staticmethod
    def setDefaultREPL():
        os.dupterm( None )


Sent 330 lines (12970 bytes) to /replmanager.py.


---
### main.py

The `main.py` is called every time the controller starts.  

It starts the `Web-REPL` if the RST-Button (more precise the button defined with `_BTN_STOP`) is not pressed within 2 seconds (`_WAIT_TIME_MS=2000`), otherwise it does nothing, so you can connect over USB.

In [35]:
%sendtofile "/main.py"
_BTN_STOP     = 0
_WAIT_TIME_MS = 2000

import time
import machine
import replmanager

btnStop = machine.Pin( _BTN_STOP, machine.Pin.IN, machine.Pin.PULL_UP )

bStartBLEREPL = True
lastTimeMs    = time.ticks_ms()

while time.ticks_diff( time.ticks_ms(), lastTimeMs ) < _WAIT_TIME_MS:
    if btnStop.value() == 0:
        bStartBLEREPL = False
        break
    time.sleep_ms(50)
    
if bStartBLEREPL:
    try:
        replmanager.REPLManager.setBLEREPL( name='ESP32-fuh-000' )
    except BaseException as e:
        print( e )

import gc
gc.collect()


Sent 26 lines (552 bytes) to /main.py.


<!-----
## Compile and write to controller
-->

In [6]:
#%%iPython
#!mpy-cross moduls/ble_repl.py

In [7]:
#%sendtofile --binary --source "./moduls/ble_repl.mpy" "/ble_repl.mpy"

In [8]:
#%ls /

---
## Connect to controller and test
If all is set up, a connection to the controller could be established over his BLE.

```
%bleconnect --name='ESP32-fuh-000'
```
or
```
%bleconnect --name='ESP32-fuh-000' --address='78:21:84:99:8E:D6'
```

In [36]:
%bleconnect --name='ESP32-fuh-000'
#%bleconnect --name='ESP32-fuh-000' --address='78:21:84:99:8E:D6'

serial exception on close write failed: [Errno 5] Input/output error
[34m
Closing SerialPort: port=/dev/ttyUSB3 baudrate=115200
[0m[34mConnecting to BLE device: ESP32-fuh-000 @ 
[0mScanning for BLE device with name 'ESP32-fuh-000'...
Found: --name='ESP32-fuh-000' --address='78:21:84:99:8E:D6'
Try: connect to name='ESP32-fuh-000' address='78:21:84:99:8E:D6' ...
   connected to name='ESP32-fuh-000' address='78:21:84:99:8E:D6'
[32m
 ** Connected to BLE device: ESP32-fuh-000 @ 78:21:84:99:8E:D6 **

[0mAdded notification handler
[34mReady.
[0m

In [37]:
print( "Hello world over BLE!" )

Hello world over BLE!
