# Итоговое домашнее задание: Python для инженерии данных
## Володин Фёдор Алексеевич

**Этап 1: Генерация данных**

Подключаемся к PGSql и создаем Базы

In [1]:
import psycopg2

conn_params = {
    "dbname": "final",
    "user": "username",
    "password": "password",
    "host": "localhost",
    "port": 5432
}

# Запросы для создания таблиц
create_tables_queries = [
    """
    CREATE TABLE IF NOT EXISTS Users (
        user_id SERIAL PRIMARY KEY,
        first_name VARCHAR(50) NOT NULL,
        last_name VARCHAR(50) NOT NULL,
        email VARCHAR(100) UNIQUE NOT NULL,
        phone VARCHAR(15) UNIQUE,
        registration_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        loyalty_status VARCHAR(20) CHECK (loyalty_status IN ('Gold', 'Silver', 'Bronze')) DEFAULT 'Bronze'
    );
    """,
    """
    CREATE TABLE IF NOT EXISTS Products (
        product_id SERIAL PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        description TEXT,
        category_id INT NOT NULL,
        price NUMERIC(10, 2) NOT NULL,
        stock_quantity INT DEFAULT 0,
        creation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    """,
    """
    CREATE TABLE IF NOT EXISTS ProductCategories (
        category_id SERIAL PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        parent_category_id INT REFERENCES ProductCategories(category_id)
    );
    """,
    """
    CREATE TABLE IF NOT EXISTS Orders (
        order_id SERIAL PRIMARY KEY,
        user_id INT REFERENCES Users(user_id),
        order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        total_amount NUMERIC(10, 2) NOT NULL,
        status VARCHAR(20) CHECK (status IN ('Pending', 'Completed', 'Cancelled')) DEFAULT 'Pending',
        delivery_date TIMESTAMP
    );
    """,
    """
    CREATE TABLE IF NOT EXISTS OrderDetails (
        order_detail_id SERIAL PRIMARY KEY,
        order_id INT REFERENCES Orders(order_id),
        product_id INT REFERENCES Products(product_id),
        quantity INT NOT NULL,
        price_per_unit NUMERIC(10, 2) NOT NULL,
        total_price NUMERIC(10, 2) GENERATED ALWAYS AS (quantity * price_per_unit) STORED
    );
    """
]

# Создание таблиц
try:
    conn = psycopg2.connect(**conn_params)
    cursor = conn.cursor()
    
    for query in create_tables_queries:
        cursor.execute(query)
        print("Таблица успешно создана!")

    conn.commit()
    print("Все таблицы успешно созданы!")

except Exception as e:
    print("Ошибка при создании таблиц:", e)

finally:
    if conn:
        cursor.close()
        conn.close()
        print("Соединение с базой данных закрыто.")

Таблица успешно создана!
Таблица успешно создана!
Таблица успешно создана!
Таблица успешно создана!
Таблица успешно создана!
Все таблицы успешно созданы!
Соединение с базой данных закрыто.


Генерация данных для Users

In [3]:
from faker import Faker
from random import choice

fake = Faker()

def generate_users_data(num_records):
    data = []
    for _ in range(num_records):
        first_name = fake.first_name()
        last_name = fake.last_name()
        email = fake.unique.email()
        phone = fake.msisdn()[:15]
        registration_date = fake.date_time_this_year()
        loyalty_status = choice(['Gold', 'Silver', 'Bronze'])
        data.append((first_name, last_name, email, phone, registration_date, loyalty_status))
    return data


def insert_users_data(data):
    try:
        conn = psycopg2.connect(**conn_params)
        cursor = conn.cursor()
        
        insert_query = """
        INSERT INTO Users (first_name, last_name, email, phone, registration_date, loyalty_status)
        VALUES (%s, %s, %s, %s, %s, %s);
        """
        cursor.executemany(insert_query, data)
        conn.commit()
        print(f"{len(data)} записей успешно добавлено в таблицу Users!")
    
    except Exception as e:
        print("Ошибка при вставке данных:", e)
    
    finally:
        if conn:
            cursor.close()
            conn.close()
            print("Соединение с базой данных закрыто.")

