<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Imports" data-toc-modified-id="Imports-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Imports</a></span></li><li><span><a href="#Function-definitions" data-toc-modified-id="Function-definitions-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Function definitions</a></span></li><li><span><a href="#Open-Serial-Port" data-toc-modified-id="Open-Serial-Port-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Open Serial Port</a></span><ul class="toc-item"><li><span><a href="#Loop-sending-bytes" data-toc-modified-id="Loop-sending-bytes-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>Loop sending bytes</a></span></li><li><span><a href="#Send-bytes-in-HDLC-in-a-loop" data-toc-modified-id="Send-bytes-in-HDLC-in-a-loop-3.2"><span class="toc-item-num">3.2&nbsp;&nbsp;</span>Send bytes in HDLC in a loop</a></span></li><li><span><a href="#Send-sine-signal-in-a-Loop" data-toc-modified-id="Send-sine-signal-in-a-Loop-3.3"><span class="toc-item-num">3.3&nbsp;&nbsp;</span>Send sine signal in a Loop</a></span></li></ul></li><li><span><a href="#Open-wav-file" data-toc-modified-id="Open-wav-file-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Open wav file</a></span><ul class="toc-item"><li><span><a href="#Just-play-the-song-direcly-in-Python" data-toc-modified-id="Just-play-the-song-direcly-in-Python-4.1"><span class="toc-item-num">4.1&nbsp;&nbsp;</span>Just play the song direcly in Python</a></span></li><li><span><a href="#Send-Data-of-song-via-UART-to-PC-and-play-it-in-Python" data-toc-modified-id="Send-Data-of-song-via-UART-to-PC-and-play-it-in-Python-4.2"><span class="toc-item-num">4.2&nbsp;&nbsp;</span>Send Data of song via UART to PC and play it in Python</a></span></li><li><span><a href="#Send-bytes-of-wav-and-receive-aknowledge" data-toc-modified-id="Send-bytes-of-wav-and-receive-aknowledge-4.3"><span class="toc-item-num">4.3&nbsp;&nbsp;</span>Send bytes of wav and receive aknowledge</a></span></li></ul></li><li><span><a href="#Get-decoded-mp3-data" data-toc-modified-id="Get-decoded-mp3-data-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Get decoded mp3 data</a></span></li><li><span><a href="#Check-consistency-of-sent-data" data-toc-modified-id="Check-consistency-of-sent-data-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Check consistency of sent data</a></span></li><li><span><a href="#Debug" data-toc-modified-id="Debug-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>Debug</a></span></li></ul></div>

# Imports

In [9]:
import serial
import time
from pathlib import Path
import wave
import pyaudio
import numpy as np
import pandas as pd
import math
import matplotlib.pyplot as pl

In [12]:
%matplotlib qt

# Function definitions

In [21]:
def create_hdlc_frame(data):
    """
    Creates a hdlc frame from bytearray (adds frame delimiters(0x7E) and escapes flags(0x7E, 0x7D) payload)
    
    Parameter:
    ----------
    
    data (bytearray):
        payload data
        
    Returns:
    -------
    
    frame (bytearray):
        payload embedded in hdlc frame
    """
    if type(data) != bytes:
        msg = "Unexpected data type. Received {}, but expected 'bytes'".format(type(data))
        raise TypeError(msg)

    if (len(data) % 2) != 0:
        raise ValueError("Array length is not even!")
    
    frame = b'\x7E' + \
            data.replace(b'\x7D', b'\x7D\x5D') \
                .replace(b'\x7E', b'\x7D\x5E') #\
            #+ b'\x7E'
            
    return frame

In [3]:
def get_transmission_chunk(wav, chunk_size):
    frame = create_hdlc_frame(wav.readframes(chunk_size))
    return frame

