In [None]:
from pynq import Overlay, MMIO
from time import sleep
from periphery import I2C

# Load FPGA overlay
def load_overlay(bitfile="design_1_wrapper.bit"):
    return Overlay(bitfile)

ol = load_overlay()

# GPIO Initialization
class GPIO:
    BASE_ADDR = 0x41200000
    RANGE = 0x10000
    OFFSET = 0x0

    def __init__(self):
        self.mmio = MMIO(self.BASE_ADDR, self.RANGE)
    
    def write(self, value):
        self.mmio.write(self.OFFSET, value)


class UART_Custom:
    BASE_ADDR = 0x40000000
    RANGE = 0x1000
    EXCHANGE_TIMEOUT = 20

    
    class UARTRegisters:
        # Register addresses for the UART
        TX_BYTE = 0x00
        TX_START = 0x04
        RX_FIFO_EN = 0x08
        RX_DONE = 0x0C
        RX_DATA_OUT = 0x10
        RX_FIFO_RST = 0x1C
        TX_ACTIVE = 0x14
        TX_DONE = 0x14
        SANITY_CHECK = 0x20
        RX_DATA_OUT_DBG = 0x24
        RX_FIFO_EMPTY_DBG = 0x28
        RX_FIFO_FULL_DBG = 0x2C
        RX_FIFO_WCOUNTER = 0x30
        RX_FIFO_RCOUNTER = 0x34
        RX_FIFO_ENABLE = 0x44
        CLOCKS_PER = 0x38
        UART_BREAK_LENGTH = 0x3C
        UART_GEN_BREAK = 0x40

    def __init__(self, baudDivider = 868, timeout = 20):
        baudDiv = baudDivider # FPGA is @100MHz / 868 = 115 207 Bauds
        break_length = 11
        self.mmio = MMIO(self.BASE_ADDR, self.RANGE)
        self.uart_baud_cfg(baudDiv)
        self.uart_break_length(break_length)
        self.uart_write_stop()
        self.uart_read_stop()
        self.uart_fifo_enable(1)
        self.uart_reset_Fifo()
        self.uart_fifo_enable(1)    
        self.EXCHANGE_TIMEOUT = timeout
        sleep(1)

    # --- LOW-LEVEL UART METHODS ---
    def uart_reset_Fifo(self):
        self.uart_fifo_rst_n()
    #     while uart_rcounter_val() > 0 and uart_wcounter_val() > 0:
    #         print("rcounter val", uart_rcounter_val())
        sleep(1)
        self.uart_fifo_rst_p()
        sleep(1)
        self.uart_fifo_rst_n()
    
    def uart_tx_done_val(self):
        return self.read(self.UARTRegisters.TX_DONE)

    def uart_tx_active_val(self):
        return self.read(self.UARTRegisters.TX_ACTIVE)

    def uart_write_value(self, x):
        self.write(self.UARTRegisters.TX_BYTE, x)

    def uart_write_start(self):
        self.write(self.UARTRegisters.TX_START, 1)

    def uart_write_stop(self):
        self.write(self.UARTRegisters.TX_START, 0)

    def uart_read_start(self):
        self.write(self.UARTRegisters.RX_FIFO_EN, 1)

    def uart_read_stop(self):
        self.write(self.UARTRegisters.RX_FIFO_EN, 0)

    def uart_read_data(self):
        return self.read(self.UARTRegisters.RX_DATA_OUT)

    def uart_read_data_dbg(self):
        return self.read(self.UARTRegisters.RX_DATA_OUT_DBG)

    def uart_fifo_empty_dbg(self):
        return self.read(self.UARTRegisters.RX_FIFO_EMPTY_DBG)

    def uart_fifo_full_dbg(self):
        return self.read(self.UARTRegisters.RX_FIFO_FULL_DBG)

    def uart_read_done(self):
        return self.read(self.UARTRegisters.RX_DONE)

    def uart_fifo_enable(self, en):
        self.write(self.UARTRegisters.RX_FIFO_ENABLE, en)

    def uart_fifo_rst_p(self):
        self.write(self.UARTRegisters.RX_FIFO_RST, 1)

    def uart_fifo_rst_n(self):
        self.write(self.UARTRegisters.RX_FIFO_RST, 0)

    def uart_xcheck(self):
        return self.read(self.UARTRegisters.SANITY_CHECK)

    def uart_wcounter_val(self):
        return self.read(self.UARTRegisters.RX_FIFO_WCOUNTER)

    def uart_rcounter_val(self):
        return self.read(self.UARTRegisters.RX_FIFO_RCOUNTER)

    def uart_baud_cfg(self, data):
        return self.write(self.UARTRegisters.CLOCKS_PER, data)

    def uart_break_length(self, data):
        return self.write(self.UARTRegisters.UART_BREAK_LENGTH, data)

    def uart_break_start(self):
        return self.write(self.UARTRegisters.UART_GEN_BREAK, 1)

    def uart_break_stop(self):
        return self.write(self.UARTRegisters.UART_GEN_BREAK, 0)

    # --- HIGH-LEVEL UART METHODS ---
    def read(self, reg):
        """Read from a specified UART register."""
        return self.mmio.read(reg)

    def write(self, reg, value):
        """Write to a specified UART register."""
        self.mmio.write(reg, value)

    def reset_fifo(self):
        """Reset the RX FIFO buffer."""
        self.write(self.UARTRegisters.RX_FIFO_RST, 0)
        sleep(0.1)
        self.write(self.UARTRegisters.RX_FIFO_RST, 1)
        sleep(0.1)
        self.write(self.UARTRegisters.RX_FIFO_RST, 0)

    def transmit(self, data):
        """Send a string over UART."""
        
        stop_time = time() + self.EXCHANGE_TIMEOUT

        for i in range(len(data)):
            self.uart_write_value(ord(data[i]))
            self.uart_write_start()
            while self.uart_tx_active_val() == 1:
                if self.uart_tx_active_val() == 0:
                    break
                if time() > stop_time:
                    print("TimeOut !")
                    break
            self.uart_write_stop()

    def receive(self, size=20):
        """Read data from UART, limited by the specified size."""
        data = []
        self.uart_fifo_enable(1)
        self.uart_fifo_rst_p()
        sleep(1)
        timeout = 0
        temp = self.uart_wcounter_val()
        stop_time = time() + self.EXCHANGE_TIMEOUT

        for i in range(size):
            self.uart_read_start()
            data.append(self.uart_read_data())
    #         print("rcounter val", uart_rcounter_val(), "wcounter val", uart_wcounter_val(), "temp", temp, "RX data =", data[i])
            self.uart_read_stop()
            if time() > stop_time:
                print("TimeOut !")
                break
        self.uart_fifo_enable(0)
        self.uart_fifo_rst_n()
        return data




