In [None]:
import pymysql
import random
from faker import Faker


db_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'password',
    'database': 'finance_hw'
}

def create_and_fill_db():
    conn = pymysql.connect(host=db_config['host'], user=db_config['user'], password=db_config['password'])
    cursor = conn.cursor()
    
    # 1. Создание БД и таблиц
    cursor.execute("CREATE DATABASE IF NOT EXISTS finance_hw")
    cursor.execute("USE finance_hw")
    
    # Таблица банкоматов (геоданные)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS atms (
            atm_id INT AUTO_INCREMENT PRIMARY KEY,
            address VARCHAR(255),
            latitude FLOAT,
            longitude FLOAT
        )
    """)
    
    # Таблица транзакций (без индексов специально, чтобы студент страдал и делал EXPLAIN)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS transactions (
            trans_id INT AUTO_INCREMENT PRIMARY KEY,
            atm_id INT,
            currency VARCHAR(3),
            amount DECIMAL(10, 2),
            trans_date DATETIME,
            FOREIGN KEY (atm_id) REFERENCES atms(atm_id)
        )
    """)
    
    # 2. Генерация данных
    fake = Faker('ru_RU')
    
    # Генерируем 20 банкоматов в центре Москвы (примерно)
    print("Генерация банкоматов...")
    atms = []
    for _ in range(20):
        # Координаты около центра Москвы с небольшим разбросом
        lat = 55.75 + random.uniform(-0.05, 0.05)
        lon = 37.61 + random.uniform(-0.05, 0.05)
        addr = fake.address()
        cursor.execute("INSERT INTO atms (address, latitude, longitude) VALUES (%s, %s, %s)", (addr, lat, lon))
        atms.append(cursor.lastrowid)
        
    conn.commit()
    
    # Генерируем 10 000 транзакций
    print("Генерация транзакций (подождите)...")
    currencies = ['USD', 'EUR', 'RUB']
    records = []
    for _ in range(10000):
        atm = random.choice(atms)
        curr = random.choices(currencies, weights=[20, 20, 60], k=1)[0] # RUB чаще
        amount = round(random.uniform(10, 1000), 2) # Сумма
        date = fake.date_time_between(start_date='-1M', end_date='now')
        records.append((atm, curr, amount, date))
    
    # Пакетная вставка
    cursor.executemany("INSERT INTO transactions (atm_id, currency, amount, trans_date) VALUES (%s, %s, %s, %s)", records)
    
    conn.commit()
    conn.close()
    print("База данных успешно создана и наполнена.")

if __name__ == '__main__':
    create_and_fill_db()


In [None]:
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.filesystem import FileSensor # Опционально, если сохраняем промежуточные файлы
import pymysql
import pandas as pd
import folium
from selenium import webdriver # Или requests, по выбору
from selenium.webdriver.common.by import By
import matplotlib.pyplot as plt
import seaborn as sns

# --- КОНСТАНТЫ И НАСТРОЙКИ ---
DB_CONFIG = {
    'host': 'mysql_host', # Замените на хост вашего MySQL контейнера/сервера
    'user': 'root',
    'password': 'password',
    'database': 'finance_hw'
}

# Пути для сохранения отчетов
MAP_OUTPUT_PATH = '/opt/airflow/dags/output/atm_map.html'
REPORT_OUTPUT_PATH = '/opt/airflow/dags/output/daily_report.png'

# --- ФУНКЦИИ (TASK CALLABLES) ---

def get_currency_rates(**kwargs):
    """
    Задание 1: Web Scraping.
    Нужно получить курс USD и EUR к RUB.
    Можно использовать requests (если есть API или простой HTML) или Selenium.
    Возвращает словарь: {'USD': 90.5, 'EUR': 98.2, 'RUB': 1.0}
    """
    rates = {'RUB': 1.0}
    
    print("Start searching for rates...")
    
    # TODO: 1. Инициализировать драйвер Selenium (или requests сессию)
    # TODO: 2. Зайти на сайт (например, cbr.ru, google finance или любой другой стабильный источник)
    # TODO: 3. Найти элементы с курсами валют и распарсить их значения
    # TODO: 4. Обработать исключения (если сайт не ответил)
    
    # Пример (заглушка - студенты должны заменить этот код):
    rates['USD'] = 92.50 
    rates['EUR'] = 99.10
    
    print(f"Rates found: {rates}")
    
    # Передаем данные в следующий таск через XCom
    kwargs['ti'].xcom_push(key='exchange_rates', value=rates)


