# Ergonomic Assessment API Test

This notebook demonstrates how to use the Ergonomic Assessment API to analyze images and videos for ergonomic risks.

In [1]:
import requests
import json
import time
import os
import matplotlib.pyplot as plt
from IPython.display import Image, display, HTML
import ipywidgets as widgets
from urllib.parse import urljoin

## Configure API URL

If the server is running on a different machine, you can update the base URL here.

In [2]:
# Base URL of the API
# Replace with your actual server hostname
BASE_URL = "http://desktop-r8kamps:5050/" 

# Add a function to let users easily set the server address
def set_server_address(ip_or_hostname, port=5050):
    global BASE_URL
    BASE_URL = f"http://{ip_or_hostname}:{port}/"
    print(f"Server address set to: {BASE_URL}")
    return BASE_URL

# Set server address to your hostname
set_server_address("desktop-r8kamps", 5050)

# Test the connection
try:
    response = requests.get(urljoin(BASE_URL, "ws-client"))
    if response.status_code == 200:
        print(f"✅ Server is running at {BASE_URL}")
    else:
        print(f"❌ Server returned status code {response.status_code}")
except requests.exceptions.ConnectionError:
    print(f"❌ Failed to connect to server at {BASE_URL}")
    print("Make sure the server is running and the URL is correct.")
    print("If connecting from a remote machine, use set_server_address() to set the correct IP.")

Server address set to: http://desktop-r8kamps:5050/
❌ Server returned status code 500


In [3]:
# Test a simple API endpoint instead
try:
    # Just check if the server is running by pinging a simple endpoint
    response = requests.get(urljoin(BASE_URL, "predict/video/result?job_id=test"))
    print(f"Server response status: {response.status_code}")
    print(f"Response: {response.text}")
    print("\nServer is running even if templates might have issues.")
except requests.exceptions.ConnectionError:
    print(f"❌ Failed to connect to server at {BASE_URL}")

Server response status: 404
Response: {
  "error": "Job not found or expired"
}


Server is running even if templates might have issues.


In [6]:
# Use a specific image path - update this with your actual image path
test_image_path = "/root/model/TA/Deploy/test/2b.jpg"  

# If you don't know the exact name, list the images in the directory
import os
import glob

# List all images in the test directory
test_dir = "/root/model/TA/Deploy/test"
if os.path.exists(test_dir):
    image_files = glob.glob(f"{test_dir}/*.jpg") + glob.glob(f"{test_dir}/*.jpeg") + glob.glob(f"{test_dir}/*.png")
    
    if image_files:
        print(f"Found {len(image_files)} images in test directory:")
        for img in image_files:
            print(f"  - {os.path.basename(img)}")
        
        # Use the first image for testing
        test_image_path = image_files[0]
        print(f"\nUsing {os.path.basename(test_image_path)} for testing")
    else:
        print("No image files found in test directory")
else:
    print(f"Test directory not found: {test_dir}")

# Upload the image for analysis if found
if os.path.exists(test_image_path):
    print(f"Uploading image: {test_image_path}")
    
    with open(test_image_path, 'rb') as f:
        files = {'image': (os.path.basename(test_image_path), f, 'image/jpeg')}
        try:
            response = requests.post(urljoin(BASE_URL, "predict/image"), files=files)
            print(f"Image analysis response status: {response.status_code}")
            
            if response.status_code == 200:
                print("✅ Image analysis successful!")
                # Print result summary
                result = response.json().get('result', {})
                if 'reba_score' in result:
                    print(f"\nREBA Score: {result['reba_score']:.1f} ({result['risk_level']} Risk)")
                    
                    print("\nComponent Scores:")
                    for component, score in result.get('component_scores', {}).items():
                        print(f"  - {component.title()}: {score}")
                    
                    if 'feedback' in result:
                        print("\nFeedback:")
                        print(result['feedback'])
                        
                    if 'visualization_path' in result:
                        vis_url = urljoin(BASE_URL, f"output_images/{result['visualization_path']}")
                        print(f"\nVisualization available at: {vis_url}")
                        
                        # Display the visualization if in a notebook
                        from IPython.display import Image, display
                        try:
                            display(Image(url=vis_url))
                            print("Visualization displayed above")
                        except:
                            print("Unable to display visualization image directly")
                else:
                    print("\nNo REBA score found in the result")
                    print(f"Full result: {result}")
            else:
                print(f"❌ Error response: {response.text}")
        except Exception as e:
            print(f"❌ Error during image upload: {str(e)}")
