In [7]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
import pandas as pd
import plotly.express as px
import sys, os

# Caminho para o projeto
sys.path.append(os.path.abspath("/Users/magnosouza/PycharmProjects/IntroducaoAnaliseDados/archive"))

# === Função principal de análise ===
def analise_exploratoria():
    df = pd.read_csv('/Users/magnosouza/PycharmProjects/IntroducaoAnaliseDados/archive/data_science_student_marks.csv')

    perfil = {
        18: 'Jovem',
        19: 'Jovem',
        20: 'Jovem',
        21: 'Adulto Jovem',
        22: 'Adulto Jovem',
        23: 'Adulto Jovem',
        24: 'Adulto Jovem',
        25: 'Adulto'
    }

    df['perfil'] = df['age'].map(perfil)
    df_perfil = (
        df.groupby(['location', 'perfil'])['age']
        .count()
        .reset_index()
        .rename(columns={'age': 'total'})
        .sort_values(by='total', ascending=False)
    )

    fig = px.bar(
        df_perfil,
        x='location',
        y='total',
        title='Distribuição de perfis por localidade',
        text_auto=True
    )

    # Salva gráfico HTML
    output_path = '/Users/magnosouza/PycharmProjects/IntroducaoAnaliseDados/archive/dados.html'
    fig.write_html(output_path)

    # Retorna o resumo como string para envio por e-mail
    return df_perfil.to_string(index=False)

# === Configurações do DAG ===
default_args = {
    'owner': 'magno',
    'depends_on_past': False,
    'email': ['magnoapt@gmail.com'],
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='analise_exploratoria_dag',
    default_args=default_args,
    description='Executa análise exploratória a cada 30 minutos e envia relatório',
    schedule='*/30 * * * *',  # ⏱ substitui schedule_interval
    start_date=datetime(2025, 11, 10),
    catchup=False,
    tags=['analise', 'dados', 'automacao']
) as dag:

    def _executar_analise(ti):
        resumo_str = analise_exploratoria()
        ti.xcom_push(key='resumo_dados', value=resumo_str)

    tarefa_analise = PythonOperator(
        task_id='executar_analise',
        python_callable=_executar_analise,
    )

    tarefa_email = EmailOperator(
        task_id='enviar_email',
        to='magnoapt@gmail.com',
        subject='[Airflow] Relatório de Análise Exploratoria',
        html_content="""
            <h3>Resumo da análise:</h3>
            <pre>{{ ti.xcom_pull(key='resumo_dados') }}</pre>
            <p>Relatório completo salvo em:</p>
            <code>/Users/magnosouza/PycharmProjects/IntroducaoAnaliseDados/archive/dados.html</code>
        """,
    )

    tarefa_analise >> tarefa_email

