# **Airflow DAG**

In [None]:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='monthly_flights_etl',
    default_args=default_args,
    start_date=datetime(2025, 6, 15),
    schedule_interval='@daily',
    catchup=False,
    tags=['batch', 'pyspark'],
) as dag:

    start = BashOperator(
        task_id='start',
        bash_command='echo "Mulai ETL untuk Monthly Flights"'
    )

    run_etl = BashOperator(
    task_id='run_etl',
    bash_command='spark-submit --jars /usr/local/airflow/jars/postgresql-42.6.0.jar /usr/local/airflow/jobs/process_monthly_flights.py'
)


    end = BashOperator(
        task_id='end',
        bash_command='echo "ETL selesai dan data telah diproses"'
    )

    start >> run_etl >> end


# **ETL dengan PySpark**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# === SPARK SESSION ===
spark = SparkSession.builder \
    .appName("MonthlyFlightsETL") \
    .config("spark.jars", "/usr/local/airflow/jars/postgresql-42.6.0.jar") \
    .getOrCreate()

# === EXTRACT ===
df = spark.read.csv("/usr/local/airflow/data/monthly_flights.csv", header=True, inferSchema=True)

# === TRANSFORM ===
agg_df = df.groupBy("date").sum("monthly_total_flights") \
           .withColumnRenamed("sum(monthly_total_flights)", "total_flights")

# === 1. TAMPILKAN OUTPUT DI CONSOLE ===
print("=== Hasil Ringkasan Monthly Flights ===")
agg_df.show(truncate=False)

# === 2. SIMPAN KE FILE CSV ===
agg_df.coalesce(1).write.csv("/usr/local/airflow/output/monthly_flights_summary", header=True, mode="overwrite")

# === 3. SIMPAN KE POSTGRESQL ===
agg_df.write.format("jdbc").options(
    url="jdbc:postgresql://localhost:5432/flightdb",
    driver="org.postgresql.Driver",
    dbtable="public.monthly_flights_summary",
    user="airflowuser",
    password="airflowpass"
).mode("overwrite").save()

# === STOP SPARK ===
spark.stop()
