In [None]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.3.104-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.8.0->ultralytics)
  Downloading n

In [None]:
import cv2
from ultralytics import YOLO
import numpy as np
from time import time, strftime, localtime
import logging
from collections import defaultdict, Counter, deque
import json
from datetime import datetime

# Setup logging
logging.basicConfig(filename="actions_detailed.log", level=logging.INFO,
                    format="%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger()

# Configuration
CONFIG = {
    "MODEL_PATH": "best.pt",
    "INPUT_VIDEO": "input2.mp4",
    "OUTPUT_VIDEO": "output_monitor_detailed.mp4",
    "CONF_THRESHOLD": 0.5,
    "IOU_THRESHOLD": 0.3,
    "MAX_DISTANCE": 200,
    "ACTION_BUFFER": 5  # Frames to smooth action transitions
}

# Action definitions with enhanced metadata
ACTION_RATINGS = {
    "Working (Focused)": {"rating": 8, "productivity": 0.9, "wellness": 0.6, "energy": -0.5, "tags": ["work", "focus"]},
    "Working (Multi-Screen)": {"rating": 9, "productivity": 0.95, "wellness": 0.5, "energy": -0.7, "tags": ["work", "multitasking"]},
    "Taking Notes": {"rating": 7, "productivity": 0.8, "wellness": 0.7, "energy": -0.3, "tags": ["work", "learning"]},
    "Reviewing Documents": {"rating": 7, "productivity": 0.85, "wellness": 0.6, "energy": -0.4, "tags": ["work", "analysis"]},
    "Collaborating": {"rating": 6, "productivity": 0.7, "wellness": 0.8, "energy": -0.2, "tags": ["teamwork"]},
    "Team Lunch": {"rating": 4, "productivity": 0.2, "wellness": 0.9, "energy": 0.8, "tags": ["break", "social"]},
    "TV Break": {"rating": 3, "productivity": 0.2, "wellness": 0.4, "energy": 0.5, "tags": ["break", "distraction"]},
    "Lunch Break": {"rating": 5, "productivity": 0.3, "wellness": 0.8, "energy": 0.7, "tags": ["break", "nutrition"]},
    "Unhealthy Break": {"rating": 3, "productivity": 0.2, "wellness": 0.4, "energy": 0.3, "tags": ["break", "unhealthy"]},
    "Mental Reset": {"rating": 6, "productivity": 0.4, "wellness": 0.9, "energy": 0.9, "tags": ["break", "wellness"]},
    "Working While Eating": {"rating": 5, "productivity": 0.6, "wellness": 0.5, "energy": -0.3, "tags": ["work", "multitasking"]},
    "Casual Reading": {"rating": 4, "productivity": 0.3, "wellness": 0.7, "energy": 0.4, "tags": ["break", "learning"]},
    "Distracted Work": {"rating": 4, "productivity": 0.5, "wellness": 0.4, "energy": -0.6, "tags": ["work", "distraction"]},
    "Socializing": {"rating": 5, "productivity": 0.2, "wellness": 0.85, "energy": 0.6, "tags": ["break", "social"]},
    "Quick Snack": {"rating": 5, "productivity": 0.3, "wellness": 0.7, "energy": 0.5, "tags": ["break", "nutrition"]}
    # Add more actions as needed below in the conditions
}

class WorkspaceMonitor:
    def __init__(self):
        self.model = YOLO(CONFIG["MODEL_PATH"])
        self.person_tracks = defaultdict(list)  # Person ID -> [(frame, action, rating, timestamp, duration, objects)]
        self.action_durations = defaultdict(float)  # (Person ID, action) -> total_duration
        self.action_buffer = defaultdict(lambda: deque(maxlen=CONFIG["ACTION_BUFFER"]))  # Action smoothing
        self.energy_levels = defaultdict(float)  # Person ID -> energy level
        logger.info(f"Initialized monitor with model: {CONFIG['MODEL_PATH']}")

    def get_distance(self, box1, box2):
        """Calculate pixel distance between box centers."""
        x1, y1 = (box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2
        x2, y2 = (box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2
        return np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)

    def iou(self, box1, box2):
        """Calculate Intersection over Union for two boxes."""
        x1, y1, x2, y2 = box1
        x1_p, y1_p, x2_p, y2_p = box2
        xi1, yi1 = max(x1, x1_p), max(y1, y1_p)
        xi2, yi2 = min(x2, x2_p), min(y2, y2_p)
        inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
        box1_area = (x2 - x1) * (y2 - y1)
        box2_area = (x2_p - x1_p) * (y2_p - y1_p)
        union_area = box1_area + box2_area - inter_area
        return inter_area / union_area if union_area > 0 else 0

    def analyze_frame(self, results, frame_count, prev_people, fps):
        """Analyze detections with temporal and spatial reasoning."""
        detections = results[0].boxes
        people = {}
        actions = []

        # Assign person IDs with tracking
        person_boxes = [(i, box.cpu().numpy()) for i, (box, cls) in enumerate(zip(detections.xyxy, detections.cls))
                        if self.model.names[int(cls)] == "person"]
        for i, box in person_boxes:
            pid = f"P{i}"
            if prev_people:
                max_iou = 0
                matched_pid = None
                for prev_pid, prev_data in prev_people.items():
                    iou_score = self.iou(box, prev_data["box"])
                    if iou_score > max_iou and iou_score > CONFIG["IOU_THRESHOLD"]:
                        max_iou = iou_score
                        matched_pid = prev_pid
                pid = matched_pid if matched_pid else f"P{len(people)}"
            people[pid] = {"box": box, "objects": [], "prev_action": prev_people.get(pid, {}).get("action")}

        # Associate objects with people
        for i, (box, cls, conf) in enumerate(zip(detections.xyxy, detections.cls, detections.conf)):
            box = box.cpu().numpy()
            cls = int(cls.cpu().numpy())
            class_name = self.model.names[cls]
            if class_name != "person":
                min_dist = float("inf")
                closest_pid = None
                for pid, pdata in people.items():
                    dist = self.get_distance(box, pdata["box"])
                    if dist < min_dist:
                        min_dist = dist
                        closest_pid = pid
                if closest_pid and min_dist < CONFIG["MAX_DISTANCE"]:
                    people[closest_pid]["objects"].append((class_name, box, conf))

        # Action analysis per person with 40+ conditions
        timestamp = frame_count / fps
        for pid, pdata in people.items():
            person_box = pdata["box"]
            objects = pdata["objects"]
            prev_action = pdata["prev_action"]
            action = None
            duration = 1 / fps

            # Object detection shortcuts
            laptop = [o for o in objects if o[0] == "laptop"]
            tv = [o for o in objects if o[0] == "tv"]
            pizza = [o for o in objects if o[0] == "pizza"]
            book = [o for o in objects if o[0] == "book"]
            chair = [o for o in objects if o[0] == "chair"]
            table = [o for o in objects if o[0] == "dining table"]
            cup = [o for o in objects if o[0] == "cup"]
            phone = [o for o in objects if o[0] == "cell phone"]
            num_people = len(people)

            # 40+ Conditions for Action Detection
            # Work-related conditions
            if laptop and not pizza and not tv and not phone:
                action = "Working (Focused)"
            elif laptop and len(tv) >= 2 and not pizza:
                action = "Working (Multi-Screen)"
            elif laptop and book and not pizza:
                action = "Reviewing Documents"
            elif book and not laptop and not pizza:
                action = "Taking Notes"
            elif laptop and pizza and not tv:
                action = "Working While Eating"
            elif laptop and phone and not pizza:
                action = "Distracted Work"
            elif laptop and num_people >= 2:
                action = "Collaborating"
            elif laptop and cup and not pizza:
                action = "Working (Focused)"  # Assuming cup is incidental
            elif laptop and tv and not pizza:
                action = "Distracted Work"
            elif laptop and chair and not pizza:
                action = "Working (Focused)"

            # Break-related conditions
            elif pizza and chair and not laptop:
                action = "Lunch Break"
            elif pizza and table and num_people >= 2 and not laptop:
                action = "Team Lunch"
            elif pizza and tv and num_people >= 2:
                action = "TV Break"
            elif pizza and len(pizza) > 1 and not laptop:
                action = "Unhealthy Break"
            elif pizza and phone and not laptop:
                action = "Unhealthy Break"
            elif pizza and not laptop and not chair:
                action = "Quick Snack"
            elif cup and not laptop and not pizza:
                action = "Mental Reset"
            elif cup and chair and not laptop:
                action = "Mental Reset"
            elif tv and not laptop and not pizza and num_people == 1:
                action = "TV Break"  # Only if no other productive items
            elif phone and not laptop and not pizza:
                action = "Unhealthy Break"

            # Social and miscellaneous conditions
            elif num_people >= 2 and not laptop and not pizza and not tv:
                action = "Socializing"
            elif num_people >= 2 and chair and not laptop:
                action = "Socializing"
            elif book and chair and not laptop:
                action = "Casual Reading"
            elif table and num_people >= 2 and not pizza:
                action = "Socializing"
            elif chair and not laptop and not pizza and not tv:
                action = "Mental Reset"

            # Additional nuanced conditions
            elif laptop and book and phone:
                action = "Distracted Work"
            elif laptop and tv and pizza:
                action = "Working While Eating"
            elif phone and chair and not laptop:
                action = "Unhealthy Break"
            elif phone and tv and not laptop:
                action = "TV Break"
            elif cup and book and not laptop:
                action = "Casual Reading"
            elif laptop and table and num_people >= 2:
                action = "Collaborating"
            elif pizza and cup and not laptop:
                action = "Lunch Break"
            elif tv and book and not laptop:
                action = "Casual Reading"
            elif phone and pizza and chair:
                action = "Unhealthy Break"
            elif laptop and phone and tv:
                action = "Distracted Work"
            elif num_people >= 3 and not laptop and not pizza:
                action = "Socializing"
            elif chair and table and not laptop:
                action = "Mental Reset"
            elif cup and tv and not laptop:
                action = "TV Break"
            elif book and phone and not laptop:
                action = "Casual Reading"
            elif laptop and cup and book:
                action = "Reviewing Documents"

            # Default fallback
            elif not objects and not prev_action:
                action = "Mental Reset"
            elif laptop:  # Default to some form of work if laptop present
                action = "Working (Focused)"
            elif not action:
                action = "Unknown Activity"  # Fallback for undefined states

            # Smooth action transitions and record
            if action:
                self.action_buffer[pid].append(action)
                action_counts = Counter(self.action_buffer[pid])
                action = action_counts.most_common(1)[0][0]
                rating = ACTION_RATINGS.get(action, {"rating": 5, "productivity": 0.5, "wellness": 0.5, "energy": 0})["rating"]

                if prev_action == action:
                    duration = self.action_durations[(pid, action)] + duration

                # Update energy level
                energy_change = ACTION_RATINGS.get(action, {"energy": 0})["energy"]
                self.energy_levels[pid] += energy_change * duration
                self.action_durations[(pid, action)] = duration
                actions.append((pid, action, rating, timestamp, duration, [o[0] for o in objects]))
                self.person_tracks[pid].append((frame_count, action, rating, timestamp, duration, objects))
                logger.info(f"{pid} - {action} (Rating: {rating}, Duration: {duration:.2f}s, Objects: {[o[0] for o in objects]})")

        return actions, people

    def generate_report(self, total_duration, fps):
        """Generate detailed report with additional insights."""
        report = {
            "metadata": {
                "date": strftime("%Y-%m-%d %H:%M:%S"),
                "video": CONFIG["INPUT_VIDEO"],
                "duration": total_duration,
                "frames": int(total_duration * fps),
                "people_detected": len(set(pid for pid in self.person_tracks))
            },
            "individuals": {},
            "overall": {},
            "recommendations": []
        }

        # Individual Analysis
        for pid, activities in self.person_tracks.items():
            action_counts = Counter(a[1] for a in activities)
            total_time = sum(a[4] for a in activities)
            total_rating = sum(a[2] for a in activities)
            productivity = sum(ACTION_RATINGS.get(a[1], {"productivity": 0.5})["productivity"] * a[4] for a in activities) / total_time
            wellness = sum(ACTION_RATINGS.get(a[1], {"wellness": 0.5})["wellness"] * a[4] for a in activities) / total_time

            report["individuals"][pid] = {
                "total_time": total_time,
                "actions": {action: {"count": count, "duration": sum(a[4] for a in activities if a[1] == action)}
                           for action, count in action_counts.items()},
                "avg_rating": total_rating / len(activities),
                "productivity": productivity,
                "wellness": wellness,
                "energy": self.energy_levels[pid],
                "eating_times": [a[3] for a in activities if "Break" in a[1] or "Lunch" in a[1] or "Eating" in a[1]]
            }

        # Overall Analysis
        total_people = report["metadata"]["people_detected"]
        report["overall"] = {
            "avg_productivity": sum(d["productivity"] * d["total_time"] for d in report["individuals"].values()) / total_duration if total_people else 0,
            "avg_wellness": sum(d["wellness"] * d["total_time"] for d in report["individuals"].values()) / total_duration if total_people else 0,
            "collab_time": sum(sum(a[4] for a in acts if "Collaborating" in a[1]) for acts in self.person_tracks.values()),
            "issues": Counter(a[1] for acts in self.person_tracks.values() for a in acts if a[2] < 4)
        }

        # Recommendations
        if report["overall"]["avg_productivity"] < 0.6:
            report["recommendations"].append("Implement focus sessions to boost productivity")
        if report["overall"]["avg_wellness"] < 0.5:
            report["recommendations"].append("Schedule mandatory breaks for wellness")
        if report["overall"]["collab_time"] / total_duration > 0.3:
            report["recommendations"].append("High collaboration detected - ensure balance with solo work")
        for pid, data in report["individuals"].items():
            if data["energy"] < -100:
                report["recommendations"].append(f"{pid}: Consider energy-replenishing breaks")
            if "Distracted Work" in data["actions"] and data["actions"]["Distracted Work"]["duration"] > 300:
                report["recommendations"].append(f"{pid}: Reduce distractions during work")

        # Write to file
        with open("report_detailed.json", "w") as f:
            json.dump(report, f, indent=2)

        # Human-readable text report
        with open("report_detailed.txt", "w") as f:
            f.write(f"Corporate Workspace Monitoring Report\n{'='*40}\n")
            for key, value in report["metadata"].items():
                f.write(f"{key.capitalize()}: {value}\n")
            f.write("\nIndividual Reports\n" + "-"*50 + "\n")
            for pid, data in report["individuals"].items():
                f.write(f"{pid}:\n")
                f.write(f"  Total Time: {data['total_time']:.1f}s ({data['total_time']/total_duration*100:.1f}%)\n")
                f.write(f"  Productivity: {data['productivity']:.2f}\n")
                f.write(f"  Wellness: {data['wellness']:.2f}\n")
                f.write(f"  Energy: {data['energy']:.1f}\n")
                f.write("  Actions:\n")
                for action, stats in data["actions"].items():
                    f.write(f"    - {action}: {stats['count']}x, {stats['duration']:.1f}s\n")
                if data["eating_times"]:
                    f.write("  Eating Times:\n")
                    for t in data["eating_times"]:
                        f.write(f"    - {int(t)}s ({strftime('%H:%M:%S', localtime(time() - total_duration + t))})\n")
            f.write("\nOverall Analysis\n" + "-"*50 + "\n")
            for key, value in report["overall"].items():
                if key == "issues":
                    f.write("Issues:\n")
                    for issue, count in value.items():
                        f.write(f"  - {issue}: {count}x\n")
                else:
                    if isinstance(value, float):
                        f.write(f"{key.replace('_', ' ').capitalize()}: {value:.2f}\n")
                    else:
                        f.write(f"{key.replace('_', ' ').capitalize()}: {value}\n")
            f.write("\nRecommendations\n" + "-"*50 + "\n")
            for rec in report["recommendations"]:
                f.write(f"- {rec}\n")

def process_video():
    """Main processing pipeline."""
    monitor = WorkspaceMonitor()
    cap = cv2.VideoCapture(CONFIG["INPUT_VIDEO"])
    if not cap.isOpened():
        logger.error(f"Could not open {CONFIG['INPUT_VIDEO']}")
        raise FileNotFoundError(f"Could not open {CONFIG['INPUT_VIDEO']}")

    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    out = cv2.VideoWriter(CONFIG["OUTPUT_VIDEO"], cv2.VideoWriter_fourcc(*"mp4v"), fps, (frame_width, frame_height))

    frame_count = 0
    start_time = time()
    prev_people = {}

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        results = monitor.model(frame, conf=CONFIG["CONF_THRESHOLD"])
        actions, current_people = monitor.analyze_frame(results, frame_count, prev_people, fps)

        # Annotate frame
        annotated_frame = results[0].plot()
        for pid, action, rating, timestamp, duration, objects in actions:
            text = f"{pid}: {action} ({rating}/10, {duration:.1f}s)"
            box = current_people[pid]["box"]
            cv2.putText(annotated_frame, text, (int(box[0]), int(box[1]) - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        out.write(annotated_frame)
        frame_count += 1
        prev_people = current_people
        if frame_count % 100 == 0:
            logger.info(f"Processed {frame_count} frames")

    total_duration = frame_count / fps
    monitor.generate_report(total_duration, fps)

    cap.release()
    out.release()
    cv2.destroyAllWindows()
    logger.info(f"Processing complete: {total_duration:.1f}s")
    print(f"Processing complete. See {CONFIG['OUTPUT_VIDEO']}, report_detailed.txt/json, and actions_detailed.log")

if __name__ == "__main__":
    process_video()


0: 384x640 4 persons, 5 chairs, 1 tv, 2 dining tables, 1 cup, 4 pizzas, 64.8ms
Speed: 4.5ms preprocess, 64.8ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 persons, 5 chairs, 1 tv, 2 dining tables, 1 cup, 4 pizzas, 52.1ms
Speed: 2.9ms preprocess, 52.1ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 persons, 5 chairs, 1 tv, 2 dining tables, 1 cup, 4 pizzas, 37.8ms
Speed: 3.3ms preprocess, 37.8ms inference, 2.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 persons, 5 chairs, 1 tv, 2 dining tables, 4 pizzas, 37.1ms
Speed: 2.9ms preprocess, 37.1ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 persons, 5 chairs, 1 tv, 1 dining table, 4 pizzas, 32.6ms
Speed: 3.1ms preprocess, 32.6ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 persons, 5 chairs, 1 tv, 1 dining table, 4 pizzas, 32.4ms
Speed: 3.5ms preprocess, 32.4ms inference, 1.3ms postproce