# Retail Price Optimization Pipeline Orchestration

This notebook allows you to run the training and inference pipelines interactively.

In [1]:
import os
import sys
# Add project root to path so we can import modules
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

from pipelines.deployment_pipeline import deployment_pipeline
from pipelines.inference_pipeline import inference_pipeline
from constants import MODEL_NAME, PIPELINE_NAME, PIPELINE_STEP_NAME
from zenml.config import DockerSettings
from zenml.integrations.constants import MLFLOW
from zenml.integrations.mlflow.mlflow_utils import get_tracking_uri

[37mReloading configuration file C:\Users\subam\Desktop\Retail-Price-Optimization-MLOPS\.zen\config.yaml[0m
[33mDaemon functionality is currently not supported on Windows.[0m


## Configuration
Ensure your `.env` file is set up with `DB_URL`.

In [2]:
from dotenv import load_dotenv
load_dotenv(os.path.join('..', '.env'))

print(f"Using DB_URL: {os.getenv('DB_URL')}")

Using DB_URL: postgresql+psycopg2://postgres:8055@localhost:5432/cs002test


## Run Deployment Pipeline
This pipeline ingests data, trains the model, and deploys it if accuracy is sufficient.

In [4]:
# Run just the ingest_data step to see the actual error
try:
    deployment_pipeline(
        min_accuracy=0.5,
        workers=3,
        timeout=60
    )
except Exception as e:
    print(f"Full error: {e}")
    import traceback
    traceback.print_exc()

[37mInitiating a new run for the pipeline: [0m[38;5;105mdeployment_pipeline[37m.[0m


[37mCaching is disabled by default for [0m[38;5;105mdeployment_pipeline[37m.[0m
[37mUsing user: [0m[38;5;105mdefault[37m[0m
[37mUsing stack: [0m[38;5;105mretail_stack_v2[37m[0m
[37m  artifact_store: [0m[38;5;105mdefault[37m[0m
[37m  experiment_tracker: [0m[38;5;105mmlflow_tracker[37m[0m
[37m  orchestrator: [0m[38;5;105mdefault[37m[0m
[37m  model_deployer: [0m[38;5;105mmlflow_deployer[37m[0m
[37mYou can visualize your pipeline runs in the [0m[38;5;105mZenML Dashboard[37m. In order to try it locally, please run [0m[38;5;105mzenml login --local[37m.[0m
[31mStep ingest_data failed.[0m
Traceback (most recent call last):
Traceback (most recent call last):
  File "c:\Users\subam\miniconda3\Lib\site-packages\sqlalchemy\engine\base.py", line 1815, in _execute_context
    except BaseException as e:
                  ^^^^^^^^^^^^
        self._handle_dbapi_exception(
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
            e, util.text_type(statement), parame

Traceback (most recent call last):
  File "C:\Users\subam\AppData\Local\Temp\ipykernel_21556\2514646077.py", line 3, in <module>
    deployment_pipeline(
    ~~~~~~~~~~~~~~~~~~~^
        min_accuracy=0.5,
        ^^^^^^^^^^^^^^^^^
        workers=3,
        ^^^^^^^^^^
        timeout=60
        ^^^^^^^^^^
    )
    ^
  File "c:\Users\subam\miniconda3\Lib\site-packages\zenml\pipelines\pipeline_definition.py", line 1627, in __call__
    return self._run()
           ~~~~~~~~~^^
  File "c:\Users\subam\miniconda3\Lib\site-packages\zenml\pipelines\pipeline_definition.py", line 1102, in _run
    submit_pipeline(
    ~~~~~~~~~~~~~~~^
        snapshot=snapshot, stack=stack, placeholder_run=run
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "c:\Users\subam\miniconda3\Lib\site-packages\zenml\execution\pipeline\utils.py", line 115, in submit_pipeline
    raise e
  File "c:\Users\subam\miniconda3\Lib\site-packages\zenml\execution\pipeline\utils.py", line 96, in subm

## Check MLflow
View the MLflow tracking URI.

In [5]:
print(f"MLflow tracking URI: {get_tracking_uri()}")

MLflow tracking URI: file:C:\Users\subam\AppData\Roaming\zenml\local_stores\0e3eb9ff-2270-4b9c-8250-4bbb7d5279e2\mlruns


## Run Inference Pipeline
This pipeline loads the deployed model and runs inference on new data.

In [7]:
import logging
from zenml.pipelines import pipeline

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

PIPELINE_NAME = "your_pipeline_name"
PIPELINE_STEP_NAME = "your_step_name"

# Try-catch wrapper with detailed error reporting
try:
    inference_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_step_name=PIPELINE_STEP_NAME,
    )
except RuntimeError as e:
    logger.error(f"Pipeline execution failed: {e}")
    logger.error("Failed steps detected. Debugging information:")
    
    # Get detailed error info
    import traceback
    traceback.print_exc()
    
    # Common issues to check:
    print("\n=== DEBUGGING CHECKLIST ===")
    print("1. Check if ingest_data_for_inference step has correct file paths")
    print("2. Verify prediction_service_loader can find the model file")
    print("3. Ensure all required dependencies are installed")
    print("4. Check file permissions and disk space")
    print("5. Verify data format matches expected schema")
    
except Exception as e:
    logger.error(f"Unexpected error: {e}")
    import traceback
    traceback.print_exc()


# ===== COMMON FIXES =====

# Fix 1: Add step-level error handling
@pipeline(name="inference_pipeline_v2")
def inference_pipeline_with_error_handling(
    pipeline_name: str,
    pipeline_step_name: str,
):
    """Inference pipeline with improved error handling."""
    try:
        # Load data step
        data = ingest_data_for_inference(pipeline_name=pipeline_name)
        
        # Load prediction service
        model = prediction_service_loader(pipeline_name=pipeline_name)
        
        return model, data
        
    except FileNotFoundError as e:
        logger.error(f"File not found: {e}")
        raise
    except Exception as e:
        logger.error(f"Step failed: {e}")
        raise


# Fix 2: Add data validation
def validate_input_data(data):
    """Validate input data format and schema."""
    if data is None:
        raise ValueError("Input data is None")
    if len(data) == 0:
        raise ValueError("Input data is empty")
    logger.info(f"Data validation passed. Shape: {data.shape if hasattr(data, 'shape') else len(data)}")
    return data


# Fix 3: Add model validation
def validate_model(model):
    """Validate loaded model."""
    if model is None:
        raise ValueError("Model failed to load")
    logger.info("Model validation passed")
    return model

[37mInitiating a new run for the pipeline: [0m[38;5;105minference_pipeline[37m.[0m
[37mCaching is disabled by default for [0m[38;5;105minference_pipeline[37m.[0m
[37mUsing user: [0m[38;5;105mdefault[37m[0m
[37mUsing stack: [0m[38;5;105mretail_stack_v2[37m[0m
[37m  artifact_store: [0m[38;5;105mdefault[37m[0m
[37m  experiment_tracker: [0m[38;5;105mmlflow_tracker[37m[0m
[37m  orchestrator: [0m[38;5;105mdefault[37m[0m
[37m  model_deployer: [0m[38;5;105mmlflow_deployer[37m[0m
[37mYou can visualize your pipeline runs in the [0m[38;5;105mZenML Dashboard[37m. In order to try it locally, please run [0m[38;5;105mzenml login --local[37m.[0m
[31mStep ingest_data_for_inference failed.[0m
Traceback (most recent call last):
Traceback (most recent call last):
  File "c:\Users\subam\miniconda3\Lib\site-packages\sqlalchemy\engine\base.py", line 1815, in _execute_context
    except BaseException as e:
                  ^^^^^^^^^^^^
        self._handle_db

Traceback (most recent call last):
  File "C:\Users\subam\AppData\Local\Temp\ipykernel_21556\1602504654.py", line 13, in <module>
    inference_pipeline(
    ~~~~~~~~~~~~~~~~~~^
        pipeline_name=PIPELINE_NAME,
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        pipeline_step_name=PIPELINE_STEP_NAME,
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "c:\Users\subam\miniconda3\Lib\site-packages\zenml\pipelines\pipeline_definition.py", line 1627, in __call__
    return self._run()
           ~~~~~~~~~^^
  File "c:\Users\subam\miniconda3\Lib\site-packages\zenml\pipelines\pipeline_definition.py", line 1102, in _run
    submit_pipeline(
    ~~~~~~~~~~~~~~~^
        snapshot=snapshot, stack=stack, placeholder_run=run
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "c:\Users\subam\miniconda3\Lib\site-packages\zenml\execution\pipeline\utils.py", line 115, in submit_pipeline
    raise e
  File "c:\Users\subam\miniconda3\Lib\site-packages\zenml\executi