def extract_and_transform_data(**kwargs):
    """
    Задание 2: MySQL Extract + Python Transform.
    1. Получить курсы из XCom.
    2. Сделать выборку транзакций из БД (обязательно используя JOIN с таблицей atms).
    3. Сконвертировать суммы в рубли.
    4. Сгруппировать данные по банкоматам.
    """
    ti = kwargs['ti']
    rates = ti.xcom_pull(key='exchange_rates', task_ids='scrape_rates_task')
    
    conn = pymysql.connect(**DB_CONFIG)
    
    # TODO: Напишите SQL запрос.
    # ЗАДАНИЕ СО ЗВЕЗДОЧКОЙ: Перед тем как вставить этот запрос сюда,
    # выполните EXPLAIN в консоли MySQL для этого запроса.
    # Добавьте необходимый индекс в БД (через консоль), чтобы избавиться от Full Table Scan или Filesort, если они есть.
    # В комментарии к коду укажите команду создания индекса.
    
    sql_query = """
    -- TODO: SELECT t.atm_id, a.latitude, a.longitude, t.amount, t.currency 
    --       FROM transactions t ... JOIN atms a ...
    --       WHERE ... (например, за последний месяц)
    """
    
    # Для примера берем все (студенту нужно раскомментировать и дописать реальный SQL)
    sql_query = """
        SELECT t.atm_id, a.address, a.latitude, a.longitude, t.amount, t.currency, t.trans_date
        FROM transactions t
        JOIN atms a ON t.atm_id = a.atm_id
    """
    
    df = pd.read_sql(sql_query, conn)
    conn.close()
    
    # Transformation
    # TODO: Используя Pandas и словарь rates, создать колонку 'amount_rub'
    def convert(row):
        return row['amount'] * rates.get(row['currency'], 1)

    df['amount_rub'] = df.apply(convert, axis=1)
    
    # Агрегация: считаем общую сумму оборота по каждому ATM
    agg_df = df.groupby(['atm_id', 'address', 'latitude', 'longitude'])['amount_rub'].sum().reset_index()
    
    # Сохраняем промежуточные данные (или передаем через XCom, если данных мало)
    # Для учебных целей сохраним JSON в XCom
    ti.xcom_push(key='atm_stats', value=agg_df.to_dict('records'))
    
    # Передаем сырой df для графика (упрощенно - список словарей)
    ti.xcom_push(key='raw_transactions', value=df[['trans_date', 'amount_rub']].to_dict('records'))


def generate_geo_report(**kwargs):
    """
    Задание 3: Визуализация (Folium).
    Нарисовать карту, где метки банкоматов имеют разный цвет или размер
    в зависимости от оборота.
    """
    ti = kwargs['ti']
    atm_stats = ti.xcom_pull(key='atm_stats', task_ids='process_data_task')
    
    # Центр карты (Москва)
    m = folium.Map(location=[55.75, 37.61], zoom_start=10)
    
    for atm in atm_stats:
        # TODO: Реализовать логику цвета.
        # Например: если оборот > 100 000 - зеленый, иначе красный.
        color = 'green' # Студент пишет логику
        
        # TODO: Добавить Marker или CircleMarker на карту
        folium.CircleMarker(
            location=[atm['latitude'], atm['longitude']],
            radius=10,
            popup=f"{atm['address']}: {atm['amount_rub']:.2f} RUB",
            color=color,
            fill=True
        ).add_to(m)
        
    m.save(MAP_OUTPUT_PATH)
    print(f"Map saved to {MAP_OUTPUT_PATH}")


def generate_graph_report(**kwargs):
    """
    Задание 4: Визуализация (Matplotlib/Seaborn).
    Построить график распределения сумм транзакций по часам дня.
    """
    ti = kwargs['ti']
    raw_data = ti.xcom_pull(key='raw_transactions', task_ids='process_data_task')
    
    df = pd.DataFrame(raw_data)
    
    # Преобразование в datetime
    df['trans_date'] = pd.to_datetime(df['trans_date'])
    
    # TODO: Выделить час из даты
    
    # TODO: Сгруппировать по часу и посчитать сумму или количество транзакций
    
    plt.figure(figsize=(10, 6))
    # TODO: Построить barplot или lineplot
    
    plt.title('Активность транзакций по часам')
    plt.savefig(REPORT_OUTPUT_PATH)
    print(f"Report saved to {REPORT_OUTPUT_PATH}")


# --- DAG DEFINITION ---

default_args = {
    'owner': 'student',
    'start_date': pendulum.datetime(2023, 1, 1, tz="UTC"),
    'catchup': False,
}

with DAG(
    dag_id='atm_efficiency_analytics',
    default_args=default_args,
    schedule_interval='@daily',
    tags=['finance', 'hw_mysql_airflow'],
) as dag:

    # 1. Получаем курсы валют
    scrape_task = PythonOperator(
        task_id='scrape_rates_task',
        python_callable=get_currency_rates
    )

    # 2. Тянем данные из БД, объединяем с курсами, считаем KPI
    process_task = PythonOperator(
        task_id='process_data_task',
        python_callable=extract_and_transform_data
    )

    # 3. Рисуем карту
    map_task = PythonOperator(
        task_id='create_map_task',
        python_callable=generate_geo_report
    )

    # 4. Рисуем график
    report_task = PythonOperator(
        task_id='create_graph_task',
        python_callable=generate_graph_report
    )

    # Порядок выполнения
    scrape_task >> process_task
    process_task >> [map_task, report_task]