In [4]:
def comp_send_receive(rec_data, sent_data):
    """
    Compare array rec_data with array sent_data.
    rec_data is expected to consist of n times sent_data, where the first and
    last occurence of sent_data might be incomplete.

    Parameter:
    ----------

    rec_data (np.array):
        Array which contains n-times sent_data,  where the first and
        last occurence of sent_data might be incomplete.
    
    sent_data (np.array):
        Array, which is the "element" of received_array

    Return:
    -------
        True:   rec_data consists exclusivly of several occurences of sent_data
        False:  -"- vice versa
    """
    offs = np.where(sent_data == rec_data[0])[0][0]
    frame_len = len(sent_data)
    len_start_frame = frame_len-offs

    num_recvd_frames, len_end_frame = divmod(len(rec_data)-len_start_frame, frame_len)

    equal = np.array_equal(rec_data[0:len_start_frame], sent_data[offs:])
    for i in range(0, num_recvd_frames):
        equal &= np.array_equal(rec_data[len_start_frame + i*frame_len: len_start_frame +i *frame_len + frame_len], sent_data)
    if len_end_frame != 0:
        equal &= np.array_equal(rec_data[len(rec_data)-len_end_frame:], sent_data[0:len_end_frame])

    return equal

In [5]:
def array_to_big_endian_bytes(array):
    return bytes(array.astype('>i2'))

In [6]:
def create_sine_samples(amplitude, frequency, samplerate):
    """
    Creates an array containing one sine period with defined frequency and amplitude strobed by the samplerate.
    Output is casted to int16
    
    Parameter:
    ----------
    
    amplitude (number):
        desired amplitude
        
    frequency (number):
        desired frequency in Hz
        
    samplerate (number):
        desired frequnecy in Hz
        
    Returns:
    -------
    
    samples (np.array of np.int16):
        mp.array containing sine samples (one period)
    """
    step_size = 2 * math.pi * frequency / samplerate
    samples = [np.int16(amplitude * math.sin(sample)) for sample in np.arange(0, 2*math.pi, step_size)]
    return samples

# Open Serial Port

In [115]:
ser = serial.Serial('COM5', 2e6, timeout=1, parity=serial.PARITY_NONE) # the baudrate of the uart dongle in verry unprecise. 1.5MHz is matched verry good.

In [114]:
ser.close()

## Loop sending bytes

In [15]:
while(True):
    for byte in range(0,0x100,1):
        #print(byte)
        ser.write(bytes([byte])) #, 0x01, 0x7F, 0x3F, 0x0F,0xFF]))
        time.sleep(1e-2)

KeyboardInterrupt: 

In [112]:
ser.write(bytes([0x7E, 0x77, 0xD7]))
ser.write(bytes([0x7E, 0x55, 0xD8]))
ser.write(bytes([0x7E, 0xAA, 0xD9]))

3

In [10]:
ser.read_all()

b'U\xaaU\xaa'

In [None]:
while(True):
    time.sleep(1)
    ser.write(bytes([0x00, 0x77, 0x55]))

## Send bytes in HDLC in a loop

In [34]:
data_little_endian = (np.arange(start=0x30, stop=0x40, step=1,dtype=np.int16()))

In [35]:
data_big_endian = array_to_big_endian_bytes(data_little_endian)
hdlc_frame = create_hdlc_frame(data=data_big_endian)

In [36]:
hdlc_frame

b'~\x000\x001\x002\x003\x004\x005\x006\x007\x008\x009\x00:\x00;\x00<\x00=\x00>\x00?~'

In [54]:
while(1):
    ser.write(hdlc_frame)
    ser.read_all()

KeyboardInterrupt: 

In [None]:
ser.write(hdlc_frame)
ser.read_all()

## Send sine signal in a Loop

In [118]:
sine = create_sine_samples(amplitude=2000, frequency=1e3, samplerate=48e3)
sine_len = len(sine)
# get free mem in fpga
response_size = 2
#ser.write(b'\x7E')

ser.reset_input_buffer()
ser.reset_output_buffer()

ser.write(bytes([0x7E]))
time.sleep(1)
rec = ser.read(response_size).hex()
#rec = ser.read(response_size).hex()
if rec == '':
    raise TimeoutError('Timout reached')
free_mem = int(rec, 16)
sine_pos = 0