users_data = generate_users_data(100)  # Создаем 100 записей
insert_users_data(users_data)


100 записей успешно добавлено в таблицу Users!
Соединение с базой данных закрыто.


Генерация данных для ProductCategories

In [4]:
# Список осмысленных корневых категорий
root_category_names = [
    "Electronics", "Clothing", "Books", "Home Appliances", "Furniture",
    "Toys", "Sports Equipment", "Beauty & Personal Care", "Automotive", "Grocery"
]

# Список осмысленных дочерних категорий для каждой корневой
child_category_map = {
    "Electronics": ["Smartphones", "Laptops", "Cameras"],
    "Clothing": ["Men", "Women", "Kids"],
    "Books": ["Fiction", "Non-Fiction", "Comics"],
    "Home Appliances": ["Kitchen", "Laundry", "Air Conditioning"],
    "Furniture": ["Living Room", "Bedroom", "Office"],
    "Toys": ["Educational", "Outdoor", "Board Games"],
    "Sports Equipment": ["Fitness", "Outdoor Sports", "Team Sports"],
    "Beauty & Personal Care": ["Makeup", "Haircare", "Skincare"],
    "Automotive": ["Car Accessories", "Motorcycles", "Spare Parts"],
    "Grocery": ["Fruits", "Vegetables", "Beverages"]
}

def insert_root_categories_from_list(names):
    try:
        conn = psycopg2.connect(**conn_params)
        cursor = conn.cursor()
        
        root_ids = []
        for name in names:
            cursor.execute(
                "INSERT INTO ProductCategories (name, parent_category_id) VALUES (%s, NULL) RETURNING category_id;",
                (name,)
            )
            root_ids.append((name, cursor.fetchone()[0]))
        
        conn.commit()
        print(f"{len(root_ids)} корневых категорий успешно добавлено!")
        return root_ids
    
    except Exception as e:
        print("Ошибка при вставке корневых категорий:", e)
        return []
    
    finally:
        if conn:
            cursor.close()
            conn.close()


def insert_child_categories_from_map(root_ids, child_map):
    try:
        conn = psycopg2.connect(**conn_params)
        cursor = conn.cursor()
        
        for root_name, root_id in root_ids:
            if root_name in child_map:
                child_categories = child_map[root_name]
                for child_name in child_categories:
                    cursor.execute(
                        "INSERT INTO ProductCategories (name, parent_category_id) VALUES (%s, %s);",
                        (child_name, root_id)
                    )
        
        conn.commit()
        print("Дочерние категории успешно добавлены!")
    
    except Exception as e:
        print("Ошибка при вставке дочерних категорий:", e)
    
    finally:
        if conn:
            cursor.close()
            conn.close()


root_ids = insert_root_categories_from_list(root_category_names)
if root_ids:
    insert_child_categories_from_map(root_ids, child_category_map)

10 корневых категорий успешно добавлено!
Дочерние категории успешно добавлены!


Следующая таблица — Products

In [5]:
from random import randint, uniform, choice

def generate_products_data(category_ids):
    products = [
        {"category": "Electronics", "products": ["iPhone", "MacBook", "DSLR Camera"]},
        {"category": "Clothing", "products": ["T-Shirt", "Jeans", "Jacket"]},
        {"category": "Books", "products": ["The Great Gatsby", "1984", "Harry Potter"]},
        {"category": "Home Appliances", "products": ["Microwave", "Refrigerator", "Washing Machine"]},
        {"category": "Furniture", "products": ["Sofa", "Bed", "Office Chair"]},
    ]

    data = []
    for product_group in products:
        category_name = product_group["category"]
        category_id = category_ids.get(category_name)
        
        if category_id:
            for product_name in product_group["products"]:
                description = f"{product_name} from {category_name} category"
                price = round(uniform(10.0, 1000.0), 2)
                stock_quantity = randint(1, 100)
                creation_date = fake.date_this_year()
                data.append((product_name, description, category_id, price, stock_quantity, creation_date))
    
    return data

