In [1]:
# Airflow DAG definition for data pipeline

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

In [None]:
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'data_pipeline',
    default_args=default_args,
    description='A simple data pipeline',
    schedule_interval=timedelta(days=1),
)

csv_file = 'example.csv'
json_file = 'example.json'
parquet_file_csv = 'example_csv.parquet'
parquet_file_json = 'example_json.parquet'
avro_file_csv = 'example_csv.avro'
avro_file_json = 'example_json.avro'
db_uri = 'postgresql://user:password@localhost:5432/mydatabase'
table_name_csv = 'csv_table'
table_name_json = 'json_table'

convert_csv_to_parquet = PythonOperator(
    task_id='convert_csv_to_parquet',
    python_callable=csv_to_parquet,
    op_kwargs={'csv_file': csv_file, 'parquet_file': parquet_file_csv},
    dag=dag,
)

convert_json_to_parquet = PythonOperator(
    task_id='convert_json_to_parquet',
    python_callable=json_to_parquet,
    op_kwargs={'json_file': json_file, 'parquet_file': parquet_file_json},
    dag=dag,
)

convert_csv_to_avro = PythonOperator(
    task_id='convert_csv_to_avro',
    python_callable=csv_to_avro,
    op_kwargs={'csv_file': csv_file, 'avro_file': avro_file_csv, 'schema': schema},
    dag=dag,
)

convert_json_to_avro = PythonOperator(
    task_id='convert_json_to_avro',
    python_callable=json_to_avro,
    op_kwargs={'json_file': json_file, 'avro_file': avro_file_json, 'schema': schema},
    dag=dag,
)

save_parquet_csv_to_db = PythonOperator(
    task_id='save_parquet_csv_to_db',
    python_callable=save_parquet_to_db,
    op_kwargs={'parquet_file': parquet_file_csv, 'db_uri': db_uri, 'table_name': table_name_csv},
    dag=dag,
)

save_parquet_json_to_db = PythonOperator(
    task_id='save_parquet_json_to_db',
    python_callable=save_parquet_to_db,
    op_kwargs={'parquet_file': parquet_file_json, 'db_uri': db_uri, 'table_name': table_name_json},
    dag=dag,
)

save_avro_csv_to_db = PythonOperator(
    task_id='save_avro_csv_to_db',
    python_callable=save_avro_to_db,
    op_kwargs={'avro_file': avro_file_csv, 'db_uri': db_uri, 'table_name': table_name_csv, 'schema': schema},
    dag=dag,
)

save_avro_json_to_db = PythonOperator(
    task_id='save_avro_json_to_db',
    python_callable=save_avro_to_db,
    op_kwargs={'avro_file': avro_file_json, 'db_uri': db_uri, 'table_name': table_name_json, 'schema': schema},
    dag=dag,
)

(convert_csv_to_parquet >> save_parquet_csv_to_db)
(convert_json_to_parquet >> save_parquet_json_to_db)
(convert_csv_to_avro >> save_avro_csv_to_db)
(convert_json_to_avro >> save_avro_json_to_db)