<a href="https://colab.research.google.com/github/Mathusayini/AI-Powered-Smart-Glove-for-Safety-Monitoring-Control-in-Workspaces/blob/main/SmartGloveManual.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Enhanced Real-time Smart Glove Data Processing with Automatic ML

In [None]:
import firebase_admin
from firebase_admin import credentials, db
import pandas as pd
import numpy as np
import os
import json
import time
from datetime import datetime, timedelta
import threading
from threading import Lock
import logging
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any, Tuple
import fcntl
import contextlib
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score
import joblib
import warnings
from firebase_admin import messaging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import ssl
warnings.filterwarnings('ignore')

In [None]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('smartglove.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

Configuration class for the Smart Glove system

In [None]:
@dataclass
class Config:
    csv_filename: str = "smartglove_unified_dataset.csv"
    model_filename: str = "smartglove_model.pkl"
    scaler_filename: str = "smartglove_scaler.pkl"
    config_filename: str = "smartglove_config.json"

    # Data retention settings
    max_records: int = 50000
    data_retention_days: int = 30

    # ML settings
    min_samples_for_training: int = 100
    min_samples_for_retraining: int = 50
    test_size: float = 0.2
    random_state: int = 42

    # Monitoring settings
    monitoring_interval: int = 30
    max_retries: int = 3
    retry_delay: int = 5

    # Firebase settings
    firebase_db_url: str = 'https://emi-smartglove-default-rtdb.asia-southeast1.firebasedatabase.app'
    firebase_paths: list = None

    # Email alert settings
    smtp_server: str = "smtp.gmail.com"
    smtp_port: int = 587
    sender_email: str = ""  # Your email
    sender_password: str = ""  # Your app password
    recipient_emails: list = None  # List of recipient emails

    # FCM settings
    fcm_server_key: str = ""  # Your FCM server key
    fcm_topic: str = "smartglove_alerts"

    # Alert thresholds
    alert_confidence_threshold: float = 0.8
    alert_predictions: list = None  # Predictions that trigger alerts


    def __post_init__(self):
        if self.firebase_paths is None:
            self.firebase_paths = ['smartglove/data', 'smartglove', 'data', 'smartglove/readings']
        if self.recipient_emails is None:
            self.recipient_emails = []
        if self.alert_predictions is None:
            self.alert_predictions = ["emergency", "help", "danger"]

    def save(self):
        """Save configuration to file"""
        with open(self.config_filename, 'w') as f:
            json.dump(asdict(self), f, indent=2)

    @classmethod
    def load(cls):
        """Load configuration from file"""
        try:
            with open(cls().config_filename, 'r') as f:
                data = json.load(f)
                return cls(**data)
        except FileNotFoundError:
            logger.info("Config file not found, using defaults")
            return cls()

class for handling alerts

In [None]:
class AlertManager:
    def __init__(self, config: Config):
        self.config = config
        self.last_alert_time = {}  # Track last alert time per prediction to avoid spam
        self.alert_cooldown = 300  # 5 minutes cooldown between same alerts

    def should_send_alert(self, prediction: str, confidence: float) -> bool:
        """Check if an alert should be sent based on prediction and confidence"""
        if prediction.lower() not in [p.lower() for p in self.config.alert_predictions]:
            return False

        if confidence < self.config.alert_confidence_threshold:
            return False

        # Check cooldown
        current_time = time.time()
        last_alert = self.last_alert_time.get(prediction, 0)
        if current_time - last_alert < self.alert_cooldown:
            return False

        return True

    def send_fcm_notification(self, prediction: str, confidence: float):
        """Send push notification via FCM"""
        try:
            message = messaging.Message(
                notification=messaging.Notification(
                    title='Smart Glove Alert',
                    body=f'Detection: {prediction.title()} (Confidence: {confidence:.2%})'
                ),
                topic=self.config.fcm_topic,
                data={
                    'prediction': prediction,
                    'confidence': str(confidence),
                    'timestamp': datetime.now().isoformat()
                }
            )

            response = messaging.send(message)
            logger.info(f"📱 FCM notification sent: {response}")
            return True

        except Exception as e:
            logger.error(f"❌ FCM notification failed: {e}")
            return False

    def send_email_alert(self, prediction: str, confidence: float):
        """Send email alert"""
        try:
            if not self.config.sender_email or not self.config.recipient_emails:
                logger.warning("Email configuration incomplete")
                return False

            # Create message
            msg = MIMEMultipart()
            msg['From'] = self.config.sender_email
            msg['To'] = ', '.join(self.config.recipient_emails)
            msg['Subject'] = f"Smart Glove Alert: {prediction.title()}"

            # Email body
            body = f"""
            Smart Glove Alert Notification

            Detection: {prediction.title()}
            Confidence: {confidence:.2%}
            Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

            This is an automated alert from your Smart Glove system.
            Please check the device status if this is unexpected.

            ---
            Smart Glove ML Pipeline
            """

            msg.attach(MIMEText(body, 'plain'))

            # Send email
            context = ssl.create_default_context()
            with smtplib.SMTP(self.config.smtp_server, self.config.smtp_port) as server:
                server.starttls(context=context)
                server.login(self.config.sender_email, self.config.sender_password)
                server.send_message(msg)

            logger.info(f"📧 Email alert sent to {len(self.config.recipient_emails)} recipients")
            return True

        except Exception as e:
            logger.error(f"❌ Email alert failed: {e}")
            return False

    def send_alert(self, prediction: str, confidence: float):
        """Send both FCM and email alerts"""
        if not self.should_send_alert(prediction, confidence):
            return

        logger.info(f"🚨 Sending alert for prediction: {prediction} (confidence: {confidence:.2%})")

        # Update last alert time
        self.last_alert_time[prediction] = time.time()

        # Send FCM notification
        fcm_success = self.send_fcm_notification(prediction, confidence)

        # Send email alert
        email_success = self.send_email_alert(prediction, confidence)

        # Log to Firebase for tracking
        try:
            alert_ref = db.reference('smartglove/alerts')
            alert_data = {
                'timestamp': datetime.now().isoformat(),
                'prediction': prediction,
                'confidence': float(confidence),
                'fcm_sent': fcm_success,
                'email_sent': email_success
            }
            alert_ref.push(alert_data)
            logger.info(f"📝 Alert logged to Firebase: {alert_data}")
        except Exception as e:
            logger.error(f"⚠️ Failed to log alert to Firebase: {e}")

  Validates sensor data quality and format

In [None]:
class DataValidator:
    @staticmethod
    def validate_sensor_data(df: pd.DataFrame) -> Tuple[bool, str]:
        """Validate sensor data format and ranges"""
        if df.empty:
            return False, "Empty dataset"

        # Check required columns
        required_cols = ['timestamp']
        missing_cols = [col for col in required_cols if col not in df.columns]
        if missing_cols:
            return False, f"Missing required columns: {missing_cols}"

        # Check for numeric columns
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) < 2:  # At least timestamp and one sensor
            return False, "Insufficient numeric sensor data"

        # Check for reasonable sensor ranges (customize based on your sensors)
        for col in numeric_cols:
            if col == 'timestamp':
                continue
            if df[col].abs().max() > 10000:  # Adjust based on your sensor ranges
                logger.warning(f"Column {col} has values outside expected range")

        return True, "Data validation passed"

    @staticmethod
    def validate_labels(labels: pd.Series) -> bool:
        """Validate label format and distribution"""
        if labels.empty:
            return False

        # Check label distribution
        label_counts = labels.value_counts()
        if len(label_counts) < 2:
            logger.warning("Only one label class found")

        # Check for minimum samples per class
        min_samples = 10
        insufficient_classes = label_counts[label_counts < min_samples]
        if not insufficient_classes.empty:
            logger.warning(f"Classes with insufficient samples: {insufficient_classes.to_dict()}")

        return True