detect_underflow = False
#read data
while 1:
    print("Free mem:", free_mem)
    print("In buff:", ser.in_waiting)
    if (free_mem < 16000) and (detect_underflow == False):
        detect_underflow = True
    if (free_mem > 16000) and (detect_underflow == True):
        msg = "Potential Underflow!"
        raise serial.SerialException(f'{msg}')
    if free_mem > sine_len:
        # build payload
        start_fragment = sine_len - sine_pos
        remaining_len = free_mem - start_fragment
        frame_cnt, end_fragment = divmod(remaining_len, sine_len)
        payload = sine[sine_pos:] + frame_cnt * sine + sine[0:end_fragment]
        sine_pos = end_fragment

        # serialize frame
        frame = create_hdlc_frame(array_to_big_endian_bytes(np.array(payload)) )       
        
        ser.write(frame)
        time.sleep(1e-3)
        rec = ser.read(response_size).hex()
        if rec == '':
            raise TimeoutError('Timout reached')
        free_mem = int(rec, 16)
    else:
        print('FPGA Buffer full!!')
        time.sleep(5e-3)
        ser.write(b'\x7E')
        time.sleep(1e-3)
        rec = ser.read(response_size).hex()
        if rec == '':
            raise TimeoutError('Timout reached')
        free_mem = int(rec, 16)



Free mem: 16383
In buff: 0
Free mem: 16383
In buff: 0
Free mem: 8030
In buff: 0
Free mem: 16047
In buff: 0


SerialException: Potential Underflow!

In [85]:
fig = pl.figure()
pl.plot(payload, figure = fig)

[<matplotlib.lines.Line2D at 0x1f61d3d05f8>]

In [17]:
ser.in_waiting

0

In [56]:
5 * sine

[0,
 31650,
 16383,
 -23169,
 -28377,
 0,
 31650,
 16383,
 -23169,
 -28377,
 0,
 31650,
 16383,
 -23169,
 -28377,
 0,
 31650,
 16383,
 -23169,
 -28377,
 0,
 31650,
 16383,
 -23169,
 -28377]

# Open wav file

In [7]:
wav_path = Path(r'..\test\Finjark_Master_003.wav')

In [17]:
wav = wave.open(str(wav_path), mode='rb')

In [18]:
wav.close()

## Just play the song direcly in Python

In [13]:
p = pyaudio.PyAudio()  
#open stream  
stream = p.open(format = p.get_format_from_width(wav.getsampwidth()),  
                channels = wav.getnchannels(),  
                rate = wav.getframerate(),  
                output = True)  
#read data  
data = wav.readframes(chunk_size)  

#play stream  
#while data:  
#    stream.write(data)  
#    data = wav.readframes(chunk_size) 

KeyboardInterrupt: 

## Send Data of song via UART to PC and play it in Python

In [85]:
ser.close()
ser = serial.Serial(port='COM5', baudrate=256e3, timeout=5, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE)

In [114]:
chunk_size = 20
data = None
wav = wave.open(f=str(wav_path / 'Finjark_Master_003.wav'), mode='rb')
p = pyaudio.PyAudio()  
#open stream  
stream = p.open(format = p.get_format_from_width(wav.getsampwidth()),  
                channels = wav.getnchannels(),  
                rate = wav.getframerate(),  
                output = True) 

ser.write(wav.readframes(chunk_size))
#data = ser.read_all()

#read data
#while data:
#    stream.write(data)
#    ser.write(wav.readframes(chunk_size))
#    data = ser.read_all()
    


#play stream  
#while data:  
#    stream.write(data)  
#    data = wav.readframes(chunk_size) 

40

## Send bytes of wav and receive aknowledge

In [46]:
wav_path = Path(r'D:\Sicherungen\Musik_Erwin\Knaat')

In [49]:
free_mem = 0
response_size = 2

wav = wave.open(f=str(wav_path / 'Finjark_Master_003.wav'), mode='rb')
p = pyaudio.PyAudio()  
#open stream  
stream = p.open(format = p.get_format_from_width(wav.getsampwidth()),  
                channels = wav.getnchannels(),  
                rate = wav.getframerate(),  
                output = True) 


ser.write(b'\x7E')
time.sleep(1e-3)
rec = ser.read(response_size).hex()
if rec == '':
    raise TimeoutError('Timout reached')
