In [18]:
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import classification_report
import joblib

# Load the data
x = np.load('C:/Users/ISSLAB/Downloads/MediaPipe_MainOut/Data/pose_features.npy')
y = np.load('C:/Users/ISSLAB/Downloads/MediaPipe_MainOut/Data/pose_labels.npy')

# Split the data
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)

# Function to evaluate and train model
def evaluate_and_train(model, name, save_path):
    # Cross-validation
    cv_scores = cross_val_score(model, x_train, y_train, cv=5)
    print(f"\n{name} Cross-Validation Results:")
    print(f"Mean Accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
    
    # Train on training set and evaluate on test set
    model.fit(x_train, y_train)
    y_pred = model.predict(x_test)
    print(f"\n{name} Test Set Results:")
    print(classification_report(y_test, y_pred, target_names=['Boxing', 'Normal', 'Squat', 'Raise']))
    
    # Train final model on full dataset and save
    final_model = model.__class__(**model.get_params())
    final_model.fit(x, y)
    joblib.dump(final_model, save_path)
    print(f"Model saved to {save_path}")

# List of models to evaluate with their save paths
models = [
    (RandomForestClassifier(n_estimators=100, random_state=42),
     "Random Forest",
     'C:/Users/ISSLAB/Downloads/MediaPipe_MainOut/Data/random_forest_model.joblib'),
    (SVC(kernel='rbf', random_state=42),
     "Support Vector Machine",
     'C:/Users/ISSLAB/Downloads/MediaPipe_MainOut/Data/support_vector_machine_model.joblib'),
    (MLPClassifier(hidden_layer_sizes=(100, 50), max_iter=500, random_state=42),
     "Neural Network",
     'C:/Users/ISSLAB/Downloads/MediaPipe_MainOut/Data/neural_network_model.joblib')
]

# Evaluate and train each model
for model, name, save_path in models:
    evaluate_and_train(model, name, save_path)

print("\nAll models evaluated, trained on full dataset, and saved to specified paths.")


Random Forest Cross-Validation Results:
Mean Accuracy: 0.9449 (+/- 0.0155)

Random Forest Test Set Results:
              precision    recall  f1-score   support

      Boxing       0.96      0.92      0.94       206
      Normal       0.93      0.97      0.95       197
       Squat       0.95      0.95      0.95       177
       Raise       1.00      1.00      1.00       201

    accuracy                           0.96       781
   macro avg       0.96      0.96      0.96       781
weighted avg       0.96      0.96      0.96       781

Model saved to C:/Users/ISSLAB/Downloads/MediaPipe_MainOut/Data/random_forest_model.joblib

Support Vector Machine Cross-Validation Results:
Mean Accuracy: 0.8889 (+/- 0.0121)

Support Vector Machine Test Set Results:
              precision    recall  f1-score   support

      Boxing       0.86      0.89      0.87       206
      Normal       0.86      0.88      0.87       197
       Squat       0.94      0.86      0.90       177
       Raise       0.

In [None]:
#nas版本----------
import cv2
import mediapipe as mp
import numpy as np
import joblib
from collections import deque
import os
import time

mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_pose = mp.solutions.pose

# Model selection
MODEL_CHOICES = {
    'rf': 'C:/Users/ISSLAB/Downloads/MediaPipe_MainOut/Data/random_forest_model.joblib',
    'svm': 'C:/Users/ISSLAB/Downloads/MediaPipe_MainOut/Data/support_vector_machine_model.joblib',
    'nn': 'C:/Users/ISSLAB/Downloads/MediaPipe_MainOut/Data/neural_network_model.joblib'
}

# Choose your model here
SELECTED_MODEL = 'rf'  #'rf', 'svm', 'nn'

# NAS configuration
NAS_ALARM_PATH = '/path/to/your/nas/alarm/folder/'  # Update this path

def load_model(model_key):
    if model_key not in MODEL_CHOICES:
        raise ValueError(f"Invalid model key. Choose from {', '.join(MODEL_CHOICES.keys())}")
    model_path = MODEL_CHOICES[model_key]
    return joblib.load(model_path)

def get_pose_class(prediction):
    pose_classes = ['Boxing', 'Normal', 'Squat', 'Raise']
    return pose_classes[prediction]

def send_alarm_to_nas(timestamp):
    alarm_file = os.path.join(NAS_ALARM_PATH, f'boxing_alarm_{timestamp}.txt')
    with open(alarm_file, 'w') as f:
        f.write(f"Boxing detected at {timestamp}")
    print(f"Alarm sent to NAS: {alarm_file}")

def main():
    model = load_model(SELECTED_MODEL)
    recent_predictions = deque(maxlen=10)
    
    # Ask user for input source
    input_source = input("Enter 'c' for webcam or provide a video file path: ")
    
    if input_source.lower() == 'c':
        cap = cv2.VideoCapture(0)
    else:
        cap = cv2.VideoCapture(input_source)
    
    if not cap.isOpened():
        print("Error opening video stream or file")
        return

    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
        while cap.isOpened():
            ret, img = cap.read()
            if not ret:
                print("Can't receive frame (stream end?). Exiting ...")
                break
            
            img = cv2.resize(img, (640, 480))
            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            results = pose.process(img_rgb)
            
            if results.pose_landmarks:
                landmarks = results.pose_landmarks.landmark
                pose_data = np.array([[lm.x, lm.y, lm.z, lm.visibility] for lm in landmarks]).flatten()
                prediction = model.predict([pose_data])[0]
                recent_predictions.append(prediction)
                
                most_common = max(set(recent_predictions), key=recent_predictions.count)
                pose_class = get_pose_class(most_common)
                confidence = recent_predictions.count(most_common) / len(recent_predictions)
                
                mp_drawing.draw_landmarks(
                    img,
                    results.pose_landmarks,
                    mp_pose.POSE_CONNECTIONS,
                    landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())
                
                cv2.putText(img, f"Pose: {pose_class}", (10, 30),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                cv2.putText(img, f"Confidence: {confidence:.2f}", (10, 70),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                
                # Check if pose is 'Boxing' and send alarm
                if pose_class == 'Boxing' and confidence > 0.8:
                    timestamp = time.strftime("%Y%m%d-%H%M%S")
                    send_alarm_to_nas(timestamp)
            
            cv2.imshow('Pose Classification', img)
            if cv2.waitKey(5) == ord('q'):
                break
    
    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


In [20]:
#ftp版本----------
import cv2
import mediapipe as mp
import numpy as np
import joblib
from collections import deque
import os
import time
from ftplib import FTP

mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_pose = mp.solutions.pose

# Model selection
MODEL_CHOICES = {
    'rf': 'C:/Users/ISSLAB/Downloads/MediaPipe_MainOut/Data/random_forest_model.joblib',
    'svm': 'C:/Users/ISSLAB/Downloads/MediaPipe_MainOut/Data/support_vector_machine_model.joblib',
    'nn': 'C:/Users/ISSLAB/Downloads/MediaPipe_MainOut/Data/neural_network_model.joblib'
}

# Choose your model here
SELECTED_MODEL = 'rf'  #'rf', 'svm', 'nn'

# FTP configuration
FTP_SERVER = '140.128.102.142:21'  # FTP 伺服器 IP
FTP_USERNAME = 'isslabView'  # FTP 使用者名稱
FTP_PASSWORD = 'isslab411view114'  # FTP 密碼
FTP_ALARM_PATH = 'C:/Users/ISSLAB/Downloads/alarm'  # 更新為你想上傳到的 FTP 路徑


def load_model(model_key):
    if model_key not in MODEL_CHOICES:
        raise ValueError(f"Invalid model key. Choose from {', '.join(MODEL_CHOICES.keys())}")
    model_path = MODEL_CHOICES[model_key]
    return joblib.load(model_path)

def get_pose_class(prediction):
    pose_classes = ['Boxing', 'Normal', 'Squat', 'Raise']
    return pose_classes[prediction]

def send_alarm_to_ftp(timestamp):
    alarm_file_name = f'boxing_alarm_{timestamp}.txt'
    alarm_content = f"Boxing detected at {timestamp}"

    # Write the alarm content to a temporary file
    with open(alarm_file_name, 'w') as f:
        f.write(alarm_content)

    # Connect to FTP and upload the alarm file
    with FTP(FTP_SERVER) as ftp:
        ftp.login(FTP_USERNAME, FTP_PASSWORD)
        ftp.cwd(FTP_ALARM_PATH)
        with open(alarm_file_name, 'rb') as file:
            ftp.storbinary(f'STOR {alarm_file_name}', file)

    print(f"Alarm sent to FTP: {FTP_ALARM_PATH}/{alarm_file_name}")
    
    # Remove local temporary file after uploading
    os.remove(alarm_file_name)

def main():
    model = load_model(SELECTED_MODEL)
    recent_predictions = deque(maxlen=10)
    
    # Ask user for input source
    input_source = input("Enter 'c' for webcam or provide a video file path: ")
    
    if input_source.lower() == 'c':
        cap = cv2.VideoCapture(0)
    else:
        cap = cv2.VideoCapture(input_source)
    
    if not cap.isOpened():
        print("Error opening video stream or file")
        return

    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
        while cap.isOpened():
            ret, img = cap.read()
            if not ret:
                print("Can't receive frame (stream end?). Exiting ...")
                break
            
            img = cv2.resize(img, (640, 480))
            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            results = pose.process(img_rgb)
            
            if results.pose_landmarks:
                landmarks = results.pose_landmarks.landmark
                pose_data = np.array([[lm.x, lm.y, lm.z, lm.visibility] for lm in landmarks]).flatten()
                prediction = model.predict([pose_data])[0]
                recent_predictions.append(prediction)
                
                most_common = max(set(recent_predictions), key=recent_predictions.count)
                pose_class = get_pose_class(most_common)
                confidence = recent_predictions.count(most_common) / len(recent_predictions)
                
                mp_drawing.draw_landmarks(
                    img,
                    results.pose_landmarks,
                    mp_pose.POSE_CONNECTIONS,
                    landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())
                
                cv2.putText(img, f"Pose: {pose_class}", (10, 30),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                cv2.putText(img, f"Confidence: {confidence:.2f}", (10, 70),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                
                # Check if pose is 'Boxing' and send alarm via FTP
                if pose_class == 'Boxing' and confidence > 0.8:
                    timestamp = time.strftime("%Y%m%d-%H%M%S")
                    send_alarm_to_ftp(timestamp)
            
            cv2.imshow('Pose Classification', img)
            if cv2.waitKey(5) == ord('q'):
                break
    
    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


Error opening video stream or file
