In [None]:
import schedule
import time
import datetime
import logging
from pathlib import Path

# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('scheduler.log'),
        logging.StreamHandler()
    ]
)

class TaskScheduler:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        
    def backup_task(self):
        """Задача резервного копирования"""
        try:
            self.logger.info("Запуск задачи резервного копирования")
            # Ваш код для резервного копирования
            time.sleep(2)  # Имитация работы
            self.logger.info("Резервное копирование завершено")
        except Exception as e:
            self.logger.error(f"Ошибка в задаче резервного копирования: {e}")
    
    def cleanup_task(self):
        """Задача очистки временных файлов"""
        try:
            self.logger.info("Запуск задачи очистки")
            # Ваш код для очистки
            time.sleep(1)
            self.logger.info("Очистка завершена")
        except Exception as e:
            self.logger.error(f"Ошибка в задаче очистки: {e}")
    
    def report_task(self):
        """Задача генерации отчетов"""
        try:
            self.logger.info("Генерация ежедневного отчета")
            # Ваш код для генерации отчета
            current_date = datetime.datetime.now().strftime("%Y-%m-%d")
            report_content = f"Отчет за {current_date}\nДанные: ..."
            
            # Сохранение отчета в файл
            report_file = Path(f"report_{current_date}.txt")
            report_file.write_text(report_content, encoding='utf-8')
            
            self.logger.info(f"Отчет сохранен в {report_file}")
        except Exception as e:
            self.logger.error(f"Ошибка при генерации отчета: {e}")

def main():
    scheduler = TaskScheduler()
    
    # Настройка расписания
    schedule.every(3).seconds.do(scheduler.backup_task)
    schedule.every().hour.do(scheduler.cleanup_task)
    schedule.every().day.at("09:00").do(scheduler.report_task)
    schedule.every().day.at("18:00").do(scheduler.report_task)
    
    logging.info("Планировщик задач запущен")
    
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        logging.info("Планировщик остановлен пользователем")

if __name__ == "__main__":
    main()

In [None]:
import pymysql
import random
from faker import Faker

# Настройки подключения
DB_CONFIG = {
    'host': 'your_mysql_host',
    'user': 'airflow_user',
    'password': 'airflow_password',
    'database': 'airflow_db',
    'cursorclass': pymysql.cursors.DictCursor
}

fake = Faker()