else:
    print(f"❌ Test image not found: {test_image_path}")

Found 2 images in test directory:
  - 2B.jpg
  - 144714956.jpg

Using 2B.jpg for testing
Uploading image: /root/model/TA/Deploy/test/2B.jpg
Image analysis response status: 200
✅ Image analysis successful!

REBA Score: 2.5 (Low Risk)

Component Scores:
  - Leg_Score: 2
  - Lower_Arm_Score: 2
  - Neck_Score: 1
  - Reba_Score: 2.5340089797973633
  - Trunk_Score: 2
  - Upper_Arm_Score: 1

Feedback:
Overall REBA Score: 2.5 - Low Risk. Action may be needed.

Focus on improving: Lower arm position, Leg posture (score 2).

Recommended actions:
- Position work to allow 90-110° elbow angles
- Ensure even weight distribution between legs
- Avoid prolonged static standing in awkward positions


Visualization available at: http://desktop-r8kamps:5050/output_images/2025-05-19/image_20250519001325_2_001327.png


Visualization displayed above


In [10]:
# Test video analysis
import os

# Define the path to your test video
test_dir = "/root/model/TA/Deploy/test"
test_video_path = os.path.join(test_dir, "ergotest.mp4")  # Update this with your actual video filename

# Check if the video exists
if not os.path.exists(test_video_path):
    # If not, list available video files
    video_files = glob.glob(f"{test_dir}/*.mp4") + glob.glob(f"{test_dir}/*.avi") + glob.glob(f"{test_dir}/*.mov")
    
    if video_files:
        print(f"Found {len(video_files)} videos in test directory:")
        for vid in video_files:
            print(f"  - {os.path.basename(vid)}")
        
        # Use the first video for testing
        test_video_path = video_files[0]
        print(f"\nUsing {os.path.basename(test_video_path)} for testing")
    else:
        print("No video files found in test directory")
        test_video_path = None

# Submit the video for analysis if found
if test_video_path and os.path.exists(test_video_path):
    print(f"Uploading video: {test_video_path}")
    
    with open(test_video_path, 'rb') as f:
        files = {'video': (os.path.basename(test_video_path), f, 'video/mp4')}
        
        try:
            response = requests.post(urljoin(BASE_URL, "predict/video"), files=files)
            print(f"Video submission response status: {response.status_code}")
            
            if response.status_code == 200:
                job_id = response.json().get('job_id')
                print(f"✅ Video submitted successfully! Job ID: {job_id}")
                
                # Now poll for results
                print("\nChecking for results (this may take a while)...")
                max_attempts = 60  # 5 minutes with 5-second intervals
                for attempt in range(max_attempts):
                    check_response = requests.get(urljoin(BASE_URL, f"predict/video/result?job_id={job_id}"))
                    
                    if check_response.status_code == 200:
                        job_status = check_response.json()
                        
                        if job_status.get('status') == 'done':
                            print(f"✅ Processing complete!")
                            result = job_status.get('result')
                            
                            print(f"\nVideo Analysis Results:")
                            print(f"Average REBA Score: {result.get('avg_reba_score', 'N/A')}")
                            print(f"Risk Level: {result.get('risk_level', 'N/A')}")
                            
                            # Display component scores
                            if 'avg_component_scores' in result:
                                print("\nComponent Scores:")
                                for component, score in result['avg_component_scores'].items():
                                    print(f"  - {component.title()}: {score:.2f}")
                            
                            # Display recommendations
                            if 'recommendations' in result and result['recommendations']:
                                print("\nRecommendations:")
                                for i, rec in enumerate(result['recommendations'], 1):
                                    print(f"  {i}. {rec}")
                            
                            # Display full result as option
                            show_full_result = False  # Set to True to see full details
                            if show_full_result:
                                print("\nFull result:")
                                print(json.dumps(result, indent=2))
                            
                            break
                        else:
                            if attempt % 4 == 0:  # Show status every 20 seconds
                                print(f"Still processing... (attempt {attempt+1}/{max_attempts})")
                    else:
                        print(f"Error checking job status: {check_response.status_code}")
                        print(check_response.text)
                        break
                    
                    time.sleep(5)  # Wait 5 seconds between checks
                    
                else:  # This runs if the loop completes without breaking
                    print("❌ Timed out waiting for results")
            else:
                print(f"❌ Error submitting video: {response.status_code}")
                print(response.text)
        except Exception as e:
            print(f"❌ Error during video upload: {str(e)}")