free_mem = int(rec, 16)
frame = get_transmission_chunk(wav=wav, chunk_size=free_mem // wav.getsampwidth())

#read data
while frame:
    if free_mem > 0:
        ser.write(frame)
        time.sleep(1e-3)
        rec = ser.read(response_size).hex()
        if rec == '':
            raise TimeoutError('Timout reached')
        free_mem = int(rec, 16)
        frame = get_transmission_chunk(wav=wav, chunk_size=free_mem // wav.getsampwidth())
    else:
        print('FPGA Buffer full!!')
        time.sleep(5e-3)
        ser.write(b'\x7E')
        time.sleep(1e-3)
        rec = ser.read(response_size).hex()
        if rec == '':
            raise TimeoutError('Timout reached')
        free_mem = int(rec, 16)

KeyboardInterrupt: 

In [40]:
for byte in range(0,0x100,1):
    ser.write(bytes([byte]))

In [82]:
wav.getnframes()

9879552

# Get decoded mp3 data

# Check consistency of sent data

In [55]:
data = []
with open(r'..\test\debug_data.csv', 'r') as IN:
    for line in IN.readlines():
        data.append(line.replace('\n', '').split(';'))

In [56]:
df = pd.DataFrame(data).T \
                  .rename(columns={0:'channel_0', 1:'channel_1', 2:'channel_2', 3:'channel_3'})

In [61]:
df.head()

Unnamed: 0,channel_0,channel_1,channel_2,channel_3,4,5
0,60,,51,,,
1,61,,52,,,
2,62,,53,,,
3,63,,54,,,
4,48,,55,,,


In [75]:
comp_send_receive(df.channel_0.dropna().astype(int).values, np.arange(start=0x30, stop=0x40, step=1))

True

In [77]:
qstr = f"channel_0 == '0'"
df.query(qstr)

Unnamed: 0,channel_0,channel_1,channel_2,channel_3,4,5


In [73]:
%debug

> [1;32m<ipython-input-53-8d31feaa4df9>[0m(22)[0;36mcomp_send_receive[1;34m()[0m
[1;32m     20 [1;33m        [1;32mFalse[0m[1;33m:[0m  [1;33m-[0m[0;31m"[0m[1;33m-[0m [0mvice[0m [0mversa[0m[1;33m[0m[0m
[0m[1;32m     21 [1;33m    """
[0m[1;32m---> 22 [1;33m    [0moffs[0m [1;33m=[0m [0mnp[0m[1;33m.[0m[0mwhere[0m[1;33m([0m[0msent_data[0m [1;33m==[0m [0mrec_data[0m[1;33m[[0m[1;36m0[0m[1;33m][0m[1;33m)[0m[1;33m[[0m[1;36m0[0m[1;33m][0m[1;33m[[0m[1;36m0[0m[1;33m][0m[1;33m[0m[0m
[0m[1;32m     23 [1;33m    [0mframe_len[0m [1;33m=[0m [0mlen[0m[1;33m([0m[0msent_data[0m[1;33m)[0m[1;33m[0m[0m
[0m[1;32m     24 [1;33m    [0mlen_start_frame[0m [1;33m=[0m [0mframe_len[0m[1;33m-[0m[0moffs[0m[1;33m[0m[0m
[0m
ipdb> rec_data
array([51, 52, 53, ..., 59, 60, 61])
ipdb> rec_data[0]
51
ipdb> np.where(sent_data == rec_data[0])
(array([], dtype=int64),)
ipdb> sent_data 
array([12288, 12544, 12800, 13056, 1331

In [20]:
hex(16128)

'0x3f00'

# Debug

In [95]:
a = b'\x7E\x7D'

In [99]:
int(a.hex(), 16)

32381

In [62]:
a.replace(b'\x7D', b'\x7D\x5D').replace(b'\x7E', b'\x7D\x5E').hex()

'7d5e7d5d'

In [55]:
type(wav.readframes(chunk_size))

bytes

In [83]:
ser.timeout

0

In [73]:
(b'\x7E' + wav.readframes(chunk_size).replace(b'\x7D', b'\x7D\x5D').replace(b'\x7E', b'\x7D\x5E') + b'\x7E').hex()

'7e0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000

In [18]:
ser.close()

In [50]:
wav.

<wave.Wave_read at 0x293b5529048>

In [96]:
a = np.array([1, 2, 0, 1, 2, 0, 1, 2, 0, 1])
b = np.array([0, 1, 2])
c = np.array([1, 2, 0, 1, 2, 0, 1, 2, 0, 2])

In [99]:
comp_send_receive(rec_data=a, sent_data=b)

True

In [22]:
np.where(c == 1)

(array([0, 3, 6], dtype=int64),)