Thread-safe file operations with locking

In [None]:
class ThreadSafeFileHandler:
    def __init__(self, filename: str):
        self.filename = filename
        self.lock = Lock()

    @contextlib.contextmanager
    def file_lock(self, mode='r'):
        """Context manager for file locking"""
        with self.lock:
            try:
                f = open(self.filename, mode)
                fcntl.flock(f.fileno(), fcntl.LOCK_EX)
                yield f
            finally:
                fcntl.flock(f.fileno(), fcntl.LOCK_UN)
                f.close()

    def read_csv(self) -> Optional[pd.DataFrame]:
        """Thread-safe CSV reading"""
        if not os.path.exists(self.filename):
            return None

        try:
            with self.file_lock('r') as f:
                return pd.read_csv(f)
        except Exception as e:
            logger.error(f"Error reading CSV: {e}")
            return None

    def write_csv(self, df: pd.DataFrame) -> bool:
        """Thread-safe CSV writing"""
        try:
            with self.file_lock('w') as f:
                df.to_csv(f, index=False)
            return True
        except Exception as e:
            logger.error(f"Error writing CSV: {e}")
            return False




Upload Firebase configuration with better error handling

In [None]:
def upload_firebase_config():
    try:
        from google.colab import files
        colab_available = True
    except ImportError:
        colab_available = False

    # Check existing JSON files
    json_files = [f for f in os.listdir('.') if f.endswith('.json') and 'firebase' in f.lower()]

    if json_files:
        logger.info(f"Found Firebase config: {json_files[0]}")
        return json_files[0]

    if colab_available:
        logger.info("Please upload your Firebase Admin SDK JSON file:")
        uploaded = files.upload()
        return list(uploaded.keys())[0]
    else:
        raise RuntimeError("Place your Firebase Admin SDK JSON file in the working directory.")


