In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import pandas as pd
import boto3
import psycopg2
from io import StringIO
from helpers.transform import clean_data  # custom cleaning logic

default_args = {
    'owner': 'romy',
    'start_date': days_ago(1),
    'retries': 1
}

dag = DAG(
    'etl_s3_to_postgres',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    description='Extract from S3, clean, load to PostgreSQL with logging and standardization'
)

S3_BUCKET = 'your-bucket-name'
S3_KEY = 'data/claims_raw.csv'
PG_CONN_DETAILS = {
    'dbname': 'health_db',
    'user': 'postgres',
    'password': 'yourpassword',
    'host': 'localhost'
}
TABLE_NAME = 'claims_data_standard'

def extract_from_s3(**kwargs):
    s3 = boto3.client('s3')
    obj = s3.get_object(Bucket=S3_BUCKET, Key=S3_KEY)
    df = pd.read_csv(obj['Body'])
    kwargs['ti'].xcom_push(key='raw_df', value=df.to_json())

def transform(**kwargs):
    raw_json = kwargs['ti'].xcom_pull(key='raw_df')
    df = pd.read_json(raw_json)
    df_cleaned = clean_data(df)
    kwargs['ti'].xcom_push(key='clean_df', value=df_cleaned.to_json())

def load_to_postgres(**kwargs):
    df_json = kwargs['ti'].xcom_pull(key='clean_df')
    df = pd.read_json(df_json)

    conn = psycopg2.connect(**PG_CONN_DETAILS)
    cursor = conn.cursor()

    # Example schema standardization
    df.columns = [col.lower().replace(' ', '_') for col in df.columns]

    output = StringIO()
    df.to_csv(output, sep='\t', header=False, index=False)
    output.seek(0)

    cursor.copy_from(output, TABLE_NAME, null='')
    conn.commit()
    cursor.close()
    conn.close()

    # Log success
    print(f"Loaded {len(df)} rows to {TABLE_NAME} at {pd.Timestamp.now()}")

extract_task = PythonOperator(task_id='extract', python_callable=extract_from_s3, provide_context=True, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform, provide_context=True, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_to_postgres, provide_context=True, dag=dag)

extract_task >> transform_task >> load_task