else:
    print(f"❌ No test video found. Please add a .mp4 video to {test_dir}")

Uploading video: /root/model/TA/Deploy/test/ergotest.mp4
Video submission response status: 200
✅ Video submitted successfully! Job ID: b13451dc-2a84-4d49-88ad-a20fe68e62de

Checking for results (this may take a while)...
Still processing... (attempt 1/60)
Still processing... (attempt 5/60)
Still processing... (attempt 9/60)
Still processing... (attempt 13/60)
Still processing... (attempt 17/60)
Still processing... (attempt 21/60)
Still processing... (attempt 25/60)
Still processing... (attempt 29/60)
Still processing... (attempt 33/60)
Still processing... (attempt 37/60)
Still processing... (attempt 41/60)
✅ Processing complete!

Video Analysis Results:
Average REBA Score: 3.348956447465079
Risk Level: Medium

Component Scores:
  - Leg: 2.02
  - Lower_Arm: 1.83
  - Neck: 1.28
  - Trunk: 2.37
  - Upper_Arm: 1.35

Recommendations:
  1. Ensure even weight distribution between both legs.
  2. Use an anti-fatigue mat if standing for long periods.
  3. Consider ergonomic adjustments to your 

## Analyze an Image

This section demonstrates how to upload an image and get ergonomic analysis results.

In [None]:
def analyze_image(image_path):
    """Upload an image and get ergonomic analysis results"""
    print(f"Analyzing image: {image_path}")
    
    # Check if file exists
    if not os.path.exists(image_path):
        print(f"❌ File not found: {image_path}")
        return None
        
    # Prepare file for upload
    with open(image_path, 'rb') as f:
        files = {'image': (os.path.basename(image_path), f, 'image/jpeg')}
        
        # Make the request
        try:
            response = requests.post(urljoin(BASE_URL, "predict/image"), files=files)
            
            if response.status_code == 200:
                result = response.json().get('result')
                print(f"✅ Analysis complete!")
                return result
            else:
                print(f"❌ Error: {response.status_code}")
                try:
                    print(response.json())
                except:
                    print(response.text)
                return None
        except Exception as e:
            print(f"❌ Exception: {str(e)}")
            return None

In [None]:
# Path to an image file for testing
# Replace with a path to your own image
test_image_path = "test_image.jpg"

# Analyze the image
image_result = analyze_image(test_image_path)

# Display the result
if image_result:
    print(f"REBA Score: {image_result['reba_score']:.1f} ({image_result['risk_level']} Risk)")
    
    # Display component scores
    print("\nComponent Scores:")
    for component, score in image_result['component_scores'].items():
        print(f"  - {component.title()}: {score}")
        
    # Display visualization if available
    if 'visualization_path' in image_result:
        visualization_url = urljoin(BASE_URL, "output_images/" + image_result['visualization_path'])
        print(f"\nVisualization: {visualization_url}")
        display(Image(url=visualization_url))
        
    # Display feedback
    if 'feedback' in image_result:
        print("\nFeedback:")
        print(image_result['feedback'])

