In [3]:
%load_ext autoreload
%autoreload 2

Test notebook with selecting data, validation and display.

In [None]:
import argparse
import asyncio
import sys
import random
import datetime
from decimal import Decimal
from typing import List, Optional
from collections import defaultdict

from tabulate import tabulate

from loguru import logger

from src.services.db.database import DataBase
from src.services.db.entities.user import *
from server.src.services.db.schemas.schemas import *

from sqlalchemy import (
    cast,
    and_,
    select,
    func,
)

from sqlalchemy.orm import (
    selectinload,
    joinedload,
)

In [2]:
data_base = DataBase()
await data_base.init_alchemy_engine()

async with data_base.async_session() as session:
    query = (
        select (
            Purchase,
        )
        .options(
            selectinload(Purchase.delivery_group).options(
                joinedload(DeliveryGroup.target_user)
            ),
            selectinload(Purchase.buyer),
            selectinload(Purchase.seller),
            selectinload(Purchase.items).options(
                selectinload(PurchaseItem.product_type).options(
                    selectinload(ProductType.seller).options(
                        joinedload(Seller.user)
                    ),
                    selectinload(ProductType.author),
                )
            )
        )
        .filter(and_(
                Purchase.payment_method == PaymentMethodEnum.card,
                Purchase.buyer_id == '6bc9fde4-25ea-47f6-a860-4d2a9ceaac90',
            ))
        )
        
    
    result = await session.execute(query)
    row_items = result.scalars().all()
    items_dto = [PurchaseDTO.model_validate(row, from_attributes=True) for row in row_items]
    
    logger.info(f"Founded {len(items_dto)} results")

    headers = ["ID", "Date", "Payment", "Buyer", "Items Count", "Product Types", "Total"]
    table_data = []
    
    for purchase in items_dto:
        total = sum(item.unit_cost * item.quantity for item in purchase.items)

        type_counts = {}
        for item in purchase.items:
            type_name = item.product_type.name
            if type_name not in type_counts:
                type_counts[type_name] = 0
            type_counts[type_name] += item.quantity
        
        types_list = [f"{name}×{qty}" for name, qty in type_counts.items()]
        types_str = ", ".join(types_list)
        
        buyer_name = " ".join(filter(None, [
            purchase.buyer.first_name,
            purchase.buyer.middle_name,
            purchase.buyer.last_name
        ]))
        
        table_data.append([
            str(purchase.id)[:8],
            purchase.timestamp.strftime("%Y-%m-%d %H:%M"),
            purchase.payment_method.value,
            buyer_name,
            type_counts[type_name],
            types_str,
            f"₽{total:.2f}"
        ])
    
    table = tabulate(table_data, headers=headers, tablefmt="grid")
    logger.info(f"\n{table}")

[32m2025-08-01 20:14:24.536[0m | [1mINFO    [0m | [36msrc.services.db.database[0m:[36minit_alchemy_engine[0m:[36m32[0m - [1mStarting service..[0m
[32m2025-08-01 20:14:24.586[0m | [1mINFO    [0m | [36msrc.services.db.database[0m:[36minit_alchemy_engine[0m:[36m51[0m - [1m[32mConnection with data base has been established![0m
[32m2025-08-01 20:14:24.675[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m35[0m - [1mFounded 14 results[0m
[32m2025-08-01 20:14:24.680[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m70[0m - [1m
+----------+------------------+-----------+--------------------------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [52]:
async def _get_random_string(length=10) -> str:
    return ''.join(random.choice('abcdefghijklmnopqrstuvwxyz') for i in range(length))


async with data_base.async_session() as session:
    try:
        result = await session.execute(
            select(
                ProductType
            )
        )
        product_types = result.scalars().all()

        for type in product_types:
            type.name = await _get_random_string(random.randint(15, 128))

        await session.commit()

        logger.info(f"Updated {len(product_types)} product types with random names")
         
    except Exception as ex:
        await session.rollback()
        logger.error(f"Error seeding data: {ex}")
        import traceback
        logger.debug(traceback.format_exc())

[32m2025-08-01 02:10:48.111[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m19[0m - [1mUpdated 989 product types with random names[0m
