In [None]:
from airflow import DAG
from airflow.operators.bash import Bash.Operator
from airflow.operators.python import PythonOperator
from datetime import timedelta, datetime
from textwrap import dedent # функция для документации

In [None]:
"""
something about this dag
it would be in __doc__
"""

# граф
with DAG(
    'dag_name',
    
    default_args={
        # остановка в случае провала?
        'depends_on_past': False,
        # куда писать о провале
        'email': ['mail@mail.com'],
        # сообщать ли о провале?
        'email_on_failure': False,
        # сообщать о повторных попытках?
        'email_on_retry': False,
        # сколько попыток делать
        'retries': 1,
        # время между запусками
        'retry_delay': timedelta(minutes=5),
    },
    
    # docs for DAG
    description='what DAG about',
    scheldule_interval=timedelta(days=1),
    start_date=datetime(2025, 1, 1),
    # наверстать упущенные запуски за прошедшее время
    catcup=False,
    tags=['smf']
) as dag:
    
    # t1, t2, t3 - tasks 
    t1 = Bash.Operator(
        task_id='print_date',
        bash_command='date',
    )
    
    t2 = Bash.Operator(
        task_id='sleep',
        'depends_on_past': False,
        bash_command='sleep 5',
        'retries': 3,
    )
    t1.doc_md = dedent(
    """
    doc for task
    """
    )
    dag.doc_md = dedent(
    """
    doc for dag
    """
    )
    dag.doc_md = dedent(__doc__) # забирает докстрин из начала файла
    
    # шаблоны Jinja
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """)

    
    t3 = Bash.Operator(
        task_id='templaed',
        'depends_on_past': False,
        bash_command=templated_command,
    )
    
    # последовательность
    t1 >> [t2, t3]
    #         -> t2
    #     t1 |
    #         -> t3
    
    #     t1 >> t2 >> [t3, t4]
    #     t3 >> t31
    #     t4 >> t41
    

    

## PythonOperator

In [None]:
def print_context(ds, **kwargs):
    print(kwargs)
    print(ds)
    return 'something prints to log'


t4 = PythonOperator(
    task_id = 'print_the_context',
    python_collable=print_context,
)

## XCOM передача данных между тасками

In [None]:
def some_push_func(some_arg, ti):
# ti = task_istance
    some = 'something'
    ti.xom_push(
    key='some',
    value=some
    )
    
    
def some_pull_func(some_arg, ti):
    some = ti.xcom_pull(
        key = 'some',
        tasks_ids = 'task.id'
    )

## Examples

In [None]:
"""
#### Task 4
**Making 10 bash-tasks and 20 python-tasks with print**

- using *for* for creating tasks and `op_kwargs` for delivery *arg* to circle for
"""

from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator


def print_task_number(task_number):
    print(f"task number is: {task_number}")


with DAG(
    'hw_i-nechetnaya_4',
    default_args={
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='DAG with for',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2025, 2, 12),
) as dag:
    
    for i in range(10):
        bush_task = BashOperator(
            task_id='bush_task_' + str(i),
            bash_command=f"echo {i}",
        )
        
    for j in range(20):
        py_task = PythonOperator(
            task_id='py_task_' + str(j),
            python_callable=print_task_number,
            op_kwargs={'task_number': int(j)},
        )
        
    bush_task.doc_md = dedent(
        """
    #### Task Documentation
    """
    dag.doc_md = dedent(__doc__)
        
    bush_task >> py_task


In [None]:
"""
#### Task 5
bash-task using jinja-template
print ts and run_id
"""

from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator


def print_task_number(task_number):
    print(f"task number is: {task_number}")


with DAG(
        'hw_i-nechetnaya_5',
        default_args={
            'depends_on_past': False,
            'email': ['airflow@example.com'],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
        },
        description='DAG with template',
        schedule_interval=timedelta(days=1),
        start_date=datetime(2025, 2, 12),
) as dag:
    template_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ts }}"
        echo "{{ run_id }}"
    {% endfor %}
    """)
    task = BashOperator(
        task_id='template',
        bash_command = template_command,
    )

    dag.doc_md = dedent(__doc__)


In [None]:
"""
#### Task 6
**Making 10 bash-tasks using *for* for creating tasks and `op_kwargs` to delivery *arg* to circle
"""

from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator


def print_task_number(task_number):
    print(f"task number is: {task_number}")