## Analyze a Video

This section demonstrates how to upload a video for asynchronous processing and retrieve the results.

In [None]:
def submit_video(video_path):
    """Upload a video for asynchronous processing"""
    print(f"Uploading video: {video_path}")
    
    # Check if file exists
    if not os.path.exists(video_path):
        print(f"❌ File not found: {video_path}")
        return None
        
    # Prepare file for upload
    with open(video_path, 'rb') as f:
        files = {'video': (os.path.basename(video_path), f, 'video/mp4')}
        
        # Make the request
        try:
            response = requests.post(urljoin(BASE_URL, "predict/video"), files=files)
            
            if response.status_code == 200:
                job_id = response.json().get('job_id')
                print(f"✅ Video submitted successfully!")
                print(f"Job ID: {job_id}")
                return job_id
            else:
                print(f"❌ Error: {response.status_code}")
                try:
                    print(response.json())
                except:
                    print(response.text)
                return None
        except Exception as e:
            print(f"❌ Exception: {str(e)}")
            return None

def check_video_result(job_id):
    """Check the status of a video analysis job"""
    try:
        response = requests.get(urljoin(BASE_URL, f"predict/video/result?job_id={job_id}"))
        
        if response.status_code == 200:
            return response.json()
        else:
            print(f"❌ Error: {response.status_code}")
            try:
                print(response.json())
            except:
                print(response.text)
            return None
    except Exception as e:
        print(f"❌ Exception: {str(e)}")
        return None

def wait_for_video_result(job_id, max_wait_seconds=300, check_interval=5):
    """Wait for video analysis to complete"""
    print(f"Waiting for video analysis to complete (job ID: {job_id})...")
    
    start_time = time.time()
    iterations = 0
    
    while time.time() - start_time < max_wait_seconds:
        iterations += 1
        result = check_video_result(job_id)
        
        if result is None:
            print(f"❌ Error checking job status")
            return None
        
        status = result.get('status')
        
        if status == 'done':
            elapsed_time = time.time() - start_time
            print(f"✅ Processing complete! (Elapsed time: {elapsed_time:.1f} seconds)")
            return result
        elif status == 'processing':
            if iterations % 4 == 0:  # Show status message every 4 checks
                elapsed_time = time.time() - start_time
                print(f"⏳ Still processing... (Elapsed time: {elapsed_time:.1f} seconds)")
        else:
            print(f"❌ Unexpected status: {status}")
            return result
        
        time.sleep(check_interval)
    
    print(f"❌ Timed out after {max_wait_seconds} seconds")
    return None

In [None]:
# Path to a video file for testing
# Replace with a path to your own video
test_video_path = "test_video.mp4"

# Submit the video for analysis
job_id = submit_video(test_video_path)

