# Работа с базой данных

- Сессии и фабрики сессий: как управлять сессиями для взаимодействия с базой данных и как применять их через декораторы.

- Добавление данных в таблицы: разберём безопасные методы добавления записей с использованием ORM. Также обсудим метод flush и разницу между ним и commit.

- Извлечение данных из таблиц: большой блок, в котором научимся извлекать данные через select, используя фильтры (например, where, filter, filter_by). Также обсудим работу с «грязными» данными и преобразование объектов SQLAlchemy в удобные словари Python с помощью Pydantic. В этом блоке разберём и методы SQLAlchemy, такие как scalar, scalars, scalar_one_or_none, all и другие.

# Что такое сессия?
Сессия в SQLAlchemy — это основной инструмент для взаимодействия с базой данных. Представьте её как рабочую область, где происходят все операции: добавление, удаление, извлечение, обновление данных. Все запросы к базе данных выполняются через сессию, без неё никакие операции невозможны.

Сессия управляет транзакциями и следит за состоянием объектов, с которыми вы работаете. Она не устанавливает прямого соединения с базой, а абстрагирует этот процесс. Все изменения отправляются в базу данных через метод commit(). В случае ошибки их можно отменить с помощью rollback().

# Фабрика сессий
Фабрика сессий — это специальная функция для создания новых сессий по мере необходимости. В SQLAlchemy это реализуется с помощью sessionmaker(). Этот объект создаёт сессии, которые можно использовать для работы с базой данных.

async

In [None]:
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
from settings.database import DATABASE_URL



DATABASE_URL = settings.get_async_db_url() # тут выбрать свой способ связки с БД

engine = create_async_engine(url = DATABASE_URL)
session_maker = async_sessionmaker(engine, expire_on_commit=False)

class Base(AsyncAttrs, DeclarativeBase):
    __abstract__ = True # Чтобы не создавалась отдельная таблица для этого класса

sync

In [2]:
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker
from settings.database import DATABASE_URL


engine = create_engine(url = DATABASE_URL)
session_maker = sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    __abstract__ = True # Чтобы не создавалась отдельная таблица для этого класса

ModuleNotFoundError: No module named 'settings'

# Создание декоратора
для работы с сессией по мере необходимости

То есть, для того чтоб начать взаимодействовать с базой данных (например для получения или добавления туда информации) нам всегда необходимо быть в рамках сессии. Вопрос только в том какое количество операций вы будете выполнять в рамках одной такой сессии, до ее закрытия.

Основные подходы к управлению сессиями:
Открытие сессии на каждое действие: для каждого действия с базой данных создаётся новая сессия. Этот подход эффективен для небольших проектов, но на крупных проектах он может привести к дополнительным накладным расходам.

Открытие сессии на весь блок операций: сессия создаётся один раз перед серией операций и закрывается по завершению всех действий. Это позволяет объединить несколько запросов в одну сессию, что экономит ресурсы и повышает производительность.

In [None]:
# sync
session_maker = sessionmaker(bind=engine)

def connection(method):
    def wrapper(*args, **kwargs):
        with session_maker() as session:
            try:
                return method(*args, session = session, **kwargs)
            except Exception as e:
                session.rollback()
                raise e
            finally:
                session.close()
    return wrapper
    


In [None]:
# async
session_maker = async_sessionmaker(engine)

# ...

def connection(method):
    async def wrapper(*args, **kwargs):
        async with session_maker() as session:
            try:
                return await method(*args, session = session, **kwargs)
            except Exception as e:
                await session.rollback()
                raise e
            finally:
                await session.close()
    return wrapper


Как работает этот декоратор:

connection принимает исходную функцию для обёртки.

wrapper — это функция-обёртка, которая принимает все аргументы исходной функции.

async with async_session_maker() автоматически создаёт и закрывает сессию в асинхронном режиме, освобождая вас от необходимости управлять сессией вручную.

Сессия передаётся в исходную функцию через аргумент session.

В случае ошибки выполняется откат транзакции через rollback(), а затем сессия закрывается.

Исравим функцию добавления пользователя в базу, так чтобы при его создании у нас создавался сразу профиль этого пользователя.

In [None]:
class User(Base):
    
    ...
    
    @classmethod
    @connection
    def create_user(cls, 
                    username: str, 
                    email: str, 
                    password: str,
                    gender: GenderEnum,
                    name: str = None,
                    surname: str = None,
                    age: int = None,
                    profession: ProfessionEnum = ProfessionEnum.UNEMPLOYED,
                    interests: list[str] = [],
                    contacts: dict = {},
                    session: Session = None) -> dict[str, int]:
        """Этот метод создает нового пользователя в базе данных.

        Args:
            username (str): имя пользователя
            email (str): электронная почта пользователя
            password (str): пароль пользователя
            session (Session, optional): db session. Defaults to None.

        Returns:
            User: новый созданный пользователь

        """
        
        hash_pw = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
        new_user = User(username=username, email=email, password=str(hash_pw)[2:-1])
        session.add(new_user)
        session.commit()
        # Продолжится работа но со смежными таблицами Profile
        
        profile = Profile(
            user_id = new_user.id,
            name=name if name else username,
            surname=surname,
            age=age,
            gender=gender,
            profession=profession,
            interests=interests,
            contacts=contacts,
        )
        session.add(profile)
        session.commit()
        print(f'Создан пользователь с ID {new_user.id}и ему присвоен профиль с ID {profile.id}')

        return {'user_id': new_user.id, 'profile_id': profile.id}


        

