### Clear memory

In [1]:
%reset -f
import gc
gc.collect()

0

### Import 

In [2]:
import sys, joblib, shap, os, anthropic, uvicorn, logging, hashlib, nest_asyncio
import pandas as pd
import numpy as np
from xgboost import XGBClassifier
from dotenv import load_dotenv
from langchain_anthropic import ChatAnthropic
from langchain.schema.exceptions import LangChainException
from langchain.schema.output_parser import OutputParserException
from langchain.memory import ConversationBufferWindowMemory
from langchain.chains.conversation.base import ConversationChain
from langchain.prompts import PromptTemplate
from pydantic import ValidationError
from datetime import datetime
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from io import StringIO
from contextlib import asynccontextmanager
from typing import List, Tuple, Dict, Optional, Any, Union
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [3]:
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
warnings.filterwarnings('ignore', category=FutureWarning)

In [4]:
sys.path.append('..')
from src.scripts.data_utils import TEPDataLoader, filter_csv
from src.scripts.feature_engineering import create_lag_features, create_diff_features, create_rolling_features

# Data Loading
Load preprocessed Tennessee Eastman Process data from the previous analysis.

In [5]:
loader = TEPDataLoader(
    raw_data_path='../data/raw',
    processed_data_path='../data/processed',
)

# # keep this commented, if first notebook was run
# loader.convert_and_save_to_csv()


In [6]:
SELECTED_FAULTS = [0, 1, 13, 16]
MAX_SIMULATION = 50
files = ['TEP_faulty_testing']

# # keep this commented, if first notebook was run
# for f in files:
#     filter_csv(f, SELECTED_FAULTS, MAX_SIMULATION, data_path='../data/processed')
#     print(f'File {f} has been filtered and saved')

In [7]:
df_test = pd.read_csv('../data/processed/TEP_faulty_testing_filtered.csv')

# Data Preprocessing
Prepare datasets and create binary fault indicators for anomaly detection.

In [8]:
df_test['faultNumber'] = df_test['faultNumber'].astype(int)

In [9]:
df_test['faulty'] = (df_test['faultNumber'] > 0) & (df_test['sample'] > 160)

Select 3 samples from faulty data

In [10]:
sim_data = df_test[df_test['faulty'] == True]
sim_data1 = sim_data.iloc[1000:1003]
sim_data2 = sim_data.iloc[3000:3003]

# Production Pipeline Architecture Design
This section implements a complete end-to-end anomaly detection pipeline that integrates XGBoost predictions, SHAP explanations, and LLM-generated natural language responses for industrial operators.

## Basic Pipeline Implementation
The initial pipeline demonstrates core functionality with a clean, modular design that processes raw sensor data through feature engineering, anomaly detection, explainability analysis, and natural language generation.

### Pipeline Workflow
1. **Data Preprocessing**: Extract temporal sequences and apply feature engineering
2. **Anomaly Detection**: Generate predictions and confidence scores using trained XGBoost model
3. **Explainability**: Calculate SHAP values and identify most impactful process variables
4. **Natural Language Generation**: Create operator-friendly explanations via Anthropic Claude API