def insert_products_data(data):
    try:
        conn = psycopg2.connect(**conn_params)
        cursor = conn.cursor()
        
        insert_query = """
        INSERT INTO Products (name, description, category_id, price, stock_quantity, creation_date)
        VALUES (%s, %s, %s, %s, %s, %s);
        """
        cursor.executemany(insert_query, data)
        conn.commit()
        print(f"{len(data)} продуктов успешно добавлено в таблицу Products!")
    
    except Exception as e:
        print("Ошибка при вставке данных в таблицу Products:", e)
    
    finally:
        if conn:
            cursor.close()
            conn.close()


def fetch_category_ids_for_products():
    try:
        conn = psycopg2.connect(**conn_params)
        cursor = conn.cursor()
        
        cursor.execute("SELECT category_id, name FROM ProductCategories WHERE parent_category_id IS NULL;")
        categories = {row[1]: row[0] for row in cursor.fetchall()}
        return categories
    
    except Exception as e:
        print("Ошибка при получении категорий:", e)
        return {}
    
    finally:
        if conn:
            cursor.close()
            conn.close()


category_ids = fetch_category_ids_for_products()
if category_ids:
    products_data = generate_products_data(category_ids)
    insert_products_data(products_data)

15 продуктов успешно добавлено в таблицу Products!


Функции для Orders

In [6]:
from random import choice, randint
from datetime import datetime, timedelta

def generate_orders_data(user_ids, num_orders):
    data = []
    statuses = ['Pending', 'Completed', 'Cancelled']
    
    for _ in range(num_orders):
        user_id = choice(user_ids)
        order_date = fake.date_time_this_year()
        total_amount = round(uniform(50.0, 500.0), 2)  # Случайная сумма заказа
        status = choice(statuses)
        delivery_date = order_date + timedelta(days=randint(1, 10)) if status == 'Completed' else None
        data.append((user_id, order_date, total_amount, status, delivery_date))
    
    return data


def insert_orders_data_individually(data):
    try:
        conn = psycopg2.connect(**conn_params)
        cursor = conn.cursor()
        
        order_ids = []
        insert_query = """
        INSERT INTO Orders (user_id, order_date, total_amount, status, delivery_date)
        VALUES (%s, %s, %s, %s, %s)
        RETURNING order_id;
        """
        
        for order in data:
            cursor.execute(insert_query, order)
            order_id = cursor.fetchone()[0]  # Получаем сгенерированный ID заказа
            order_ids.append(order_id)
        
        conn.commit()
        print(f"{len(order_ids)} заказов успешно добавлено в таблицу Orders!")
        return order_ids
    
    except Exception as e:
        print("Ошибка при вставке данных в таблицу Orders:", e)
        return []
    
    finally:
        if conn:
            cursor.close()
            conn.close()

Функции для OrderDetails

In [7]:
from random import sample

def generate_order_details_data(order_ids, product_ids, max_items_per_order):
    data = []
    
    for order_id in order_ids:
        num_items = randint(1, max_items_per_order)  # Количество товаров в заказе
        selected_products = sample(product_ids, num_items)  # Выбор случайных продуктов без замены
        
        for product_id in selected_products:
            quantity = randint(1, 5)  # Случайное количество
            price_per_unit = round(uniform(10.0, 100.0), 2)  # Цена за единицу
            data.append((order_id, product_id, quantity, price_per_unit))
    
    return data

