In [4]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
import sqlite3

# Define the ETL functions
def extract_data():
    input_path = '/home/azizi/airflow_home/data/raw/sentiment-analysis.csv'
    df = pd.read_csv(input_path)
    df.to_csv('/home/azizi/airflow_home/data/processed/extracted.csv', index=False)

def transform_data():
    input_path = '/home/azizi/airflow_home/data/processed/extracted.csv'
    output_path = '/home/azizi/airflow_home/data/processed/transformed.csv'
    df = pd.read_csv(input_path)
    df['Sentiment'] = df['Feedback'].apply(lambda x: 'Positive' if 'Pos' in x.lower() else 'Negative')
    df.to_csv(output_path, index=False)

def load_data():
    input_path = '/home/azizi/airflow_home/data/processed/transformed.csv'
    conn = sqlite3.connect('/home/azizi/airflow_home/data/processed/feedback.db')
    df = pd.read_csv(input_path)
    df.to_sql('feedback', conn, if_exists='replace', index=False)
    conn.close()

# Define the DAG
default_args = {'start_date': datetime(2025, 1, 1)}
with DAG('feedback_etl', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
    extract_task = PythonOperator(task_id='extract_data', python_callable=extract_data)
    transform_task = PythonOperator(task_id='transform_data', python_callable=transform_data)
    load_task = PythonOperator(task_id='load_data', python_callable=load_data)

    extract_task >> transform_task >> load_task