In [11]:
class AnomalyDetectionPipeline:

    FEATURE_DESCRIPTIONS = {
            'xmeas_1': 'A feed (stream 1)',
            'xmeas_2': 'D feed (stream 2)',
            'xmeas_3': 'E feed (stream 3)',
            'xmeas_4': 'A and C feed (stream 4)',
            'xmeas_5': 'Recycle flow (stream 8)',
            'xmeas_6': 'Reactor feed rate (stream 6)',
            'xmeas_7': 'Reactor pressure',
            'xmeas_8': 'Reactor level',
            'xmeas_9': 'Reactor temperature',
            'xmeas_10': 'Purge rate (stream 9)',
            'xmeas_11': 'Product separator temperature',
            'xmeas_12': 'Product separator level',
            'xmeas_13': 'Product separator pressure',
            'xmeas_14': 'Product separator underflow (stream 10)',
            'xmeas_15': 'Stripper level',
            'xmeas_16': 'Stripper pressure',
            'xmeas_17': 'Stripper underflow (stream 11)',
            'xmeas_18': 'Stripper temperature',
            'xmeas_19': 'Stripper steam flow',
            'xmeas_20': 'Compressor work',
            'xmeas_21': 'Reactor cooling water outlet temperature',
            'xmeas_22': 'Separator cooling water outlet temperature',
            'xmeas_23': 'A composition in reactor feed (stream 6)',
            'xmeas_24': 'B composition in reactor feed (stream 6)',
            'xmeas_25': 'C composition in reactor feed (stream 6)',
            'xmeas_26': 'D composition in reactor feed (stream 6)',
            'xmeas_27': 'E composition in reactor feed (stream 6)',
            'xmeas_28': 'F composition in reactor feed (stream 6)',
            'xmeas_29': 'A composition in purge gas (stream 9)',
            'xmeas_30': 'B composition in purge gas (stream 9)',
            'xmeas_31': 'C composition in purge gas (stream 9)',
            'xmeas_32': 'D composition in purge gas (stream 9)',
            'xmeas_33': 'E composition in purge gas (stream 9)',
            'xmeas_34': 'F composition in purge gas (stream 9)',
            'xmeas_35': 'G composition in purge gas (stream 9)',
            'xmeas_36': 'H composition in purge gas (stream 9)',
            'xmeas_37': 'D composition in product (stream 11)',
            'xmeas_38': 'E composition in product (stream 11)',
            'xmeas_39': 'F composition in product (stream 11)',
            'xmeas_40': 'G composition in product (stream 11)',
            'xmeas_41': 'H composition in product (stream 11)',
            'xmv_1': 'D feed flow valve (stream 2)',
            'xmv_2': 'E feed flow valve (stream 3)',
            'xmv_3': 'A feed flow valve (stream 1)',
            'xmv_4': 'A and C feed flow valve  (stream 4)',
            'xmv_5': 'Compressor recycle valve',
            'xmv_6': 'Purge valve (stream 9)',
            'xmv_7': 'Separator pot liquid flow valve (stream 10)',
            'xmv_8': 'Stripper liquid product flow valve (stream 11)',
            'xmv_9': 'Stripper steam valve',
            'xmv_10': 'Reactor cooling water flow',
            'xmv_11': 'Condenser cooling water flow',
            'xmv_12': 'Agitator speed',
        }

    def __init__(self):
        self.anomaly_detector = XGBClassifier(n_jobs=-1, random_state=42)
        self.anomaly_detector.load_model('../models/xgb_model.json')

        self.explainer = shap.TreeExplainer(self.anomaly_detector)

        self.scaler = joblib.load('../models/scaler.pkl')

        self.all_features = joblib.load('../models/all_features.pkl')
        self.selected_features = joblib.load('../models/selected_features.pkl')

        load_dotenv()
        self.llm_client = anthropic.Anthropic(api_key=os.getenv('ANTHROPIC_API_KEY'))

    def get_sequence_for_analysis(self, data:pd.DataFrame, simulation_run:int=None, target_sample:int=None) -> pd.DataFrame:
        if simulation_run is None:
            simulation_run = self.simulation_run
        if target_sample is None:
            target_sample = self.target_sample

        assert data.shape[0] >= 3, 'At least 3 samples must be provided'
        assert target_sample >= 2, 'Target sample has to have at least 2 previous time steps'
        sim_data = data[data['simulationRun'] == simulation_run]
        start_idx = target_sample - 2
        end_idx = target_sample
        mask = (sim_data['sample'] >= start_idx) & (sim_data['sample'] <= end_idx)
        return sim_data[mask]

    def feature_engineering(self, data:pd.DataFrame) -> pd.DataFrame:
        data = create_lag_features(data=data, lags=[1,2], columns=self.selected_features, group_by='simulationRun', dropna=False)
        data = create_rolling_features(data=data, window_sizes=[3], columns=self.selected_features, group_by='simulationRun', dropna=False)
        data = create_diff_features(data=data, columns=self.selected_features, group_by='simulationRun', dropna=True)
        return data

    def feature_scaling(self, data:pd.DataFrame) -> np.ndarray:
        X = self.scaler.transform(data[self.all_features])
        return X

    def predict_anomaly(self, X:np.ndarray) -> tuple[int, float]:
        confidence = self.anomaly_detector.predict_proba(X)[0, 1]
        prediction = self.anomaly_detector.predict(X)[0]
        return prediction, confidence

    def get_shap_importance(self, X:np.ndarray) -> np.ndarray:
        shap_values = self.explainer(X).values
        return shap_values

    def get_most_impactful_features(self, shap_values:np.ndarray, top_n:int=3) -> list:
        base_feature_impacts = {}

        for i, feature in enumerate(self.all_features):
            impact = shap_values[0,i]
            base_feature = '_'.join(feature.split('_')[:2])
            base_description = self.FEATURE_DESCRIPTIONS[base_feature]

            if base_feature not in base_feature_impacts:
                base_feature_impacts[base_feature] = {'total_impact': 0, 'description': base_description}

            base_feature_impacts[base_feature]['total_impact'] += impact

        impactful_features = sorted(base_feature_impacts.items(), key=lambda x: x[1]['total_impact'], reverse=True)
        return impactful_features[:top_n]

    def create_llm_prompt(self, confidence:float=None, impactful_features:list[tuple[str, dict]]=None) -> str:
        if confidence is None:
            confidence = self.confidence
        if impactful_features is None:
            impactful_features = self.impactful_features

        if confidence >= 0.75:
            context = f'ALERT: {confidence:.1%} confidence - Key factors: '
        elif confidence >= 0.5:
            context = f'CAUTION: {confidence:.1%} confidence - Key factors: '
        else:
            context = f'NORMAL: {confidence:.1%} anomaly probability'

        factors = []
        for _, data in impactful_features:
            description = data['description']
            if confidence >= 0.5:
                factors.append(description)

        context += ', '.join(factors)
        prompt = f"""You are a Tennessee Eastman Process control engineer analyzing plant anomalies. Your role is to provide actionable technical analysis for plant operators.

CONTEXT: Tennessee Eastman is a chemical process with reactor, separator, stripper, and recycle streams producing products G and H from reactants A, C, D, E.

ANALYSIS FORMAT:
- STATUS: [NORMAL (less than 50% confidence) / CAUTION (between 50% and 75% confidence) / ALERT (more than 75% confidence)]
- ISSUE: Brief technical description (one sentence)
- ROOT CAUSE: Most likely physical/chemical cause (one sentence)
- IMMEDIATE ACTION: Single most critical operator step
- MONITORING: One key parameter to track

EXAMPLES:

Input: NORMAL: 15.2% anomaly probability
Output: STATUS: NORMAL - Continue routine monitoring of all process variables.

Input: CAUTION: 65.8% confidence - Key factors: Reactor temperature, Cooling water outlet temperature
Output: STATUS: CAUTION
ISSUE: Reactor thermal management deviation detected
ROOT CAUSE: Cooling water system efficiency reduction or heat duty increase
IMMEDIATE ACTION: Verify cooling water flow rates and heat exchanger performance
MONITORING: Track reactor temperature trend

Input: ALERT: 94.3% confidence - Key factors: Product separator pressure, Product separator level, Product separator temperature
Output: STATUS: ALERT
ISSUE: Product separator control system failure detected
ROOT CAUSE: Multiple control loops failing simultaneously indicating instrumentation malfunction
IMMEDIATE ACTION: Switch separator to manual control and verify pressure relief systems
MONITORING: Product separator pressure

CURRENT ANALYSIS:
{context}
Output:"""
        return prompt

    def generate_explanation(self, prompt:str) -> str:
        response = self.llm_client.messages.create(
            model='claude-3-haiku-20240307',
            max_tokens=200,
            messages=[{'role': 'user', 'content': prompt}],
            temperature=0.3,
        )

        return response.content[0].text

    def analyze_sample(self, data:pd.DataFrame, simulation_run:int=None, target_sample:int=None) -> str:
        self.simulation_run = data.iloc[0]['simulationRun'] if simulation_run is None else simulation_run
        self.target_sample = data['sample'].max() if target_sample is None else target_sample
        data = self.get_sequence_for_analysis(data)
        data = self.feature_engineering(data)
        X = self.feature_scaling(data)
        self.prediction, self.confidence = self.predict_anomaly(X)
        shap_values = self.get_shap_importance(X)
        self.impactful_features = self.get_most_impactful_features(shap_values)
        prompt = self.create_llm_prompt()
        response = self.generate_explanation(prompt)
        return response


### Pipeline Validation Testing
Validate the basic pipeline functionality using real anomaly data to ensure all components integrate correctly and produce meaningful operator guidance.


In [12]:
p = AnomalyDetectionPipeline()
r = p.analyze_sample(sim_data)
print(r)

STATUS: ALERT
ISSUE: Stripper system performance degradation detected
ROOT CAUSE: Reduced separation efficiency in the stripper column, likely due to fouling or scaling
IMMEDIATE ACTION: Reduce A and C feed rates, increase stripper temperature setpoint to improve separation
MONITORING: Stripper temperature and pressure profiles


The explanation from LLM to provided piece of data is received - the pipeline works

## Improved Production Pipeline with LangChain Integration
The improved version addresses production requirements with enterprise-grade features including error handling, caching, conversation memory, and robust fallback mechanisms.

### Advanced Features Implementation

**Conversation Memory**: maintains context of recent analyses to provide historically-aware explanations, enabling operators to understand patterns and recurring issues.

**Response Caching**: implements intelligent caching based on confidence scores and feature combinations to reduce API costs and improve response times for similar scenarios.

**Error Handling & Fallbacks**: comprehensive exception handling with structured fallback responses ensures system reliability even during API failures or unexpected data conditions.

**Response Validation**: automatic validation of LLM outputs against expected format ensures consistent, actionable operator guidance.

**Structured Logging**: detailed logging throughout the pipeline enables monitoring, debugging, and performance optimization in production environments.