def insert_order_details_data(data):
    try:
        conn = psycopg2.connect(**conn_params)
        cursor = conn.cursor()
        
        insert_query = """
        INSERT INTO OrderDetails (order_id, product_id, quantity, price_per_unit)
        VALUES (%s, %s, %s, %s);
        """
        cursor.executemany(insert_query, data)
        conn.commit()
        print(f"{len(data)} записей успешно добавлено в таблицу OrderDetails!")
    
    except Exception as e:
        print("Ошибка при вставке данных в таблицу OrderDetails:", e)
    
    finally:
        if conn:
            cursor.close()
            conn.close()

Генерация данных для OrderDetails и Orders

In [8]:
def fetch_user_ids():
    try:
        conn = psycopg2.connect(**conn_params)
        cursor = conn.cursor()
        
        cursor.execute("SELECT user_id FROM Users;")
        user_ids = [row[0] for row in cursor.fetchall()]
        return user_ids
    
    except Exception as e:
        print("Ошибка при получении ID пользователей:", e)
        return []
    
    finally:
        if conn:
            cursor.close()
            conn.close()

def fetch_product_ids():
    try:
        conn = psycopg2.connect(**conn_params)
        cursor = conn.cursor()
        
        cursor.execute("SELECT product_id FROM Products;")
        product_ids = [row[0] for row in cursor.fetchall()]
        return product_ids
    
    except Exception as e:
        print("Ошибка при получении ID продуктов:", e)
        return []
    
    finally:
        if conn:
            cursor.close()
            conn.close()


user_ids = fetch_user_ids()
product_ids = fetch_product_ids()

if user_ids and product_ids:
    # Генерация заказов
    orders_data = generate_orders_data(user_ids, 50)  # 50 заказов
    order_ids = insert_orders_data_individually(orders_data)

    # Генерация деталей заказов
    if order_ids:
        order_details_data = generate_order_details_data(order_ids, product_ids, 5)  # До 5 товаров на заказ
        insert_order_details_data(order_details_data)



50 заказов успешно добавлено в таблицу Orders!
152 записей успешно добавлено в таблицу OrderDetails!


**Этап 2: Репликация данных**

DAG-и для выполнения репликации данных и генерации аналитических витрин предоставлены в данном ноутбуке в качестве примера. Однако их корректный запуск выполняется в Airflow, который развёрнут с использованием Docker Compose.

Подробная инструкция по развёртыванию Airflow в Docker Compose и настройке пайплайнов описана в пункте 4 и в файле `README.md`.

Для успешного выполнения данных этапов необходимо:
1. Убедиться, что Airflow настроен и запущен через Docker Compose.
2. Загрузить предоставленные DAG-и в папку `dags/` в Airflow.
3. Запустить пайплайны через веб-интерфейс Airflow.

Примерный порядок настройки и запуска DAG-ов описан в README.

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime
import pandas as pd


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

