In [1]:
import sys
sys.path.append('/app')

In [2]:
from indexing.meetings_monitor import MeetingsMonitor
import asyncio
import logging
from datetime import datetime, timezone,timedelta

from psql_helpers import get_session


In [3]:
monitor = MeetingsMonitor()

In [4]:
self = monitor

In [5]:
overlap_minutes = 60*40

In [6]:
async with get_session() as session:
    cursor = await self._get_stored_max_timestamp(session)
    if cursor:
        cursor = cursor - timedelta(minutes=overlap_minutes)  # Small overlap to ensure no records are missed


In [7]:
meetings_data = await self.vexa_auth.get_speech_stats(
                    after_time=cursor,
                )

In [8]:
from uuid import UUID

In [9]:
meetings_data = [m for m in meetings_data if m.user_id == UUID('12262e9c-a9e8-49b3-b0ee-547240fac7a9')]



In [10]:
meetings_data

[SessionSpeakerStats(meeting_session_id=UUID('4efd2910-7bcf-4e2b-94ba-f8b8750ce1ad'), start_timestamp=datetime.datetime(2024, 12, 3, 11, 36, 51, tzinfo=TzInfo(UTC)), updated_timestamp=datetime.datetime(2024, 12, 3, 13, 16, 39, 605942, tzinfo=TzInfo(UTC)), user_id=UUID('12262e9c-a9e8-49b3-b0ee-547240fac7a9'), last_finish=datetime.datetime(2024, 12, 3, 12, 14, 18, tzinfo=TzInfo(UTC)), speakers=['Janise Tan', 'Max Weber', 'Monika Janota (xWF)', 'Sylvia Dieckmann', 'TBD'])]

In [11]:
from datetime import timedelta
from psql_models import User,Meeting
from sqlalchemy import select
from indexing.redis_keys import RedisKeys

In [12]:
meetings_data

[SessionSpeakerStats(meeting_session_id=UUID('4efd2910-7bcf-4e2b-94ba-f8b8750ce1ad'), start_timestamp=datetime.datetime(2024, 12, 3, 11, 36, 51, tzinfo=TzInfo(UTC)), updated_timestamp=datetime.datetime(2024, 12, 3, 13, 16, 39, 605942, tzinfo=TzInfo(UTC)), user_id=UUID('12262e9c-a9e8-49b3-b0ee-547240fac7a9'), last_finish=datetime.datetime(2024, 12, 3, 12, 14, 18, tzinfo=TzInfo(UTC)), speakers=['Janise Tan', 'Max Weber', 'Monika Janota (xWF)', 'Sylvia Dieckmann', 'TBD'])]

In [13]:
user_id = '12262e9c-a9e8-49b3-b0ee-547240fac7a9'

In [14]:
from vexa import VexaAPI
from psql_models import UserToken

In [15]:
token_stmt = select(UserToken).where(UserToken.user_id == user_id)
token = await session.scalar(token_stmt) or await self.vexa_auth.get_user_token(user_id=user_id)

if token:
    vexa = VexaAPI(token=token)
    user_info = await vexa.get_user_info()
    # Check by email since it's our unique constraint
    user = await session.scalar(select(User).where(User.email == user_info.get('email')))
    
    if user:
        # Check if any data has changed
        data_changed = (
            user.username != user_info.get('username', user.username) or
            user.first_name != user_info.get('first_name', user.first_name) or
            user.last_name != user_info.get('last_name', user.last_name) or
            user.image != user_info.get('image', user.image)
        )
        
        if data_changed:
            # Update existing user with new data
            user.username = user_info.get('username', user.username)
            user.first_name = user_info.get('first_name', user.first_name)
            user.last_name = user_info.get('last_name', user.last_name)
            user.image = user_info.get('image', user.image)
            user.updated_timestamp = datetime.now(timezone.utc)
    else:
        # Create new user if not found
        user = User(
            id=user_id,
            email=user_info.get('email', ''),
            username=user_info.get('username', ''),
            first_name=user_info.get('first_name', ''),
            last_name=user_info.get('last_name', ''),
            image=user_info.get('image', ''),
            created_timestamp=datetime.now(timezone.utc),
            updated_timestamp=datetime.now(timezone.utc)
        )
        session.add(user)
        data_changed = True  # New user, so data is considered changed
    
    # Check if token already exists
    existing_token = await session.scalar(select(UserToken).where(UserToken.token == token))
    if not existing_token:
        session.add(UserToken(
            token=token,
            user_id=user_id,
            created_at=datetime.now(timezone.utc)
        ))
        data_changed = True  # Token added, so data is considered changed
    
    if data_changed:
        await session.commit()


Vexa token: 9ce3ca4b29e349beac6e9b2892fe4f2b
User information retrieved successfully.


In [18]:
user.id

UUID('0cb2fc8f-1684-4ab9-a35e-57a879b08ecd')

