# RUN

`alembic upgrade 76bf43d4437f`

In [1]:
from sqlalchemy import delete
import json
from tqdm import tqdm

In [2]:
from app.database import *

development




In [3]:
engine.url

postgresql+asyncpg://AfroChat_Admin:***@34.165.255.238/AfroChatV2

In [4]:
from app.routers.api_v1.Auth.models import *
from app.routers.api_v1.Auth.models_v2 import *
from app.routers.api_v1.chat.models import *
from app.models import *

In [5]:
db_session: AsyncSession = await get_db().__anext__()
session = db_session

In [6]:
async def delete_mobile_users(session: AsyncSession):
    try:
        await session.execute(delete(UserEmail))
        await session.execute(delete(UserPhoneNumber))
        await session.execute(delete(UserPassword))
        await session.execute(delete(UserProfilePicture))
        await session.execute(delete(User))

        await session.commit()
    except Exception as e:
        await session.rollback()
        raise e


async def delete_telegram_users(session: AsyncSession):
    try:
        await session.execute(delete(UserTelegram))
        await session.commit()
    except Exception as e:
        await session.rollback()
        raise e


async def delete_all(session: AsyncSession):
    await delete_telegram_users(session)
    await delete_mobile_users(session)

In [7]:
async def populate_mobile_users(session: AsyncSession):
    stmt = select(User).where(User.telegram_id.is_(None))
    result = await session.execute(stmt)
    prev_mobile_users: List[User] = result.scalars().fetchall()

    total_num = len(prev_mobile_users)

    for mobile_user in tqdm(prev_mobile_users):
        # check if username exists in db
        db_user = await User.find_by_username(
            db_session=session, username=mobile_user.username
        )

        if not db_user:
            # create user
            new_user = User()
            new_user.id = mobile_user.id
            new_user.username = mobile_user.username
            new_user.is_activated = mobile_user.is_activated
            new_user.is_archived = mobile_user.is_archived
            new_user.role = mobile_user.role
            new_user.created_at = mobile_user.created_at
            new_user.updated_at = mobile_user.created_at

            session.add(new_user)

            # create user email
            new_user_email = UserEmail()
            new_user_email.user_id = new_user.id
            new_user_email.email = mobile_user.email
            new_user_email.is_activated = mobile_user.is_activated
            new_user_email.created_at = mobile_user.created_at
            new_user_email.updated_at = mobile_user.created_at

            session.add(new_user_email)

            # create user phone
            new_user_phone = UserPhoneNumber()
            new_user_phone.user_id = new_user.id
            new_user_phone.phone_number = mobile_user.phone_number
            new_user_phone.is_activated = mobile_user.is_activated
            new_user_phone.created_at = mobile_user.created_at
            new_user_phone.updated_at = mobile_user.created_at

            session.add(new_user_phone)

            # create user password

            new_user_password = UserPassword(
                user_id=new_user.id,
                hashed_password=mobile_user.hashed_password,
                created_at=mobile_user.created_at,
                updated_at=mobile_user.created_at,
            )

            session.add(new_user_password)

            # create profile picture for the user

            new_profile_picture = UserProfilePicture(
                user_id=new_user.id,
                profile_picture=mobile_user.profile_picture,
                created_at=mobile_user.created_at,
                updated_at=mobile_user.created_at,
            )

            session.add(new_profile_picture)

    await session.commit()

    assert total_num == len((await session.execute(select(User))).scalars().all())
    print("mobile users populated")

In [8]:
async def populate_telegram_mini_app_users(session: AsyncSession):
    stmt = select(User).where(User.telegram_id.is_not(None))
    result = await session.execute(stmt)
    prev_mini_users: List[User] = result.scalars().fetchall()
    current_count = len(prev_mini_users)
    prev_user_count = (
        await session.execute(select(func.count()).select_from(UserTelegram))
    ).scalar()

    for mini_user in tqdm(prev_mini_users):
        # check if username exists in db
        db_user = await User.find_by_username(
            db_session=session, username=mini_user.username
        )
        if not db_user:
            # create user
            new_user = User()
            new_user.id = mini_user.id
            # check if the username already exists
            stmt = select(User).where(User.username == mini_user.username)
            result = await session.execute(stmt)
            user: User | None = result.scalars().first()

            new_user.username = mini_user.username
            new_user.is_activated = mini_user.is_activated
            new_user.is_archived = mini_user.is_archived
            new_user.role = mini_user.role
            new_user.created_at = mini_user.created_at
            new_user.updated_at = mini_user.created_at

            session.add(new_user)

            # create user telegram
            new_user_telegram = UserTelegram()
            new_user_telegram.user_id = new_user.id
            new_user_telegram.telegram_id = mini_user.telegram_id

            session.add(new_user_telegram)

    await session.commit()
    assert (
        prev_user_count + current_count
        == (
            await session.execute(select(func.count()).select_from(UserTelegram))
        ).scalar()
    )
    print("telegram mini app users have been added")

