In [4]:
import importlib
import dataingestion.DataIngestion
import datastorage.DataStorage
import datavalidation.DataValidation
import datapreparation.DataPreparation
import datatransformationandstorage.DataTransformationAndStorage
import featurestore.FeatureStore
import dataversioning.DataVersioning
import modelbuild.ModelBuild

importlib.reload(dataingestion.DataIngestion)
importlib.reload(datastorage.DataStorage)
importlib.reload(datavalidation.DataValidation)
importlib.reload(datapreparation.DataPreparation)
importlib.reload(datatransformationandstorage.DataTransformationAndStorage)
importlib.reload(featurestore.FeatureStore)
importlib.reload(dataversioning.DataVersioning)
importlib.reload(modelbuild.ModelBuild)


from prefect import task, flow, get_run_logger
from prefect.tasks import Task
from dataingestion.DataIngestion import load_csv, load_api, load_db
from datastorage.DataStorage import save_csv_or_db, save_api
from datavalidation.DataValidation import validate_churn_data
from datapreparation.DataPreparation import preprocess_and_eda
from datatransformationandstorage.DataTransformationAndStorage import transform_and_store
from featurestore.FeatureStore  import create_feature_store, sample_feature_queries
from dataversioning.DataVersioning import save_and_version_both
from modelbuild.ModelBuild import run_training
import sqlite3  


def visualize_flow(flow_func):
    # Run once (small dummy run, doesn't execute heavy code since tasks are empty here)
    futures = flow_func()

    dot = Digraph(comment="Prefect Flow DAG", format="png")

    # Collect dependencies manually
    for fut in futures:
        if hasattr(fut, "name"):
            dot.node(fut.name, fut.name)

            # Prefect 2 futures keep upstream dependencies
            for upstream in fut.wait()._parent_task_run_ids or []:
                dot.edge(str(upstream), fut.name)

    return dot
@task
def ingest_data():
    logger = get_run_logger()
    csv_url = "https://synapseaisolutionsa.z13.web.core.windows.net/data/bankcustomerchurn/churn.csv"
    logger.info(f"📥 Ingesting data from {csv_url}")
    df_csv = load_csv(csv_url, csv_url)
    logger.info(f"✅ Data ingestion complete. Shape: {df_csv.shape}")
    return df_csv

@task
def store_data(df_csv):
    logger = get_run_logger()
    base_dir = "datastorage"
    save_csv_or_db(df_csv, base_dir, "csv")
    logger.info(f"✅ Data stored at {base_dir}")
    return base_dir

@task
def validate_data(df_csv):
    logger = get_run_logger()
    base_dir = "datavalidation/reports"
    issues, metadata = validate_churn_data(df_csv, base_dir, "pdf")
    logger.info(f"🔍 Validation complete. Issues: {len(issues)} Metadata: {metadata}")
    return issues, metadata

@task
def prepare_data(df_csv):
    logger = get_run_logger()
    base_dir = "datapreparation/prepared"
    df_processed = preprocess_and_eda(df_csv, base_dir)
    logger.info(f"✅ Data preparation complete. Shape: {df_processed.shape}")
    return df_processed

@task
def transform_data(df_processed):
    logger = get_run_logger()
    base_dir = "datatransformationandstorage/transformationandstorage"
    df_txfnstr = transform_and_store(df_processed, base_dir, "churn")
    logger.info(f"✅ Data transformation complete. Shape: {df_txfnstr.shape}")
    return df_txfnstr

@task
def build_feature_store(df_txfnstr):
    logger = get_run_logger()
    base_path = "featurestore/featurestore"
    df_feature, conn, db_path = create_feature_store(df_txfnstr, base_path)
    sample_feature_queries(conn, base_path)
    logger.info(f"✅ Feature store created at {base_path}, DB path: {db_path}")
    return df_feature, db_path

@task
def version_data(df_csv, df_feature):
    logger = get_run_logger()
    save_and_version_both(
        df_csv,
        df_feature,
        "dataversioning/raw/churn_raw.csv",
        "dataversioning/transformed/churn_transformed_v1.csv",
        "churn_raw.csv",
        "Changes_Commited"
    )
    logger.info("✅ Data versioning complete.")

@task
def train_model(db_path):
    logger = get_run_logger()
    run_training(db_path)
    logger.info("✅ Model training complete.")


@flow(name="Churn ML Pipeline Orchestration")
def churn_pipeline():
    df_csv = ingest_data()
    store_data(df_csv)
    validate_data(df_csv)
    df_processed = prepare_data(df_csv)
    df_txfnstr = transform_data(df_processed)
    df_feature, db_path = build_feature_store(df_txfnstr)
    version_data(df_csv, df_feature)
    train_model(db_path)

    print("✅ Pipeline complete!")

visualize_flow(churn_pipeline, "Churn ML Pipeline")


if __name__ == "__main__":
    churn_pipeline()

[main 173875d] Dataset update: churn_raw.csv (raw + transformed) - Changes_Commited
 14 files changed, 10829 insertions(+), 112 deletions(-)
 create mode 100644 datapreparation/prepared/cleaned_data_20250823_233451.csv
 create mode 100644 datapreparation/prepared/eda_report_20250823_233450.pdf
 create mode 100644 datatransformationandstorage/transformationandstorage/schema_design_20250823_233451.sql
 create mode 100644 datavalidation/reports/churn_data_issues_20250823_233450.csv
 create mode 100644 datavalidation/reports/churn_data_metadata_20250823_233450.csv
 create mode 100644 datavalidation/reports/churn_data_report_20250823_233450.pdf
[main 5691a28] Update version metadata for churn_raw.csv
 1 file changed, 10 insertions(+)


From https://github.com/dhairyas87/dmml-bank-churn-pipeline
 * branch            main       -> FETCH_HEAD


Current branch main is up to date.
✅ Raw + Transformed datasets for churn_raw.csv saved, versioned, and pushed under commit 173875dd9c59ca70a80c99e65205517b913e8521


To https://github.com/dhairyas87/dmml-bank-churn-pipeline.git
   4eca96b..5691a28  main -> main


✅ Training complete with engineered features.
📂 Deliverables: models/, data/model_results.txt, data/model_versions.json


✅ Pipeline complete!


AttributeError: 'State' object has no attribute 'tasks'