In [7]:
import pyaudio
import numpy as np
import webrtcvad
import collections
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras import models
import turtle

# Параметры записи аудио
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
FRAME_DURATION_MS = 30  # Длительность фрейма в миллисекундах
PADDING_DURATION_MS = 300  # Длительность дополнительной оконной рамки в миллисекундах
FRAME_SIZE = int(RATE * FRAME_DURATION_MS / 1000)  # Размер фрейма в сэмплах

commands = ['down', 'go', 'left', 'right', 'stop', 'up']
model = models.load_model('model.h5')

In [8]:
def plot_waveform(waveform):
    plt.figure(figsize=(10, 4))
    plt.plot(waveform)
    plt.title('Audio Waveform')
    plt.xlabel('Time (samples)')
    plt.ylabel('Amplitude')
    plt.grid(True)
    plt.show()

In [9]:
def preprocess_audiobuffer(waveform):
    waveform = waveform / 32768
    waveform = tf.convert_to_tensor(waveform, dtype=tf.float32)
    spectrogram = get_spectrogram(waveform)
    spectrogram=tf.expand_dims(spectrogram, 0)
    return spectrogram

In [10]:
def get_spectrogram(waveform):
    input_len = 16000
    waveform = waveform[:input_len]
    zero_padding = tf.zeros(
        [input_len] - tf.shape(waveform),
        dtype=tf.float32)
    waveform = tf.cast(waveform, dtype=tf.float32)
    equal_length = tf.concat([waveform, zero_padding], 0)
    spectrogram = tf.signal.stft(
        equal_length, frame_length=255, frame_step=128)
    spectrogram = tf.abs(spectrogram)
    spectrogram = spectrogram[..., tf.newaxis]
    return spectrogram

In [11]:
def record_audio():
    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT,
                    channels=CHANNELS,
                    rate=RATE,
                    input=True,
                    frames_per_buffer=FRAME_SIZE)
    
    print("Listening...")

    try:
        while True:
            data = stream.read(FRAME_SIZE)
            yield np.frombuffer(data, dtype=np.int16)
    except KeyboardInterrupt:
        pass
    
    print("Recording stopped.")

    stream.stop_stream()
    stream.close()
    p.terminate()

def vad_collector(sample_rate, frame_duration_ms, padding_duration_ms, vad, frames):
    num_padding_frames = int(padding_duration_ms / frame_duration_ms)
    ring_buffer = collections.deque(maxlen=num_padding_frames)
    triggered = False

    voiced_frames = []
    for frame in frames:
        is_speech = vad.is_speech(frame.tobytes(), sample_rate)
        if not triggered:
            ring_buffer.append((frame, is_speech))
            num_voiced = len([f for f, speech in ring_buffer if speech])
            if num_voiced > 0.9 * ring_buffer.maxlen:
                triggered = True
                for f, s in ring_buffer:
                    voiced_frames.append(f)
                ring_buffer.clear()
        else:
            voiced_frames.append(frame)
            ring_buffer.append((frame, is_speech))
            num_unvoiced = len([f for f, speech in ring_buffer if not speech])
            if num_unvoiced > 0.9 * ring_buffer.maxlen:
                triggered = False
                yield b''.join([f.tobytes() for f in voiced_frames])
                ring_buffer.clear()
                voiced_frames = []
    if triggered:
        yield b''.join([f.tobytes() for f in voiced_frames])

In [15]:
s = turtle.getscreen()

t = turtle.Turtle() # starts at right:

size = t.turtlesize()
increase = (2 * num for num in size)
t.turtlesize(*increase)

t.pensize(5)
t.shapesize()
t.pencolor("blue")

def go_right():
    # target = 0
    current = t.heading()
    if current == 0:
        pass
    elif current == 90:
        t.right(90)
    elif current == 180:
        t.right(180)
    elif current == 270:
        t.left(90)
    else:
        raise ValueError('not a right angle!')

def go_up():
    # target = 90
    current = t.heading()
    if current == 0:
        t.left(90)
    elif current == 90:
        pass
    elif current == 180:
        t.right(90)
    elif current == 270:
        t.left(180)
    else:
        raise ValueError('not a right angle!')
    
def go_left():
    # target = 180
    current = t.heading()
    if current == 0:
        t.left(180)
    elif current == 90:
        t.left(90)
    elif current == 180:
        pass
    elif current == 270:
        t.right(90)
    else:
        raise ValueError('not a right angle!')
    
def go_down():
    # target = 270
    current = t.heading()
    if current == 0:
        t.right(90)
    elif current == 90:
        t.right(180)
    elif current == 180:
        t.left(90)
    elif current == 270:
        pass
    else:
        raise ValueError('not a right angle!')


def move_turtle(command):
    if command == 'up':
        go_up()
    elif command == 'down':
        go_down()
    elif command == 'left':
        go_left()
    elif command == 'right':
        go_right()
    elif command == 'go':
        t.forward(100)
    elif command == 'stop':
        s.bye()
        print('Stopping the turtle')


# Создание экземпляра VAD
vad = webrtcvad.Vad(3)  # Уровень агрессивности VAD (1-3)

# Получение отрезков с командами
audio_generator = record_audio()
for audio_segment in vad_collector(RATE, FRAME_DURATION_MS, PADDING_DURATION_MS, vad, audio_generator):
    print("Received audio segment with command:", len(audio_segment))
    waveform = np.frombuffer(audio_segment, dtype=np.int16)
    plot_waveform(waveform)
    spec = preprocess_audiobuffer(waveform)
    prediction = model(spec)
    print(prediction)
    confidence = np.max(tf.nn.softmax(prediction))
    print('Confidence: ', confidence)
    # if confidence < 0.7:
    #     print("Недостаточно уверенное предсказание. Пропускаем.")
    #     label_pred = np.argmax(prediction, axis=1)
    #     print(label_pred)
    #     command = commands[label_pred[0]]
    #     print('Predicted label: ', command)
    # else:
    label_pred = np.argmax(prediction, axis=1)
    print(label_pred)
    command = commands[label_pred[0]]
    print('Predicted label: ', command)
    move_turtle(command)
    if command == "stop":
        # break
        pass



Terminator: 