In [None]:
import pandas as pd
import sqlite3

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.email import EmailOperator
from airflow.operators.python import PythonOperator

CON = sqlite3.connect('example.db')

# extrect from site
def extract_data(url, tmp_file, **context) -> pd.DataFrame:
    """ Extract CSV
    """
    return pd.read_csv(url).to_csv(tmp_file)

# group data
def transform_data(group, agreg, tmp_file, tmp_file_agg, **context) -> None:
    """ Group by data
    """
    data = pd.read_csv(tmp_file)
    data.groupby(group).agg(agreg).reset_index().to_csv(tmp_file_agg)

# load to db
def load_data(tmp_file_agg, table_name, conn=CON, **context) -> None:
    """ Load to DB
    """
    data = pd.read_csv(tmp_file_agg)
    data["insert_time"] = pd.to_datetime("now")
    data.to_sql(table_name, conn, if_exists='replace', index=False)

# DAG
dag = DAG(dag_id='dag',
          default_args={'owner':'airflow'},
          schedule_interval='@daily',
          start_date=days_ago(1))

extract_data = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
    op_kwargs={
        'url': 'https://raw.githubusercontent.com/dm-novikov/stepik_airflow_course/main/data/data.csv',
        'tmp_file': '/tmp/file.csv'
    }
)

transform_data = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag,
    op_kwargs={
        'tmp_file': '/tmp/file.csv',
        'tmp_file_agg': '/tmp/file_agg.csv',
        'group': ['A', 'B', 'C'],
        'agreg': {'D': sum}}
)

load_data = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag,
    op_kwargs={
        'tmp_file_agg': '/tmp/file_agg.csv',
        'table_name': 'table'
    }
)

email_op = EmailOperator(
    task_id='send_mail',
    to="develtomas@gmail.com",
    subject='test mail',
    html_content=""" mail tyt """,
    files=['/tmp/file_agg.csv'],
    dag=dag
)

extract_data >> transform_data >> [load_data, email_op]