In [1]:

%load_ext autoreload
%autoreload 2

In [2]:
import logging
from datetime import datetime, timezone

from app.database_redis.connection import get_redis_client
from app.services.apis.streamqueue_service.client import StreamQueueServiceAPI
from app.services.audio.redis import Connection, Diarizer, Meeting, Transcriber
from app.settings import settings

logger = logging.getLogger(__name__)

check_and_process_connections_interval_sec: 5.0


In [3]:
settings.check_and_process_connections_interval_sec

5.0

In [4]:


class Processor:
    def __init__(self):
        self.__running_tasks = set()
        self.stream_queue_service_api = StreamQueueServiceAPI()

    async def process_connections(self):
        logger.info("Process connections...")
        connections = await self.stream_queue_service_api.get_connections()
        connection_ids = [c[0] for c in connections]

        for connection_id in connection_ids:
            await self._process_connection_task(connection_id)

    async def _process_connection_task(self, connection_id, diarizer_step=60, transcriber_step=5):
        redis_client = await get_redis_client(settings.redis_host, settings.redis_port, settings.redis_password)
        meeting_id, segment_start_timestamp, segment_end_timestamp, user_id = await self.writestream2file(connection_id)

        current_time = datetime.now(timezone.utc)

        connection = Connection(redis_client, connection_id, user_id)
        await connection.update_timestamps(segment_start_timestamp, segment_end_timestamp)

        meeting = Meeting(redis_client, meeting_id)
        await meeting.load_from_redis()
        await meeting.add_connection(connection.id)
        meeting.diarizer_last_updated_timestamp = meeting.diarizer_last_updated_timestamp or segment_start_timestamp
        meeting.transcriber_last_updated_timestamp = meeting.transcriber_last_updated_timestamp or segment_start_timestamp

        if (current_time - meeting.diarizer_last_updated_timestamp).seconds > diarizer_step:
            diarizer = Diarizer(redis_client)
            await diarizer.add_todo(meeting.meeting_id)
            await meeting.update_diarizer_timestamp(
                segment_start_timestamp, diarizer_last_updated_timestamp=current_time
            )

        if (current_time - meeting.transcriber_last_updated_timestamp).seconds > transcriber_step:
            transcriber = Transcriber(redis_client)
            await transcriber.add_todo(meeting.meeting_id)
            await meeting.update_transcriber_timestamp(
                segment_start_timestamp, transcriber_last_updated_timestamp=current_time
            )

    async def writestream2file(self, connection_id):
        path = f"/audio/{connection_id}.webm"
        first_timestamp = None
        items = await self.stream_queue_service_api.fetch_chunks(connection_id, num_chunks=100)

        if items:
            # if there is no meeting_id in META-data
            meeting_id = connection_id

            for item in items["chunks"]:
                chunk = bytes.fromhex(item["chunk"])
                first_timestamp = datetime.fromisoformat(item["timestamp"].rstrip('Z')).astimezone(timezone.utc) if not first_timestamp else first_timestamp

                # Open the file in append mode
                with open(path, "ab") as file:
                    # Write data to the file
                    file.write(chunk)

                last_timestamp = datetime.fromisoformat(item["timestamp"].rstrip('Z')).astimezone(timezone.utc)

                meeting_id = item["meeting_id"]
                user_id = item["user_id"]

            return meeting_id, first_timestamp, last_timestamp, user_id


In [7]:
diarizer_step=10
transcriber_step=5

In [8]:
self = Processor()

In [9]:
connections = await self.stream_queue_service_api.get_connections()
connection_ids = [c[0] for c in connections]

In [10]:
connection_id = connection_ids[0]

In [11]:
connection_id

'25b0a1b4-78e7-4bf3-9d1c-c64bf59b845d'

In [23]:
redis_client = await get_redis_client(settings.redis_host, settings.redis_port, settings.redis_password)
meeting_id, segment_start_timestamp, segment_end_timestamp, user_id = await self.writestream2file(connection_id)

In [36]:

current_time = datetime.now(timezone.utc)

connection = Connection(redis_client, connection_id, user_id)
await connection.update_timestamps(segment_start_timestamp, segment_end_timestamp)

meeting = Meeting(redis_client, meeting_id)
await meeting.load_from_redis()

In [37]:
await meeting.set_start_timestamp(segment_start_timestamp)

In [38]:

await meeting.add_connection(connection.id)

meeting.diarizer_last_updated_timestamp = meeting.diarizer_last_updated_timestamp or segment_start_timestamp
meeting.transcriber_last_updated_timestamp = meeting.transcriber_last_updated_timestamp or segment_start_timestamp

if (current_time - meeting.diarizer_last_updated_timestamp).seconds > diarizer_step:
    diarizer = Diarizer(redis_client)
    await diarizer.add_todo(meeting.meeting_id)
    await meeting.update_diarizer_timestamp(
        segment_start_timestamp, diarizer_last_updated_timestamp=current_time
    )
    print("diarizer added")