### LangChain Integration Benefits
- **Prompt Management**: template-based prompt engineering with variable injection
- **Memory Management**: automatic conversation history tracking and context management
- **Chain Architecture**: modular LLM interaction patterns for maintainable code
- **Provider Abstraction**: easy switching between different LLM providers if needed



In [13]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('anomaly_detector')

In [14]:
class ImprovedAnomalyDetectionPipeline:

    FEATURE_DESCRIPTIONS = {
            'xmeas_1': 'A feed (stream 1)',
            'xmeas_2': 'D feed (stream 2)',
            'xmeas_3': 'E feed (stream 3)',
            'xmeas_4': 'A and C feed (stream 4)',
            'xmeas_5': 'Recycle flow (stream 8)',
            'xmeas_6': 'Reactor feed rate (stream 6)',
            'xmeas_7': 'Reactor pressure',
            'xmeas_8': 'Reactor level',
            'xmeas_9': 'Reactor temperature',
            'xmeas_10': 'Purge rate (stream 9)',
            'xmeas_11': 'Product separator temperature',
            'xmeas_12': 'Product separator level',
            'xmeas_13': 'Product separator pressure',
            'xmeas_14': 'Product separator underflow (stream 10)',
            'xmeas_15': 'Stripper level',
            'xmeas_16': 'Stripper pressure',
            'xmeas_17': 'Stripper underflow (stream 11)',
            'xmeas_18': 'Stripper temperature',
            'xmeas_19': 'Stripper steam flow',
            'xmeas_20': 'Compressor work',
            'xmeas_21': 'Reactor cooling water outlet temperature',
            'xmeas_22': 'Separator cooling water outlet temperature',
            'xmeas_23': 'A composition in reactor feed (stream 6)',
            'xmeas_24': 'B composition in reactor feed (stream 6)',
            'xmeas_25': 'C composition in reactor feed (stream 6)',
            'xmeas_26': 'D composition in reactor feed (stream 6)',
            'xmeas_27': 'E composition in reactor feed (stream 6)',
            'xmeas_28': 'F composition in reactor feed (stream 6)',
            'xmeas_29': 'A composition in purge gas (stream 9)',
            'xmeas_30': 'B composition in purge gas (stream 9)',
            'xmeas_31': 'C composition in purge gas (stream 9)',
            'xmeas_32': 'D composition in purge gas (stream 9)',
            'xmeas_33': 'E composition in purge gas (stream 9)',
            'xmeas_34': 'F composition in purge gas (stream 9)',
            'xmeas_35': 'G composition in purge gas (stream 9)',
            'xmeas_36': 'H composition in purge gas (stream 9)',
            'xmeas_37': 'D composition in product (stream 11)',
            'xmeas_38': 'E composition in product (stream 11)',
            'xmeas_39': 'F composition in product (stream 11)',
            'xmeas_40': 'G composition in product (stream 11)',
            'xmeas_41': 'H composition in product (stream 11)',
            'xmv_1': 'D feed flow valve (stream 2)',
            'xmv_2': 'E feed flow valve (stream 3)',
            'xmv_3': 'A feed flow valve (stream 1)',
            'xmv_4': 'A and C feed flow valve  (stream 4)',
            'xmv_5': 'Compressor recycle valve',
            'xmv_6': 'Purge valve (stream 9)',
            'xmv_7': 'Separator pot liquid flow valve (stream 10)',
            'xmv_8': 'Stripper liquid product flow valve (stream 11)',
            'xmv_9': 'Stripper steam valve',
            'xmv_10': 'Reactor cooling water flow',
            'xmv_11': 'Condenser cooling water flow',
            'xmv_12': 'Agitator speed',
        }

    PROMPT_TEMPLATE = """You are a Tennessee Eastman Process control engineer analyzing plant anomalies. Your role is to provide actionable technical analysis for plant operators.

CONTEXT: Tennessee Eastman is a chemical process with reactor, separator, stripper, and recycle streams producing products G and H from reactants A, C, D, E.

HISTORICAL CONTEXT (last 5 relevant analyses):
{HISTORY}

ANALYSIS FORMAT:
- STATUS: [NORMAL (less than 50% confidence) / CAUTION (between 50% and 75% confidence) / ALERT (more than 75% confidence)]
- ISSUE: Brief technical description (one sentence)
- ROOT CAUSE: Most likely physical/chemical cause (one sentence)
- IMMEDIATE ACTION: Single most critical operator step
- MONITORING: One key parameter to track

EXAMPLES:

Input: NORMAL: 15.2% anomaly probability
Output: STATUS: NORMAL - Continue routine monitoring of all process variables.

Input: CAUTION: 65.8% confidence - Key factors: Reactor temperature, Cooling water outlet temperature
Output: STATUS: CAUTION
ISSUE: Reactor thermal management deviation detected
ROOT CAUSE: Cooling water system efficiency reduction or heat duty increase
IMMEDIATE ACTION: Verify cooling water flow rates and heat exchanger performance
MONITORING: Track reactor temperature trend

Input: ALERT: 94.3% confidence - Key factors: Product separator pressure, Product separator level, Product separator temperature
Output: STATUS: ALERT
ISSUE: Product separator control system failure detected
ROOT CAUSE: Multiple control loops failing simultaneously indicating instrumentation malfunction
IMMEDIATE ACTION: Switch separator to manual control and verify pressure relief systems
MONITORING: Product separator pressure

CURRENT ANALYSIS:
{CONTEXT}
Output:"""

    def __init__(self):
        self.anomaly_detector = XGBClassifier(n_jobs=-1, random_state=42)
        self.anomaly_detector.load_model('../models/xgb_model.json')

        self.explainer = shap.TreeExplainer(self.anomaly_detector)
        self.scaler = joblib.load('../models/scaler.pkl')
        self.all_features = joblib.load('../models/all_features.pkl')
        self.selected_features = joblib.load('../models/selected_features.pkl')

        self.response_cache = {}

        load_dotenv()

        try:
            self.llm = ChatAnthropic(
                model='claude-3-haiku-20240307',
                temperature=0.3,
                max_tokens=200,
                api_key=os.getenv('ANTHROPIC_API_KEY')
            )

            self.prompt_template = PromptTemplate(
                template=self.PROMPT_TEMPLATE,
                input_variables=['CONTEXT', 'HISTORY'],
            )

            self.memory = ConversationBufferWindowMemory(
                k=3,
                memory_key='HISTORY',
                input_key='CONTEXT',
                return_messages=False,
            )

            self.conversation_chain = ConversationChain(
                llm=self.llm,
                prompt=self.prompt_template,
                memory=self.memory,
                input_key='CONTEXT',
            )

            logger.info('ImprovedAnomalyDetectionPipeline initialized successfully')

        except Exception as e:
            logger.error(f'Failed to initialize LangChain components: {str(e)}')
            raise

    def get_sequence_for_analysis(self, data:pd.DataFrame, target_sample:int = None, simulation_run:int = None) -> pd.DataFrame:
        if target_sample is None:
            target_sample = self.target_sample
        if simulation_run is None:
            simulation_run = self.simulation_run

        assert data.shape[0] >= 3, 'At least 3 samples must be provided'
        assert target_sample >= 2, 'Target sample has to have at least 2 previous time steps'

        sim_data = data[data['simulationRun'] == simulation_run]
        start_idx = target_sample - 2
        end_idx = target_sample
        mask = (sim_data['sample'] >= start_idx) & (sim_data['sample'] <= end_idx)
        return sim_data[mask]

    def feature_engineering(self, data:pd.DataFrame) -> pd.DataFrame:
        data = create_lag_features(data=data, lags=[1,2], columns=self.selected_features, group_by='simulationRun', dropna=False)
        data = create_rolling_features(data=data, window_sizes=[3], columns=self.selected_features, group_by='simulationRun', dropna=False)
        data = create_diff_features(data=data, columns=self.selected_features, group_by='simulationRun', dropna=True)
        return data

    def feature_scaling(self, data:pd.DataFrame) -> np.ndarray:
        X = self.scaler.transform(data[self.all_features])
        return X

    def predict_anomaly(self, X:np.ndarray) -> tuple[int, float]:
        confidence = self.anomaly_detector.predict_proba(X)[0, 1]
        prediction = self.anomaly_detector.predict(X)[0]
        return prediction, confidence

    def get_shap_importance(self, X:np.ndarray) -> np.ndarray:
        shap_values = self.explainer(X).values
        return shap_values

    def get_most_impactful_features(self, shap_values:np.ndarray, top_n:int=3) -> list:
        base_feature_impacts = {}

        for i, feature in enumerate(self.all_features):
            impact = shap_values[0,i]
            base_feature = '_'.join(feature.split('_')[:2])
            base_description = self.FEATURE_DESCRIPTIONS[base_feature]

            if base_feature not in base_feature_impacts:
                base_feature_impacts[base_feature] = {'total_impact': 0, 'description': base_description}

            base_feature_impacts[base_feature]['total_impact'] += impact

        impactful_features = sorted(base_feature_impacts.items(), key=lambda x: x[1]['total_impact'], reverse=True)
        return impactful_features[:top_n]

    def prepare_context_data(self, confidence:float=None, impactful_features:list[tuple[str,dict]]=None) -> dict:
        if confidence is None:
            confidence = self.confidence
        if impactful_features is None:
            impactful_features = self.impactful_features

        if confidence >= 0.75:
            context = f'ALERT: {confidence:.1%} confidence - Key factors: '
        elif confidence >= 0.5:
            context = f'CAUTION: {confidence:.1%} confidence - Key factors: '
        else:
            context = f'NORMAL: {confidence:.1%} anomaly probability'

        factors = []
        for _, data in impactful_features:
            description = data['description']
            if confidence >= 0.5:
                factors.append(description)

        if factors:
            context += ', '.join(factors)

        logger.info(f'Context prepared: {context[:100]}...')
        return {'CONTEXT': context}

    def _validate_response(self, response_text: str) -> bool:
        required_fields = ['STATUS:', 'ISSUE:', 'ROOT CAUSE:', 'IMMEDIATE ACTION:', 'MONITORING:']
        is_valid = all(field in response_text for field in required_fields)
        return is_valid

    def _get_fallback_response(self, confidence:float=None) -> str:
        if confidence is None:
            confidence = self.confidence

        if confidence >= 0.75:
            responce = """STATUS: ALERT
ISSUE: Anomaly detected with high confidence
ROOT CAUSE: Analysis system temporarily unavailable - manual investigation required
IMMEDIATE ACTION: Investigate manually using plant monitoring systems
MONITORING: All critical process parameters"""
            return responce
        elif confidence >= 0.5:
            responce = """STATUS: CAUTION
ISSUE: Potential anomaly detected
ROOT CAUSE: Analysis system temporarily unavailable - verify conditions manually
IMMEDIATE ACTION: Check key process parameters for deviations
MONITORING: Process trend monitoring"""
            return responce
        else:
            responce = 'STATUS: NORMAL - Continue routine monitoring of all process variables.'
            return responce

    def _create_cache_key(self, confidence:float=None, impactful_features:list=None) -> str:
        if confidence is None:
            confidence = self.confidence
        if impactful_features is None:
            impactful_features = self.impactful_features

        features_str = str(sorted(f[1]['description'] for f in impactful_features))
        confidence_rounded = round(confidence, 2)

        cache_data = f'{confidence_rounded}_{features_str}'
        return hashlib.md5(cache_data.encode()).hexdigest()

    def generate_explanation(self, context_data: dict, confidence:float=None, impactful_features:list=None) -> str:
        if confidence is None:
            confidence = self.confidence
        if impactful_features is None:
            impactful_features = self.impactful_features

        cache_key = self._create_cache_key(confidence, impactful_features)

        if cache_key in self.response_cache:
            logger.info(f'Cache hit for key: {cache_key[:8]}...')
            return self.response_cache[cache_key]

        try:
            logger.info('Invoking conversation chain with memory')
            response = self.conversation_chain.predict(CONTEXT=context_data['CONTEXT'])
            logger.info(f'LLM response received: {len(response)} characters')
            if self._validate_response(response):
                self.response_cache[cache_key] = response.strip()
                logger.info(f'Cached response for key: {cache_key[:8]}...')
                return response.strip()
            else:
                logger.error('Response validation failed, using fallback')
                return self._get_fallback_response(confidence)

        except (LangChainException, OutputParserException, ValidationError) as e:
            logger.error(f'LangChain specific error: {str(e)}')
            return self._get_fallback_response(confidence)

        except Exception as e:
            logger.error(f'Unexpected error: {str(e)}')
            return self._get_fallback_response(confidence)

    def _extract_action_from_response(self, response: str) -> str:
        lines = response.split('\n')
        for line in lines:
            if line.startswith('IMMEDIATE ACTION:'):
                return line.replace('IMMEDIATE ACTION:', '').strip()
        return 'Monitor process parameters'

    def format_analysis_for_memory(self, response:str, confidence:float=None, impactful_features:list=None, timestamp: str = None) -> str:
        if confidence is None:
            confidence = self.confidence
        if impactful_features is None:
            impactful_features = self.impactful_features
        if timestamp is None:
            timestamp = datetime.now().strftime('%H:%M')

        features = [data['description'] for _, data in impactful_features[:3]]
        action = self._extract_action_from_response(response)
        memory = f"Time: {timestamp} | Confidence: {confidence:.1%} | Features: {', '.join(features)} | Action taken: {action}"
        return memory

    def analyze_sample(self, data: pd.DataFrame, simulation_run: int = None, target_sample: int = None) -> str:
        try:
            self.simulation_run = data.iloc[0]['simulationRun'] if simulation_run is None else simulation_run
            self.target_sample = data['sample'].max() if target_sample is None else target_sample

            logger.info(f'Starting analysis for simulation_run: {self.simulation_run}, target_sample: {self.target_sample}')

            data = self.get_sequence_for_analysis(data)
            data = self.feature_engineering(data)
            X = self.feature_scaling(data)

            self.prediction, self.confidence = self.predict_anomaly(X)
            shap_values = self.get_shap_importance(X)

            self.impactful_features = self.get_most_impactful_features(shap_values)

            context_data = self.prepare_context_data()
            response = self.generate_explanation(context_data)

            if self.confidence >= 0.5:
                memory_entry = self.format_analysis_for_memory(response)
                logger.info(f'Saving to memory: {memory_entry}')

            logger.info('Analysis completed successfully')
            return response

        except Exception as e:
            logger.error(f'Error in analyze_sample: {str(e)}')
            return 'STATUS: ERROR - Analysis system temporarily unavailable. Please use manual monitoring procedures.'

    def reset_state(self) -> None:
        self.prediction = None
        self.confidence = None
        self.impactful_features = None
        self.simulation_run = None
        self.target_sample = None