Чуть оптимизируем его

In [None]:

    @classmethod
    @connection
    def create_user(cls, 
                    username: str, 
                    email: str, 
                    password: str,
                    gender: GenderEnum,
                    name: str = None,
                    surname: str = None,
                    age: int = None,
                    profession: ProfessionEnum = ProfessionEnum.UNEMPLOYED,
                    interests: list[str] = [],
                    contacts: dict = {},
                    session: Session = None) -> dict[str, int]:
        """Этот метод создает нового пользователя в базе данных.

        Args:
            username (str): имя пользователя
            email (str): электронная почта пользователя
            password (str): пароль пользователя
            session (Session, optional): db session. Defaults to None.

        Returns:
            User: новый созданный пользователь

        """
        
        hash_pw = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
        new_user = User(username=username, email=email, password=str(hash_pw)[2:-1])
    
        session.add(new_user)
        session.flush() # Промежуточный шаг для получения user.id без коммита
    
        profile = Profile(
            user_id = new_user.id,
            name=name if name else username,
            surname=surname,
            age=age,
            gender=gender,
            profession=profession,
            interests=interests,
            contacts=contacts,
        )
    
        session.add(profile)
        session.commit()
        print(f'Создан пользователь с ID {new_user.id}и ему присвоен профиль с ID {profile.id}')

        return {'user_id': new_user.id, 'profile_id': profile.id}


        

- flush в SQLAlchemy отправляет изменения в базу данных без их окончательной фиксации, то есть без выполнения коммита. Это полезно, когда нужно сгенерировать данные, такие как идентификаторы (например, user.id), чтобы использовать их до фактического сохранения данных в базе. При этом сама транзакция остаётся открытой, и окончательное сохранение происходит позже, при вызове commit.

Почему это более оптимально:
- Работа с промежуточными данными: flush позволяет работать с данными, которые ещё не записаны в базу окончательно, но уже доступны для использования. Например, после создания пользователя вы можете получить его user.id и использовать его для добавления профиля, не выполняя коммит между этими операциями.

- Сокращение количества транзакций: Используя flush, мы избегаем нескольких коммитов. Это снижает нагрузку на базу данных, так как все изменения будут зафиксированы одним коммитом в конце транзакции. Таким образом, база данных фиксирует изменения только один раз, что ускоряет выполнение операций.

Кодом ниже можно сделать довавление сразу нескольких новых пользователей

In [None]:
users_list = [
        User(
            username=user_data['username'],
            email=user_data['email'],
            password=user_data['password']
        )
    for user_data in users_data
]
session.add_all(users_list)
session.commit()

Расширим основной класс Base и пропишем туда методы add и add_many как шаблонных запрос для любой модели

In [None]:
from typing import Any, Dict, List, Self
from sqlalchemy.orm import Session


class Base(DeclarativeBase):
        
    ... #(прошлый код)
    
    @classmethod
    @connection
    def add(cls,
            session: Session = None,
            **values) -> Self:
        new_instance = cls(**values)
        session.add(new_instance)
        try:
            session.commit()
        except Exception as e:
            session.rollback()
            raise e
        return new_instance

    
    @classmethod
    @connection
    def add_many(cls,
                 instances: List[Dict[str, Any]], 
                 session: Session = None) -> List[Self]:
        new_instances = [cls(**values) for values in instances]
        session.add_all(new_instances)
        try:
            session.commit()
        except Exception as e:
            session.rollback()
            raise e
        return new_instances      

Основные моменты:



1. Метод add: Этот метод позволяет добавить одну запись (например, одного пользователя) в базу данных.

   - Он принимает сессию базы данных и значения для полей записи в виде именованных аргументов (**values).

   - Создаётся новый экземпляр модели с переданными данными, затем он добавляется в сессию.

   - После этого вызывается commit, чтобы зафиксировать изменения в базе данных.

   - В случае ошибки, происходит откат (rollback), и ошибка выбрасывается.

2. Метод add_many: Этот метод используется для добавления сразу нескольких записей в базу данных за один раз.

    - Он принимает список словарей, где каждый словарь содержит данные для одной записи.

    - Из этих словарей создаются экземпляры модели и добавляются в сессию с помощью add_all.

    - После добавления всех экземпляров вызывается коммит для сохранения изменений.

    - Если возникает ошибка, как и в первом методе, вызывается откат транзакции и ошибка поднимается дальше.