In [1]:
import sys
import os

src_path = os.path.abspath(os.path.join(os.getcwd(), '..'))
if src_path not in sys.path:
    sys.path.append(src_path)

In [2]:
from src.core.llm_generators.airflow import AirflowDagGenerator
from src.core.llm_generators.specification import AnalyticsSpecGenerator
from src.config.prompts import prompts
from src.config.settings import settings

In [3]:
spec_gen = AnalyticsSpecGenerator()
filepath = settings.ARTIFACTS_DIRECTORY / "analytics_spec.yml"
result = spec_gen._from_yml_to_analytics_spec(filepath)
result

AnalyticsSpec(business_process=BusinessProcess(name='Анализ продаж интернет-магазина', description='Анализировать продажи и поведение покупателей для повышения выручки и оптимизации маркетинговых кампаний', schedule='0 3 * * *', roles=[{'role': 'Менеджеры по продажам'}, {'role': 'Маркетологи'}, {'role': 'Продуктовый аналитик'}], goals=['Повышение выручки', 'Оптимизация маркетинговых кампаний'], limitations='Ограничения по GDPR'), data_sources=[DataSource(name='orders', description='Таблица заказов', type='database', data_schema={'order_id': 'int', 'product_id': 'int', 'timestamp': 'timestamp', 'customer_id': 'int', 'amount': 'float'}, database='PostgreSQL', access_method='SQL-запросы', data_volume='20000 заказов в день', limitations=None, recommendations=[], connection_params={}), DataSource(name='customers', description='Таблица клиентов', type='database', data_schema={'customer_id': 'int', 'name': 'varchar', 'region_id': 'int', 'age': 'int'}, database='PostgreSQL', access_method='SQL

In [4]:
airflow_gen = AirflowDagGenerator(analytics_specification=result)

In [9]:
template_path = settings.TEMPLATE_DAG_PATH
with open(template_path, "r", encoding='utf-8') as f:
        pipeline_template = f.read()
pipeline_template

'from datetime import datetime, timedelta\n\nfrom airflow.sdk import DAG\nfrom airflow.operators.bash import BashOperator\nfrom airflow.operators.python import PythonOperator\n\n\nPROJECT_DIR = "/opt/airflow/dbt"\nDATA_PATH = f"{PROJECT_DIR}/sample"\n\n\nDEFAULT_ARGS = {\n    "owner": "airflow",\n    "depends_on_past": False,\n    "email_on_failure": False,\n    "retries": 1,\n    "retry_delay": timedelta(minutes=5)\n}\n\n\n{{ moving_data_from_source_to_dwh }}\n\n\nwith DAG(\n    dag_id="{{ dag_name }}", \n    start_date={{ start_date }},\n    schedule_interval="{{ schedule }}",\n    max_active_runs=1,\n    catchup=True\n) as dag:\n    \n    moving_data_from_source_to_dwh = PythonOperator(\n        task_id="moving_data",\n        python_callable=moving_data_from_source_to_dwh\n    )\n\n    build_staging_models = BashOperator(\n        task_id="build_staging_models",\n        bash_command=f"dbt run --profiles-dir {PROJECT_DIR} " \\\n                             f"--project-dir {PROJECT_

In [10]:
code = """def moving_data_from_source_to_dwh(**context) -> None:

    import pandas as pd
    from airflow.hooks.postgres_hook import PostgresHook
    from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook

    # Подключение к источнику данных PostgreSQL
    source = PostgresHook(postgres_conn_id='postgres_source')

    # Подключение к аналитическому хранилищу ClickHouse
    clickhouse_dwh = ClickHouseHook(clickhouse_conn_id='clickhouse_dwh')

    # Извлечение данных из таблицы 'orders'
    orders_query = "SELECT * FROM orders"
    orders_records = source.get_records(orders_query)

    # Извлечение данных из таблицы 'customers'
    customers_query = "SELECT * FROM customers"
    customers_records = source.get_records(customers_query)

    # Загрузка данных в ClickHouse
    clickhouse_dwh.execute("CREATE TABLE IF NOT EXISTS orders (order_id Int32, product_id Int32, timestamp DateTime, customer_id Int32, amount Float64) ENGINE = MergeTree() ORDER BY order_id")
    clickhouse_dwh.execute("CREATE TABLE IF NOT EXISTS customers (customer_id Int32, name String, region_id Int32, age Int32) ENGINE = MergeTree() ORDER BY customer_id")

    clickhouse_dwh.execute('INSERT INTO orders VALUES', orders_records)
    clickhouse_dwh.execute('INSERT INTO customers VALUES', customers_records)"""

In [11]:

airflow_gen._render_dag(pipeline_template=pipeline_template,
                        dag_name="example_dag",
                        start_date="datetime(2025, 12, 14)",
                        schedule="0 5 * * *",
                        moving_data_from_source_to_dwh=code)

'from datetime import datetime, timedelta\n\nfrom airflow.sdk import DAG\nfrom airflow.operators.bash import BashOperator\nfrom airflow.operators.python import PythonOperator\n\n\nPROJECT_DIR = "/opt/airflow/dbt"\nDATA_PATH = f"{PROJECT_DIR}/sample"\n\n\nDEFAULT_ARGS = {\n    "owner": "airflow",\n    "depends_on_past": False,\n    "email_on_failure": False,\n    "retries": 1,\n    "retry_delay": timedelta(minutes=5)\n}\n\n\ndef moving_data_from_source_to_dwh(**context) -> None:\n\n    import pandas as pd\n    from airflow.hooks.postgres_hook import PostgresHook\n    from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook\n\n    # Подключение к источнику данных PostgreSQL\n    source = PostgresHook(postgres_conn_id=\'postgres_source\')\n\n    # Подключение к аналитическому хранилищу ClickHouse\n    clickhouse_dwh = ClickHouseHook(clickhouse_conn_id=\'clickhouse_dwh\')\n\n    # Извлечение данных из таблицы \'orders\'\n    orders_query = "SELECT * FROM orders"\n    orders_re