In [9]:
async def populate_telegram_users(session: AsyncSession):
    telegram_id_UUID = {}
    stmt = select(TelegramUser)
    result = await session.execute(stmt)
    prev_telegram_users: List[TelegramUser] = result.scalars().fetchall()
    total_num = len(prev_telegram_users)
    prev_date = datetime.utcnow()
    for telegram_user in tqdm(prev_telegram_users):
        new_telegram_user_name = (
            f"{telegram_user.firstname}_{telegram_user.telegram_id}"
        )
        db_user = await User.find_by_username(
            db_session=session, username=new_telegram_user_name
        )
        if not db_user:
            existing_user = await User.find_by_telegram_id(
                db_session, telegram_user.telegram_id
            )
            if existing_user:
                telegram_id_UUID[telegram_user.id] = str(existing_user.id)
                continue

            new_user = User()
            new_user.id = uuid.uuid4()
            new_user.username = new_telegram_user_name
            new_user.is_activated = True
            new_user.is_archived = False
            new_user.role = Role.USER.value

            # fetch the users first telegram conversation
            stmt = (
                select(TelegramConversation)
                .where(TelegramConversation.user_id == telegram_user.id)
                .order_by(TelegramConversation.created_date.asc())
                .limit(1)
            )
            result = await session.execute(stmt)
            conversation: TelegramConversation | None = result.scalars().first()

            if conversation != None:
                prev_date = conversation.created_date

            new_user.created_at = (
                conversation.created_date if conversation else prev_date
            )
            new_user.updated_at = (
                conversation.created_date if conversation else prev_date
            )

            session.add(new_user)

            # create user telegram
            new_user_telegram = UserTelegram()
            new_user_telegram.user_id = new_user.id
            new_user_telegram.telegram_id = telegram_user.telegram_id
            session.add(new_user_telegram)

            telegram_id_UUID[telegram_user.id] = str(new_user.id)

    await session.commit()

    assert total_num == len(telegram_id_UUID)
    print("telegram users populated")
    return telegram_id_UUID

In [10]:
await delete_all(session)
await populate_mobile_users(session)
await populate_telegram_mini_app_users(session)
telegram_id_UUID = await populate_telegram_users(session)

100%|█████████████████████████████████████████████████████████████████████████████████████| 33/33 [00:05<00:00,  6.12it/s]


mobile users populated


100%|███████████████████████████████████████████████████████████████████████████████████| 101/101 [00:31<00:00,  3.20it/s]


telegram mini app users have been added


100%|███████████████████████████████████████████████████████████████████████████████████| 306/306 [02:42<00:00,  1.89it/s]


telegram users populated


In [11]:
telegram_new_map_id = telegram_id_UUID

# RUN
`alembic upgrade heads`

In [12]:
with open("mapp.json", "w") as f:
    f.write(json.dumps(telegram_id_UUID))

In [13]:
# telegram_conversation.id --> chat_session.id
mapping_dictionary_for_telegram_conversation = {}

# persona:X --> persona.id
mapping_dictionary_for_telegram_conversation_personas = {
    "persona:afro_chat": {
        "full_name": "AfroChat",
        "id": "48122ec5-5f65-4588-a012-0c7bfe15802f",
    },
    "persona:albert": {
        "full_name": "Albert Einstein",
        "id": "8d4d60ed-d4fa-4a88-a64c-9cabc9af7919",
    },
    "persona:kevin": {
        "full_name": "Kevin Hart",
        "id": "9921b1b8-7a2e-46be-9048-cfc0d91be315",
    },
    "persona:jordan": {
        "full_name": "Jordan Peterson",
        "id": "bb5135cd-2d9d-4e41-a15f-7cae0b6fa560",
    },
    "persona:mandela": {
        "full_name": "Nelson Mandela",
        "id": "0ccd4180-598d-4818-aa27-fb126e3c50fb",
    },
}