with DAG(
        'replicate_postgres_to_mysql',
        default_args=default_args,
        description='Replicate data from PostgreSQL to MySQL',
        schedule_interval=None,
        start_date=datetime(2025, 1, 1),
        catchup=False,
) as dag:

    def create_mysql_table(mysql_conn_id, table_name):
        #Создание таблицы в MySQL, если она не существует
        mysql_hook = MySqlHook(mysql_conn_id)


        table_schemas = {
            'Users': """
                CREATE TABLE IF NOT EXISTS Users (
                    user_id INT PRIMARY KEY,
                    first_name VARCHAR(50) NOT NULL,
                    last_name VARCHAR(50) NOT NULL,
                    email VARCHAR(100) UNIQUE NOT NULL,
                    phone VARCHAR(15) UNIQUE,
                    registration_date TIMESTAMP,
                    loyalty_status VARCHAR(20)
                );
            """,
            'Products': """
                CREATE TABLE IF NOT EXISTS Products (
                    product_id INT PRIMARY KEY,
                    name VARCHAR(100) NOT NULL,
                    description TEXT,
                    category_id INT NOT NULL,
                    price DECIMAL(10, 2) NOT NULL,
                    stock_quantity INT,
                    creation_date TIMESTAMP
                );
            """,
            'ProductCategories': """
                CREATE TABLE IF NOT EXISTS ProductCategories (
                    category_id INT PRIMARY KEY,
                    name VARCHAR(100) NOT NULL,
                    parent_category_id INT
                );
            """,
            'Orders': """
                CREATE TABLE IF NOT EXISTS Orders (
                    order_id INT PRIMARY KEY,
                    user_id INT,
                    order_date TIMESTAMP,
                    total_amount DECIMAL(10, 2) NOT NULL,
                    status VARCHAR(20),
                    delivery_date TIMESTAMP
                );
            """,
            'OrderDetails': """
                CREATE TABLE IF NOT EXISTS OrderDetails (
                    order_detail_id INT PRIMARY KEY,
                    order_id INT,
                    product_id INT,
                    quantity INT NOT NULL,
                    price_per_unit DECIMAL(10, 2) NOT NULL,
                    total_price DECIMAL(10, 2)
                );
            """,
        }

        create_table_sql = table_schemas.get(table_name)
        if create_table_sql:
            mysql_hook.run(create_table_sql)
            print(f"Таблица {table_name} создана (если её не было).")


    def replicate_table(postgres_conn_id, mysql_conn_id, table_name):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        from airflow.providers.mysql.hooks.mysql import MySqlHook
        import pandas as pd

        postgres_hook = PostgresHook(postgres_conn_id)
        mysql_hook = MySqlHook(mysql_conn_id)

        create_mysql_table(mysql_conn_id, table_name)

        sql = f"SELECT * FROM {table_name};"
        records = postgres_hook.get_pandas_df(sql)
        print(f"Извлеченные данные из таблицы {table_name}:")
        print(records.head())

        for col in records.columns:
            # Для строковых полей
            if records[col].dtype.kind in {'O', 'U', 'S'}:
                # Пусть пропуски станут пустыми строками
                records[col] = records[col].fillna('')

            # Для числовых (включая int, float, unsigned)
            elif records[col].dtype.kind in {'i', 'u', 'f'}:
                # Превращаем колонку в object, чтобы pandas мог хранить None
                records[col] = records[col].astype(object)
                # Заменяем NaN/NaT на None (это будет NULL в MySQL)
                records[col] = records[col].where(records[col].notnull(), None)

            # Для дат (dtype == 'datetime64[ns]')
            elif records[col].dtype.kind == 'M':
                # Переводим в object, чтобы пустые значения могли быть None
                records[col] = records[col].astype(object).where(records[col].notnull(), None)

        # Очищаем таблицу в MySQL перед вставкой
        mysql_hook.run(f"TRUNCATE TABLE {table_name};")

        # Пытаемся вставить данные
        try:
            mysql_hook.insert_rows(
                table=table_name,
                rows=records.values.tolist(),
                target_fields=records.columns.tolist()
            )
            print(f"Таблица {table_name} успешно реплицирована!")
        except Exception as e:
            print(f"Ошибка при репликации таблицы {table_name}: {e}")


    # Список таблиц для репликации
    tables = ['Users', 'Products', 'ProductCategories', 'Orders', 'OrderDetails']

    # Создаём задачи на репликацию для каждой таблицы
    for table in tables:
        replicate_task = PythonOperator(
            task_id=f'replicate_{table.lower()}',
            python_callable=replicate_table,
            op_kwargs={
                'postgres_conn_id': 'postgres_connection', 
                'mysql_conn_id': 'mysql_connection',
                'table_name': table,
            },
        )

**Этап 3: Построение аналитических витрин**

DAG-и для выполнения репликации данных и генерации аналитических витрин предоставлены в данном ноутбуке в качестве примера. Однако их корректный запуск выполняется в Airflow, который развёрнут с использованием Docker Compose.

Подробная инструкция по развёртыванию Airflow в Docker Compose и настройке пайплайнов описана в пункте 4 и в файле `README.md`.