if (current_time - meeting.transcriber_last_updated_timestamp).seconds > transcriber_step:
    transcriber = Transcriber(redis_client)
    await transcriber.add_todo(meeting.meeting_id)
    await meeting.update_transcriber_timestamp(
        segment_start_timestamp, transcriber_last_updated_timestamp=current_time
    )
    print("transcriber added")

diarizer added
transcriber added


In [29]:
meeting.diarizer_last_updated_timestamp

datetime.datetime(2024, 5, 22, 15, 41, 22, 776157, tzinfo=tzutc())

In [30]:
current_time

datetime.datetime(2024, 5, 22, 15, 41, 22, 776157, tzinfo=datetime.timezone.utc)

In [21]:
connection_id = connection_ids[0]

In [35]:
connection_id

'87a3f4b1-5b50-45be-b2c7-dfd83ebfe022'

In [36]:
meeting_id, segment_start_timestamp, segment_end_timestamp, user_id = await self.writestream2file(connection_id)

In [37]:
segment_start_timestamp,segment_end_timestamp

(datetime.datetime(2024, 5, 22, 14, 27, 59, 846448, tzinfo=datetime.timezone.utc),
 datetime.datetime(2024, 5, 22, 14, 28, 9, 48882, tzinfo=datetime.timezone.utc))

In [38]:
meeting_id

'https://meet.google.com/jmt-yfgt-rzp'

In [39]:
redis_client = await get_redis_client(settings.redis_host, settings.redis_port, settings.redis_password)

In [40]:
#await redis_client.flushdb()

In [41]:
current_time = datetime.now(timezone.utc)

connection = Connection(redis_client, connection_id, user_id)
await connection.update_timestamps(segment_start_timestamp, segment_end_timestamp)

In [42]:
#transcriber_step

In [43]:
current_time

datetime.datetime(2024, 5, 22, 14, 28, 11, 963135, tzinfo=datetime.timezone.utc)

In [52]:


meeting = Meeting(redis_client, meeting_id)

await meeting.update_redis()
await meeting.load_from_redis()
await meeting.add_connection(connection.id)
meeting.diarizer_last_updated_timestamp = meeting.diarizer_last_updated_timestamp or segment_start_timestamp
meeting.transcriber_last_updated_timestamp = meeting.transcriber_last_updated_timestamp or segment_start_timestamp

In [53]:
meeting.diarizer_last_updated_timestamp

datetime.datetime(2024, 5, 22, 14, 27, 59, 846448, tzinfo=tzutc())

In [54]:
diarizer_step

60

In [55]:
meeting.diarizer_last_updated_timestamp

datetime.datetime(2024, 5, 22, 14, 27, 59, 846448, tzinfo=tzutc())

In [64]:

if (current_time - meeting.diarizer_last_updated_timestamp).seconds > diarizer_step:
    print("diarizer added")
    diarizer = Diarizer(redis_client)
    await diarizer.add_todo(meeting.meeting_id)
    await meeting.update_diarizer_timestamp(
        segment_start_timestamp, diarizer_last_updated_timestamp=current_time
    )

if (current_time - meeting.transcriber_last_updated_timestamp).seconds > transcriber_step:
    print("transcriber added")
    transcriber = Transcriber(redis_client)
    await transcriber.add_todo(meeting.meeting_id)
    await meeting.update_transcriber_timestamp(
        segment_start_timestamp, transcriber_last_updated_timestamp=current_time
            )

In [65]:
settings

Settings(service_version='0.0.1_example', service_name='Audio API', service_api_host='0.0.0.0', service_api_port=8009, service_token='service_token', check_and_process_connections_interval_sec=5.0, stream_queue_service_list_connections='http://host.docker.internal:8000/api/v1/connections/list', stream_queue_service_flush_cache='http://host.docker.internal:8000/api/v1/tools/flush-cache', stream_queue_service_get_next_chunks='http://host.docker.internal:8000/api/v1/tools/get-next-chunks', stream_queue_service_health='http://host.docker.internal:8000/api/v1/health', stream_queue_service_health_check='http://host.docker.internal:8000/api/v1/hc', stream_queue_service_request_timeout=5, stream_queue_service_auth_token='LKJBn98wefgh', redis_host='redis', redis_port=6379, redis_password='', volume_data_path='/home/dima/ssd/0', redis_image_port='6382')

In [66]:
(current_time - meeting.transcriber_last_updated_timestamp).seconds

0

In [67]:
meeting.transcriber_last_updated_timestamp

datetime.datetime(2024, 5, 22, 14, 28, 11, 963135, tzinfo=tzutc())

In [68]:
meeting.transcriber_last_updated_timestamp

datetime.datetime(2024, 5, 22, 14, 28, 11, 963135, tzinfo=tzutc())

In [69]:
segment_start_timestamp

datetime.datetime(2024, 5, 22, 14, 27, 59, 846448, tzinfo=datetime.timezone.utc)

In [70]:
connection.start_timestamp

datetime.datetime(2024, 5, 22, 14, 27, 59, 846448, tzinfo=datetime.timezone.utc)

In [71]:
meeting.transcriber_last_updated_timestamp

datetime.datetime(2024, 5, 22, 14, 28, 11, 963135, tzinfo=tzutc())