if job_id:
    # Wait for the analysis to complete
    result = wait_for_video_result(job_id)
    
    if result and result.get('status') == 'done':
        # Extract the actual results
        video_result = result.get('result')
        
        # Display basic results
        print(f"\nVideo Analysis Results:")
        print(f"Average REBA Score: {video_result['avg_reba_score']:.1f} ({video_result['risk_level']} Risk)")
        print(f"Action Level: {video_result['action_level']} - {video_result['action_text']}")
        
        # Display component scores
        print("\nAverage Component Scores:")
        for component, score in video_result['avg_component_scores'].items():
            print(f"  - {component.title()}: {score:.1f}")
        
        # Display high risk periods
        if video_result['high_risk_periods_count'] > 0:
            print(f"\nHigh Risk Periods Detected: {video_result['high_risk_periods_count']}")
            for i, period in enumerate(video_result['high_risk_periods']):
                print(f"  - Period {i+1}: Frames {period['start_frame']}-{period['end_frame']} (Avg REBA: {period['avg_score']:.1f})")
        else:
            print("\nNo high risk periods detected.")
        
        # Display recommendations
        if 'recommendations' in video_result and video_result['recommendations']:
            print("\nRecommendations:")
            for i, rec in enumerate(video_result['recommendations']):
                print(f"  {i+1}. {rec}")
                
        # Show more detailed stats using a collapsible widget
        stats_html = f"""
        <h4>Detailed Statistics:</h4>
        <table>
            <tr>
                <th>Statistic</th>
                <th>Value</th>
            </tr>
            <tr>
                <td>REBA Score Range</td>
                <td>{video_result['reba_statistics']['min']:.1f} - {video_result['reba_statistics']['max']:.1f}</td>
            </tr>
            <tr>
                <td>REBA Score Median</td>
                <td>{video_result['reba_statistics']['median']:.1f}</td>
            </tr>
            <tr>
                <td>REBA Score Std Dev</td>
                <td>{video_result['reba_statistics']['std']:.2f}</td>
            </tr>
        </table>
        """
        
        # Create collapsible widget
        stats_button = widgets.Button(description="Show Detailed Statistics")
        stats_output = widgets.Output()
        
        def toggle_stats(_):
            if stats_output.outputs:
                stats_output.clear_output()
                stats_button.description = "Show Detailed Statistics"
            else:
                with stats_output:
                    display(HTML(stats_html))
                stats_button.description = "Hide Detailed Statistics"
                
        stats_button.on_click(toggle_stats)
        display(stats_button)
        display(stats_output)

## Remote Server Configuration

If you're running this notebook on a different machine than your server (e.g., running Jupyter on a remote server with `jupyter notebook --ip=0.0.0.0 --port=8888 --allow-root`), update the server address below:

In [1]:
# Example: If your server's IP is 192.168.1.100 and running on port 5050
# set_server_address("192.168.1.100", 5050)

# If you're running everything locally, you can use:
# set_server_address("localhost", 5050)

# Uncomment and modify the line below with your server's actual IP or hostname:
server_ip = set_server_address("desktop-r8kamps", 5050)

NameError: name 'set_server_address' is not defined

## Test WebSocket Client

To test the WebSocket functionality, open the client page in your browser:

In [None]:
ws_client_url = urljoin(BASE_URL, "ws-client")
print(f"WebSocket client URL: {ws_client_url}")
print("Open this URL in your browser to test real-time ergonomic analysis.")

# Create a clickable link
display(HTML(f'<a href="{ws_client_url}" target="_blank">Open WebSocket Client</a>'))

## Additional Testing Functions

The following helper functions can be useful for testing specific components:

In [None]:
def test_server_connection():
    """Test if the server is running and accessible"""
    try:
        response = requests.get(urljoin(BASE_URL, "ws-client"), timeout=5)
        if response.status_code == 200:
            print(f"✅ Server is running at {BASE_URL}")
            return True
        else:
            print(f"❌ Server returned status code {response.status_code}")
            return False
    except requests.exceptions.ConnectionError:
        print(f"❌ Failed to connect to server at {BASE_URL}")
        print("Make sure the server is running and the URL is correct.")
        return False
    except requests.exceptions.Timeout:
        print(f"❌ Connection timed out to server at {BASE_URL}")
        return False

def check_job_status(job_id):
    """Check the status of a job without waiting for completion"""
    return check_video_result(job_id)

def list_previous_job(job_id):
    """Retrieve and display the result of a previously completed job"""
    result = check_video_result(job_id)
    if result and result.get('status') == 'done':
        print(f"Found completed job: {job_id}")
        video_result = result.get('result')
        print(f"Average REBA Score: {video_result['avg_reba_score']:.1f} ({video_result['risk_level']} Risk)")
        return video_result
    else:
        print(f"Job {job_id} not found or not completed")
        return None

## Conclusion

This notebook demonstrated how to:

1. Connect to the Ergonomic Assessment API
2. Upload and analyze images
3. Submit videos for asynchronous analysis
4. Access the WebSocket client for real-time analysis
5. Configure for remote server access

You can modify these examples to integrate the API into your own applications.