In [13]:
%pip install python-dotenv

import numpy as np
import pandas as pd
import os
from dotenv import load_dotenv
load_dotenv()
# Set the environment variable for the CSV file path

# 1. Generate Synthetic Time Series Data
def generate_synthetic_pump_data(filename="synthetic_pump_data.csv", time_steps=100000):
    np.random.seed(42)
    
    # Use a smaller frequency (hours instead of days) to avoid datetime overflow
    data = {
        "timestamp": pd.date_range(start="2025-01-01", periods=time_steps, freq="H"),
        "vibration": np.random.normal(loc=0.2, scale=0.05, size=time_steps),
        "temperature": np.random.normal(loc=60, scale=1.0, size=time_steps),
        "pressure": np.random.normal(loc=30, scale=2.0, size=time_steps),
        "flow_rate": np.random.normal(loc=100, scale=10.0, size=time_steps),
    }
    df = pd.DataFrame(data)
    
    # Adjust anomaly indices for hourly data
    # Inject a variety of anomalies
    
    # 1. Isolated severe anomalies (major equipment failures)
    severe_anomaly_indices = [60, 70, 85, 1000, 5000, 15000, 25000, 40000, 60000, 85000]
    # Make sure indices are within range
    severe_anomaly_indices = [i for i in severe_anomaly_indices if i < time_steps]
    df.loc[severe_anomaly_indices, "vibration"] += 0.5
    df.loc[severe_anomaly_indices, "temperature"] += 5
    df.loc[severe_anomaly_indices, "pressure"] -= 10
    df.loc[severe_anomaly_indices, "flow_rate"] -= 20
    
    # 2. Gradual degradation patterns (equipment wearing out)
    degradation_start_indices = [2000, 30000, 70000]
    degradation_start_indices = [i for i in degradation_start_indices if i < time_steps]
    
    for start_idx in degradation_start_indices:
        if start_idx + 100 <= time_steps:
            for i in range(100):
                idx = start_idx + i
                # Progressive degradation over 100 time units
                df.loc[idx, "vibration"] += 0.005 * i
                df.loc[idx, "temperature"] += 0.05 * i
                df.loc[idx, "pressure"] -= 0.1 * i
    
    # 3. Seasonal patterns (adapted for hourly data)
    for i in range(0, time_steps, 24*30):  # Approximately monthly patterns
        if i + 24*7 <= time_steps:  # 7 days of higher temperature
            summer_indices = range(i, i + 24*7)  # A week of higher temps
            df.loc[summer_indices, "temperature"] += 2.5
    
    # 4. Random minor anomalies (normal operational variations)
    num_minor_anomalies = min(time_steps // 10, 5000)  # Scale with dataset size, cap at 5000
    minor_anomaly_indices = np.random.choice(time_steps, size=num_minor_anomalies, replace=False)
    df.loc[minor_anomaly_indices, "vibration"] += np.random.normal(0.1, 0.05, size=num_minor_anomalies)
    df.loc[minor_anomaly_indices, "temperature"] += np.random.normal(1.0, 0.5, size=num_minor_anomalies)
    df.loc[minor_anomaly_indices, "pressure"] += np.random.normal(0, 3.0, size=num_minor_anomalies)
    
    # 5. Maintenance effects
    maintenance_events = [3000, 10000, 20000, 45000, 75000]
    maintenance_events = [i for i in maintenance_events if i < time_steps]
    
    for event in maintenance_events:
        if event + 24*3 <= time_steps:  # 3 days of improved readings
            recovery_indices = range(event, event + 24*3)
            df.loc[recovery_indices, "vibration"] = df.loc[recovery_indices, "vibration"] * 0.8
            df.loc[recovery_indices, "temperature"] = df.loc[recovery_indices, "temperature"] * 0.95
            df.loc[recovery_indices, "pressure"] = np.random.normal(loc=30, scale=1.0, size=len(recovery_indices))
    
    # 6. Cyclical load patterns (daily variations)
    for i in range(0, time_steps, 24):  # Daily cycle
        if i + 8 <= time_steps:  # Night hours - lower usage
            night_indices = range(i, i + 8)
            df.loc[night_indices, "flow_rate"] *= 0.8  # Lower flow rates at night
            df.loc[night_indices, "pressure"] *= 0.9  # Lower pressure at night
    
    # 7. Correlated anomalies
    correlated_anomaly_indices = np.random.choice(time_steps, size=50, replace=False)
    for idx in correlated_anomaly_indices:
        if idx < time_steps:
            severity = np.random.uniform(0.5, 1.5)
            df.loc[idx, "vibration"] += 0.2 * severity
            df.loc[idx, "temperature"] += 3 * severity
            df.loc[idx, "pressure"] -= 5 * severity
            df.loc[idx, "flow_rate"] -= 15 * severity
    
    df.to_csv(filename, index=False)
    return filename

# Try with a lower number of data points first to ensure it works
csv_file = generate_synthetic_pump_data(time_steps=10000)

Note: you may need to restart the kernel to use updated packages.


  "timestamp": pd.date_range(start="2025-01-01", periods=time_steps, freq="H"),


In [14]:
import numpy as np
import pandas as pd
import os
from dotenv import load_dotenv
load_dotenv()
# Set the environment variable for the CSV file path

# 2. Full MDP Pipeline
def mdp_pipeline(csv_path, gamma=0.95):
    df = pd.read_csv(csv_path, parse_dates=["timestamp"])

    # Step 1: Classify state
    def classify_state(row):
        if row["vibration"] > 0.6 or row["temperature"] > 65 or row["pressure"] < 25:
            return "Faulty"
        elif row["vibration"] > 0.3 or row["temperature"] > 62 or row["pressure"] < 28:
            return "Degraded"
        else:
            return "Healthy"
    df["state"] = df.apply(classify_state, axis=1)

    # Step 2: Define rewards and actions
    states = ["Healthy", "Degraded", "Faulty"]
    actions = ["Do Nothing", "Preventive Maintenance", "Corrective Maintenance"]
    R = {
        "Healthy": {"Do Nothing": 0, "Preventive Maintenance": -10, "Corrective Maintenance": -30},
        "Degraded": {"Do Nothing": -5, "Preventive Maintenance": -10, "Corrective Maintenance": -30},
        "Faulty": {"Do Nothing": -100, "Preventive Maintenance": -20, "Corrective Maintenance": -50},
    }
    df["best_action"] = df["state"].map(lambda x: "Do Nothing")  # Initial action

    # Step 3: Count transitions
    transition_counts = {
        s: {a: {s1: 0 for s1 in states} for a in actions} for s in states
    }
    action_counts = {
        s: {a: 0 for a in actions} for s in states
    }

    for t in range(len(df) - 1):
        s, a, s1 = df.loc[t, "state"], df.loc[t, "best_action"], df.loc[t + 1, "state"]
        transition_counts[s][a][s1] += 1
        action_counts[s][a] += 1

    # Step 4: Estimate transition probabilities
    estimated_P = {
        s: {
            a: {
                s1: transition_counts[s][a][s1] / action_counts[s][a]
                if action_counts[s][a] > 0 else 0
                for s1 in states
            }
            for a in actions
        }
        for s in states
    }

    # Step 5: Value Iteration
    V = {s: 0 for s in states}
    policy = {s: None for s in states}
    for _ in range(100):
        delta = 0
        new_V = V.copy()
        for s in states:
            values = []
            for a in actions:
                expected = R[s][a] + gamma * sum(
                    estimated_P[s][a][s1] * V[s1] for s1 in states
                )
                values.append(expected)
            best_val = max(values)
            delta = max(delta, abs(V[s] - best_val))
            new_V[s] = best_val
            policy[s] = actions[np.argmax(values)]
        V = new_V
        if delta < 1e-4:
            break

    df["best_action"] = df["state"].map(lambda x: policy[x])
    return df, V, policy

# 3. Interpret Result and Provide Guidelines
def interpret_results(df, V, policy):
    state_counts = df["state"].value_counts().to_dict()
    action_counts = df["best_action"].value_counts().to_dict()
    most_common_state = max(state_counts, key=state_counts.get)

    # Calculate state percentages
    total_states = sum(state_counts.values())
    state_percentages = {s: (count/total_states)*100 for s, count in state_counts.items()}
    
    # Calculate transition probabilities between states - FIX for KEY ERROR
    transitions = {}
    for state in ["Healthy", "Degraded", "Faulty"]:
        # Find rows of current state
        state_indices = df[df["state"] == state].index.tolist()
        
        # Get valid next indices (don't go past the end of the dataframe)
        valid_next_indices = [i+1 for i in state_indices if i+1 < len(df)]
        
        if valid_next_indices:
            # Get states at the valid next indices
            next_states = df.loc[valid_next_indices, "state"].value_counts().to_dict()
            total = sum(next_states.values())
            transitions[state] = {next_state: f"{(count/total)*100:.1f}%" 
                                for next_state, count in next_states.items()}
        else:
            transitions[state] = {"No transitions observed": "N/A"}

    summary = f"""
🧠 Predictive Maintenance Summary:

1. Most Frequent State: '{most_common_state}' ({state_counts[most_common_state]} occurrences, {state_percentages[most_common_state]:.1f}%)
2. Recommended Action for that State: '{policy[most_common_state]}'
3. State Distribution: 
   - Healthy: {state_counts.get('Healthy', 0)} ({state_percentages.get('Healthy', 0):.1f}%)
   - Degraded: {state_counts.get('Degraded', 0)} ({state_percentages.get('Degraded', 0):.1f}%)
   - Faulty: {state_counts.get('Faulty', 0)} ({state_percentages.get('Faulty', 0):.1f}%)
4. State Values (Expected Long-term Reward): {V}
5. Optimal Policy: {policy}
6. Action Distribution: {action_counts}
7. State Transition Patterns: {transitions}

📋 Recommendations:
- Set up real-time classification for sensor input into: Healthy, Degraded, or Faulty.
- When pump is 'Degraded', perform {policy.get('Degraded', 'Preventive Maintenance')} to avoid high repair costs.
- Only use Corrective Maintenance for confirmed 'Faulty' state.
- Schedule regular inspections every {len(df)//500} time units based on transition patterns.
- Integrate this MDP policy into a monitoring system for automated decisions.
"""
    return summary

# Execute full pipeline
# Use the already generated CSV file
csv_file = "synthetic_pump_data.csv"  # Use the file created by generate_synthetic_pump_data
final_df, value_map, optimal_policy = mdp_pipeline(csv_file)
summary_text = interpret_results(final_df, value_map, optimal_policy)

# Display basic dataframe info and sample without ace_tools
print(f"Final Pump MDP Analysis - {len(final_df)} rows total")
print("\nDataframe Sample (first 5 rows):")
print(final_df.head())

print("\nState Distribution:")
print(final_df["state"].value_counts())

print("\nAction Distribution:")
print(final_df["best_action"].value_counts())

# Display the summary text
print(summary_text)

Final Pump MDP Analysis - 10000 rows total

Dataframe Sample (first 5 rows):
            timestamp  vibration  temperature   pressure  flow_rate     state  \
0 2025-01-01 00:00:00   0.224836    61.821505  27.626915  64.155424  Degraded   
1 2025-01-01 01:00:00   0.193087    62.194501  27.509982  71.560115  Degraded   
2 2025-01-01 02:00:00   0.232384    61.902619  25.314264  75.303773  Degraded   
3 2025-01-01 03:00:00   0.276151    62.610418  28.043252  81.197351  Degraded   
4 2025-01-01 04:00:00   0.188292    63.697179  24.317851  88.193299    Faulty   

              best_action  
0  Preventive Maintenance  
1  Preventive Maintenance  
2  Preventive Maintenance  
3  Preventive Maintenance  
4  Preventive Maintenance  

State Distribution:
state
Healthy     4805
Degraded    4437
Faulty       758
Name: count, dtype: int64

Action Distribution:
best_action
Preventive Maintenance    5195
Do Nothing                4805
Name: count, dtype: int64

🧠 Predictive Maintenance Summary:

1. Mos

In [16]:
import numpy as np
import pandas as pd
import os
import json
from datetime import datetime
import google.generativeai as genai
from dotenv import load_dotenv
load_dotenv()
# Set the environment variable for the CSV file path

# 2. Full MDP Pipeline
def mdp_pipeline(csv_path, gamma=0.95):
    df = pd.read_csv(csv_path, parse_dates=["timestamp"])

    # Step 1: Classify state
    def classify_state(row):
        if row["vibration"] > 0.6 or row["temperature"] > 65 or row["pressure"] < 25:
            return "Faulty"
        elif row["vibration"] > 0.3 or row["temperature"] > 62 or row["pressure"] < 28:
            return "Degraded"
        else:
            return "Healthy"
    df["state"] = df.apply(classify_state, axis=1)

    # Step 2: Define rewards and actions
    states = ["Healthy", "Degraded", "Faulty"]
    actions = ["Do Nothing", "Preventive Maintenance", "Corrective Maintenance"]
    R = {
        "Healthy": {"Do Nothing": 0, "Preventive Maintenance": -10, "Corrective Maintenance": -30},
        "Degraded": {"Do Nothing": -5, "Preventive Maintenance": -10, "Corrective Maintenance": -30},
        "Faulty": {"Do Nothing": -100, "Preventive Maintenance": -20, "Corrective Maintenance": -50},
    }
    df["best_action"] = df["state"].map(lambda x: "Do Nothing")  # Initial action

    # Step 3: Count transitions
    transition_counts = {
        s: {a: {s1: 0 for s1 in states} for a in actions} for s in states
    }
    action_counts = {
        s: {a: 0 for a in actions} for s in states
    }

    for t in range(len(df) - 1):
        s, a, s1 = df.loc[t, "state"], df.loc[t, "best_action"], df.loc[t + 1, "state"]
        transition_counts[s][a][s1] += 1
        action_counts[s][a] += 1

    # Step 4: Estimate transition probabilities
    estimated_P = {
        s: {
            a: {
                s1: transition_counts[s][a][s1] / action_counts[s][a]
                if action_counts[s][a] > 0 else 0
                for s1 in states
            }
            for a in actions
        }
        for s in states
    }

    # Step 5: Value Iteration
    V = {s: 0 for s in states}
    policy = {s: None for s in states}
    for _ in range(100):
        delta = 0
        new_V = V.copy()
        for s in states:
            values = []
            for a in actions:
                expected = R[s][a] + gamma * sum(
                    estimated_P[s][a][s1] * V[s1] for s1 in states
                )
                values.append(expected)
            best_val = max(values)
            delta = max(delta, abs(V[s] - best_val))
            new_V[s] = best_val
            policy[s] = actions[np.argmax(values)]
        V = new_V
        if delta < 1e-4:
            break

    df["best_action"] = df["state"].map(lambda x: policy[x])
    return df, V, policy

# Configure Gemini API with your key and use Gemini 1.5 Flash model
def configure_gemini_api():
    # Load API key from .env using dotenv
    api_key = os.environ.get("GEMINI_API_KEY")
    genai.configure(api_key=api_key)
    # Use Gemini 1.5 Flash model (the latest fast model as of 2024)
    return genai.GenerativeModel('models/gemini-2.0-flash')

# Analyze pump data with Gemini API
def gemini_analysis(df, V, policy, shift_start_time=None):
    # Extract key information for analysis
    state_counts = df["state"].value_counts().to_dict()
    total_states = sum(state_counts.values())
    state_percentages = {s: (count/total_states)*100 for s, count in state_counts.items()}
    
    # Find anomaly timestamps
    faulty_indices = df[df["state"] == "Faulty"].index.tolist()
    degraded_indices = df[df["state"] == "Degraded"].index.tolist()
    
    # Get timestamps for the first few anomalies
    faulty_timestamps = []
    degraded_timestamps = []
    
    if faulty_indices:
        faulty_timestamps = df.loc[faulty_indices[:5], "timestamp"].tolist()
    
    if degraded_indices:
        degraded_timestamps = df.loc[degraded_indices[:5], "timestamp"].tolist()
    
    # Find trends and patterns
    # Detect major transitions from healthy to faulty
    state_transitions = []
    for i in range(1, len(df)):
        if df.loc[i-1, "state"] == "Healthy" and df.loc[i, "state"] == "Faulty":
            state_transitions.append({
                "from": "Healthy",
                "to": "Faulty",
                "timestamp": df.loc[i, "timestamp"],
                "metrics": {
                    "vibration": df.loc[i, "vibration"],
                    "temperature": df.loc[i, "temperature"],
                    "pressure": df.loc[i, "pressure"],
                    "flow_rate": df.loc[i, "flow_rate"],
                }
            })
    
    # Get subset of data for hourly summary
    if shift_start_time:
        # Convert string to datetime if needed
        if isinstance(shift_start_time, str):
            shift_start_time = datetime.fromisoformat(shift_start_time)
        
        # Filter to the last 8 hours (typical shift length)
        shift_data = df[df["timestamp"] >= shift_start_time]
        shift_summary = {
            "start_time": shift_start_time,
            "total_readings": len(shift_data),
            "healthy_count": len(shift_data[shift_data["state"] == "Healthy"]),
            "degraded_count": len(shift_data[shift_data["state"] == "Degraded"]),
            "faulty_count": len(shift_data[shift_data["state"] == "Faulty"]),
        }
    else:
        shift_summary = None
    
    # Prepare analysis prompt for Gemini
    analysis_context = {
        "state_percentages": state_percentages,
        "state_transitions": state_transitions[:5],  # Send only first 5 transitions
        "faulty_timestamps": faulty_timestamps,
        "degraded_timestamps": degraded_timestamps,
        "policy": policy,
        "value_function": V,
        "shift_summary": shift_summary
    }
    
    model = configure_gemini_api()
    
    prompt = f"""
    You are an experienced industrial maintenance engineer analyzing pump sensor data at the beginning of your shift.
    
    The data has been processed through a Markov Decision Process model and classified into three states:
    - Healthy: Normal operating conditions
    - Degraded: Early warning signs, pump still operational
    - Faulty: Critical issues requiring immediate attention
    
    Here's the current state of the pump system:
    {json.dumps(analysis_context, indent=2, default=str)}
    
    As the engineer starting your shift, please provide:
    1. A brief summary of the pump's current condition
    2. Specific times when anomalies were detected and what likely caused them
    3. Recommendations for immediate actions needed during this shift
    4. Potential maintenance schedule based on the detected patterns
    5. Cost-benefit analysis of performing maintenance now vs. waiting
    
    Format your response as a professional maintenance report that I can share with my team.
    """
    
    try:
        response = model.generate_content(prompt)
        return response.text
    except Exception as e:
        return f"Error generating analysis: {str(e)}\n\nPlease ensure your API key is valid and you have proper permissions."

# Execute full pipeline with Gemini analysis
def run_maintenance_analysis(csv_path, shift_start=None):
    print("Loading and processing pump data...")
    df, value_map, optimal_policy = mdp_pipeline(csv_path)
    
    print("\n===== Standard MDP Analysis =====")
    print(f"Data points: {len(df)} rows")
    print("State Distribution:")
    print(df["state"].value_counts())
    print("\nOptimal Policy:")
    print(optimal_policy)
    
    print("\n===== Gemini-Powered Shift Analysis =====")
    gemini_report = gemini_analysis(df, value_map, optimal_policy, shift_start)
    print(gemini_report)
    
    return df, gemini_report

# Example usage
if __name__ == "__main__":
    # To simulate an engineer starting their shift
    shift_start_time = "2025-01-04T06:00:00"  # Example shift start time
    
    # Use the generated CSV file
    csv_file = "synthetic_pump_data.csv"
    if not os.path.exists(csv_file):
        from predictive_maintenance import generate_synthetic_pump_data
        csv_file = generate_synthetic_pump_data(time_steps=10000)
    
    # Run the full analysis pipeline
    final_df, engineer_report = run_maintenance_analysis(csv_file, shift_start_time)

Loading and processing pump data...

===== Standard MDP Analysis =====
Data points: 10000 rows
State Distribution:
state
Healthy     4805
Degraded    4437
Faulty       758
Name: count, dtype: int64

Optimal Policy:
{'Healthy': 'Do Nothing', 'Degraded': 'Preventive Maintenance', 'Faulty': 'Preventive Maintenance'}

===== Gemini-Powered Shift Analysis =====
## Pump System Maintenance Report - Start of Shift (2025-01-09)

**Date:** 2025-01-09
**Prepared by:** Industrial Maintenance Engineer

**1. Executive Summary:**

The pump system is showing signs of significant degradation. While nearly half of the readings indicate a healthy state (48.05%), a substantial portion indicates a degraded state (44.37%). Alarmingly, 7.58% of the readings classify the system as faulty. Recent transitions from "Healthy" to "Faulty" provide insight into potential failure modes. Based on the Markov Decision Process (MDP) model, immediate action is required to prevent catastrophic failure and minimize downtime.