def generate_db_structure():
    conn = pymysql.connect(**DB_CONFIG)
    cur = conn.cursor()

    # --- 1. SETUP INMON (Source, 3NF) ---
    cur.execute("DROP TABLE IF EXISTS source_orders")
    cur.execute("DROP TABLE IF EXISTS source_products")
    cur.execute("DROP TABLE IF EXISTS source_users")

    cur.execute("""
        CREATE TABLE source_users (
            id INT AUTO_INCREMENT PRIMARY KEY,
            email VARCHAR(100),
            full_name VARCHAR(100),
            registration_date DATE,
            country VARCHAR(50)
        )
    """)
    cur.execute("""
        CREATE TABLE source_products (
            id INT AUTO_INCREMENT PRIMARY KEY,
            product_name VARCHAR(100),
            category VARCHAR(50),
            price DECIMAL(10, 2)
        )
    """)
    cur.execute("""
        CREATE TABLE source_orders (
            id INT AUTO_INCREMENT PRIMARY KEY,
            user_id INT,
            product_id INT,
            quantity INT,
            order_date DATE,
            FOREIGN KEY (user_id) REFERENCES source_users(id),
            FOREIGN KEY (product_id) REFERENCES source_products(id)
        )
    """)

    # --- 2. SETUP KIMBALL (Target, Star Schema) ---
    cur.execute("DROP TABLE IF EXISTS fact_sales")
    cur.execute("DROP TABLE IF EXISTS dim_users")
    cur.execute("DROP TABLE IF EXISTS dim_products")

    cur.execute("""
        CREATE TABLE dim_users (
            user_key INT PRIMARY KEY, -- Skips surrogate key generation for simplicity
            user_name VARCHAR(100),
            user_country VARCHAR(50)
        )
    """)
    cur.execute("""
        CREATE TABLE dim_products (
            product_key INT PRIMARY KEY,
            product_name VARCHAR(100),
            product_category VARCHAR(50),
            current_price DECIMAL(10, 2)
        )
    """)
    cur.execute("""
        CREATE TABLE fact_sales (
            id INT AUTO_INCREMENT PRIMARY KEY,
            user_key INT,
            product_key INT,
            total_amount DECIMAL(10, 2),
            sale_date DATE,
            data_load_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)

    # --- 3. SEED DATA ---
    print("Generating Users...")
    users = [(fake.email(), fake.name(), fake.date_between(start_date='-2y', end_date='today'), fake.country()) for _ in range(100)]
    cur.executemany("INSERT INTO source_users (email, full_name, registration_date, country) VALUES (%s, %s, %s, %s)", users)
    
    print("Generating Products...")
    products = [(fake.word(), fake.word(), random.uniform(10, 500)) for _ in range(50)]
    cur.executemany("INSERT INTO source_products (product_name, category, price) VALUES (%s, %s, %s)", products)
    
    conn.commit()

    print("Generating Orders...")
    # Get IDs
    cur.execute("SELECT id FROM source_users")
    u_ids = [row['id'] for row in cur.fetchall()]
    cur.execute("SELECT id FROM source_products")
    p_ids = [row['id'] for row in cur.fetchall()]

    orders = []
    for _ in range(1000):
        orders.append((
            random.choice(u_ids),
            random.choice(p_ids),
            random.randint(1, 5),
            fake.date_between(start_date='-1y', end_date='today')
        ))
    cur.executemany("INSERT INTO source_orders (user_id, product_id, quantity, order_date) VALUES (%s, %s, %s, %s)", orders)
    
    conn.commit()
    print("Database Initialized!")
    conn.close()

if __name__ == "__main__":
    generate_db_structure()


In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime

# Стандартный аргументы
default_args = {
    'owner': 'student',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

def etl_users_process():
    # Hook - это абстракция Airflow над коннекшном.
    # Он берет креды из Admin -> Connections (conn_id='mysql_default')
    hook = MySqlHook(mysql_conn_id='mysql_default')
    conn = hook.get_conn()
    cursor = conn.cursor()

    # 1. Extract (из Inmon)
    cursor.execute("SELECT id, full_name, country FROM source_users")
    users = cursor.fetchall()

    # 2. Transform (Примитивная)
    transformed_users = []
    for u in users:
        # Пример: приведение к верхнему регистру
        transformed_users.append((u[0], u[1].upper(), u[2]))

    # 3. Load (в Kimball)
    # ВАЖНО: Идемпотентность! Сначала чистим, потом пишем, или используем UPSERT.
    # Для простоты примера делаем Full Reload
    cursor.execute("TRUNCATE TABLE dim_users") 
    cursor.executemany(
        "INSERT INTO dim_users (user_key, user_name, user_country) VALUES (%s, %s, %s)", 
        transformed_users
    )
    conn.commit()
    conn.close()

def etl_fact_sales_process(**context):
    # Доступ к дате исполнения DAG-а
    execution_date = context['ds'] 
    
    hook = MySqlHook(mysql_conn_id='mysql_default')
    conn = hook.get_conn()
    cursor = conn.cursor()

    # Собираем данные: Join в источнике (ELT подход часто лучше чистого ETL)
    sql_query = f"""
        SELECT 
            o.user_id, 
            o.product_id, 
            (o.quantity * p.price) as total,
            o.order_date
        FROM source_orders o
        JOIN source_products p ON o.product_id = p.id
        WHERE o.order_date = '{execution_date}' 
    """
    cursor.execute(sql_query)
    sales = cursor.fetchall()

    if sales:
        # Удаляем данные за эту дату, чтобы избежать дублей при перезапуске
        cursor.execute(f"DELETE FROM fact_sales WHERE sale_date = '{execution_date}'")
        
        cursor.executemany(
            "INSERT INTO fact_sales (user_key, product_key, total_amount, sale_date) VALUES (%s, %s, %s, %s)",
            sales
        )
        conn.commit()
        print(f"Loaded {len(sales)} rows for {execution_date}")
    else:
        print(f"No sales for {execution_date}")

    conn.close()

with DAG('01_classic_etl', default_args=default_args, schedule_interval='@daily', catchup=False) as dag:
    
    t1_dim_users = PythonOperator(
        task_id='load_dim_users',
        python_callable=etl_users_process
    )

    t2_fact_sales = PythonOperator(
        task_id='load_fact_sales',
        python_callable=etl_fact_sales_process,
        provide_context=True # Нужно, чтобы передать context (дату) в функцию
    )

    t1_dim_users >> t2_fact_sales


In [None]:
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime

@dag(schedule_interval='@daily', start_date=datetime(2023, 1, 1), catchup=False, tags=['seminar'])
def modern_etl_dag():

    @task()
    def extract_high_value_sales(ds=None):
        """Извлекаем заказы дороже 1000 у.е."""
        hook = MySqlHook(mysql_conn_id='mysql_default')
        # Используем pandas для удобства (частый кейс в Airflow)
        df = hook.get_pandas_df(sql=f"""
            SELECT o.id, (o.quantity * p.price) as total 
            FROM source_orders o
            JOIN source_products p ON o.product_id = p.id
            WHERE o.order_date = '{ds}'
        """)
        # Возвращаем словарь или список словарей (автоматически сериализуется в XCom)
        return df.to_dict('records')

    @task()
    def check_revenue(sales_data):
        """Принимаем данные из предыдущей задачи через аргумент"""
        total_revenue = sum([s['total'] for s in sales_data])
        print(f"Total revenue today: {total_revenue}")
        
        # Логика ветвления
        if total_revenue > 5000:
            return 'notify_manager'
        return 'skip_notification'

    # Старый добрый оператор для ветвления (он пока не имеет декоратора во всех версиях)
    branching = BranchPythonOperator(
        task_id='branching',
        python_callable=lambda ti: ti.xcom_pull(task_ids='check_revenue')
    )

    @task(task_id='notify_manager')
    def send_alert():
        print("ALERT: Huge revenue today! Champagne time!")

    @task(task_id='skip_notification')
    def relax():
        print("Just a normal day.")

    # Строим зависимости
    sales = extract_high_value_sales()
    decision = check_revenue(sales)
    
    decision >> branching >> [send_alert(), relax()]

dag = modern_etl_dag()


In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime

# Конфиг: список таблиц для репликации
TABLES_TO_SYNC = ['users', 'products', 'orders']

def create_dag(table_name):
    """Фабрика дагов"""
    dag_id = f'sync_table_{table_name}'
    
    default_args = {'owner': 'dynamic', 'start_date': datetime(2023, 1, 1)}
    
    dag = DAG(dag_id, default_args=default_args, schedule_interval='@daily', catchup=False)
    
    with dag:
        start = EmptyOperator(task_id='start')
        
        process = PythonOperator(
            task_id=f'process_{table_name}',
            python_callable=lambda: print(f"Mirroring table: {table_name} from Source to DWH...")
        )
        
        end = EmptyOperator(task_id='end')
        
        start >> process >> end
        
    return dag

# Генерируем объекты DAG в глобальной области видимости
for table in TABLES_TO_SYNC:
    dag_object = create_dag(table)
    globals()[dag_object.dag_id] = dag_object