### Improved Pipeline Validation Testing
Test the production-ready pipeline with multiple samples to validate advanced features including conversation memory, caching mechanisms, and historical context integration.

**Multi-Sample Testing Strategy:**
- Sample 1: establish baseline response and populate conversation memory
- Sample 2: test memory integration and contextual awareness in explanations
- Cache Validation: verify identical scenarios utilize cached responses for efficiency
- Memory Persistence: confirm conversation history maintains relevant context across analyses

In [15]:
p = ImprovedAnomalyDetectionPipeline()

result1 = p.analyze_sample(sim_data1)
print('First:', result1)

INFO:anomaly_detector:ImprovedAnomalyDetectionPipeline initialized successfully
INFO:anomaly_detector:Starting analysis for simulation_run: 1.0, target_sample: 363
INFO:anomaly_detector:Context prepared: ALERT: 100.0% confidence - Key factors: Stripper temperature, Stripper pressure, Stripper steam valv...
INFO:anomaly_detector:Invoking conversation chain with memory
INFO:httpx:HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
INFO:anomaly_detector:LLM response received: 306 characters
INFO:anomaly_detector:Cached response for key: 96fe58ad...
INFO:anomaly_detector:Saving to memory: Time: 01:17 | Confidence: 100.0% | Features: Stripper temperature, Stripper pressure, Stripper steam valve | Action taken: Manually control stripper steam valve to maintain temperature and pressure setpoints
INFO:anomaly_detector:Analysis completed successfully


