# 1. Создание контекстного менеджера с помощью @contextmanager

In [1]:
from contextlib import contextmanager
import sqlite3

@contextmanager
def file_open(filename, mode='r'):
    f = open(filename, mode)
    try:
        yield f
    finally:
        f.close()

with file_open('example.txt', 'w') as f:
    f.write("Hello, world!")

# Аналогично работает с файлами в других модулях:
with sqlite3.connect('database.db') as conn:
    cursor = conn.cursor()
    # Выполнение операций с базой данных


Этот пример демонстрирует, как создать контекстный менеджер для открытия и закрытия файлов или соединений с базами данных.

In [None]:
from contextlib import suppress

with suppress(FileNotFoundError, PermissionError):
    with open('nonexistent_file.txt', 'r') as file:
        content = file.read()
print(content)  # Игнорирует FileNotFoundError и PermissionError

# Аналогично можно игнорировать несколько типов исключений

In [None]:
from contextlib import nullcontext

def expensive_operation():
    print("Выполняется дорогостоящая операция")
    # Здесь могла бы быть сложная логика или ресурсоемкая операция
    print("Операция завершена")

# Функция для определения, когда пропускать выполнение
def should_skip_execution():
    return True  # В реальном сценарии это могло бы быть условие

# Использование nullcontext для пропуска выполнения
if should_skip_execution():
    with nullcontext() as _:
        expensive_operation()
else:
    print("Выполняется обычный код")

print("Программа продолжает работу")


# 2. Использование @closing для автоматического закрытия ресурсов

In [2]:
from contextlib import closing
import urllib

def read_url(url):
    with closing(urllib.request.urlopen(url)) as page:
        return page.read()

# Аналогично работает с другими объектами, поддерживающими контекстные менеджеры


Здесь @closing гарантирует закрытие объекта после выполнения блока кода.

# 3. Создание контекстного менеджера для многоканального ввода-вывода

In [None]:
import select
from contextlib import contextmanager

@contextmanager
def multiplex_io(inputs, outputs, timeout=1.0):
    readable, writable, errored = select.select(inputs, outputs, [], timeout)
    yield readable, writable, errored
    for fd in readable:
        fd.recv(1024)
    for fd in writable:
        fd.sendall(b"")
    for fd in errored:
        fd.close()

# Пример использования
inputs = [open('input.txt', 'rb'), socket.socket(socket.AF_INET, socket.SOCK_STREAM)]
outputs = [open('output.txt', 'wb'), socket.socket(socket.AF_INET, socket.SOCK_STREAM)]

with multiplex_io(inputs, outputs) as (readable, writable, errored):
    print("Waiting...")
    # Здесь можно обработать данные из файлов и сетевых сокетов


Этот пример демонстрирует использование контекстного менеджера для многоканального ввода-вывода.

# 4. Использование @redirect_stdout для изменения вывода

In [1]:
from contextlib import redirect_stdout

with open('output.txt', 'w') as f:
    with redirect_stdout(f):
        print("This will be written to output.txt")
print("This will be printed normally")

# Вывод в файл и обычный вывод могут быть объединены:
with open('combined_output.txt', 'w') as f:
    with redirect_stdout(f):
        print("First line")
    print("Second line")  # Это будет обычным выводом


This will be printed normally
Second line


Здесь @redirect_stdout позволяет изменить стандартный поток вывода на указанный файл или объект.

# 5. Создание контекстного менеджера с повторяющимся использованием

In [4]:
import contextlib

class ReusableContextManager:
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        return False  # Не игнорируем исключения

@contextlib.contextmanager
def repeat_context_manager():
    manager = ReusableContextManager()
    yield manager

with repeat_context_manager() as manager:
    print("Entering context")
    # Здесь можно использовать manager несколько раз
    print(manager)
    print(manager)


Entering context
<__main__.ReusableContextManager object at 0x000001C3BB79D460>
<__main__.ReusableContextManager object at 0x000001C3BB79D460>


Этот пример демонстрирует создание контекстного менеджера, который может быть повторно использован внутри блока with.

# 6. Использование @suppress для игнорирования определенных исключений

In [5]:
from contextlib import suppress

try:
    with open('nonexistent_file.txt', 'r') as f:
        content = f.read()
except FileNotFoundError:
    print("File not found")
else:
    print("File contents:", content)

# Аналогично работает с другими исключениями:
with suppress(FileNotFoundError, PermissionError):
    with open('protected_file.txt', 'r') as f:
        content = f.read()
    print(content)


File not found


Здесь @suppress позволяет игнорировать конкретные исключения при выполнении кода в блоке with.

# 7. ExitStack из модуля contextlib

## Базовый пример с несколькими файлами

In [7]:
from contextlib import ExitStack

def process_multiple_files(file_paths):
    with ExitStack() as stack:
        files = [stack.enter_context(open(path, 'r')) for path in file_paths]
        
        # Обработка файлов
        for i, file in enumerate(files):
            print(f"Processing file {i+1}:")
            content = file.read()
            print(f"First 20 characters of file {i+1}: {content[:20]}")
        
        # Все файлы будут автоматически закрыты при выходе из блока with

