Перед запуском убедитесь, что в корне проекта есть файл .env и в нем заполнены выданные вам креды подключения к базам данных и хранилищу

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
import pandas as pd
from dotenv import load_dotenv, find_dotenv
from sqlalchemy import create_engine

In [None]:
# подгружаем .env
load_dotenv()

In [None]:
# Считываем все креды
src_host = os.environ.get('DB_SOURCE_HOST')
src_port = os.environ.get('DB_SOURCE_PORT')
src_username = os.environ.get('DB_SOURCE_USER')
src_password = os.environ.get('DB_SOURCE_PASSWORD')
src_db = os.environ.get('DB_SOURCE_NAME') 

dst_host = os.environ.get('DB_DESTINATION_HOST')
dst_port = os.environ.get('DB_DESTINATION_PORT')
dst_username = os.environ.get('DB_DESTINATION_USER')
dst_password = os.environ.get('DB_DESTINATION_PASSWORD')
dst_db = os.environ.get('DB_DESTINATION_NAME')

s3_bucket = os.environ.get('S3_BUCKET_NAME')
s3_access_key = os.environ.get('AWS_ACCESS_KEY_ID')
s3_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')

In [None]:
# Создадим соединения
src_conn = create_engine(f'postgresql://{src_username}:{src_password}@{src_host}:{src_port}/{src_db}')
dst_conn = create_engine(f'postgresql://{dst_username}:{dst_password}@{dst_host}:{dst_port}/{dst_db}')

In [None]:
# dags/churn.py


import pendulum
from airflow.decorators import dag, task
import pandas as pd
import numpy as np
from airflow.providers.postgres.hooks.postgres import PostgresHook
from sqlalchemy import (
MetaData, Table, Column,
String, Integer, DateTime, Float,
UniqueConstraint, inspect
)



@dag(
    schedule='@once',
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    tags=["ETL"]
)
def prepare_churn_dataset():
    
    hook_dest = PostgresHook('destination_db')
    hook_source = PostgresHook('source_db')
    
    
    @task()
    def create_table(hook:PostgresHook) -> None:
        metadata = MetaData()
        users_churn_table = Table(
            'users_churn',
            metadata,
            Column('id', Integer, primary_key=True, autoincrement=True),
            Column('customer_id', String),
            Column('begin_date', DateTime),
            Column('end_date', DateTime),
            Column('type', String),
            Column('paperless_billing', String),
            Column('payment_method', String),
            Column('monthly_charges', Float),
            Column('total_charges', Float),
            Column('internet_service', String),
            Column('online_security', String),
            Column('online_backup', String),
            Column('device_protection', String),
            Column('tech_support', String),
            Column('streaming_tv', String),
            Column('streaming_movies', String),
            Column('gender', String),
            Column('senior_citizen', Integer),
            Column('partner', String),
            Column('dependents', String),
            Column('multiple_lines', String),
            Column('target', Integer),
            UniqueConstraint('customer_id', name='unique_customer_id_constraint')
        )
        con = hook.get_sqlalchemy_engine()
        if not inspect(con).has_table(users_churn_table.name):
            metadata.create_all(con)

    @task()
    def extract(hook:PostgresHook):
        conn =  hook.get_sqlalchemy_engine()
        
        sql = """
        SELECT
            c.customer_id, c.begin_date, c.end_date, c.type, c.paperless_billing, c.payment_method, c.monthly_charges, c.total_charges,
            i.internet_service, i.online_security, i.online_backup, i.device_protection, i.tech_support, i.streaming_tv, i.streaming_movies,
            p.gender, p.senior_citizen, p.partner, p.dependents,
            ph.multiple_lines
        FROM contracts AS c
        LEFT JOIN internet AS i ON i.customer_id = c.customer_id
        LEFT JOIN personal AS p ON p.customer_id = c.customer_id
        LEFT JOIN phone AS ph ON ph.customer_id = c.customer_id
        """
        data = pd.read_sql(sql, conn)
        return data

    @task()
    def transform(data: pd.DataFrame)->pd.DataFrame:
        data['target'] = (data['end_date'] != 'No').astype(int)
        data['end_date'].replace({'No': None}, inplace=True)
        return data

    @task()
    def load(data: pd.DataFrame,hook:PostgresHook)->None:
        hook = PostgresHook('destination_db')
        hook.insert_rows(
            table="users_churn",
            replace=True,
            target_fields=data.columns.tolist(),
            replace_index=['customer_id'],
            rows=data.values.tolist()
    )

    create_table(hook_dest)
    data = extract(hook_source)
    transformed_data = transform(data)
    load(transformed_data,hook_dest)
    
prepare_churn_dataset()