First: STATUS: ALERT
ISSUE: Stripper system malfunction detected
ROOT CAUSE: Stripper steam valve failure leading to loss of temperature and pressure control
IMMEDIATE ACTION: Manually control stripper steam valve to maintain temperature and pressure setpoints
MONITORING: Stripper temperature and pressure trends


In [16]:
result2 = p.analyze_sample(sim_data2)
print('Second:', result2)

INFO:anomaly_detector:Starting analysis for simulation_run: 2.0, target_sample: 763
INFO:anomaly_detector:Context prepared: ALERT: 100.0% confidence - Key factors: Stripper temperature, A and C feed (stream 4), Stripper stea...
INFO:anomaly_detector:Invoking conversation chain with memory
INFO:httpx:HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
INFO:anomaly_detector:LLM response received: 395 characters
INFO:anomaly_detector:Cached response for key: 2ceb047f...
INFO:anomaly_detector:Saving to memory: Time: 01:17 | Confidence: 100.0% | Features: Stripper temperature, A and C feed (stream 4), Stripper steam valve | Action taken: Manually control the stripper steam valve to maintain the stripper temperature and pressure at their setpoints
INFO:anomaly_detector:Analysis completed successfully


Second: STATUS: ALERT
ISSUE: Stripper system malfunction detected
ROOT CAUSE: Stripper steam valve failure leading to loss of temperature and pressure control in the stripper
IMMEDIATE ACTION: Manually control the stripper steam valve to maintain the stripper temperature and pressure at their setpoints
MONITORING: Closely monitor the stripper temperature and pressure trends to ensure stable operation


In [17]:
print('History:', p.memory.buffer)

History: Human: ALERT: 100.0% confidence - Key factors: Stripper temperature, Stripper pressure, Stripper steam valve
AI: STATUS: ALERT
ISSUE: Stripper system malfunction detected
ROOT CAUSE: Stripper steam valve failure leading to loss of temperature and pressure control
IMMEDIATE ACTION: Manually control stripper steam valve to maintain temperature and pressure setpoints
MONITORING: Stripper temperature and pressure trends
Human: ALERT: 100.0% confidence - Key factors: Stripper temperature, A and C feed (stream 4), Stripper steam valve
AI: STATUS: ALERT
ISSUE: Stripper system malfunction detected
ROOT CAUSE: Stripper steam valve failure leading to loss of temperature and pressure control in the stripper
IMMEDIATE ACTION: Manually control the stripper steam valve to maintain the stripper temperature and pressure at their setpoints
MONITORING: Closely monitor the stripper temperature and pressure trends to ensure stable operation


History is being saved correctly!

Save one sample for the future tests

In [18]:
sim_data1.to_csv('../data/processed/TEP_API_test.csv', encoding='utf-8', sep=',', header=True, index=False)

# API Development
This section implements an API service that transforms the anomaly detection pipeline into a production-ready web service. The API accepts CSV files containing Tennessee Eastman Process data and returns comprehensive JSON responses including predictions, explanations, and metadata.

## API Architecture Overview
The service provides a complete interface for industrial anomaly detection with the following key capabilities:
- **File-based Input**: CSV upload handling for time series data
- **Comprehensive Validation**: Multi-layer data validation before processing
- **Structured Responses**: JSON outputs with predictions, confidence scores, and explanations
- **Production Features**: Health checks, monitoring, and error handling
- **Documentation**: Auto-generated API documentation via FastAPI

## Logging Configuration
Establish structured logging for API operations to enable monitoring, debugging, and audit trails in production deployments.

In [19]:
logging.basicConfig(level=logging.INFO)
api_logger = logging.getLogger('anomaly_api')

## API Data Models with Pydantic Validation
Pydantic models provide automatic data validation, serialization, and API documentation generation. Each model corresponds to a specific endpoint response, ensuring type safety and consistent data structures across the API.

### Model Information Endpoint (`/info`)
The `ModelInfo` model provides comprehensive metadata about the deployed ML system, enabling clients to understand model capabilities, requirements, and compatibility. This information is essential for integration teams and system administrators.

**Key Information Provided:**
- **Model Architecture**: Details about the XGBoost + SHAP + LLM pipeline
- **Feature Requirements**: Exact count and structure of expected input features
- **Data Format Specifications**: Precise CSV format requirements for successful processing
- **Training Context**: Background on model training data for confidence assessment

In [20]:
class ModelInfo(BaseModel):
    """Information model for describing ML model configuration and requirements."""
    model_type: str = Field(..., description='Type of ML model')
    features_count: int = Field(..., description='Number of input features')
    llm_model: str = Field(..., description='LLM model used for explanations')
    training_data: str = Field(..., description='Training dataset description')
    expected_csv_format: Dict[str, Any] = Field(..., description='Expected CSV format and requirements')