# tool:X --> tool.id
mapping_dictionary_for_telegram_conversation_tools = {
    "tool:brainstorm": {
        "sub_tool_name": "Brainstorming",
        "sub_tool_id": "14f448a5-5471-4f47-8f2a-f7eaf88769bb",
    },
    "tool:resume": {
        "sub_tool_name": "Targeted Resume",
        "sub_tool_id": "55e0bd3c-c25f-4357-b52c-0b6217884f9f",
    },
    "tool:paragrapher": {
        "sub_tool_name": "Paraphrasing Content",
        "sub_tool_id": "37fd6efc-6eb9-416b-8263-c55b12820832",
    },
    "tool:essay": {
        "sub_tool_name": "Expander and Summarize",
        "sub_tool_id": "630ca76a-9c8a-4808-a77f-ab38fe9a209f",
    },
}

In [14]:
async def populate_telegram_conversations(session: AsyncSession):
    stmt = select(TelegramConversation)
    result = await session.execute(stmt)
    prev_telegram_conversations: List[
        TelegramConversation
    ] = result.scalars().fetchall()
    new_chat_session_count = len(prev_telegram_conversations)
    prev_chat_session_count = (
        await session.execute(select(func.count()).select_from(ChatSession))
    ).scalar()
    print("prev cnt", prev_chat_session_count)
    print("new cnt", new_chat_session_count)
    for telegram_conversation in tqdm(prev_telegram_conversations):
        # create new chat session
        new_conversation: ChatSession = ChatSession()
        new_conversation.id = uuid.uuid4()
        if telegram_conversation.user_id not in telegram_new_map_id:
            continue
        new_conversation.user_id = telegram_new_map_id[telegram_conversation.user_id]

        # check if telegram_conversation.persona starts with tool:
        if telegram_conversation.persona.startswith("tool:"):
            # get the tool name and id
            tool_id = mapping_dictionary_for_telegram_conversation_tools[
                telegram_conversation.persona
            ]["sub_tool_id"]

            new_conversation.sub_tool_id = tool_id
        else:
            # get the persona name and id
            persona_id = mapping_dictionary_for_telegram_conversation_personas[
                telegram_conversation.persona
            ]["id"]

            new_conversation.persona_id = persona_id

        # chat session first message
        # fetch the first message from telegram messages by timestamp
        stmt = (
            select(TelegramMessage)
            .where(TelegramMessage.conversation_id == telegram_conversation.id)
            .order_by(TelegramMessage.timestamp.asc())
            .limit(1)
        )
        result = await session.execute(stmt)
        message: TelegramMessage | None = result.scalars().first()
        if message:
            new_conversation.first_message = message.content
            new_conversation.created_at = message.timestamp
        else:
            raise Exception("error found at", telegram_conversation)
        session.add(new_conversation)

        mapping_dictionary_for_telegram_conversation[
            telegram_conversation.id
        ] = new_conversation.id
    try:
        pass
        await session.commit()
    except Exception as e:
        await session.rollback()
        raise e
    assert (
        new_chat_session_count + prev_chat_session_count
        == (
            await session.execute(select(func.count()).select_from(ChatSession))
        ).scalar()
    )
    print("chat sessions added succesfully")

In [15]:
await populate_telegram_conversations(session)

prev cnt 860
new cnt 951


100%|███████████████████████████████████████████████████████████████████████████████████| 951/951 [02:32<00:00,  6.25it/s]


chat sessions added succesfully


In [16]:
async def populate_telegram_messages(session: AsyncSession):
    stmt = select(TelegramMessage)
    result = await session.execute(stmt)
    prev_telegram_messages: List[TelegramMessage] = result.scalars().fetchall()
    new_chat_message_count = len(prev_telegram_messages)
    prev_chat_message_count = (
        await session.execute(select(func.count()).select_from(ChatMessage))
    ).scalar()
    print(len(prev_telegram_messages))
    for telegram_message in tqdm(prev_telegram_messages):
        # create new chat session
        new_message: ChatMessage = ChatMessage()
        new_message.id = uuid.uuid4()
        if (
            telegram_message.conversation_id
            not in mapping_dictionary_for_telegram_conversation
        ):
            continue
        new_message.chat_session_id = mapping_dictionary_for_telegram_conversation[
            telegram_message.conversation_id
        ]
        new_message.message = telegram_message.content
        new_message.created_at = telegram_message.timestamp
        new_message.token_usage = telegram_message.token_usage
        new_message.role = telegram_message.role

        session.add(new_message)
    try:
        await session.commit()
    except Exception as e:
        await session.rollback()
        raise e
    assert (
        new_chat_message_count + prev_chat_message_count
        == (
            await session.execute(select(func.count()).select_from(ChatMessage))
        ).scalar()
    )
    print("chat messages added succesfully")

In [17]:
await populate_telegram_messages(session)

8112


100%|██████████████████████████████████████████████████████████████████████████████| 8112/8112 [00:00<00:00, 37558.11it/s]


chat messages added succesfully