Initialize Firebase with better error handling

In [None]:
def initialize_firebase(json_filename: str, db_url: str):
    try:
        if not os.path.exists(json_filename):
            raise FileNotFoundError(f"Firebase config file not found: {json_filename}")

        cred = credentials.Certificate(json_filename)
        if not firebase_admin._apps:
            firebase_admin.initialize_app(cred, {'databaseURL': db_url})
            logger.info("Firebase initialized successfully")
        else:
            logger.info("Firebase app already initialized")
        return True
    except Exception as e:
        logger.error(f"Firebase initialization failed: {e}")
        return False

Enhanced data processor with thread safety and validation

In [None]:
class SmartGloveDataProcessor:
    def __init__(self, config: Config):
        self.config = config
        self.file_handler = ThreadSafeFileHandler(config.csv_filename)
        self.model = None
        self.scaler = None
        self.feature_columns = None
        self.last_training_time = None
        self.last_record_count = 0
        self.validator = DataValidator()

        # Load existing model if available
        self.load_model()

    def load_model(self):
        """Load existing model and scaler"""
        try:
            if os.path.exists(self.config.model_filename):
                self.model = joblib.load(self.config.model_filename)
                logger.info("Model loaded successfully")

            if os.path.exists(self.config.scaler_filename):
                self.scaler = joblib.load(self.config.scaler_filename)
                logger.info("Scaler loaded successfully")

        except Exception as e:
            logger.error(f"Error loading model: {e}")

    def save_model(self):
        """Save trained model and scaler"""
        try:
            if self.model is not None:
                joblib.dump(self.model, self.config.model_filename)
            if self.scaler is not None:
                joblib.dump(self.scaler, self.config.scaler_filename)
            logger.info("Model and scaler saved successfully")
        except Exception as e:
            logger.error(f"Error saving model: {e}")

    def find_data_path(self) -> Optional[str]:
        for path in self.config.firebase_paths:
            for attempt in range(self.config.max_retries):
                try:
                    ref = db.reference(path)
                    data = ref.get()
                    if data and isinstance(data, dict):
                        if any(isinstance(v, dict) and len(v) > 1 for v in data.values()):
                            logger.info(f"Found data at path: '{path}'")
                            return path
                except Exception as e:
                    logger.warning(f"Attempt {attempt + 1} failed for path '{path}': {e}")
                    if attempt < self.config.max_retries - 1:
                        time.sleep(self.config.retry_delay)

        logger.error("No valid Firebase path found")
        return None

    def fetch_and_process_data(self, data_path: str) -> Optional[pd.DataFrame]:
        try:
            ref = db.reference(data_path)
            data = ref.get()
            if not data:
                logger.warning("No data found in Firebase")
                return None

            ts_data = {}
            for dtype, entries in data.items():
                if not isinstance(entries, dict):
                    continue
                for ts, record in entries.items():
                    if not isinstance(record, dict):
                        continue
                    if ts not in ts_data:
                        ts_data[ts] = {}
                    for k, v in record.items():
                        if isinstance(v, dict):
                            for sub_k, sub_v in v.items():
                                ts_data[ts][f"{k}_{sub_k}"] = sub_v
                        elif k != 'timestamp':
                            ts_data[ts][k] = v

            if not ts_data:
                logger.warning("No timestamp data found")
                return None

            df = pd.DataFrame.from_dict(ts_data, orient='index').reset_index()
            df = df.rename(columns={'index': 'timestamp'})
            df.columns = [c.replace(' ', '_').lower() for c in df.columns]

            cols = ['timestamp'] + [c for c in df.columns if c not in ['timestamp', 'label']]
            if 'label' in df.columns:
                df['label'] = df['label'].astype(str).str.lower()
                cols.append('label')
            else:
                df['label'] = 'unlabeled'
                cols.append('label')

            df = df[cols].sort_values('timestamp').reset_index(drop=True)

            is_valid, message = self.validator.validate_sensor_data(df)
            if not is_valid:
                logger.error(f"Data validation failed: {message}")
                return None

            logger.info(f"Processed {len(df)} records from Firebase")
            return df

        except Exception as e:
            logger.error(f"Error processing data: {e}")
            return None

    def update_csv(self, df: pd.DataFrame) -> bool:
        if df is None or df.empty:
            return False
        try:
            existing_df = self.file_handler.read_csv()
            if existing_df is not None:
                combined = pd.concat([existing_df, df], ignore_index=True)
                combined = combined.drop_duplicates(subset=['timestamp'], keep='last')
                combined = combined.sort_values('timestamp').reset_index(drop=True)

                if len(combined) > self.config.max_records:
                    combined = combined.tail(self.config.max_records)
                    logger.info(f"Applied record limit, keeping {len(combined)} records")

                if 'timestamp' in combined.columns:
                    cutoff_time = datetime.now() - timedelta(days=self.config.data_retention_days)
                    cutoff_timestamp = cutoff_time.timestamp()
                    combined = combined[combined['timestamp'] >= cutoff_timestamp]
                    logger.info(f"Applied time retention, keeping {len(combined)} records")
            else:
                combined = df

            success = self.file_handler.write_csv(combined)
            if success:
                logger.info(f"Updated CSV: {len(combined)} total records")
            return success

        except Exception as e:
            logger.error(f"Error updating CSV: {e}")
            return False

    def prepare_ml_data(self) -> Tuple[Optional[pd.DataFrame], Optional[pd.Series]]:
        df = self.file_handler.read_csv()
        if df is None:
            return None, None

        if 'label' not in df.columns:
            logger.error("No label column found")
            return None, None

        X = df.drop(columns=['timestamp', 'label'], errors='ignore')
        X = X.select_dtypes(include=[np.number]).fillna(0)
        y = df['label']

        if not self.validator.validate_labels(y):
            logger.warning("Label validation issues detected")

        self.feature_columns = X.columns.tolist()
        logger.info(f"Prepared ML data: {len(X)} samples, {len(X.columns)} features")

        return X, y

    def should_retrain(self, current_sample_count: int) -> bool:
        if self.model is None:
            return current_sample_count >= self.config.min_samples_for_training

        new_samples = current_sample_count - self.last_record_count
        if new_samples < self.config.min_samples_for_retraining:
            return False

        if self.last_training_time is None:
            return True

        time_since_training = datetime.now() - self.last_training_time
        return time_since_training.total_seconds() > 3600

    def train_model(self, X: pd.DataFrame, y: pd.Series) -> bool:
        if X.empty or y.empty:
            logger.warning("Empty dataset for training")
            return False
        try:
            if len(X) < self.config.min_samples_for_training:
                logger.warning(f"Insufficient samples for training: {len(X)}")
                return False

            X_train, X_test, y_train, y_test = train_test_split(
                X, y,
                test_size=self.config.test_size,
                random_state=self.config.random_state,
                stratify=y
            )

            self.scaler = StandardScaler()
            X_train_scaled = self.scaler.fit_transform(X_train)
            X_test_scaled = self.scaler.transform(X_test)

            self.model = RandomForestClassifier(
                n_estimators=100,
                random_state=self.config.random_state,
                n_jobs=-1
            )
            self.model.fit(X_train_scaled, y_train)

            y_pred = self.model.predict(X_test_scaled)
            accuracy = accuracy_score(y_test, y_pred)

            logger.info(f"Model trained successfully | Accuracy: {accuracy:.4f}")
            logger.info(f"Classification Report:\n{classification_report(y_test, y_pred)}")

            self.save_model()

            self.last_training_time = datetime.now()
            self.last_record_count = len(X)

            return True

        except Exception as e:
            logger.error(f"Error training model: {e}")
            return False

    def predict_new_data(self, new_data) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
        if self.model is None or self.scaler is None:
            logger.warning("Model not available for prediction")
            return None, None

        try:
            if isinstance(new_data, dict):
                new_data = pd.DataFrame([new_data])

            if self.feature_columns:
                new_data = new_data.reindex(columns=self.feature_columns, fill_value=0)

            new_data = new_data.fillna(0)
            scaled = self.scaler.transform(new_data)
            pred = self.model.predict(scaled)
            proba = self.model.predict_proba(scaled)

            return pred, proba

        except Exception as e:
            logger.error(f"Error making prediction: {e}")
            return None, None