### Health Check Endpoint (`/health`)
The `HealthResponse` model enables monitoring systems to verify service availability and component status. Critical for production deployment where external monitoring systems need to assess API health.

**Health Indicators:**
- **Overall Status**: Aggregated service health (healthy/degraded/unhealthy)
- **Component Status**: Individual status of ML model and LLM service
- **Timestamp**: Enables tracking of health check frequency and response times

In [21]:
class HealthResponse(BaseModel):
    """Response model for API health check endpoints."""
    status: str = Field(..., description='Service status')
    timestamp: str = Field(..., description='Health check timestamp')
    model_loaded: bool = Field(..., description='Whether ML model is loaded')
    llm_available: bool = Field(..., description='Whether LLM service is available')

### CSV Validation Endpoint (`/validate-csv`)
The `CSVValidationResponse` model provides comprehensive pre-analysis validation, preventing processing failures and providing clear feedback on data issues before costly analysis operations.

**Validation Features:**
- **Format Compliance**: Ensures CSV structure matches Tennessee Eastman Process requirements
- **Data Quality Checks**: Identifies missing values, incorrect data types, and structural issues
- **Metadata Extraction**: Provides insights into data characteristics and detected parameters
- **Warning System**: Non-blocking warnings for suboptimal but processable data conditions


In [22]:
class CSVValidationResponse(BaseModel):
    """Response model for CSV file validation results."""
    is_valid: bool = Field(..., description='Whether CSV format is valid')
    errors: List[str] = Field(..., description='List of validation errors if any')
    warnings: List[str] = Field(..., description='List of warnings')
    data_shape: Optional[tuple] = Field(default=None, description='Shape of the data (rows, columns)')
    detected_simulation_runs: Optional[List[int]] = Field(default=None, description='Detected simulation runs')
    sample_range: Optional[Dict[str, int]] = Field(default=None, description='Min and max sample values')

### Analysis Endpoint (`/analyze`)
The `AnalysisResponse` model delivers comprehensive anomaly detection results with complete traceability and metadata. This is the primary deliverable for industrial monitoring systems.

**Response Components:**
- **Core Predictions**: Binary classification and confidence scores for decision making
- **Explainability Data**: SHAP-derived feature importance for technical understanding
- **Natural Language**: LLM-generated explanations for operator guidance  
- **Processing Metadata**: Performance metrics and traceability information
- **Input Context**: Reference information linking results back to source data

In [23]:
class AnalysisResponse(BaseModel):
    """Response model for anomaly detection analysis API endpoints."""
    prediction: int = Field(..., ge=0, le=1, description='Binary prediction: 0=normal, 1=anomaly')
    confidence: float = Field(..., ge=0.0, le=1.0, description='Prediction confidence score')
    important_features: List[Dict[str, Any]] = Field(..., description='Most impactful process variables')
    explanation: str = Field(..., description='Human-readable explanation from LLM')
    timestamp: str = Field(..., description='Analysis timestamp')
    model_version: str = Field(default='1.0', description='Model version used')
    processing_time_ms: Optional[float] = Field(default=None, description='Processing time in milliseconds')
    input_rows_count: int = Field(..., description='Number of input data rows processed')
    simulation_run: int = Field(..., description='Simulation run identifier from data')
    target_sample: int = Field(..., description='Target time sample analyzed')


## Utility Functions for Data Processing
Essential helper functions that transform pipeline outputs into API-compliant formats and validate incoming data before processing.

### SHAP Feature Formatting
Converts internal SHAP analysis results into standardized JSON format suitable for API responses and client consumption.

In [24]:
def format_shap_features(impactful_features: List[tuple]) -> List[Dict[str, Any]]:
    """Converts SHAP feature importance data into a standardized API response format."""
    formatted_features = []
    for feature_name, feature_data in impactful_features:
        formatted_features.append({
            'variable_name': feature_name,
            'description': feature_data['description'],
            'importance_score': round(float(feature_data['total_impact']), 4)
        })
    return formatted_features


### CSV Data Validation  
Comprehensive validation logic that ensures uploaded files meet Tennessee Eastman Process requirements before expensive ML processing begins.

In [25]:
def validate_csv_data(df: pd.DataFrame) -> Tuple[bool, List[str], List[str]]:
    """Validates CSV data format and content for Tennessee Eastman Process anomaly detection."""
    errors = []
    warns = []

    if len(df) < 3:
        errors.append('CSV must contain at least 3 rows for temporal analysis')
        return False, errors, warns

    required_columns = ['sample', 'simulationRun']
    missing_required = [col for col in required_columns if col not in df.columns]
    if missing_required:
        errors.append(f'Missing required columns: {missing_required}')

    required_features = ['xmeas_' + str(i) for i in range(1,42,1)] + ['xmv_' + str(i) for i in range(1,12,1)]
    missing_required = [col for col in required_features if col not in df.columns]
    if missing_required:
        errors.append(f'Missing required features: {missing_required}')

    non_numeric_cols = []
    for col in required_features:
        if not pd.api.types.is_numeric_dtype(df[col]):
            non_numeric_cols.append(col)

    if non_numeric_cols:
        errors.append(f'Non-numeric columns found: {non_numeric_cols[:3]}...')

    critical_missing = []
    for col in ['sample', 'simulationRun']:
        if col in df.columns and df[col].isnull().any():
            critical_missing.append(col)

    if critical_missing:
        errors.append(f'Missing values in critical columns: {critical_missing}')

    if 'sample' in df.columns:
        if not df['sample'].is_monotonic_increasing:
            warns.append('Sample values are not in ascending order')

    if 'simulationRun' in df.columns:
        unique_runs = df['simulationRun'].nunique()
        if unique_runs > 1:
            warns.append(f'Multiple simulation runs detected ({unique_runs}). Using the first one.')

    is_valid = len(errors) == 0
    return is_valid, errors, warns


## FastAPI Application Configuration
Configure the FastAPI application with proper lifecycle management, documentation, and middleware for production deployment.

### Application Lifecycle Management
Implement proper startup and shutdown procedures to ensure the ML pipeline is initialized once during server startup and cleaned up gracefully during shutdown. This prevents repeated model loading and ensures resource management.


In [26]:
@asynccontextmanager
async def lifespan(_app: FastAPI):
    global pipeline
    try:
        api_logger.info('Initializing Anomaly Detection Pipeline...')
        pipeline = ImprovedAnomalyDetectionPipeline()
        api_logger.info('Pipeline initialized successfully')
    except Exception as e:
        api_logger.error(f'Failed to initialize pipeline: {str(e)}')
        raise e

    yield
    api_logger.info('Shutting down pipeline...')
    pipeline = None


### Global Pipeline State
Maintain a global reference to the initialized pipeline for access across all endpoint handlers. This singleton pattern ensures consistent model state across requests.

In [27]:
pipeline = None

