In [1]:
import datetime
import numpy as np
import socket
import pickle
import threading
import time
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
import GPUtil
from faster_whisper import WhisperModel
import queue
# from datetime import datetime



class Transcriber:
    def __init__(self):
        self.data = {}  # Словарь для хранения данных пользователя
        self.sample_rate = 16000

        self.lock = threading.Lock()  # Блокировка для безопасного доступа к данным 

        self.transcryber_lock = threading.Lock() # <Блокировка для доступа к GPU

        self.model_size = "large-v3"
        self.model = WhisperModel(self.model_size, device="cuda", compute_type="float16", device_index=7)

        self.conn = None
        self.addr = None
        self.stop_event = None
        # #TODO сделать lock для каждого юзера

    def send(self, data=None):
        if self.stop_event.is_set():
            print("cоnnection broken")
            return
        data = pickle.dumps(data)
        self.conn.sendall(data)

    def transcribe_and_send(self, user_id, data, is_mic):
        with self.transcryber_lock:
            segments, info = self.model.transcribe(audio=data, beam_size=5, language="ru", vad_filter=True)
            text = ""
            # start_time = datetime.now()
            for segment in segments:
                line = segment.text
                if not "DimaTorzok" in line and not "Продолжение следует" in line:
                    text += line
            # end_time = datetime.now()
            # execution_time = end_time - start_time
            # print(f"Время выполнения программы: {execution_time} секунд")
            #             print(text)

        
            data = {
                "is_mic": is_mic,
                "user_id": user_id,
                "data": text,
            }
            print(data)
            
            self.send(data=data)

    def calculate_chunck_loudnes(self, data):
        return np.sqrt(np.mean(np.square(data)))

    def is_science(self, data, user_id, is_mic):
        return self.calculate_chunck_loudnes(data) < self.data[user_id][
            'is_mic_loudnes' if is_mic else 'no_mic_loudnes']

    def detect_phrase(self, user_id, pointer, is_mic):
        is_mic_str = "no_mic"
        if is_mic:
            is_mic_str = "is_mic"

        min_length_of_sound = 1.0
        length_of_scilense = 0.6
        chunk = 0.1

        min_length_of_sound = int(min_length_of_sound * self.sample_rate)
        length_of_scilense = int(length_of_scilense * self.sample_rate)
        chunk = int(chunk * self.sample_rate)

        if pointer <= (min_length_of_sound + length_of_scilense):
            return  # недостаточно данных для обработки

        if (pointer - self.data[user_id][is_mic_str + '_pointer']) < (min_length_of_sound + length_of_scilense):
            return  # не прошло достаточно времени с последнего распознавания

        for i in np.arange(pointer - length_of_scilense, pointer, chunk):
            if self.is_science(self.data[user_id][is_mic_str][i:i + chunk], user_id, True):
                pass
            else:
                return

                # в последних length_of_scilense нет тишины

        is_scilence = True
        for i in np.arange(self.data[user_id][is_mic_str + '_pointer'], pointer - length_of_scilense, chunk):
            if self.is_science(self.data[user_id][is_mic_str][i:i + chunk], user_id, True):
                pass
            else:
                is_scilence = False

        if is_scilence:
            return

        start_pointer = self.data[user_id][is_mic_str + '_pointer']
        end_pointer = pointer
        data_to_transcrybe = np.copy(self.data[user_id][is_mic_str][start_pointer:end_pointer])

        threading.Thread(target=self.transcribe_and_send, args=(user_id, data_to_transcrybe, is_mic)).start()
        self.data[user_id][is_mic_str + '_pointer'] = int(pointer - length_of_scilense / 2)

    def detect_updates(self, user_id):
        # print("updates detection started")

        with self.lock:
            if user_id in self.data:
                pointer = min(len(self.data[user_id]['is_mic']), len(self.data[user_id]['no_mic']))
                if self.data[user_id]['both_pointer'] != pointer:
                    self.data[user_id]['both_pointer'] = pointer
                    self.detect_phrase(user_id, pointer, True)
                    self.detect_phrase(user_id, pointer, False)

    def update(self, user_id, data, is_mic):
        with self.lock:
            if user_id not in self.data:
                # print("adding_new_user")
                self.data[user_id] = {
                    'is_mic': np.array([], dtype=np.int16),
                    'no_mic': np.array([], dtype=np.int16),
                    'timestamp': datetime.datetime.now(),
                    'is_mic_pointer': 0,
                    'no_mic_pointer': 0,
                    'is_mic_loudnes': 0.3,
                    'no_mic_loudnes': 0.3,
                    'both_pointer': 0
                }

            self.data[user_id]['is_mic' if is_mic else 'no_mic'] = np.append(
                self.data[user_id]['is_mic' if is_mic else 'no_mic'], data)

            k = 0.9
            p = 2.5

            if self.calculate_chunck_loudnes(data) < self.data[user_id][
                'is_mic_loudnes' if is_mic else 'no_mic_loudnes']:
                self.data[user_id]['is_mic_loudnes' if is_mic else 'no_mic_loudnes'] = (
                        self.data[user_id]['is_mic_loudnes' if is_mic else 'no_mic_loudnes'] * k +
                        self.calculate_chunck_loudnes(data) * (1 - k) * 2
                )
            # print("data updated")
            #             print(self.calculate_chunck_loudnes(data), self.data[user_id]['is_mic_loudnes' if is_mic else 'no_mic_loudnes'])

            threading.Thread(target=self.detect_updates, args=(user_id,)).start()

            # self.start_saver()

    def _process_input_data(self, input_data):

        # print("starting processing")

        data = None
        is_mic = None
        user_id = None

        try:
            # print("starting unpucking")
            data = pickle.loads(input_data)  # расшифровываем данные

            user_id = data["user_id"]
            is_mic = data["is_mic"]
            data = data["data"]

            print(is_mic, len(data))

            data = np.frombuffer(data, dtype='int16')
            data = data.astype(np.float32) / (2 ** (2 * 8 - 1))

            print("data unpucked")
        except:
            print("error to unpack")
        else:
            self.update(user_id, data, is_mic)

    def receiver(self):
        try:
            while not self.stop_event.is_set(): # постоянно, пока не стопнули

                data = bytearray()  # Используем bytearray для накопления данных
                while True:
                    chunk = self.conn.recv(1024) # приняли чанк
                    data.extend(chunk)
                    if (len(chunk) < 1024) and (
                            len(chunk) > 0):  # если пакет пуст или неполный, то это последний или некорректный чанк
                        break

                if len(data) > 0:
                    # print("data recieved")
                    threading.Thread(target=self._process_input_data, args=(data,)).start()
                else:
                    print("nothing")

        except OSError as e:
            print("Error in receiving messages:", e)
        finally:
            stop_event.set()

    def server(self, host='0.0.0.0', port=6005):
        if self.stop_event is None:
            self.stop_event = threading.Event()
        else:
            if self.stop_event.is_set():
                self.stop_event = threading.Event()
            else:
                print("Server is already running")
                return

        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.bind((host, port))
            s.listen()
            # print("Server is listening...")
            self.conn, self.addr = s.accept()
            with self.conn:
                print(f"Connected by {self.addr}")

                receiver_thread = threading.Thread(target=self.receiver)
                receiver_thread.start()
                receiver_thread.join()

                # print("Server has finished communication.")

    def start_server(self):
        threading.Thread(target=self.server).start()

    def stop_server(self):
        if self.stop_event is not None: 
            self.stop_event.set()
        # print("server stopped")

    def __del__(self):
        self.stop_server()
        # self.close()

  from .autonotebook import tqdm as notebook_tqdm
