In [18]:
import csv
data_ = []
data_0 = []
with open('RBK/dags/test_file.csv', encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    next(reader)
    for row in reader:
        data_.append(tuple(row))
with open('RBK/dags/data.csv', encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    next(reader)
    for row in reader:
        data_0.append(tuple(row))
data_

[('Ларек', '0'),
 ('Минимаркет', '50'),
 ('Магазин у дома', '250'),
 ('Супермаркет', '1000'),
 ('Гипермаркет', '3000')]

In [20]:
import pandas as pd
import sqlite3
import logging
from datetime import datetime, timedelta
from typing import Dict, Any
import os

# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class AirflowTaskSimulator:
    """Класс для имитации выполнения задач Airflow"""
    
    def __init__(self, db_path: str = "test_database.db"):
        self.db_path = db_path
        self.connection = None
    
    def connect_to_db(self):
        """Подключение к базе данных"""
        try:
            self.connection = sqlite3.connect(self.db_path)
            logger.info("Успешное подключение к базе данных")
            return True
        except Exception as e:
            logger.error(f"Ошибка подключения к БД: {e}")
            return False
    
    def close_connection(self):
        """Закрытие соединения с БД"""
        if self.connection:
            self.connection.close()
            logger.info("Соединение с БД закрыто")

# Задача 1: Развертывание тестовой базы данных
def task_setup_database(**context):
    """Создание и настройка тестовой базы данных"""
    logger.info("=== TASK 1: Развертывание тестовой базы данных ===")
    
    simulator = AirflowTaskSimulator()
    if not simulator.connect_to_db():
        raise Exception("Не удалось подключиться к базе данных")
    
    # Создаем таблицы
    cursor = simulator.connection.cursor()
    
    # Таблица для персональных данных
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS employees (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            first_name TEXT NOT NULL,
            last_name TEXT NOT NULL,
            middle_name TEXT,
            birth_date DATE,
            degree_name TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    
    # Таблица для магазинов
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS stores (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            store_name TEXT NOT NULL,
            min_square REAL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    
    with open('RBK/dags/test_file.csv', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile)
        next(reader)
        for row in reader:
            data_.append(tuple(row))
    cursor.executemany('''
        INSERT OR IGNORE INTO employees (first_name, last_name, middle_name, birth_date, degree_name)
        VALUES (?, ?, ?, ?, ?)
    ''', data_)
    
    # Создаем образцы данных для stores (имитируем ваш data файл)
    with open('RBK/dags/data.csv', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile)
        next(reader)
        for row in reader:
            data_0.append(tuple(row))

    cursor.executemany('''
        INSERT OR IGNORE INTO stores (store_name, min_square)
        VALUES (?, ?)
    ''', data_0)
    
    simulator.connection.commit()
    simulator.close_connection()
    
    logger.info("База данных успешно развернута")
    return "database_setup_complete"

# Задача 2: Создание дополнительных данных
def task_create_additional_data(**context):
    """Создание CSV файла с дополнительными данными"""
    logger.info("=== TASK 2: Создание дополнительных данных ===")
    
    # Создаем дополнительные данные для сотрудников
    additional_employees = pd.DataFrame([
        {'first_name': 'Анна', 'last_name': 'Морозова', 'middle_name': 'Викторовна', 'birth_date': '1991-07-12', 'degree_name': 'Магистр'},
        {'first_name': 'Сергей', 'last_name': 'Белов', 'middle_name': 'Андреевич', 'birth_date': '1989-11-30', 'degree_name': 'Бакалавр'},
        {'first_name': 'Татьяна', 'last_name': 'Орлова', 'middle_name': 'Дмитриевна', 'birth_date': '1986-04-25', 'degree_name': 'Доктор наук'},
        {'first_name': 'Максим', 'last_name': 'Зайцев', 'middle_name': 'Сергеевич', 'birth_date': '1993-01-08', 'degree_name': 'Специалист'},
        {'first_name': 'Ольга', 'last_name': 'Федорова', 'middle_name': 'Павловна', 'birth_date': '1984-10-14', 'degree_name': 'Магистр'},
        {'first_name': 'Николай', 'last_name': 'Кузнецов', 'middle_name': 'Владимирович', 'birth_date': '1990-06-20', 'degree_name': 'Кандидат наук'},
        {'first_name': 'Екатерина', 'last_name': 'Новикова', 'middle_name': 'Игоревна', 'birth_date': '1992-12-03', 'degree_name': 'Бакалавр'},
        {'first_name': 'Андрей', 'last_name': 'Лебедев', 'middle_name': 'Александрович', 'birth_date': '1988-08-17', 'degree_name': 'Магистр'},
        {'first_name': 'Валентина', 'last_name': 'Григорьева', 'middle_name': 'Николаевна', 'birth_date': '1985-02-28', 'degree_name': 'Специалист'},
        {'first_name': 'Роман', 'last_name': 'Соколов', 'middle_name': 'Евгеньевич', 'birth_date': '1991-09-11', 'degree_name': 'Магистр'},
        {'first_name': 'Юлия', 'last_name': 'Васильева', 'middle_name': 'Сергеевна', 'birth_date': '1987-05-06', 'degree_name': 'Кандидат наук'},
        {'first_name': 'Игорь', 'last_name': 'Макаров', 'middle_name': 'Дмитриевич', 'birth_date': '1989-03-22', 'degree_name': 'Бакалавр'}
    ])
    
    # Сохраняем в CSV
    additional_employees.to_csv('additional_employees.csv', index=False, encoding='utf-8')
    
    logger.info(f"Создан файл additional_employees.csv с {len(additional_employees)} записями")
    return "additional_data_created"

# Задача 3: Обновление данных из CSV
def task_update_data_from_csv(**context):
    """Загрузка дополнительных данных в базу"""
    logger.info("=== TASK 3: Обновление данных из CSV ===")
    
    # Читаем CSV файл
    df = pd.read_csv('additional_employees.csv')
    
    simulator = AirflowTaskSimulator()
    if not simulator.connect_to_db():
        raise Exception("Не удалось подключиться к базе данных")
    
    # Загружаем данные в базу
    df.to_sql('employees', simulator.connection, if_exists='append', index=False)
    
    simulator.close_connection()
    
    logger.info(f"Загружено {len(df)} новых записей в таблицу employees")
    return "data_updated"

# Задача 4: Создание SQL функции и витрины данных
def task_create_data_mart(**context):
    """Создание витрины данных с расчетными показателями"""
    logger.info("=== TASK 4: Создание витрины данных ===")
    
    simulator = AirflowTaskSimulator()
    if not simulator.connect_to_db():
        raise Exception("Не удалось подключиться к базе данных")
    
    cursor = simulator.connection.cursor()
    
    # Создаем витрину данных с аналитическими показателями
    mart_query = '''
    CREATE TABLE IF NOT EXISTS employee_analytics_mart AS
    WITH employee_stats AS (
        SELECT 
            e.id,
            e.first_name,
            e.last_name,
            e.degree_name,
            e.birth_date,
            -- Вычисляем возраст
            CASE 
                WHEN strftime('%m-%d', 'now') >= strftime('%m-%d', e.birth_date)
                THEN strftime('%Y', 'now') - strftime('%Y', e.birth_date)
                ELSE strftime('%Y', 'now') - strftime('%Y', e.birth_date) - 1
            END as age,
            
            -- Категория по возрасту
            CASE 
                WHEN (strftime('%Y', 'now') - strftime('%Y', e.birth_date)) <= 25 THEN 'Молодые специалисты'
                WHEN (strftime('%Y', 'now') - strftime('%Y', e.birth_date)) BETWEEN 26 AND 35 THEN 'Средний возраст'
                WHEN (strftime('%Y', 'now') - strftime('%Y', e.birth_date)) BETWEEN 36 AND 50 THEN 'Опытные специалисты'
                ELSE 'Старшие специалисты'
            END as age_category,
            
            -- Ранг по степени образования
            CASE 
                WHEN e.degree_name = 'Доктор наук' THEN 5
                WHEN e.degree_name = 'Кандидат наук' THEN 4
                WHEN e.degree_name = 'Магистр' THEN 3
                WHEN e.degree_name = 'Специалист' THEN 2
                WHEN e.degree_name = 'Бакалавр' THEN 1
                ELSE 0
            END as education_rank,
            
            current_timestamp as created_at
        FROM employees e
    ),
    store_stats AS (
        SELECT 
            s.id,
            s.store_name,
            s.min_square,
            -- Категория магазина по площади
            CASE 
                WHEN s.min_square <= 100 THEN 'Мини-магазин'
                WHEN s.min_square BETWEEN 101 AND 300 THEN 'Средний магазин'
                WHEN s.min_square BETWEEN 301 AND 600 THEN 'Крупный магазин'
                ELSE 'Гипермаркет'
            END as store_category,
            
            -- Рейтинг площади (процентиль)
            PERCENT_RANK() OVER (ORDER BY s.min_square) * 100 as square_percentile
        FROM stores s
    )
    SELECT 
        'employee_' || es.id as record_id,
        'employee' as record_type,
        es.first_name || ' ' || es.last_name as full_name,
        es.age,
        es.age_category,
        es.degree_name,
        es.education_rank,
        NULL as store_info,
        es.created_at
    FROM employee_stats es
    
    UNION ALL
    
    SELECT 
        'store_' || ss.id as record_id,
        'store' as record_type,
        ss.store_name as full_name,
        NULL as age,
        ss.store_category as age_category,
        CAST(ss.min_square AS TEXT) || ' кв.м' as degree_name,
        CAST(ss.square_percentile AS INTEGER) as education_rank,
        ss.store_category || ' (' || CAST(ss.min_square AS TEXT) || ' кв.м)' as store_info,
        current_timestamp as created_at
    FROM store_stats ss
    '''
    
    # Удаляем старую витрину и создаем новую
    cursor.execute('DROP TABLE IF EXISTS employee_analytics_mart')
    cursor.execute(mart_query)
    
    # Создаем представление с агрегированной статистикой
    view_query = '''
    CREATE VIEW IF NOT EXISTS summary_statistics AS
    SELECT 
        record_type,
        age_category,
        COUNT(*) as count,
        AVG(CASE WHEN education_rank > 0 THEN education_rank END) as avg_education_rank,
        MIN(CASE WHEN education_rank > 0 THEN education_rank END) as min_education_rank,
        MAX(CASE WHEN education_rank > 0 THEN education_rank END) as max_education_rank
    FROM employee_analytics_mart
    GROUP BY record_type, age_category
    ORDER BY record_type, count DESC
    '''
    
    cursor.execute(view_query)
    simulator.connection.commit()
    
    # Получаем результаты для логирования
    cursor.execute('SELECT * FROM summary_statistics')
    results = cursor.fetchall()
    
    logger.info("Витрина данных создана успешно")
    for row in results:
        logger.info(f"Статистика: {row}")
    
    simulator.close_connection()
    return "data_mart_created"

# Имитация DAG Airflow
class SimpleDAG:
    """Простая имитация DAG Airflow"""
    
    def __init__(self, dag_id: str, description: str):
        self.dag_id = dag_id
        self.description = description
        self.tasks = []
        self.task_results = {}
    
    def add_task(self, task_id: str, task_function, depends_on=None):
        """Добавление задачи в DAG"""
        self.tasks.append({
            'task_id': task_id,
            'function': task_function,
            'depends_on': depends_on
        })
    
    def run(self):
        """Запуск DAG"""
        logger.info(f"=== Запуск DAG: {self.dag_id} ===")
        logger.info(f"Описание: {self.description}")
        
        executed_tasks = set()
        
        while len(executed_tasks) < len(self.tasks):
            for task in self.tasks:
                if task['task_id'] in executed_tasks:
                    continue
                
                # Проверяем зависимости
                if task['depends_on'] and task['depends_on'] not in executed_tasks:
                    continue
                
                # Выполняем задачу
                try:
                    logger.info(f"Выполнение задачи: {task['task_id']}")
                    result = task['function']()
                    self.task_results[task['task_id']] = result
                    executed_tasks.add(task['task_id'])
                    logger.info(f"Задача {task['task_id']} выполнена успешно")
                except Exception as e:
                    logger.error(f"Ошибка в задаче {task['task_id']}: {e}")
                    raise e
        
        logger.info(f"=== DAG {self.dag_id} выполнен успешно ===")
        return self.task_results

# Функция для запуска всего процесса
def run_data_processing_pipeline():
    """Основная функция для запуска пайплайна обработки данных"""
    
    # Создаем DAG
    dag = SimpleDAG(
        dag_id='data_processing_pipeline',
        description='Пайплайн обработки данных: развертывание БД, загрузка данных, создание витрины'
    )
    
    # Добавляем задачи в DAG
    dag.add_task('setup_database', task_setup_database)
    dag.add_task('create_additional_data', task_create_additional_data, depends_on='setup_database')
    dag.add_task('update_data_from_csv', task_update_data_from_csv, depends_on='create_additional_data')
    dag.add_task('create_data_mart', task_create_data_mart, depends_on='update_data_from_csv')
    
    # Запускаем DAG
    results = dag.run()
    
    return results

# Функция для просмотра результатов
def view_results():
    """Просмотр результатов работы витрины"""
    simulator = AirflowTaskSimulator()
    if not simulator.connect_to_db():
        print("Ошибка подключения к БД")
        return
    
    # Показываем данные из витрины
    df_mart = pd.read_sql_query("SELECT * FROM employee_analytics_mart LIMIT 10", simulator.connection)
    print("\n=== ДАННЫЕ ИЗ ВИТРИНЫ (первые 10 записей) ===")
    print(df_mart.to_string())
    
    # Показываем агрегированную статистику
    df_stats = pd.read_sql_query("SELECT * FROM summary_statistics", simulator.connection)
    print("\n=== АГРЕГИРОВАННАЯ СТАТИСТИКА ===")
    print(df_stats.to_string())
    
    simulator.close_connection()

# Запуск всего процесса
if __name__ == "__main__":
    print("Имитация работы Airflow DAG в Jupyter Notebook")
    print("=" * 60)
    
    # Запускаем пайплайн
    pipeline_results = run_data_processing_pipeline()
    
    print("\n" + "=" * 60)
    print("РЕЗУЛЬТАТЫ ВЫПОЛНЕНИЯ ПАЙПЛАЙНА:")
    for task_id, result in pipeline_results.items():
        print(f"✅ {task_id}: {result}")
    
    print("\n" + "=" * 60)
    # Показываем результаты
    view_results()
    # https://claude.ai/chat/3112f0c2-54c4-48a7-9336-aff32ae9c768
    

2025-09-06 13:23:05,559 - INFO - === Запуск DAG: data_processing_pipeline ===
2025-09-06 13:23:05,561 - INFO - Описание: Пайплайн обработки данных: развертывание БД, загрузка данных, создание витрины
2025-09-06 13:23:05,561 - INFO - Выполнение задачи: setup_database
2025-09-06 13:23:05,561 - INFO - === TASK 1: Развертывание тестовой базы данных ===
2025-09-06 13:23:05,575 - INFO - Успешное подключение к базе данных
2025-09-06 13:23:05,583 - INFO - Соединение с БД закрыто
2025-09-06 13:23:05,583 - INFO - База данных успешно развернута
2025-09-06 13:23:05,584 - INFO - Задача setup_database выполнена успешно
2025-09-06 13:23:05,584 - INFO - Выполнение задачи: create_additional_data
2025-09-06 13:23:05,584 - INFO - === TASK 2: Создание дополнительных данных ===
2025-09-06 13:23:05,629 - INFO - Создан файл additional_employees.csv с 12 записями
2025-09-06 13:23:05,630 - INFO - Задача create_additional_data выполнена успешно
2025-09-06 13:23:05,632 - INFO - Выполнение задачи: update_data_fro

Имитация работы Airflow DAG в Jupyter Notebook

РЕЗУЛЬТАТЫ ВЫПОЛНЕНИЯ ПАЙПЛАЙНА:
✅ setup_database: database_setup_complete
✅ create_additional_data: additional_data_created
✅ update_data_from_csv: data_updated
✅ create_data_mart: data_mart_created


=== ДАННЫЕ ИЗ ВИТРИНЫ (первые 10 записей) ===
     record_id record_type        full_name  age         age_category    degree_name  education_rank store_info           created_at
0   employee_1    employee      Иван Иванов   40  Опытные специалисты        Магистр               3       None  2025-09-06 08:23:05
1   employee_2    employee    Мария Петрова   35      Средний возраст       Бакалавр               1       None  2025-09-06 08:23:05
2   employee_3    employee  Алексей Сидоров   36  Опытные специалисты  Кандидат наук               4       None  2025-09-06 08:23:05
3   employee_4    employee    Елена Козлова   33      Средний возраст        Магистр               3       None  2025-09-06 08:23:05
4   employee_5    employee   Дмитрий Во