Enhanced real-time monitoring with better error handling
    

In [None]:
class RealTimeMonitor:
    def __init__(self, processor: SmartGloveDataProcessor, path: str, config: Config):
        self.processor = processor
        self.path = path
        self.config = config
        self.running = False
        self.thread = None
        self.error_count = 0
        self.max_errors = 10
        self.alert_manager = AlertManager(config)

    def start(self):
        """Start monitoring with proper thread management"""
        if self.running:
            logger.warning("Monitor already running")
            return

        self.running = True
        self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
        self.thread.start()
        logger.info(f"Real-time monitoring started (interval: {self.config.monitoring_interval}s)")

    def stop(self):
        """Stop monitoring gracefully"""
        self.running = False
        if self.thread and self.thread.is_alive():
            self.thread.join(timeout=5)
        logger.info("Monitoring stopped")



Main monitoring loop with error recovery

In [None]:
def _monitor_loop(self):
    while self.running:
        try:
            # Fetch new data
            df = self.processor.fetch_and_process_data(self.path)

            if df is not None and self.processor.update_csv(df):
                # Prepare ML data
                X, y = self.processor.prepare_ml_data()

                # Check if retraining is needed
                if X is not None and self.processor.should_retrain(len(X)):
                    logger.info("Starting model retraining...")
                    if self.processor.train_model(X, y):
                        logger.info("Model retraining completed")
                    else:
                        logger.error("Model retraining failed")

                # Make prediction on latest data
                if X is not None and not X.empty:
                    latest_data = X.iloc[-1:].drop(columns=['label'], errors='ignore')
                    pred, proba = self.processor.predict_new_data(latest_data)

                    if pred is not None and proba is not None:
                        confidence = max(proba[0])
                        prediction = pred[0]

                        logger.info(f"Latest prediction: {prediction} (confidence: {confidence:.3f})")

                        # 🔥 Push prediction and confidence to Firebase
                        try:
                            prediction_ref = db.reference('smartglove/predictions')
                            prediction_data = {
                                'timestamp': datetime.now().isoformat(),
                                'prediction': str(prediction),
                                'confidence': float(confidence)
                            }
                            prediction_ref.push(prediction_data)
                            logger.info(f"📤 Prediction pushed to Firebase: {prediction_data}")
                        except Exception as firebase_error:
                            logger.error(f"⚠️ Failed to push prediction to Firebase: {firebase_error}")

                        # 🚨 NEW: Check if alert should be sent
                        self.alert_manager.send_alert(prediction, confidence)

            # Reset error count on success
            self.error_count = 0

        except Exception as e:
            self.error_count += 1
            logger.error(f"Monitoring error ({self.error_count}/{self.max_errors}): {e}")

            if self.error_count >= self.max_errors:
                logger.error("Too many errors, stopping monitor")
                self.running = False
                break

        # Sleep with early exit check
        for _ in range(self.config.monitoring_interval):
            if not self.running:
                break
            time.sleep(1)