class ESP32_BT:
    def __init__(self, uart):
        """Initialize the ESP32 Bluetooth module with a UART object."""
        self.uart = uart  # Store the UART instance
        #self.BTInit_ESP32()  # Automatically initialize Bluetooth on startup

    def SendCMD(self, Command):
        """Sends an AT command via UART and returns the response."""
        self.uart.transmit(Command + "\r\n")  # Send command
        data_r = self.uart.receive(20)  # Receive response
        data_r = str(bytes(bytearray(data_r)).decode('ascii'))
        return data_r.strip()  # Strip unwanted characters

    def BTInit_ESP32(self):
        """Initializes Bluetooth on ESP32."""
        self.SendCMD("AT+RST")  # Reset Bluetooth module
        for _ in range(10):
            data_r = self.SendCMD("AT+BTINIT=1")
            sleep(0.2)
            print(data_r)
            if "ERROR" in data_r:
                data_r = self.SendCMD("AT+BTINIT=0")
            else:
                break
            print(data_r)
            sleep(0.2)
        print("Bluetooth initiated!")

    def BTInitSPP_ESP32(self):
        """Initializes ESP32 Bluetooth in SPP (Serial Port Profile) mode."""
        self.SendCMD("AT+BTINIT?")
        self.SendCMD("AT+BTSPPINIT=2")  # Set SPP mode
        self.SendCMD("AT+BTSCANMODE=2")  # Set visibility
        self.SendCMD("AT+BTNAME=\"esp_demo\"")  # Set Bluetooth name
        self.SendCMD("AT+BTSPPSTART")  # Start SPP service
        self.SendCMD("AT+BTNAME?")  # Confirm name

    def BTInitA2DP_ESP32(self):
        """Initializes ESP32 Bluetooth in A2DP (Audio Streaming) mode."""
        self.BTInit_ESP32()
        self.SendCMD("AT+BTSCANMODE=2")
        print(self.SendCMD("AT+BTNAME=\"esp_demo\""))
        print(self.SendCMD("AT+BTINIT?"))
        print(self.SendCMD("AT+BTA2DPINIT=2"))

    def BTReceive_ESP32(self):
        """Continuously receives and prints Bluetooth data."""
        while True:
            data_r = self.uart.receive(16)
            data_r = str(bytes(bytearray(data_r)).decode('ascii'))
            if data_r.strip():
                print(data_r)