### API Metadata and Documentation
Configure comprehensive API metadata including title, description, and version for automatic documentation generation. FastAPI automatically creates interactive documentation at `/docs` and `/redoc` endpoints.

In [28]:
app = FastAPI(
    title='Tennessee Eastman Process Anomaly Detection API',
    description='ML-powered system for industrial process anomaly detection and LLM explanations',
    version='1.0.0',
    docs_url='/docs',
    redoc_url='/redoc',
    lifespan=lifespan,
)


### CORS Configuration for Development
Enable Cross-Origin Resource Sharing for development and testing purposes. This allows web applications from different domains to interact with the API during development phases.

In [29]:
app.add_middleware(
    CORSMiddleware,
    allow_origins=['*'],  # in production this must be specified
    allow_credentials=True,
    allow_methods=['GET', 'POST'],
    allow_headers=['*'],
)

## API Endpoint Implementation
Complete set of RESTful endpoints providing comprehensive anomaly detection services with proper error handling, validation, and monitoring capabilities.

### Model Information Endpoint (`GET /info`)
Provides essential metadata about the deployed model system, enabling clients to understand API capabilities and data requirements before integration. Returns detailed specifications for CSV format, feature requirements, and model architecture information.

In [30]:
@app.get('/info', response_model=ModelInfo)
async def model_info():
    """Provides comprehensive information about the ML model configuration and requirements."""
    try:
        if pipeline is None:
            raise HTTPException(status_code=503, detail='Pipeline not initialized')

        expected_format = {
            'file_format': 'CSV',
            'encoding': 'UTF-8',
            'minimum_rows': 3,
            'required_columns': ['sample', 'simulationRun'],
            'feature_variables': {
                'measured': 'xmeas_1 to xmeas_41 (41 variables)',
                'manipulated': 'xmv_1 to xmv_11 (11 variables)'
            },
            'data_types': 'All process variables must be numeric',
            'temporal_requirement': 'Data must represent 3 consecutive time points'
        }

        return ModelInfo(
            model_type='XGBoost Classifier with SHAP explanations',
            features_count=len(pipeline.all_features),
            llm_model=pipeline.llm.model,
            training_data='Tennessee Eastman Process Dataset (50 simulations for each fault type, 3 fault types in total)',
            expected_csv_format=expected_format,
        )

    except Exception as e:
        api_logger.error(f'Error getting model info: {str(e)}')
        raise HTTPException(status_code=500, detail=f'Failed to get model info: {str(e)}')


### Health Monitoring Endpoint (`GET /health`)
Critical for production deployment monitoring. Returns structured health status enabling load balancers and monitoring systems to assess service availability. Implements graceful degradation status when components are partially available.

In [31]:
@app.get('/health', response_model=HealthResponse)
async def health_check():
    """Provides comprehensive health status information for the anomaly detection service."""
    try:
        model_loaded = pipeline is not None

        llm_available = model_loaded and hasattr(pipeline, 'llm') and pipeline.llm is not None

        if model_loaded and llm_available:
            overall_status = 'healthy'
        else:
            overall_status = 'degraded'

        return HealthResponse(
            status=overall_status,
            timestamp=datetime.now().isoformat(),
            model_loaded=model_loaded,
            llm_available=llm_available,
        )

    except Exception as e:
        api_logger.error(f'Health check failed: {str(e)}')
        return HealthResponse(
            status='unhealthy',
            timestamp=datetime.now().isoformat(),
            model_loaded=False,
            llm_available=False,
        )


### Data Validation Endpoint (`POST /validate-csv`)
Pre-analysis validation service that checks CSV compatibility without performing expensive ML operations. Provides detailed feedback on data issues, enabling clients to fix problems before submitting for analysis. Returns comprehensive metadata about detected data characteristics.

In [32]:
@app.post('/validate-csv', response_model=CSVValidationResponse)
async def validate_csv_file(file: UploadFile = File(...)):
    """Validates uploaded CSV files for Tennessee Eastman Process anomaly detection compatibility."""
    try:
        if not file.filename.lower().endswith('.csv'):
            return CSVValidationResponse(
                is_valid=False,
                errors=['File must be a CSV file'],
                warnings=[],
            )

        contents = await file.read()
        csv_string = contents.decode('utf-8')
        df = pd.read_csv(StringIO(csv_string))

        is_valid, errors, warns = validate_csv_data(df)

        if 'simulationRun' in df.columns:
            simulation_runs = df['simulationRun'].unique().tolist()
        else:
            simulation_runs = []

        if 'sample' in df.columns:
            sample_range = {'min': int(df['sample'].min()), 'max': int(df['sample'].max())}
        else:
            sample_range = None

        return CSVValidationResponse(
            is_valid=is_valid,
            errors=errors,
            warnings=warns,
            data_shape=(len(df), len(df.columns)),
            detected_simulation_runs=simulation_runs,
            sample_range=sample_range
        )

    except Exception as e:
        return CSVValidationResponse(
            is_valid=False,
            errors=[f'Failed to process file: {str(e)}'],
            warnings=[],
        )


### Primary Analysis Endpoint (`POST /analyze`)
The core service endpoint that executes the complete anomaly detection pipeline. Accepts CSV files with optional parameters for simulation run and target sample selection. Returns comprehensive analysis including predictions, explanations, feature importance, and processing metadata.

**Key Features:**
- **Flexible Input Handling**: supports optional parameters for multi-simulation datasets
- **Comprehensive Validation**: multi-layer validation before expensive processing
- **Performance Monitoring**: built-in timing and logging for production monitoring
- **Detailed Output**: complete analysis results with traceability information