# Использование
file_paths = ['example.txt', 'output.txt', 'output3.txt']
process_multiple_files(file_paths)


Processing file 1:
First 20 characters of file 1: Hello, world!
Processing file 2:
First 20 characters of file 2: This will be written
Processing file 3:
First 20 characters of file 3: 
Python generation!


## Работа с несколькими базами данных 

In [None]:
import sqlite3
from contextlib import ExitStack

def process_multiple_databases(db_paths):
    with ExitStack() as stack:
        connections = [stack.enter_context(sqlite3.connect(path)) for path in db_paths]
        
        # Обработка баз данных
        for i, conn in enumerate(connections):
            print(f"\nProcessing database {i+1}: {conn.filename}")
            cursor = conn.cursor()
            cursor.execute("SELECT * FROM some_table")
            row = cursor.fetchone()
            while row:
                print(row)
                row = cursor.fetchone()
        
        # Все соединения будут автоматически закрыты при выходе из блока with

# Использование
db_paths = ['database1.db', 'database2.db', 'database3.db']
process_multiple_databases(db_paths)

In [None]:
from contextlib import ExitStack
import sqlite3
import psycopg2
import mysql.connector

def connect_to_db(db_type, db_name):
    if db_type == "sqlite":
        conn = sqlite3.connect(db_name)
    elif db_type == "postgres":
        conn = psycopg2.connect(db_name) # расширится подключение
    elif db_type == "mysql":
        conn = mysql.connector.connect(db_name) # расширится подключение
    else:
        raise ValueError("Unsupported database type")
    
    return conn

def query_database(db_type, db_name, query):
    with connect_to_db(db_type, db_name) as conn:
        with ExitStack() as stack:
            cursor = conn.cursor()
            
            # Открываем соединения с другими базами данных
            other_conn1 = stack.enter_context(connect_to_db(db_type, f"other_{db_name}_1"))
            other_cursor1 = other_conn1.cursor()
            
            other_conn2 = stack.enter_context(connect_to_db(db_type, f"other_{db_name}_2"))
            other_cursor2 = other_conn2.cursor()
            
            # Выполняем запросы
            cursor.execute(query)
            other_cursor1.execute("SELECT * FROM other_table")
            other_cursor2.execute("SELECT * FROM another_table")
            
            # Получаем результаты
            result = cursor.fetchall()
            result1 = other_cursor1.fetchall()
            result2 = other_cursor2.fetchall()
        
        # Закрываем соединения
        conn.close()
        other_conn1.close()
        other_conn2.close()

# Использование
query_database("sqlite", "main.db", "SELECT * FROM main_table")


## Управление ресурсами с разными типами контекстных менеджеров

In [15]:
from contextlib import ExitStack
import urllib.request
import io

def process_resources():
    with ExitStack() as stack:
        file1 = stack.enter_context(open('file.txt', 'r'))
        file2 = stack.enter_context(open('example.txt', 'r'))
        url_content = stack.enter_context(urllib.request.urlopen('https://ru.wikipedia.org/wiki/Python'))
        buffer = io.StringIO()
        
        # Обработка ресурсов
        content1 = file1.read()
        content2 = file2.read()
        url_data = url_content.read()
        buffer.write("Buffered data")
        
        print(f"File1: {content1[:20]}")
        print(f"File2: {content2[:20]}")
        print(f"URL content: {url_data[:20]}")
        print(f"Buffer content: {buffer.getvalue()[:20]}")

# Использование
process_resources()


File1: Hello, world!
File2: Hello, world!
URL content: b'<!doctype html>\n<htm'
Buffer content: Buffered data


## Динамическое управление контекстными менеджерами

In [20]:
from contextlib import ExitStack

# Создаем таблицу some_table если она не существует
create_table_query = """
    CREATE TABLE IF NOT EXISTS some_table (
        id INTEGER PRIMARY KEY,
        column1 TEXT,
        column2 INTEGER,
        -- добавьте другие столбцы по необходимости
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
"""

def dynamic_resource_management(resource_types, resource_params):
    with ExitStack() as stack:
        resources = []
        for resource_type, params in resource_params.items():
            if resource_type == 'file':
                resources.append(stack.enter_context(open(params['path'], params.get('mode', 'r'))))
            elif resource_type == 'database':
                conn = sqlite3.connect(params['path'])
                cursor = conn.cursor()
                resources.append((conn, cursor))
            # Добавьте другие типы ресурсов по необходимости
        
        # Обработка ресурсов
        for i, resource in enumerate(resources):
            print(f"\nProcessing resource {i+1}")
            if isinstance(resource, tuple):  # База данных
                conn, cursor = resource
                cursor.execute(create_table_query)
                cursor.execute("SELECT * FROM some_table")
                row = cursor.fetchone()
                while row:
                    print(row)
                    row = cursor.fetchone()
            else:  # Файл
                content = resource.read()
                print(content[:50])
        
        # Автоматическое закрытие всех ресурсов

# Использование
resource_types = ['file', 'database']
resource_params = {
    'file': {'path': 'example.txt', 'mode': 'r'},
    'database': {'path': 'example.db'}
}
dynamic_resource_management(resource_types, resource_params)



Processing resource 1
Hello, world!

Processing resource 2