In [16]:
user.username

'Daniel'

In [18]:
user.id

UUID('0cb2fc8f-1684-4ab9-a35e-57a879b08ecd')

In [19]:
async with get_session() as session:
    cutoff = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=self.active_seconds)
    
    for meeting in meetings_data:
        meeting_id = meeting.meeting_session_id
        user_id = meeting.user_id
        
        # Ensure user exists and is committed before proceeding
        await self._ensure_user_exists(str(user_id), session)
        # Verify user was created
        user = await session.scalar(select(User).where(User.id == user_id))
        if not user:

            continue
        
        meeting_time = self._parse_timestamp(meeting.start_timestamp).replace(tzinfo=None)
        last_update = self._parse_timestamp(meeting.last_finish).replace(tzinfo=None)
        meeting_name = f"{meeting_time.strftime('%H:%M')}"
        cutoff = cutoff.replace(tzinfo=None)
        
        # Check if meeting exists
        stmt = select(Meeting).where(Meeting.meeting_id == meeting_id)
        existing_meeting = await session.scalar(stmt)
        
        if last_update > cutoff:
            # Active meeting - just upsert, no queuing
            if not existing_meeting:
                existing_meeting = Meeting(
                    meeting_id=meeting_id,
                    timestamp=meeting_time,
                    meeting_name=meeting_name,
                    last_update=last_update
                )
                session.add(existing_meeting)
            else:
                existing_meeting.timestamp = meeting_time
                existing_meeting.meeting_name = meeting_name
                existing_meeting.last_update = last_update
        else:
            # Inactive meeting - changed logic to check last_update
            if not existing_meeting:
                existing_meeting = Meeting(
                    meeting_id=meeting_id,
                    timestamp=meeting_time,
                    meeting_name=meeting_name,
                    last_update=last_update
                )
                session.add(existing_meeting)
                self._add_to_queue(str(meeting_id))
            else:
                # Check if it was previously active
                active_score = self.redis.zscore(RedisKeys.ACTIVE_MEETINGS, str(meeting_id))
                if active_score is not None:
                    existing_meeting.last_update = last_update
                    self._add_to_queue(str(meeting_id))  # Queue for indexing as it's now inactive
        

Vexa token: 9ce3ca4b29e349beac6e9b2892fe4f2b
User information retrieved successfully.


The garbage collector is trying to clean up non-checked-in connection <AdaptedConnection <asyncpg.connection.Connection object at 0x7f89ad5995d0>>, which will be terminated.  Please ensure that SQLAlchemy pooled connections are returned to the pool explicitly, either by calling ``close()`` or by using appropriate context managers to manage their lifecycle.
  return compile(source, filename, mode, flags,


IntegrityError: (raised as a result of Query-invoked autoflush; consider using a session.no_autoflush block if this flush is occurring prematurely)
(sqlalchemy.dialects.postgresql.asyncpg.IntegrityError) <class 'asyncpg.exceptions.ForeignKeyViolationError'>: insert or update on table "user_tokens" violates foreign key constraint "user_tokens_user_id_fkey"
DETAIL:  Key (user_id)=(12262e9c-a9e8-49b3-b0ee-547240fac7a9) is not present in table "users".
[SQL: UPDATE user_tokens SET user_id=$1::UUID, last_used_at=$2::TIMESTAMP WITH TIME ZONE WHERE user_tokens.token = $3::VARCHAR]
[parameters: ('12262e9c-a9e8-49b3-b0ee-547240fac7a9', datetime.datetime(2024, 12, 4, 11, 40, 9, 283637, tzinfo=datetime.timezone.utc), '9ce3ca4b29e349beac6e9b2892fe4f2b')]
(Background on this error at: https://sqlalche.me/e/20/gkpj)

In [43]:
user

In [42]:
existing_meeting

NameError: name 'existing_meeting' is not defined

In [33]:
from psql_models import engine


In [34]:
import pandas as pd

In [38]:
from psql_helpers import get_session

In [39]:
import asyncio
import pandas as pd
from sqlalchemy import text

async def get_meetings_df(email: str):
    async with get_session() as session:
        query = text("""
            SELECT m.*
            FROM meetings m
            JOIN user_meetings um ON m.meeting_id = um.meeting_id
            JOIN users u ON um.user_id = u.id
            WHERE u.email = :email
        """)
        result = await session.execute(query, {"email": email})
        rows = result.mappings().all()
        return pd.DataFrame(rows)

# Use in jupyter
meetings_df = await get_meetings_df('danicuki@gmail.com')

In [40]:
meetings_df

In [36]:
r = pd.read_sql_query(q, engine)

  r = pd.read_sql_query(q, engine)


AttributeError: 'AsyncEngine' object has no attribute 'cursor'

In [15]:
user

In [16]:
existing_meeting

NameError: name 'existing_meeting' is not defined