In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator

In [None]:
# Função para criar um arquivo
def criar_arquivo():
    with open('/tmp/arquivo_criado.txt', 'w') as f:  # Cria um arquivo na pasta temporária
        f.write('Este é um arquivo criado pelo Airflow.\n')  # Escreve conteúdo no arquivo
    print('Arquivo criado em /tmp/arquivo_criado.txt')  # Exibe mensagem de conclusão


In [None]:
# Função para validar o arquivo
def validar_arquivo():
    try:
        with open('/tmp/arquivo_criado.txt', 'r') as f:  # Abre o arquivo criado
            conteudo = f.read()  # Lê o conteúdo do arquivo
        if 'Airflow' in conteudo:  # Verifica se "Airflow" está no conteúdo
            print('Validação bem-sucedida: o arquivo contém "Airflow".')  # Mensagem de sucesso
        else:
            raise ValueError('Erro: "Airflow" não encontrado no arquivo.')  # Lança erro
    except FileNotFoundError:
        raise FileNotFoundError('Erro: Arquivo não encontrado.')  # Lança erro se o arquivo não existir

In [None]:
# Função para finalizar a execução
def finalizar_processo():
    print('Processo finalizado com sucesso!')  # Exibe mensagem ao concluir o processo

In [None]:
# Configuração da DAG
default_args = {
    'owner': 'admin',  # Proprietário da DAG
    'depends_on_past': False,  # Execuções não dependem de execuções anteriores
    'email_on_failure': False,  # Não envia email em caso de falha
    'email_on_retry': False,  # Não envia email em caso de reexecução
    'retries': 2,  # Tenta executar a tarefa até 2 vezes em caso de falha
    'retry_delay': timedelta(minutes=5),  # Intervalo de 5 minutos entre as tentativas
}

In [None]:
with DAG(
    'projeto_exemplo_dag',  # Nome único da DAG
    default_args=default_args,  # Argumentos padrão para as tarefas
    description='Exemplo de DAG para projeto básico no Airflow',  # Descrição da DAG
    schedule_interval=None,  # DAG executada manualmente
    start_date=datetime(2023, 12, 1),  # Data de início da DAG
    catchup=False,  # Não executa execuções pendentes
    tags=['projeto', 'exemplo'],  # Tags para categorizar a DAG
) as dag:

    inicio = DummyOperator(task_id='inicio')  # Tarefa de início sem lógica

    criar_arquivo_tarefa = PythonOperator(
        task_id='criar_arquivo',  # Nome único da tarefa
        python_callable=criar_arquivo,  # Função a ser executada
    )

    validar_arquivo_tarefa = PythonOperator(
        task_id='validar_arquivo',  # Nome único da tarefa
        python_callable=validar_arquivo,  # Função a ser executada
    )

    finalizar = PythonOperator(
        task_id='finalizar_processo',  # Nome único da tarefa
        python_callable=finalizar_processo,  # Função a ser executada
    )

    # Configuração da ordem das tarefas
    inicio >> criar_arquivo_tarefa >> validar_arquivo_tarefa >> finalizar  # Ordem de execução