In [None]:
def setup_email_config():
    """Interactive setup for email configuration"""
    print("\n=== Email Alert Configuration ===")
    sender_email = input("Enter sender email (Gmail): ")
    sender_password = input("Enter app password (not regular password): ")

    recipients = []
    while True:
        email = input("Enter recipient email (or press Enter to finish): ")
        if not email:
            break
        recipients.append(email)

    return sender_email, sender_password, recipients

In [None]:
def setup_fcm_topic():
    """Setup FCM topic for notifications"""
    try:
        # This would typically be done on the client side (mobile app)
        # For now, we'll just log the topic name
        logger.info("📱 FCM Topic: smartglove_alerts")
        logger.info("Subscribe your mobile app to this topic to receive notifications")
        return True
    except Exception as e:
        logger.error(f"FCM setup failed: {e}")
        return False

In [None]:
def upload_firebase_config():
    """Upload Firebase Admin SDK JSON file in Google Colab"""
    from google.colab import files
    logger.info("Please upload your Firebase Admin SDK JSON file now:")
    uploaded = files.upload()
    for filename in uploaded.keys():
        if filename.endswith('.json'):
            logger.info(f"Uploaded file: {filename}")
            return filename
    raise RuntimeError("No JSON file uploaded. Please upload a valid Firebase Admin SDK JSON file.")