In [None]:
class ADAU1761:
    I2C_ADDRESS = 0x3B  # ADAU1761 I2C Address

    # Audio Registers (same as before)
    class Registers:
        CLOCK_CONTROL = 0x00
        PLL_CONTROL = 0x02
        RECORD_MIXER_LEFT_CONTROL_0 = 0x0A
        RECORD_MIXER_LEFT_CONTROL_1 = 0x0B
        RECORD_MIXER_RIGHT_CONTROL_0 = 0x0C
        RECORD_MIXER_RIGHT_CONTROL_1 = 0x0D
        ADC_CONTROL = 0x19
        PLAYBACK_MIXER_LEFT_CONTROL_0 = 0x1C
        PLAYBACK_MIXER_RIGHT_CONTROL_0 = 0x1E
        PLAYBACK_LR_MIXER_LEFT_LINE_OUTPUT_CONTROL = 0x20
        PLAYBACK_LR_MIXER_RIGHT_LINE_OUTPUT_CONTROL = 0x21
        PLAYBACK_HEADPHONE_LEFT_VOLUME_CONTROL = 0x23
        PLAYBACK_HEADPHONE_RIGHT_VOLUME_CONTROL = 0x24
        PLAYBACK_LINE_OUTPUT_LEFT_VOLUME_CONTROL = 0x25
        PLAYBACK_LINE_OUTPUT_RIGHT_VOLUME_CONTROL = 0x26
        PLAYBACK_POWER_MANAGEMENT = 0x29
        DAC_CONTROL_0 = 0x2A
        SERIAL_INPUT_ROUTE_CONTROL = 0xF2
        SERIAL_OUTPUT_ROUTE_CONTROL = 0xF3
        CONVERTER_CONTROL_0 = 0x17
        SERIAL_PORT_SAMPLING_RATE = 0xF8
        CLOCK_ENABLE_0 = 0xF9
        CLOCK_ENABLE_1 = 0xFA

    def __init__(self, i2c_bus="/dev/i2c-0"):
        # Initialize I2C
        self.i2c = I2C(i2c_bus)

        self.write_register(self.Registers.CLOCK_CONTROL, 0xE)

        # Perform the additional I2C PLL Lock sync request.
        i2c = I2C("/dev/i2c-0")
        msgs = [I2C.Message([ 0x40, 0x02]), I2C.Message([0x02, 0x71, 0x02, 0x3C, 0x21, 0x01], read=False)]
        i2c.transfer(0x3b, msgs)
        i2c.close()


        # Now, initialize the ADAU1761 with all the register settings
        self.initialize()

    def write_register(self, register, value):
        """Writes a single byte to the specified register."""
        msg = [I2C.Message([0x40, register]), I2C.Message([value], read=False)]
        self.i2c.transfer(self.I2C_ADDRESS, msg)
        sleep(0.1)

    def read_register(self, register, length):
        """Reads 'length' bytes from the specified register."""
        read_data = [0] * length
        msg = [I2C.Message([0x40, register]), I2C.Message(read_data, read=True)]
        self.i2c.transfer(self.I2C_ADDRESS, msg)
        sleep(0.1)
        return msg[1].data

    def initialize(self):
        """Initializes ADAU1761 with basic configuration."""
        print("Initializing ADAU1761...")


        # Wait for PLL to lock
        count = 1
        while True:
            data = self.read_register(self.Registers.PLL_CONTROL, 6)
            if data[5] & 0x02:
                print("PLL locked successfully!")
                break
            if count > 20:
                print("Failed to lock PLL!")
                break
            count += 1
            sleep(0.5)

            
        self.write_register(self.Registers.CLOCK_CONTROL, 0xF)
        # Set up input and output mixers
        self.write_register(self.Registers.CLOCK_ENABLE_0, 0x7F)  # Enable clocks
        self.write_register(self.Registers.CLOCK_ENABLE_1, 0x03)  # Enable rest of clocks
        
        self.write_register(self.Registers.CONVERTER_CONTROL_0, 0x05)  # Set converter control for 48 KHz
        self.write_register(self.Registers.SERIAL_PORT_SAMPLING_RATE, 0x05)  # Set serial port sampling rate for 48 KHz
        self.write_register(self.Registers.ADC_CONTROL, 0x13)  # Enable ADCs
        self.write_register(self.Registers.DAC_CONTROL_0, 0x03)  # Enable both DACs
        self.write_register(self.Registers.PLAYBACK_POWER_MANAGEMENT, 0x03)  # Enable left and right channel playback

        self.write_register(self.Registers.SERIAL_INPUT_ROUTE_CONTROL, 0x00)  # Connect I2S serial port output to DACs
        self.write_register(self.Registers.SERIAL_OUTPUT_ROUTE_CONTROL, 0x03)  # Connect I2S serial port input to ADCs

        self.write_register(self.Registers.RECORD_MIXER_LEFT_CONTROL_0, 0x01)  # Enable mixer 1
        self.write_register(self.Registers.RECORD_MIXER_LEFT_CONTROL_1, 0x05)  # Set Left channel gain to 0 dB
        self.write_register(self.Registers.RECORD_MIXER_RIGHT_CONTROL_0, 0x01)  # Enable mixer 2
        self.write_register(self.Registers.RECORD_MIXER_RIGHT_CONTROL_1, 0x05)  # Set Right channel gain to 0 dB
      
        self.write_register(self.Registers.PLAYBACK_MIXER_LEFT_CONTROL_0, 0x21)  # Unmute Left DAC into Mxr 3; Enable mxr 3
        self.write_register(self.Registers.PLAYBACK_MIXER_RIGHT_CONTROL_0, 0x41)  # Unmute Right DAC into Mxr 4; Enable mxr 4
        self.write_register(self.Registers.PLAYBACK_LR_MIXER_LEFT_LINE_OUTPUT_CONTROL, 0x03)  # Set gain to 0 dB for Mxr 5
        self.write_register(self.Registers.PLAYBACK_LR_MIXER_RIGHT_LINE_OUTPUT_CONTROL, 0x09)  # Set gain to 0 dB for Mxr 6
        self.write_register(self.Registers.PLAYBACK_HEADPHONE_LEFT_VOLUME_CONTROL, 0xE7)  # Set volume to 0 dB for LHP
        self.write_register(self.Registers.PLAYBACK_HEADPHONE_RIGHT_VOLUME_CONTROL, 0xE7)  # Set volume to 0 dB for RHP
        self.write_register(self.Registers.PLAYBACK_LINE_OUTPUT_LEFT_VOLUME_CONTROL, 0xE6)  # Set volume to 0 dB for LOUT
        self.write_register(self.Registers.PLAYBACK_LINE_OUTPUT_RIGHT_VOLUME_CONTROL, 0xE6)  # Set volume to 0 dB for ROUT
        
        self.write_register(self.Registers.SERIAL_INPUT_ROUTE_CONTROL, 0x01)  # Connect I2S serial port output to DACs

    def close(self):
        """Closes the I2C connection."""
        self.i2c.close()


# Example usage:
if __name__ == "__main__":
    gpio = GPIO()    # Create GPIO Instance.
    gpio.write(0x5)  # LED PIN ON, (LD3 PZ2) | UART PINS SEL (otherwise SPI pins) | EN PIN (PmodESP32) DRIVE HIGH (otherwise reset) 101 (0x5)
    
    uart = UART_Custom(868, 20) #Create UART Instance #Baud Divieder of 868 for 100MHz = 115200 bauds UART, 20Sec Timeout
    esp32 = ESP32_BT(uart)   #Pass object to esp32 class

    esp32.BTInitA2DP_ESP32() #Initialise A2DP Mode
    adau = ADAU1761()        #Initialize the Audio-Codec 


AT+BTINIT=1

ERROR
AT+BTINIT=0

ERROR
AT+BTINIT=1
       
Bluetooth initiated!
AT+BTNAME="esp_demo"
AT+BTINIT?
+BTINIT:
AT+BTA2DPINIT=2

O
Initializing ADAU1761...
PLL locked successfully!