Для успешного выполнения данных этапов необходимо:
1. Убедиться, что Airflow настроен и запущен через Docker Compose.
2. Загрузить предоставленные DAG-и в папку `dags/` в Airflow.
3. Запустить пайплайны через веб-интерфейс Airflow.

Примерный порядок настройки и запуска DAG-ов описан в README.

## Аналитические витрины

### Витрина активности пользователей

- **Описание**: Витрина содержит данные о поведении пользователей для анализа их активности.
- **Поля**:
  - `user_id` (int): Идентификатор пользователя.
  - `first_name` (text): Имя пользователя.
  - `last_name` (text): Фамилия пользователя.
  - `email` (text): Электронная почта.
  - `phone` (text): Номер телефона.
  - `registration_date` (timestamp): Дата регистрации.
  - `loyalty_status` (text): Статус лояльности (например, Gold, Silver).
  - `total_orders` (bigint): Общее количество заказов пользователя.
  - `total_amount` (decimal(20,2)): Общая сумма покупок пользователя.
- **Источник данных**: Таблицы `Users` и `Orders` из базы данных MySQL.
- **Применение**: Используется для анализа клиентской активности и сегментации пользователей.

### Витрина топ-продуктов

- **Описание**: Витрина содержит данные о продажах продуктов, включая популярность и выручку.
- **Поля**:
  - `product_id` (int): Идентификатор продукта.
  - `name` (text): Название продукта.
  - `description` (text): Описание продукта.
  - `category_id` (int): Идентификатор категории продукта.
  - `price` (decimal(10,2)): Цена продукта.
  - `stock_quantity` (int): Количество товара на складе.
  - `creation_date` (timestamp): Дата добавления продукта.
  - `total_quantity` (bigint): Общее количество проданных единиц продукта.
  - `total_revenue` (decimal(20,2)): Общая выручка от продаж продукта.
- **Источник данных**: Таблицы `Products` и `OrderDetails` из базы данных MySQL.
- **Применение**: Используется для анализа популярности товаров и планирования ассортимента.

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from pyspark.sql import SparkSession
from datetime import datetime

def create_spark_session():
    return SparkSession.builder \
        .appName("MySQL_to_Vittrines") \
        .config("spark.jars", "/path/to/mysql-connector-java.jar") \
        .getOrCreate()

def generate_user_activity_vitrine(mysql_conn_id):
    """Генерация витрины активности пользователей"""
    mysql_hook = MySqlHook(mysql_conn_id)

    spark = create_spark_session()

    users_df = spark.read.format("jdbc").options(
        url="jdbc:mysql://localhost:3306/final",
        driver="com.mysql.cj.jdbc.Driver",
        dbtable="Users",
        user="username",
        password="password"
    ).load()

    orders_df = spark.read.format("jdbc").options(
        url="jdbc:mysql://localhost:3306/final",
        driver="com.mysql.cj.jdbc.Driver",
        dbtable="Orders",
        user="username",
        password="password"
    ).load()

    # Подсчет активности пользователей
    user_activity_df = orders_df.groupBy("user_id").agg(
        count("order_id").alias("total_orders"),
        sum("total_amount").alias("total_amount")
    )

    user_activity = users_df.join(user_activity_df, "user_id", "left")

    # Запись витрины в MySQL
    user_activity.write.format("jdbc").options(
        url="jdbc:mysql://localhost:3306/final",
        driver="com.mysql.cj.jdbc.Driver",
        dbtable="UserActivityVitrine",
        user="username",
        password="password"
    ).mode("overwrite").save()