2024-01-26 01:39:54.794283: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
transcriber = Transcriber()

In [3]:
transcriber.start_server()

Connected by ('172.17.0.1', 58566)
True 2732
data unpucked
False 2732
data unpucked
True 2732
data unpucked
True 4096
data unpucked
False 4096
data unpucked
True 2732
data unpucked
True 2732
data unpucked
False 2732
data unpucked
True 4096
data unpucked
False 4096
data unpucked
True 2732
data unpucked
False 2732
data unpucked
True 2732
data unpucked
False 2732
data unpucked
True 4096
data unpucked
False 4096
data unpucked
True 2732
data unpucked
False 2732
data unpucked
True 4096
data unpucked
False 4096
data unpucked
True 2732
data unpucked
True 2732
data unpucked
False 2732
data unpucked
True 4096
data unpucked
False 4096
data unpucked
TrueFalse 2732
data unpucked
 2732
data unpucked
True 2732
data unpucked
False 2732
data unpucked
True 4096
data unpucked
False 4096
data unpucked
True 2732
data unpucked
False 2732
data unpucked
True 2732
data unpucked
False 2732
data unpucked
True 2732
data unpucked
False 2732
data unpucked
True 4096
data unpucked
False 4096
data unpucked
True 2732
d

In [6]:
# Обновленный код для сервера с улучшенной обработкой исключений и закрытием сокета

def send_messages(conn, stop_event):
    try:
        for i in range(1000):
            if stop_event.is_set():
                break
            message = f"Server Message {i}"
            conn.sendall(message.encode())
            time.sleep(0.01)  # Отправка сообщения каждые 10 мс
    except OSError as e:
        print("Error in sending messages:", e)
    finally:
        stop_event.set()

def receive_messages(conn, stop_event):
    try:
        while not stop_event.is_set():
            data = conn.recv(1024)
            if not data:
                break
            print("Received from client:", data.decode())
    except OSError as e:
        print("Error in receiving messages:", e)
    finally:
        stop_event.set()

def main():
    host = '0.0.0.0'  
    port = 6005        

    stop_event = threading.Event()

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((host, port))
        s.listen()
        print("Server is listening...")
        conn, addr = s.accept()
        with conn:
            print(f"Connected by {addr}")

            sender_thread = threading.Thread(target=send_messages, args=(conn, stop_event))
            receiver_thread = threading.Thread(target=receive_messages, args=(conn, stop_event))

            sender_thread.start()
            receiver_thread.start()

            sender_thread.join()
            receiver_thread.join()

            print("Server has finished communication.")


main()


OSError: [Errno 98] Address already in use

In [None]:
len(data['is_mic'])/16000

In [None]:
len(data['no_mic'])/16000

In [None]:
len(data['is_mic'])/len(data['no_mic'])

In [None]:
# !pip3 install tqdm==4.40.0
# !pip3 install --upgrade ipywidgets
# !pip install -U huggingface_hub

In [None]:
from faster_whisper import WhisperModel

model_size = "large-v3"

# Run on GPU with FP16
model = WhisperModel(model_size, device="cuda", compute_type="float16", device_index=7)

In [None]:
%%time
segments, info = model.transcribe(audio=data['is_mic'], beam_size=5, language="ru", vad_filter = False)
text = ""
for segment in segments:
    text+=(segment.text)
print(text)

In [None]:
print(text)

In [None]:
"ss" in "fssc"