Main function with enhanced error handling

In [None]:
def main():
    logger.info("=== ENHANCED SMART GLOVE REAL-TIME ML PIPELINE ===")
    try:
        config = Config.load()
        config.save()

        # Upload Firebase Admin SDK JSON file interactively in Colab
        json_file = upload_firebase_config()

        if not config.sender_email or not config.recipient_emails:
            sender_email, sender_password, recipients = setup_email_config()
            config.sender_email = sender_email
            config.sender_password = sender_password
            config.recipient_emails = recipients
            config.save()

        # Setup FCM
        setup_fcm_topic()

        if not initialize_firebase(json_file, config.firebase_db_url):
            logger.error("❌ Failed to initialize Firebase")
            return

        processor = SmartGloveDataProcessor(config)
        path = processor.find_data_path()
        if not path:
            logger.error("❌ No valid Firebase path found")
            return

        logger.info("📦 Fetching initial data...")
        df = processor.fetch_and_process_data(path)
        if df is not None:
            processor.update_csv(df)
            X, y = processor.prepare_ml_data()
            if X is not None and len(X) >= config.min_samples_for_training:
                logger.info("🔁 Training initial model...")
                processor.train_model(X, y)

        monitor = RealTimeMonitor(processor, path, config)
        monitor.start()

        try:
            while True:
                time.sleep(10)
        except KeyboardInterrupt:
            logger.info("🛑 Shutdown requested by user.")
        finally:
            monitor.stop()
            logger.info("✅ Smart Glove ML pipeline shutdown complete.")

    except Exception as e:
        logger.exception(f"❌ Fatal error in main: {e}")


In [None]:
if __name__ == "__main__":
    main()

Saving emi-smartglove-firebase-adminsdk-fbsvc-7ce1c7583b.json to emi-smartglove-firebase-adminsdk-fbsvc-7ce1c7583b (2).json


ERROR:__main__:No valid Firebase path found
ERROR:__main__:❌ No valid Firebase path found