def generate_top_products_vitrine(mysql_conn_id):
    """Генерация витрины топ-продуктов"""
    mysql_hook = MySqlHook(mysql_conn_id)

    # Инициализация Spark
    spark = create_spark_session()

    # Загрузка данных из MySQL
    products_df = spark.read.format("jdbc").options(
        url="jdbc:mysql://localhost:3306/final",
        driver="com.mysql.cj.jdbc.Driver",
        dbtable="Products",
        user="username",
        password="password"
    ).load()

    order_details_df = spark.read.format("jdbc").options(
        url="jdbc:mysql://localhost:3306/final",
        driver="com.mysql.cj.jdbc.Driver",
        dbtable="OrderDetails",
        user="username",
        password="password"
    ).load()

    # Подсчет продаж по продуктам
    product_sales_df = order_details_df.groupBy("product_id").agg(
        sum("quantity").alias("total_quantity"),
        sum("total_price").alias("total_revenue")
    )

    top_products = products_df.join(product_sales_df, "product_id", "left")

    # Запись витрины в MySQL
    top_products.write.format("jdbc").options(
        url="jdbc:mysql://localhost:3306/final",
        driver="com.mysql.cj.jdbc.Driver",
        dbtable="TopProductsVitrine",
        user="username",
        password="password"
    ).mode("overwrite").save()

def generate_vitrines(mysql_conn_id):
    generate_user_activity_vitrine(mysql_conn_id)
    generate_top_products_vitrine(mysql_conn_id)

# Инициализация DAG
with DAG(
        'generate_vitrines',
        default_args={
            'owner': 'airflow',
            'depends_on_past': False,
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 0,
        },
        description='Generate analytic vitrines from MySQL',
        schedule_interval=None,
        start_date=datetime(2025, 1, 1),
        catchup=False,
) as dag:

    generate_vitrines_task = PythonOperator(
        task_id='generate_vitrines',
        python_callable=generate_vitrines,
        op_kwargs={'mysql_conn_id': 'mysql_connection'}
    )

**Этап 4: Использование Airflow**
1. **Настройка окружения для работы с Airflow и базами данных:**
   - Установить Docker и Docker Compose.
   - Создать `Dockerfile` для настройки среды Airflow. (Файл в репозитории)
   - Создать файл `docker-compose.yml` для подъёма сервисов, включающих: (Файл в репозитории)
     - Airflow Scheduler.
     - Airflow Webserver.
     - PostgreSQL для источника данных.
     - MySQL для целевой базы данных.

2. **Поднять сервисы через Docker Compose:**
   - Выполняем команду:
     ```bash
     docker-compose up -d
     ```
     
3. **Настроить подключения в Airflow:**
   - Перейти в веб-интерфейс Airflow. (по умолчанию localhost:8080)
   - Создаем подключения для баз данных:
     - **PostgreSQL:**
       - `Conn Id`: `postgres_connection`.
       - `Conn Type`: `Postgres`.
       - Ввести данные подключения:
         - `Host`: адрес сервера PostgreSQL.
         - `Schema`: имя базы данных.
         - `Login`: имя пользователя.
         - `Password`: пароль пользователя.
         - `Port`: `5432`.
     - **MySQL:**
       - `Conn Id`: `mysql_connection`.
       - `Conn Type`: `MySQL`.
       - Ввести данные подключения:
         - `Host`: адрес сервера MySQL.
         - `Schema`: имя базы данных.
         - `Login`: имя пользователя.
         - `Password`: пароль пользователя.
         - `Port`: `3306`.

4. **Подготовить DAG для репликации данных:**
   - Создаем DAG для репликации данных из PostgreSQL в MySQL.
   - Основные задачи DAG:
     - Извлечение данных из PostgreSQL с использованием SQL-запросов.
     - Загрузка данных в MySQL.

5. **Настроить DAG для аналитических витрин:**
   - Создаем DAG для формирования аналитических витрин на основе данных из MySQL.
   - Основные задачи DAG:
     - Выполнение запросов для агрегации данных.
     - Сохранение результатов в новые таблицы витрин.

6. **Проверка работы DAG'ов:**
   - Запустить DAG'и в веб-интерфейсе Airflow.
   - Проверить логи выполнения задач на успешность.
   - Убедиться, что данные корректно реплицированы и аналитические витрины созданы.