with DAG(
        'hw_i-nechetnaya_6',
        default_args={
            'depends_on_past': False,
            'email': ['airflow@example.com'],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
        },
        description='DAG with for',
        schedule_interval=timedelta(days=1),
        start_date=datetime(2025, 2, 12),
) as dag:
    for i in range(10):
        bush_task = BashOperator(
            task_id='bush_task_' + str(i),
            bash_command="echo $NUMBER ",
            env={'NUMBER': str(i)}
        )

    dag.doc_md = dedent(__doc__)


In [None]:
"""
#### Task 8
"""

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator


def give_xcom_test(ti):
    ti.xcom_push(
        key='sample_xcom_key',
        value="xcom test"
    )


def take_xcom_test(ti):
    string = ti.xcom_pull(
        key='sample_xcom_key',
        task_ids="give_xcom_test"
    )
    print(string)


with DAG(
        'hw_i-nechetnaya_8',
        default_args={
            'depends_on_past': False,
            'email': ['airflow@example.com'],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
        },
        description='DAG',
        schedule_interval=timedelta(days=1),
        start_date=datetime(2025, 2, 12),
) as dag:
    t1 = PythonOperator(
        task_id='give_xcom_test',
        python_callable=give_xcom_test,
    )
    t2 = PythonOperator(
        task_id='take_xcom_test',
        python_callable=take_xcom_test,
    )

    t1 >> t2


In [None]:
"""
#### Task 9
"""

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator


def get_string():
    return "Airflow tracks everything"


def print_string(ti):
    result = ti.xcom_pull(
        key='return_value',
        task_ids='task_get_string'
    )
    print(result)


with DAG(
        'hw_i-nechetnaya_9',
        default_args={
            'depends_on_past': False,
            'email': ['airflow@example.com'],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
        },
        description='DAG',
        schedule_interval=timedelta(days=1),
        start_date=datetime(2025, 2, 12),
) as dag:
    t1 = PythonOperator(
        task_id='task_get_string',
        python_callable=get_string,
    )
    t2 = PythonOperator(
        task_id='task_print_string',
        python_callable=print_string,
        provide_context=True,
    )

    t1 >> t2


In [None]:
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook


def get_liker():
    postgres = PostgresHook(postgres_conn_id="startml_feed")
    with postgres.get_conn() as conn:
      with conn.cursor() as cursor:
          cursor.execute("""
          SELECT user_id, COUNT(action)
          FROM feed_action 
          WHERE action = 'like'
          GROUP BY user_id
          ORDER BY COUNT(action) DESC
          LIMIT 1
          """
          )
          result = cursor.fetchone()
    return result


with DAG(
        'hw_i-nechetnaya_11',
        default_args={
            'depends_on_past': False,
            'email': ['airflow@example.com'],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
        },
        description='DAG',
        schedule_interval=timedelta(days=1),
        start_date=datetime(2025, 2, 12),
) as dag:
    t1 = PythonOperator(
        task_id='task_get_liker',
        python_callable=get_liker,
    )


In [None]:
"""
#### Task 12
"""

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable


def print_var():
    is_startml = Variable.get("is_startml")
    print(is_startml)

with DAG(
        'hw_i-nechetnaya_11',
        default_args={
            'depends_on_past': False,
            'email': ['airflow@example.com'],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
        },
        description='DAG',
        schedule_interval=timedelta(days=1),
        start_date=datetime(2025, 2, 12),
) as dag:
    t1 = PythonOperator(
        task_id='task_print_var',
        python_callable=print_var,
    )


In [None]:
"""
#### Task 12
"""

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.models import Variable


def choose_branch():
    is_startml = Variable.get("is_startml")
    return "startml_desc" if is_startml == 'True' else "not_startml_desc"


def print_startml():
    print("StartML is a starter course for ambitious people")


def print_not_startml():
    print("Not a startML course, sorry")


with DAG(
        'hw_i-nechetnaya_13',
        default_args={
            'depends_on_past': False,
            'email': ['airflow@example.com'],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
        },
        description='DAG',
        schedule_interval=timedelta(days=1),
        start_date=datetime(2025, 2, 12),
) as dag:
    branch_task = BranchPythonOperator(
        task_id="branch_task",
        python_callable=choose_branch,
    )
    t1 = PythonOperator(
        task_id='startml_desc',
        python_callable=print_startml,
    )
    t2 = PythonOperator(
        task_id='not_startml_desc',
        python_callable=print_not_startml,
    )
    join_start = EmptyOperator(task_id="join_task_start")
    join_end = EmptyOperator(task_id="join_task_end")

    join_start >> branch_task >> [t1, t2] >> join_end