In [33]:
@app.post('/analyze', response_model=AnalysisResponse)
async def analyze_csv_data(
    file: UploadFile = File(..., description='CSV file with 3 consecutive time points'),
    simulation_run: Optional[Union[int, str, None]] = Form(default=None, description='Simulation run to analyze (if multiple in file)'),
    target_sample: Optional[Union[int, str, None]] = Form(default=None, description='Target sample to analyze'),
):
    """Performs comprehensive anomaly detection analysis on uploaded Tennessee Eastman Process data."""
    start_time = datetime.now()

    if simulation_run == '':
        simulation_run = None
    if target_sample == '':
        target_sample = None

    try:
        if pipeline is None:
            raise HTTPException(status_code=503, detail='Analysis pipeline not available. Please try again later.')

        if not file.filename.lower().endswith('.csv'):
            raise HTTPException(status_code=400, detail='File must be a CSV file')

        api_logger.info(f'Received CSV file: {file.filename} ({file.size} bytes)')

        try:
            contents = await file.read()
            csv_string = contents.decode('utf-8')
            df = pd.read_csv(StringIO(csv_string))

            api_logger.info(f'CSV loaded successfully: {df.shape} shape')

        except UnicodeDecodeError:
            raise HTTPException(status_code=400, detail='Invalid CSV encoding. Please use UTF-8 encoding.')

        except pd.errors.EmptyDataError:
            raise HTTPException(status_code=400, detail='CSV file is empty')

        except Exception as e:
            raise HTTPException(status_code=400, detail=f'Failed to parse CSV: {str(e)}')

        is_valid, errors, warns = validate_csv_data(df)

        if not is_valid:
            raise HTTPException(status_code=400, detail=f"CSV validation failed: {'; '.join(errors)}")

        for warning in warns:
            api_logger.warning(warning)

        api_logger.info(f'Analysis parameters: simulation_run={simulation_run}, target_sample={target_sample}')

        explanation_text = pipeline.analyze_sample(
            data=df,
            simulation_run=simulation_run,
            target_sample=target_sample,
        )

        simulation_run = getattr(pipeline, 'simulation_run', 0)
        target_sample = getattr(pipeline, 'target_sample', 0)
        impactful_features = getattr(pipeline, 'impactful_features', [])
        prediction = getattr(pipeline, 'prediction', 0)
        confidence = getattr(pipeline, 'confidence', 0.0)

        formatted_features = format_shap_features(impactful_features)

        processing_time = (datetime.now() - start_time).total_seconds() * 1000

        response = AnalysisResponse(
            prediction=int(prediction),
            confidence=round(float(confidence), 4),
            important_features=formatted_features,
            explanation=explanation_text,
            timestamp=datetime.now().isoformat(),
            processing_time_ms=round(processing_time, 2),
            input_rows_count=len(df),
            simulation_run=simulation_run,
            target_sample=target_sample,
        )

        api_logger.info(f'Analysis completed successfully in {processing_time:.2f}ms')
        return response

    except HTTPException:
        raise

    except Exception as e:
        api_logger.error(f'Unexpected error in analysis: {str(e)}')
        raise HTTPException(status_code=500, detail=f'Analysis failed: {str(e)}')


### API Discovery Endpoint (`GET /`)
Root endpoint providing API overview and navigation assistance for developers and integration teams. Returns structured information about available endpoints and their purposes.

In [34]:
@app.get('/')
async def root():
    """Root endpoint providing API overview and navigation information."""
    r = {
        'message': 'Tennessee Eastman Process Anomaly Detection API (CSV-based)',
        'version': '1.0.0',
        'documentation': '/docs',
        'health': '/health',
        'input_format': 'CSV files with 3+ consecutive time points',
        'endpoints': {
            'analyze': 'POST /analyze - Main anomaly analysis endpoint (CSV upload)',
            'validate-csv': 'POST /validate-csv - Validate CSV format before analysis',
            'health': 'GET /health - Service health check',
            'info': 'GET /info - Model and format information'
        }
    }
    return r


## Server Deployment and Testing
Configure and launch the FastAPI server for local development and testing with appropriate server parameters and monitoring capabilities.

### Server Configuration Function
Utility function that encapsulates server startup with configurable parameters for different deployment environments. Provides clear console output for development teams to access API endpoints and documentation.

In [35]:
def run_api_server(app, host: str = '127.0.0.1', port: int = 8000, reload: bool = False):
    print(f'Starting CSV-based Anomaly Detection API server...')
    print(f'Server will be available at: http://{host}:{port}')
    print(f'API documentation: http://{host}:{port}/docs')
    print(f'Upload CSV files at: http://{host}:{port}/analyze')

    uvicorn.run(
        app=app,
        host=host,
        port=port,
        reload=reload,
        log_level='info',
    )

### Development Server Launch
Initialize the server using uvicorn with nest_asyncio compatibility for Jupyter notebook environments. The server becomes immediately available for testing with interactive documentation.

In [36]:
nest_asyncio.apply()
run_api_server(app)

INFO:     Started server process [13284]
INFO:     Waiting for application startup.
INFO:anomaly_api:Initializing Anomaly Detection Pipeline...


Starting CSV-based Anomaly Detection API server...
Server will be available at: http://127.0.0.1:8000
API documentation: http://127.0.0.1:8000/docs
Upload CSV files at: http://127.0.0.1:8000/analyze


INFO:anomaly_detector:ImprovedAnomalyDetectionPipeline initialized successfully
INFO:anomaly_api:Pipeline initialized successfully
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:65422 - "GET / HTTP/1.1" 200 OK
INFO:     127.0.0.1:65422 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO:     127.0.0.1:65423 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:65423 - "GET /openapi.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:65423 - "GET /info HTTP/1.1" 200 OK
INFO:     127.0.0.1:65437 - "GET /health HTTP/1.1" 200 OK
INFO:     127.0.0.1:65486 - "POST /validate-csv HTTP/1.1" 200 OK


INFO:anomaly_api:Received CSV file: TEP_API_test.csv (1619 bytes)
INFO:anomaly_api:CSV loaded successfully: (3, 56) shape
INFO:anomaly_api:Analysis parameters: simulation_run=None, target_sample=None
INFO:anomaly_detector:Starting analysis for simulation_run: 1.0, target_sample: 363
INFO:anomaly_detector:Context prepared: ALERT: 100.0% confidence - Key factors: Stripper temperature, Stripper pressure, Stripper steam valv...
INFO:anomaly_detector:Invoking conversation chain with memory
INFO:httpx:HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
INFO:anomaly_detector:LLM response received: 322 characters
INFO:anomaly_detector:Cached response for key: 96fe58ad...
INFO:anomaly_detector:Saving to memory: Time: 01:18 | Confidence: 100.0% | Features: Stripper temperature, Stripper pressure, Stripper steam valve | Action taken: Isolate the stripper system, shut down the affected process train, and initiate emergency procedures
INFO:anomaly_detector:Analysis completed 

INFO:     127.0.0.1:65512 - "POST /analyze HTTP/1.1" 200 OK
INFO:     127.0.0.1:65533 - "GET / HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:anomaly_api:Shutting down pipeline...
INFO:     Application shutdown complete.
INFO:     Finished server process [13284]


Everything works as expected

# Summary

## Complete Production Pipeline Achievement
This notebook successfully demonstrates the transformation of research-grade anomaly detection models into a production-ready API service. The implementation bridges the gap between ML experimentation and industrial deployment through comprehensive API design with FastAPI.

## Technical Integration Success
The project achieves seamless integration of multiple complex components:
- **XGBoost Model**: high-performance anomaly detection with 97.4% F1-score
- **SHAP Analysis**: real-time feature importance calculation for explainability
- **LLM Integration**: natural language explanations via Anthropic Claude API
- **Data Validation**: multi-layer CSV validation ensuring data quality
- **Error Handling**: robust error management with graceful degradation

## Memory and Caching Implementation

This project implements LangChain ConversationBufferWindowMemory to provide contextual anomaly analysis. The system remembers the last 5 analyses and can reference previous incidents when providing recommendations.

**Current scope:** Memory persists within a single Python session, making it ideal for:
- Batch processing of historical data
- Interactive analysis sessions
- Development and testing

**Production considerations:** For stateless API deployment, memory and caching would require external storage:
- Redis/database for persistent conversation history
- Distributed caching for ML inference results
- Session management for user-specific contexts


The resulting system provides a complete solution for industrial process monitoring with operator-friendly explanations, demonstrating the practical application of advanced ML techniques in real-world scenarios.