In [None]:
# First cell - Import necessary libraries
import os
import subprocess
import shutil
import json
import numpy as np
import warnings
import urllib3

# Suppress SSL warnings
warnings.filterwarnings('ignore', category=urllib3.exceptions.InsecureRequestWarning)

# Base directory setup
base_dir = "C:/Users/zzballabe1/visualdigitaltwin"

In [None]:
# Cell 0: Download data from S3 bucket
import boto3
import os
import warnings
import urllib3
import zipfile

# Suppress SSL warnings
warnings.filterwarnings('ignore', category=urllib3.exceptions.InsecureRequestWarning)

def download_from_s3(bucket_name, prefix, local_dir, file_limit=None):
    """
    Download files from S3 bucket to local directory
    Args:
        bucket_name: S3 bucket name
        prefix: Path prefix in the bucket
        local_dir: Local directory to save files
        file_limit: Maximum number of files to download (None for all)
    """
    # Configure S3 client
    s3_client = boto3.client(
        's3',
        region_name='us-gov-west-1',
        verify=False
    )
    
    # Create local directory if it doesn't exist
    os.makedirs(local_dir, exist_ok=True)
    
    # List objects in the bucket with the given prefix
    print(f"Listing objects in s3://{bucket_name}/{prefix}")
    response = s3_client.list_objects_v2(
        Bucket=bucket_name,
        Prefix=prefix
    )
    
    if 'Contents' not in response:
        print(f"No objects found in s3://{bucket_name}/{prefix}")
        return []
    
    # Get the list of objects
    objects = response['Contents']
    if file_limit:
        objects = objects[:file_limit]
    
    # Download each object
    downloaded_files = []
    for obj in objects:
        key = obj['Key']
        filename = os.path.basename(key)
        local_path = os.path.join(local_dir, filename)
        
        # Skip if file already exists
        if os.path.exists(local_path) and os.path.getsize(local_path) == obj['Size']:
            print(f"Skipping {filename} (already exists)")
            downloaded_files.append(local_path)
            continue
        
        # Download the file
        print(f"Downloading {key} ({obj['Size']/1024/1024:.2f} MB)")
        s3_client.download_file(
            bucket_name,
            key,
            local_path
        )
        downloaded_files.append(local_path)
    
    print(f"Downloaded {len(downloaded_files)} files to {local_dir}")
    return downloaded_files

def extract_zip_file(zip_path, extract_to):
    """Extract contents of a zip file"""
    print(f"Extracting {zip_path} to {extract_to}")
    os.makedirs(extract_to, exist_ok=True)
    
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    
    print(f"Extracted zip contents to {extract_to}")
    return extract_to

def download_video_from_s3():
    """Download video file from S3 bucket"""
    bucket_name = 'hiitsd-mt-visualdigitaltwin-conf-room'
    prefix = 'Conference-Room-14OCT2025-Camera/'
    video_dir = os.path.join(base_dir, "video")
    
    try:
        # List all files in the prefix
        s3_client = boto3.client(
            's3',
            region_name='us-gov-west-1',
            verify=False
        )
        
        response = s3_client.list_objects_v2(
            Bucket=bucket_name,
            Prefix=prefix
        )
        
        if 'Contents' not in response:
            print(f"No objects found in s3://{bucket_name}/{prefix}")
            return None
        
        # First try to download the MOV file
        mov_key = prefix + 'conf_room_14OCT-CAM.MOV'
        mov_path = os.path.join(video_dir, 'conf_room_14OCT-CAM.MOV')
        
        try:
            print(f"Attempting to download video file: {mov_key}")
            os.makedirs(video_dir, exist_ok=True)
            s3_client.download_file(bucket_name, mov_key, mov_path)
            print(f"Successfully downloaded video to {mov_path}")
            return mov_path
        except Exception as e:
            print(f"Error downloading MOV file: {e}")
            
        # If MOV fails, try the ZIP file
        zip_key = prefix + 'confroom_14OCT2025-CAM.zip'
        zip_path = os.path.join(video_dir, 'confroom_14OCT2025-CAM.zip')
        
        try:
            print(f"Attempting to download ZIP file: {zip_key}")
            os.makedirs(video_dir, exist_ok=True)
            s3_client.download_file(bucket_name, zip_key, zip_path)
            print(f"Successfully downloaded ZIP to {zip_path}")
            
            # Extract the ZIP file
            extract_dir = os.path.join(base_dir, "extracted")
            extract_zip_file(zip_path, extract_dir)
            
            # Look for video files in the extracted directory
            for root, _, files in os.walk(extract_dir):
                for file in files:
                    if file.lower().endswith(('.mov', '.mp4')):
                        video_path = os.path.join(root, file)
                        print(f"Found video in ZIP: {video_path}")
                        return video_path
            
            print("No video files found in the ZIP archive")
            return None
            
        except Exception as e:
            print(f"Error downloading ZIP file: {e}")
    
    except Exception as e:
        print(f"Error accessing S3: {e}")
    
    # If all else fails, try to download images directly
    print("Downloading images from previous dataset...")
    images_dir = os.path.join(base_dir, "images")
    download_from_s3(
        bucket_name,
        'post_model_dataset_jpgs/jpgs/jpgs/',
        images_dir,
        file_limit=100  # Limit to 100 images for testing
    )
    
    # Also download camera data if needed
    cameras_dir = os.path.join(base_dir, "cameras")
    download_from_s3(
        bucket_name,
        'post_model_dataset/cameras/cameras/',
        cameras_dir,
        file_limit=100
    )
    
    return None  # No video downloaded, but images are available

def run_complete_workflow_with_s3(target_label="dog",
                                 bucket_name='hiitsd-mt-visualdigitaltwin-conf-room',
                                 video_prefix='Conference-Room-14OCT2025-Camera/',
                                 video_filename='conf_room_14OCT-CAM.MOV',
                                 zip_filename='confroom_14OCT2025-CAM.zip',
                                 fallback_images_prefix='post_model_dataset_jpgs/jpgs/jpgs/',
                                 fallback_cameras_prefix='post_model_dataset/cameras/cameras/',
                                 clear_previous_data=True):
    """
    Run the complete workflow using data from S3
    Args:
        target_label: Object label to detect and project to 3D
        bucket_name: S3 bucket name
        video_prefix: Path prefix for the video files
        video_filename: Name of the video file to download
        zip_filename: Name of the zip file to download as fallback
        fallback_images_prefix: Path prefix for fallback images
        fallback_cameras_prefix: Path prefix for fallback camera data
        clear_previous_data: Whether to clear previous data before starting
    """
    print("=== Visual Digital Twin Workflow with S3 ===")
    
    # Set up directories
    images_dir = os.path.join(base_dir, "images")
    frames_dir = os.path.join(base_dir, "frames")
    colmap_dir = os.path.join(base_dir, "colmap")
    
    # Clear previous data if requested
    if clear_previous_data:
        print("\n=== Clearing previous data ===")
        for dir_path in [images_dir, frames_dir, colmap_dir]:
            if os.path.exists(dir_path):
                print(f"Clearing directory: {dir_path}")
                for item in os.listdir(dir_path):
                    item_path = os.path.join(dir_path, item)
                    try:
                        if os.path.isfile(item_path):
                            os.unlink(item_path)
                        elif os.path.isdir(item_path):
                            import shutil
                            shutil.rmtree(item_path)
                    except Exception as e:
                        print(f"Error while clearing {item_path}: {e}")
    
    # Phase 0: Download data from S3
    print("\n=== Phase 0: Download Data from S3 ===")
    video_path = download_video_from_s3(
        bucket_name=bucket_name,
        video_prefix=video_prefix,
        video_filename=video_filename,
        zip_filename=zip_filename,
        fallback_images_prefix=fallback_images_prefix,
        fallback_cameras_prefix=fallback_cameras_prefix
    )
    
    # Check if we have a video or images
    if video_path and os.path.exists(video_path):
        print(f"Using video file: {video_path}")
        # Phase 1: Frame extraction
        print("\n=== Phase 1: Frame Extraction ===")
        os.makedirs(frames_dir, exist_ok=True)
        os.makedirs(images_dir, exist_ok=True)
        
        extract_frames(video_path, frames_dir)
        
        # Clear images directory before copying new frames
        for file in os.listdir(images_dir):
            file_path = os.path.join(images_dir, file)
            if os.path.isfile(file_path):
                os.unlink(file_path)
        
        copy_to_images(frames_dir, images_dir)
        
        # Debug: Check what's in the images directory
        print(f"\nDebug: Checking images directory after frame extraction")
        image_files = os.listdir(images_dir)
        print(f"Found {len(image_files)} files in {images_dir}")
        if len(image_files) > 0:
            print(f"First few images: {image_files[:5]}")
    else:
        print("No video found, using downloaded images directly")
        # Check if we have images
        if not os.listdir(images_dir):
            print("❌ No images found. Please check S3 access and try again.")
            return
    
    # Phase 2: COLMAP reconstruction
    print("\n=== Phase 2: COLMAP Reconstruction ===")
    print(f"Debug: About to call run_colmap_pipeline() at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    try:
        timing_info = run_colmap_pipeline()
        print(f"Debug: run_colmap_pipeline() completed at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    except Exception as e:
        print(f"Error in COLMAP pipeline: {e}")
        import traceback
        traceback.print_exc()
        return
    
    # Phase 3: YOLO object detection
    print("\n=== Phase 3: Object Detection ===")
    try:
        run_yolo_detection()
    except Exception as e:
        print(f"Error in YOLO detection: {e}")
        import traceback
        traceback.print_exc()
        return
    
    # Phase 4: 2D to 3D projection
    print(f"\n=== Phase 4: 2D to 3D Projection ({target_label}) ===")
    try:
        project_detections_to_3d(target_label)
    except Exception as e:
        print(f"Error in 2D to 3D projection: {e}")
        import traceback
        traceback.print_exc()
        return
    
    # Phase 5: Create detection sphere
    print(f"\n=== Phase 5: Create Detection Sphere ===")
    try:
        create_detection_sphere(target_label)
    except Exception as e:
        print(f"Error in detection sphere creation: {e}")
        import traceback
        traceback.print_exc()
        return
    
    print("\n=== Workflow Complete! ===")
    print(f"Results are in: {os.path.join(base_dir, 'results')}")
    
    # Return timing information for potential analysis
    return timing_info

In [None]:
import cv2
import shutil

def extract_frames(video_path, output_dir, frame_rate=3):
    """
    Extract frames from a video file
    
    Args:
        video_path: Path to the video file
        output_dir: Directory to save extracted frames
        frame_rate: Number of frames to extract per second
    """
    print(f"Extracting frames from {video_path} to {output_dir}")
    os.makedirs(output_dir, exist_ok=True)
    
    # Open the video file
    video = cv2.VideoCapture(video_path)
    if not video.isOpened():
        print(f"Error: Could not open video {video_path}")
        return False
    
    # Get video properties
    fps = video.get(cv2.CAP_PROP_FPS)
    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    duration = total_frames / fps
    
    print(f"Video properties: {fps:.2f} fps, {total_frames} frames, {duration:.2f} seconds")
    
    # Calculate frame interval based on desired frame rate
    frame_interval = int(fps / frame_rate)
    if frame_interval < 1:
        frame_interval = 1
    
    # Extract frames
    count = 0
    frame_count = 0
    while True:
        ret, frame = video.read()
        if not ret:
            break
        
        # Save frame at specified intervals
        if count % frame_interval == 0:
            frame_path = os.path.join(output_dir, f"frame_{frame_count:04d}.jpg")
            cv2.imwrite(frame_path, frame)
            frame_count += 1
        
        count += 1
    
    video.release()
    print(f"Extracted {frame_count} frames from video")
    return True

def copy_to_images(frames_dir, images_dir):
    """
    Copy extracted frames to images directory
    
    Args:
        frames_dir: Directory containing extracted frames
        images_dir: Directory to copy frames to
    """
    print(f"Copying frames from {frames_dir} to {images_dir}")
    os.makedirs(images_dir, exist_ok=True)
    
    # Get list of frame files
    frame_files = [f for f in os.listdir(frames_dir) if f.endswith('.jpg')]
    
    # Copy each frame to images directory
    for frame_file in frame_files:
        src_path = os.path.join(frames_dir, frame_file)
        dst_path = os.path.join(images_dir, frame_file)
        shutil.copy2(src_path, dst_path)
    
    print(f"Copied {len(frame_files)} frames to images directory")
    return True

def run_colmap_pipeline():
    """Run COLMAP reconstruction pipeline"""
    print("Running COLMAP reconstruction...")
    # This would contain the actual COLMAP commands
    # For now, we'll just simulate it
    print("COLMAP reconstruction completed")
    return True

def run_yolo_detection():
    """Run YOLO object detection on images"""
    print("Running YOLO object detection...")
    # This would contain the actual YOLO detection code
    # For now, we'll just simulate it
    print("YOLO detection completed")
    return True

def project_detections_to_3d(target_label):
    """Project 2D detections to 3D space"""
    print(f"Projecting {target_label} detections to 3D...")
    # This would contain the actual projection code
    # For now, we'll just simulate it
    print("Projection completed")
    return True

def create_detection_sphere(target_label):
    """Create detection sphere for visualization"""
    print(f"Creating detection sphere for {target_label}...")
    # This would contain the actual sphere creation code
    # For now, we'll just simulate it
    print("Detection sphere created")
    return True

In [None]:
# Second cell - Create directory structure
# Create subdirectories
subdirs = [
    "video",           # For input video
    "images",          # Full image set (frames or photos)
    "images_down",     # Optional: downselected images for speed
    "colmap",          # COLMAP outputs
    "detections",      # YOLO outputs + detections.json
    "results",         # Final outputs (fused.ply, detections_3d.json)
    "scripts"          # Helper scripts
]

for subdir in subdirs:
    os.makedirs(os.path.join(base_dir, subdir), exist_ok=True)

print("✅ Directory structure created")

In [None]:
# Cell 0: Download data from S3 bucket
import boto3
import os
import warnings
import urllib3
import zipfile

# Suppress SSL warnings
warnings.filterwarnings('ignore', category=urllib3.exceptions.InsecureRequestWarning)

def download_from_s3(bucket_name, prefix, local_dir, file_limit=None):
    """
    Download files from S3 bucket to local directory
    Args:
        bucket_name: S3 bucket name
        prefix: Path prefix in the bucket
        local_dir: Local directory to save files
        file_limit: Maximum number of files to download (None for all)
    """
    # Configure S3 client
    s3_client = boto3.client(
        's3',
        region_name='us-gov-west-1',
        verify=False
    )
    
    # Create local directory if it doesn't exist
    os.makedirs(local_dir, exist_ok=True)
    
    # List objects in the bucket with the given prefix
    print(f"Listing objects in s3://{bucket_name}/{prefix}")
    response = s3_client.list_objects_v2(
        Bucket=bucket_name,
        Prefix=prefix
    )
    
    if 'Contents' not in response:
        print(f"No objects found in s3://{bucket_name}/{prefix}")
        return []
    
    # Get the list of objects
    objects = response['Contents']
    if file_limit:
        objects = objects[:file_limit]
    
    # Download each object
    downloaded_files = []
    for obj in objects:
        key = obj['Key']
        filename = os.path.basename(key)
        local_path = os.path.join(local_dir, filename)
        
        # Skip if file already exists
        if os.path.exists(local_path) and os.path.getsize(local_path) == obj['Size']:
            print(f"Skipping {filename} (already exists)")
            downloaded_files.append(local_path)
            continue
        
        # Download the file
        print(f"Downloading {key} ({obj['Size']/1024/1024:.2f} MB)")
        s3_client.download_file(
            bucket_name,
            key,
            local_path
        )
        downloaded_files.append(local_path)
    
    print(f"Downloaded {len(downloaded_files)} files to {local_dir}")
    return downloaded_files

def extract_zip_file(zip_path, extract_to):
    """Extract contents of a zip file"""
    print(f"Extracting {zip_path} to {extract_to}")
    os.makedirs(extract_to, exist_ok=True)
    
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    
    print(f"Extracted zip contents to {extract_to}")
    return extract_to

def download_video_from_s3(bucket_name='hiitsd-mt-visualdigitaltwin-conf-room', 
                          video_prefix='Conference-Room-14OCT2025-Camera/',
                          video_filename='conf_room_14OCT-CAM.MOV',
                          zip_filename='confroom_14OCT2025-CAM.zip',
                          fallback_images_prefix='post_model_dataset_jpgs/jpgs/jpgs/',
                          fallback_cameras_prefix='post_model_dataset/cameras/cameras/'):
    """
    Download video file from S3 bucket
    
    Args:
        bucket_name: S3 bucket name
        video_prefix: Path prefix for the video files
        video_filename: Name of the video file to download
        zip_filename: Name of the zip file to download as fallback
        fallback_images_prefix: Path prefix for fallback images
        fallback_cameras_prefix: Path prefix for fallback camera data
    """
    video_dir = os.path.join(base_dir, "video")
    
    try:
        # List all files in the prefix
        s3_client = boto3.client(
            's3',
            region_name='us-gov-west-1',
            verify=False
        )
        
        response = s3_client.list_objects_v2(
            Bucket=bucket_name,
            Prefix=video_prefix
        )
        
        if 'Contents' not in response:
            print(f"No objects found in s3://{bucket_name}/{video_prefix}")
            return None
        
        # First try to download the MOV file
        mov_key = video_prefix + video_filename
        mov_path = os.path.join(video_dir, video_filename)
        
        try:
            print(f"Attempting to download video file: {mov_key}")
            os.makedirs(video_dir, exist_ok=True)
            s3_client.download_file(bucket_name, mov_key, mov_path)
            print(f"Successfully downloaded video to {mov_path}")
            return mov_path
        except Exception as e:
            print(f"Error downloading video file: {e}")
            
        # If video fails, try the ZIP file
        zip_key = video_prefix + zip_filename
        zip_path = os.path.join(video_dir, zip_filename)
        
        try:
            print(f"Attempting to download ZIP file: {zip_key}")
            os.makedirs(video_dir, exist_ok=True)
            s3_client.download_file(bucket_name, zip_key, zip_path)
            print(f"Successfully downloaded ZIP to {zip_path}")
            
            # Extract the ZIP file
            extract_dir = os.path.join(base_dir, "extracted")
            extract_zip_file(zip_path, extract_dir)
            
            # Look for video files in the extracted directory
            for root, _, files in os.walk(extract_dir):
                for file in files:
                    if file.lower().endswith(('.mov', '.mp4')):
                        video_path = os.path.join(root, file)
                        print(f"Found video in ZIP: {video_path}")
                        return video_path
            
            print("No video files found in the ZIP archive")
            return None
            
        except Exception as e:
            print(f"Error downloading ZIP file: {e}")
    
    except Exception as e:
        print(f"Error accessing S3: {e}")
    
    # If all else fails, try to download images directly
    print("Downloading images from fallback dataset...")
    images_dir = os.path.join(base_dir, "images")
    download_from_s3(
        bucket_name,
        fallback_images_prefix,
        images_dir,
        file_limit=100  # Limit to 100 images for testing
    )
    
    # Also download camera data if needed
    cameras_dir = os.path.join(base_dir, "cameras")
    download_from_s3(
        bucket_name,
        fallback_cameras_prefix,
        cameras_dir,
        file_limit=100
    )
    
    return None  # No video downloaded, but images are available

In [None]:
import time
import os
import subprocess
from datetime import datetime

def run_colmap_pipeline():
    """Run the complete COLMAP pipeline with timing information and GPU acceleration"""
    # Start timing the entire pipeline
    pipeline_start_time = time.time()
    print(f"Starting COLMAP pipeline at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("Using NVIDIA L4 GPU for acceleration")
    
    colmap_dir = os.path.join(base_dir, "colmap_cropped")
    images_dir = os.path.join(base_dir, "cropped_frames")
    sparse_dir = os.path.join(colmap_dir, "sparse")
    dense_dir = os.path.join(colmap_dir, "dense")
    results_dir = os.path.join(base_dir, "results_cropped")
    
    os.makedirs(colmap_dir, exist_ok=True)
    os.makedirs(sparse_dir, exist_ok=True)
    os.makedirs(dense_dir, exist_ok=True)
    os.makedirs(results_dir, exist_ok=True)
    
    # Define the full path to COLMAP executable
    colmap_exe = r"C:\Users\zzballabe1\Documents\COLMAP\COLMAP.bat"
    
    # Dictionary to store timing information
    timing_info = {}
    
    # 1. Feature extraction with GPU
    print(f"\nStep 1: Feature extraction - Started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    step1_start = time.time()
    cmd = [
        colmap_exe, "feature_extractor",
        "--database_path", os.path.join(colmap_dir, "database.db"),
        "--image_path", images_dir,
        "--SiftExtraction.use_gpu", "1",
        "--SiftExtraction.gpu_index", "0",
        "--SiftExtraction.max_num_features", "8192",  # Default is 8192, increase if needed
        "--SiftExtraction.first_octave", "-1"         # Start at a lower octave to detect more features
    ]
    subprocess.run(cmd)
    step1_time = time.time() - step1_start
    timing_info['feature_extraction'] = step1_time
    print(f"Step 1: Feature extraction - Completed in {step1_time:.2f} seconds ({step1_time/60:.2f} minutes)")
    
    # 2. Feature matching with GPU
    print(f"\nStep 2: Feature matching - Started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    step2_start = time.time()

    cmd = [
        colmap_exe, "exhaustive_matcher",
        "--database_path", os.path.join(colmap_dir, "database.db"),
        "--SiftMatching.use_gpu", "1",
        "--SiftMatching.gpu_index", "0"
    ]
    
    subprocess.run(cmd)
    step2_time = time.time() - step2_start
    timing_info['feature_matching'] = step2_time
    print(f"Step 2: Feature matching - Completed in {step2_time:.2f} seconds ({step2_time/60:.2f} minutes)")
    
    # 3. Sparse reconstruction
    print(f"\nStep 3: Sparse reconstruction - Started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    step3_start = time.time()
    cmd = [
        colmap_exe, "mapper",
        "--database_path", os.path.join(colmap_dir, "database.db"),
        "--image_path", images_dir,
        "--output_path", sparse_dir
    ]
    subprocess.run(cmd)
    step3_time = time.time() - step3_start
    timing_info['sparse_reconstruction'] = step3_time
    print(f"Step 3: Sparse reconstruction - Completed in {step3_time:.2f} seconds ({step3_time/60:.2f} minutes)")
    
    # 4. Image undistortion
    print(f"\nStep 4: Image undistortion - Started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    step4_start = time.time()
    cmd = [
        colmap_exe, "image_undistorter",
        "--image_path", images_dir,
        "--input_path", os.path.join(sparse_dir, "0"),
        "--output_path", dense_dir,
        "--output_type", "COLMAP"
    ]
    subprocess.run(cmd)
    step4_time = time.time() - step4_start
    timing_info['image_undistortion'] = step4_time
    print(f"Step 4: Image undistortion - Completed in {step4_time:.2f} seconds ({step4_time/60:.2f} minutes)")
    
    # 5. Patch match stereo with GPU
    print(f"\nStep 5: Patch match stereo - Started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    step5_start = time.time()
    cmd = [
        colmap_exe, "patch_match_stereo",
        "--workspace_path", dense_dir,
        "--PatchMatchStereo.gpu_index", "0"
    ]
    subprocess.run(cmd)
    step5_time = time.time() - step5_start
    timing_info['patch_match_stereo'] = step5_time
    print(f"Step 5: Patch match stereo - Completed in {step5_time:.2f} seconds ({step5_time/60:.2f} minutes)")
    
    # 6. Stereo fusion
    print(f"\nStep 6: Stereo fusion - Started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    step6_start = time.time()
    cmd = [
        colmap_exe, "stereo_fusion",
        "--workspace_path", dense_dir,
        "--output_path", os.path.join(results_dir, "fused.ply")
    ]
    subprocess.run(cmd)
    step6_time = time.time() - step6_start
    timing_info['stereo_fusion'] = step6_time
    print(f"Step 6: Stereo fusion - Completed in {step6_time:.2f} seconds ({step6_time/60:.2f} minutes)")
    
    # 7. Export model to TXT format
    print(f"\nStep 7: Exporting model to TXT format - Started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    model_txt_dir = os.path.join(colmap_dir, "model_txt", "0")
    os.makedirs(os.path.dirname(model_txt_dir), exist_ok=True)
    step7_start = time.time()
    cmd = [
        colmap_exe, "model_converter",
        "--input_path", os.path.join(sparse_dir, "0"),
        "--output_path", model_txt_dir,
        "--output_type", "TXT"
    ]
    subprocess.run(cmd)
    step7_time = time.time() - step7_start
    timing_info['model_export'] = step7_time
    print(f"Step 7: Model export - Completed in {step7_time:.2f} seconds ({step7_time/60:.2f} minutes)")
    
    # Calculate total pipeline time
    total_pipeline_time = time.time() - pipeline_start_time
    timing_info['total_pipeline_time'] = total_pipeline_time
    
    # Print summary
    print("\n" + "="*50)
    print("COLMAP PIPELINE TIMING SUMMARY (GPU-ACCELERATED)")
    print("="*50)
    print(f"Step 1: Feature extraction:    {timing_info['feature_extraction']:.2f} seconds ({timing_info['feature_extraction']/60:.2f} minutes)")
    print(f"Step 2: Feature matching:      {timing_info['feature_matching']:.2f} seconds ({timing_info['feature_matching']/60:.2f} minutes)")
    print(f"Step 3: Sparse reconstruction: {timing_info['sparse_reconstruction']:.2f} seconds ({timing_info['sparse_reconstruction']/60:.2f} minutes)")
    print(f"Step 4: Image undistortion:    {timing_info['image_undistortion']:.2f} seconds ({timing_info['image_undistortion']/60:.2f} minutes)")
    print(f"Step 5: Patch match stereo:    {timing_info['patch_match_stereo']:.2f} seconds ({timing_info['patch_match_stereo']/60:.2f} minutes)")
    print(f"Step 6: Stereo fusion:         {timing_info['stereo_fusion']:.2f} seconds ({timing_info['stereo_fusion']/60:.2f} minutes)")
    print(f"Step 7: Model export:          {timing_info['model_export']:.2f} seconds ({timing_info['model_export']/60:.2f} minutes)")
    print("-"*50)
    print(f"TOTAL PIPELINE TIME:           {total_pipeline_time:.2f} seconds ({total_pipeline_time/60:.2f} minutes)")
    print("="*50)
    
    # Final completion messages
    print("\n✅ COLMAP pipeline completed with GPU acceleration")
    print(f"✅ Dense point cloud saved to: {os.path.join(results_dir, 'fused.ply')}")
    print(f"✅ Model exported to TXT format: {model_txt_dir}")
    
    # Return timing information for potential further analysis
    return timing_info

In [None]:
# Fifth cell - YOLO detection function
def run_yolo_detection():
    print("Running yolo detection")
    """Run YOLOv8 detection on images and save results"""
    img_dir = os.path.join(base_dir, "images")
    out_dir = os.path.join(base_dir, "detections")
    os.makedirs(out_dir, exist_ok=True)
    
    # Check if model exists locally first
    model_path = os.path.join(base_dir, "yolov8m.pt")
    if not os.path.exists(model_path):
        print("Model not found locally, downloading with SSL verification disabled...")
        import requests
        import warnings
        import urllib3
        # Suppress SSL warnings
        warnings.filterwarnings('ignore', category=urllib3.exceptions.InsecureRequestWarning)
        # Download the model with SSL verification disabled
        url = "https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8m.pt"
        response = requests.get(url, verify=False, stream=True)
        with open(model_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"Model downloaded to {model_path}")
    
    # Install YOLOv8 if not already installed
    try:
        from ultralytics import YOLO
    except ImportError:
        print("Installing ultralytics...")
        !pip install ultralytics
        from ultralytics import YOLO
    
    # Load YOLOv8 model from local file
    model = YOLO(model_path)
    
    # Run inference
    results = model.predict(
        source=img_dir,
        conf=0.4,
        save=True,
        project=out_dir,
        name="yolo_out",
        verbose=True
    )
    
    # Pack results into a single JSON
    det_json = {}
    for r in results:
        img_name = os.path.basename(r.path)
        per_image = []
        for b in r.boxes:
            xyxy = b.xyxy[0].tolist()  # [x1,y1,x2,y2]
            cls = int(b.cls[0].item())
            conf = float(b.conf[0].item())
            per_image.append({
                "label": model.names[cls],
                "bbox": [float(xyxy[0]), float(xyxy[1]), float(xyxy[2]), float(xyxy[3])],
                "confidence": conf
            })
        det_json[img_name] = per_image
    
    out_json = os.path.join(out_dir, "detections.json")
    with open(out_json, "w") as f:
        json.dump(det_json, f, indent=2)
    
    print(f"✅ Wrote {out_json}")
    print(f"✅ Annotated images: {os.path.join(out_dir, 'yolo_out')}")
    return det_json

In [None]:
# Sixth cell - 2D to 3D projection functions
def load_points3D_txt(path):
    pts = {}
    with open(os.path.join(path, "points3D.txt"), "r", encoding="utf-8") as f:
        for line in f:
            if line.startswith("#") or not line.strip():
                continue
            parts = line.strip().split()
            pid = int(parts[0])
            x, y, z = map(float, parts[1:4])
            pts[pid] = (x, y, z)
    return pts

def load_images_txt(path):
    """
    Returns dict: name -> {"id": int, "xys": Nx2 float array, "pids": Nx int array}
    Only what we need: the 2D points (x,y) and their linked 3D point IDs.
    """
    name_to_data = {}
    with open(os.path.join(path, "images.txt"), "r", encoding="utf-8") as f:
        lines = [ln.strip() for ln in f if ln.strip()]
    
    i = 0
    while i < len(lines):
        if lines[i].startswith("#"):
            i += 1
            continue
        head = lines[i].split()  # IMAGE_ID qw qx qy qz tx ty tz CAMERA_ID NAME
        if len(head) < 10:
            i += 1
            continue
        img_id = int(head[0])
        name = head[9]
        i += 1
        if i >= len(lines):
            break
        
        xy_pid_line = lines[i]  # x y POINT3D_ID repeating
        i += 1
        
        arr = xy_pid_line.split()
        xs, ys, pids = [], [], []
        for k in range(0, len(arr), 3):
            try:
                x = float(arr[k]); y = float(arr[k+1]); pid = int(arr[k+2])
            except Exception:
                break
            xs.append(x); ys.append(y); pids.append(pid)
        
        xys = np.stack([np.array(xs), np.array(ys)], axis=1) if xs else np.zeros((0,2))
        pids = np.array(pids, dtype=np.int64) if pids else np.zeros((0,), dtype=np.int64)
        name_to_data[name] = {"id": img_id, "xys": xys, "pids": pids}
    return name_to_data

def bbox_filter(xys, pids, bbox, min_pts=6):
    """ Return 3D point IDs whose 2D projections fall inside bbox [x1,y1,x2,y2]. """
    if xys.shape[0] == 0:
        return np.array([], dtype=np.int64)
    x1, y1, x2, y2 = bbox
    mask = (xys[:,0] >= x1) & (xys[:,0] <= x2) & (xys[:,1] >= y1) & (xys[:,1] <= y2) & (pids != -1)
    sel = pids[mask]
    if sel.size < min_pts:
        return np.array([], dtype=np.int64)
    return sel

def robust_center(points_xyz):
    """ Median is robust against outliers. """
    pts = np.array(points_xyz, dtype=float)
    if pts.shape[0] == 0:
        return None
    return np.median(pts, axis=0)

In [None]:
# Seventh cell - 2D to 3D projection main function
def project_detections_to_3d(target_label="dog"):
    """Project 2D detections to 3D space"""
    
    print("Project detection_to_3d")

    # Install Open3D if not already installed
    try:
        import open3d as o3d
    except ImportError:
        print("Installing open3d...")
        !pip install open3d --trusted-host pypi.org --trusted-host files.pythonhosted.org
        import open3d as o3d
    
    txt_model = os.path.join(base_dir, "colmap", "model_txt", "0")
    dets_json = os.path.join(base_dir, "detections", "detections.json")
    fused_ply = os.path.join(base_dir, "results", "fused.ply")
    out_ply = os.path.join(base_dir, "results", f"{target_label}_markers.ply")
    out_json = os.path.join(base_dir, "results", f"{target_label}_detections_3d.json")
    
    os.makedirs(os.path.dirname(out_ply), exist_ok=True)
    
    pts3d = load_points3D_txt(txt_model)
    imgs = load_images_txt(txt_model)
    
    with open(dets_json, "r", encoding="utf-8") as f:
        dets = json.load(f)
    
    centers = []
    det3d_records = []
    
    for img_name, det_list in dets.items():
        # normalize name if detections used full paths
        base = os.path.basename(img_name)
        key = img_name if img_name in imgs else (base if base in imgs else None)
        if key is None:
            continue
        
        xys = imgs[key]["xys"]
        pids = imgs[key]["pids"]
        
        for det in det_list:
            if det["label"].lower() != target_label.lower():
                continue
            x1, y1, x2, y2 = det["bbox"]
            sel_pids = bbox_filter(xys, pids, (x1, y1, x2, y2), min_pts=6)
            if sel_pids.size == 0:
                continue
            
            xyzs = [pts3d[pid] for pid in sel_pids if pid in pts3d]
            if not xyzs:
                continue
            
            center = robust_center(xyzs)
            if center is None:
                continue
            
            centers.append(center.tolist())
            det3d_records.append({
                "image": key,
                "label": target_label,
                "bbox": det["bbox"],
                "num_points_3d": len(xyzs),
                "center_xyz": center.tolist()
            })
    
    with open(out_json, "w", encoding="utf-8") as f:
        json.dump({"detections_3d": det3d_records}, f, indent=2)
    print(f"✅ Wrote {out_json}  ({len(det3d_records)} {target_label} instances)")
    
    if not centers:
        print(f"⚠️ No {target_label} centers estimated. Check that YOLO produced '{target_label}' detections and image names match COLMAP.")
        return
    
    # markers cloud (red)
    markers = o3d.geometry.PointCloud()
    markers.points = o3d.utility.Vector3dVector(np.array(centers))
    colors = np.tile(np.array([[1.0, 0.0, 0.0]]), (len(centers), 1))
    markers.colors = o3d.utility.Vector3dVector(colors)
    o3d.io.write_point_cloud(out_ply, markers)
    print(f"✅ Wrote {out_ply}")
    
    # quick viewer
    if os.path.exists(fused_ply):
        scene = o3d.io.read_point_cloud(fused_ply)
        if len(np.asarray(scene.points)) > 0:
            scene.colors = o3d.utility.Vector3dVector(
                np.tile(np.array([[0.8, 0.8, 0.8]]), (len(scene.points), 1))
            )
        o3d.visualization.draw_geometries([scene, markers])
    else:
        o3d.visualization.draw_geometries([markers])

In [None]:
# Eighth cell - Create detection sphere
print("Running create_detection_sphere")

def create_detection_sphere(target_label="dog", eps=0.40, padding=0.10, min_radius=0.15, max_radius=1.00, resolution=48):
    """Merge nearby detections into one cluster and export ONE sphere that encloses them."""

    # Install Open3D if not already installed
    try:
        import open3d as o3d
    except ImportError:
        print("Installing open3d...")
        !pip install open3d
        import open3d as o3d
    
    in_json = os.path.join(base_dir, "results", f"{target_label}_detections_3d.json")
    out_ply = os.path.join(base_dir, "results", f"{target_label}_sphere_single.ply")
    fused_ply = os.path.join(base_dir, "results", "fused.ply")
    
    # Greedy radius clustering
    def cluster_points(points, eps=0.40):
        """Greedy radius clustering. eps in meters. Returns list of clusters (each a list of indices)."""
        if len(points) == 0:
            return []
        pts = np.asarray(points, dtype=float)
        n = pts.shape[0]
        used = np.zeros(n, dtype=bool)
        clusters = []
        for i in range(n):
            if used[i]:
                continue
            center = pts[i]
            d = np.linalg.norm(pts - center, axis=1)
            members = np.where(d <= eps)[0]
            used[members] = True
            clusters.append(members.tolist())
        return clusters
    
    # load 3D centers
    with open(in_json, "r", encoding="utf-8") as f:
        dets = json.load(f).get("detections_3d", [])
    centers = [d["center_xyz"] for d in dets]
    if not centers:
        print(f"⚠️ No centers found. Run projection to 3D first.")
        return
    
    # cluster & pick the largest group
    clusters = cluster_points(centers, eps=eps)
    # if everything is very close, you'll likely get a single cluster already
    best = max(clusters, key=len)
    P = np.asarray([centers[i] for i in best], dtype=float)
    
    # sphere center = cluster centroid (or median for robustness)
    center = np.median(P, axis=0)
    
    # sphere radius = max distance to center + padding
    dists = np.linalg.norm(P - center[None, :], axis=1)
    r = float(np.max(dists)) + float(padding)
    r = float(np.clip(r, min_radius, max_radius))
    
    # build sphere mesh
    sphere = o3d.geometry.TriangleMesh.create_sphere(radius=r, resolution=resolution)
    sphere.compute_vertex_normals()
    sphere.paint_uniform_color([1.0, 0.0, 0.0])  # red
    sphere.translate(center.tolist())
    
    os.makedirs(os.path.dirname(out_ply), exist_ok=True)
    o3d.io.write_triangle_mesh(out_ply, sphere)
    print(f"✅ Wrote {out_ply}  (center={center.tolist()}, radius={r:.3f} m, cluster_size={len(best)})")
    
    # quick preview (optional)
    geoms = []
    if os.path.exists(fused_ply):
        cloud = o3d.io.read_point_cloud(fused_ply)
        geoms.append(cloud)
    geoms.append(sphere)
    if geoms:
        o3d.visualization.draw_geometries(geoms)

In [None]:
# Ninth cell - Complete workflow execution
print("Running run_complete_workflow")
print("=== Visual Digital Twin Workflow ===")

def run_complete_workflow(target_label="dog", video_path=None):
    """Run the complete workflow from video to 3D detection"""
    
    # Check if video exists
    if video_path is None:
        video_path = os.path.join(base_dir, "video", "room.MP4")
    
    if not os.path.exists(video_path):
        print(f"❌ Video not found: {video_path}")
        print("Please place your video in the video directory first.")
        return
    
    # Phase 1: Frame extraction
    print("\n=== Phase 1: Frame Extraction ===")
    frames_dir = os.path.join(base_dir, "frames")
    images_dir = os.path.join(base_dir, "images")
    extract_frames(video_path, frames_dir)
    copy_to_images(frames_dir, images_dir)
    
    # Phase 2: COLMAP reconstruction
    print("\n=== Phase 2: COLMAP Reconstruction ===")
    run_colmap_pipeline()
    
    # Phase 3: YOLO object detection
    print("\n=== Phase 3: Object Detection ===")
    run_yolo_detection()
    
    # Phase 4: 2D to 3D projection
    print(f"\n=== Phase 4: 2D to 3D Projection ({target_label}) ===")
    project_detections_to_3d(target_label)
    
    # Phase 5: Create detection sphere
    print(f"\n=== Phase 5: Create Detection Sphere ===")
    create_detection_sphere(target_label)
    
    print("\n=== Workflow Complete! ===")
    print(f"Results are in: {os.path.join(base_dir, 'results')}")


In [None]:
# Example usage:
run_complete_workflow_with_s3(target_label="dog")

# Next

In [None]:
import os
colmap_dir = r"C:\Users\zzballabe1\Documents\COLMAP"
print("Files in COLMAP directory:", os.listdir(colmap_dir))

def run_colmap_pipeline():
    """Run the complete COLMAP pipeline, skipping steps that are already completed"""
    colmap_dir = os.path.join(base_dir, "colmap")
    images_dir = os.path.join(base_dir, "images")
    sparse_dir = os.path.join(colmap_dir, "sparse")
    dense_dir = os.path.join(colmap_dir, "dense")
    results_dir = os.path.join(base_dir, "results")
    os.makedirs(colmap_dir, exist_ok=True)
    os.makedirs(sparse_dir, exist_ok=True)
    os.makedirs(dense_dir, exist_ok=True)
    os.makedirs(results_dir, exist_ok=True)
    
    # Define the full path to COLMAP executable
    colmap_exe = r"C:\Users\zzballabe1\Documents\COLMAP\COLMAP.bat"
    
    # Check for database file
    db_path = os.path.join(colmap_dir, "database.db")
    if not os.path.exists(db_path):
        # 1. Feature extraction
        print("Step 1: Feature extraction")
        cmd = [
            colmap_exe, "feature_extractor",
            "--database_path", db_path,
            "--image_path", images_dir
        ]
        subprocess.run(cmd)
    else:
        print("✅ Step 1: Feature extraction already completed")
    
    # Check if feature matching is done (harder to detect, check database size)
    if os.path.exists(db_path) and os.path.getsize(db_path) < 10000000:  # Arbitrary threshold
        # 2. Feature matching
        print("Step 2: Feature matching")
        cmd = [
            colmap_exe, "exhaustive_matcher",
            "--database_path", db_path
        ]
        subprocess.run(cmd)
    else:
        print("✅ Step 2: Feature matching already completed")
    
    # Check for sparse reconstruction
    sparse_0_dir = os.path.join(sparse_dir, "0")
    if not os.path.exists(sparse_0_dir) or len(os.listdir(sparse_0_dir)) < 4:
        # 3. Sparse reconstruction
        print("Step 3: Sparse reconstruction")
        cmd = [
            colmap_exe, "mapper",
            "--database_path", db_path,
            "--image_path", images_dir,
            "--output_path", sparse_dir
        ]
        subprocess.run(cmd)
    else:
        print("✅ Step 3: Sparse reconstruction already completed")
    
    # Check for image undistortion
    dense_images_dir = os.path.join(dense_dir, "images")
    if not os.path.exists(dense_images_dir) or len(os.listdir(dense_images_dir)) < 10:
        # 4. Image undistortion
        print("Step 4: Image undistortion")
        cmd = [
            colmap_exe, "image_undistorter",
            "--image_path", images_dir,
            "--input_path", os.path.join(sparse_dir, "0"),
            "--output_path", dense_dir,
            "--output_type", "COLMAP"
        ]
        subprocess.run(cmd)
    else:
        print("✅ Step 4: Image undistortion already completed")
    
    # Check for patch match stereo completion
    stereo_dir = os.path.join(dense_dir, "stereo")
    depth_maps_dir = os.path.join(stereo_dir, "depth_maps") if os.path.exists(stereo_dir) else None
    
    # Count number of images to process
    if os.path.exists(dense_images_dir):
        image_files = [f for f in os.listdir(dense_images_dir) if f.endswith('.jpg') or f.endswith('.png')]
        total_images = len(image_files)
    else:
        total_images = 0
    
    # Check if all depth maps are created
    if not depth_maps_dir or not os.path.exists(depth_maps_dir) or len(os.listdir(depth_maps_dir)) < total_images:
        # 5. Patch match stereo
        print("Step 5: Patch match stereo")
        cmd = [
            colmap_exe, "patch_match_stereo",
            "--workspace_path", dense_dir
        ]
        subprocess.run(cmd)
    else:
        print("✅ Step 5: Patch match stereo already completed")
    
    # Check for final point cloud
    fused_ply = os.path.join(results_dir, "fused.ply")
    if not os.path.exists(fused_ply):
        # 6. Stereo fusion
        print("Step 6: Stereo fusion")
        cmd = [
            colmap_exe, "stereo_fusion",
            "--workspace_path", dense_dir,
            "--output_path", fused_ply
        ]
        subprocess.run(cmd)
    else:
        print("✅ Step 6: Stereo fusion already completed")
    
    # Check for model TXT export
    model_txt_dir = os.path.join(colmap_dir, "model_txt", "0")
    if not os.path.exists(model_txt_dir) or len(os.listdir(model_txt_dir)) < 3:
        # Export model to TXT format for later use
        os.makedirs(os.path.dirname(model_txt_dir), exist_ok=True)
        print("Step 7: Exporting model to TXT format")
        cmd = [
            colmap_exe, "model_converter",
            "--input_path", os.path.join(sparse_dir, "0"),
            "--output_path", model_txt_dir,
            "--output_type", "TXT"
        ]
        subprocess.run(cmd)
    else:
        print("✅ Step 7: Model export to TXT already completed")
    
    # Final check
    if os.path.exists(fused_ply) and os.path.exists(model_txt_dir):
        print("✅ COLMAP pipeline completed successfully")
        print(f"✅ Dense point cloud saved to: {fused_ply}")
        print(f"✅ Model exported to TXT format: {model_txt_dir}")
    else:
        print("⚠️ COLMAP pipeline not fully completed")
        if not os.path.exists(fused_ply):
            print("  Missing final point cloud")
        if not os.path.exists(model_txt_dir):
            print("  Missing TXT model export")

# Visual Digital Twin Workflow Documentation

## Overview

This workflow transforms a video of a space into a 3D digital twin with object detection capabilities. It combines computer vision, photogrammetry, and 3D visualization to create an accurate spatial representation with identified objects.

## Software Requirements

1. **Python Libraries**:
   - `numpy`: For numerical operations
   - `open3d`: For 3D point cloud and mesh processing
   - `ultralytics`: For YOLOv8 object detection
   - `opencv-python`: For image processing

2. **External Software**:
   - `COLMAP`: Structure-from-Motion and Multi-View Stereo software
   - `FFmpeg`: For video frame extraction

## Workflow Phases

### Cell 1: Setup and Imports
Imports necessary libraries and sets up the base directory structure. This cell handles basic configuration and suppresses SSL warnings that might occur in secure environments.

### Cell 2: Directory Structure Creation
Creates the folder structure needed for the workflow:
- `video`: Stores input video files
- `images`: Contains extracted video frames
- `images_down`: Optional folder for downsampled images
- `colmap`: Stores COLMAP reconstruction outputs
- `detections`: Contains object detection results
- `results`: Stores final outputs (point clouds, 3D detections)
- `scripts`: Helper scripts

### Cell 3: Frame Extraction Functions
Defines functions to extract frames from video at a specified frame rate using FFmpeg. The extracted frames are then copied to the images directory for further processing.

**What it does**: Converts video into a sequence of still images that can be used for 3D reconstruction.

### Cell 4: COLMAP Reconstruction Pipeline
Implements the complete COLMAP photogrammetry pipeline:
1. Feature extraction: Detects distinctive points in each image
2. Feature matching: Finds correspondences between images
3. Sparse reconstruction: Calculates camera positions and a sparse point cloud
4. Dense reconstruction: Creates a detailed 3D point cloud

**What it does**: Transforms 2D images into a 3D representation of the environment.

### Cell 5: YOLO Object Detection
Runs YOLOv8 object detection on the extracted frames to identify objects of interest. Results are saved as annotated images and a structured JSON file containing bounding box coordinates and object labels.

**What it does**: Identifies objects in the 2D images with their precise locations.

### Cell 6: 2D to 3D Projection Helper Functions
Defines utility functions for loading COLMAP's output files and processing the 3D data:
- Loading 3D points from COLMAP's text format
- Loading image data with 2D-to-3D point correspondences
- Filtering points within bounding boxes
- Computing robust center points

**What it does**: Provides tools to bridge between 2D detections and 3D space.

### Cell 7: 2D to 3D Projection Main Function
Projects 2D object detections into 3D space by:
1. Finding 3D points that project into the object's bounding box
2. Computing the median position of these points as the object's 3D location
3. Saving the 3D positions as colored markers and a structured JSON file

**What it does**: Places detected objects in their correct 3D positions within the reconstructed space.

### Cell 8: Detection Sphere Creation
Creates a sphere that encompasses all instances of a detected object:
1. Clusters nearby detections of the same object
2. Computes the center and radius to enclose all instances
3. Generates a 3D sphere mesh and visualizes it within the point cloud

**What it does**: Creates a visual representation of the object's location and approximate size in 3D space.

### Cell 9: Complete Workflow Execution
Provides a function to run the entire pipeline from start to finish:
1. Frame extraction from video
2. COLMAP 3D reconstruction
3. YOLO object detection
4. 2D to 3D projection
5. Detection sphere creation

**What it does**: Orchestrates the entire process from video input to 3D visualization with detected objects.

## Practical Applications

This workflow enables:
- Creating accurate 3D models of physical spaces
- Identifying and localizing objects within those spaces
- Visualizing object positions in 3D
- Measuring spatial relationships between objects
- Supporting augmented reality and digital twin applications

The resulting 3D model with object detections can be used for space planning, asset tracking, virtual tours, and integration with CAD systems.

# Visual Digital Twin Workflow: Inputs and Outputs

## Cell 1: Setup and Imports
**Inputs:** None  
**Outputs:** Configured environment with base directory

## Cell 2: Directory Structure Creation
**Inputs:** Base directory path  
**Outputs:** Created directories:
- `video/`: For input video files
- `images/`: For extracted frames
- `images_down/`: For downsampled images (optional)
- `colmap/`: For COLMAP reconstruction outputs
- `detections/`: For object detection results
- `results/`: For final outputs
- `scripts/`: For helper scripts

## Cell 3: Frame Extraction Functions
**Inputs:** Video file (e.g., `room.MP4`)  
**Outputs:** 
- Extracted frames in `frames/` directory
- Copied frames in `images/` directory

## Cell 4: COLMAP Reconstruction Pipeline
**Inputs:** Images in `images/` directory  
**Outputs:**
- COLMAP database: `colmap/database.db`
- Sparse reconstruction: `colmap/sparse/`
- Dense reconstruction: `colmap/dense/`
- Text model: `colmap/model_txt/0/`
- Point cloud: `results/fused.ply`

## Cell 5: YOLO Object Detection
**Inputs:**
- Images in `images/` directory
- YOLOv8 model (downloaded automatically)  

**Outputs:**
- Annotated images: `detections/yolo_out/`
- Detection JSON: `detections/detections.json`

## Cell 6: 2D to 3D Projection Helper Functions
**Inputs:** None (defines functions only)  
**Outputs:** Helper functions for 2D-to-3D projection

## Cell 7: 2D to 3D Projection Main Function
**Inputs:**
- COLMAP text model: `colmap/model_txt/0/`
- Detection JSON: `detections/detections.json`
- Point cloud: `results/fused.ply` (for visualization)  

**Outputs:**
- 3D markers: `results/<target_label>_markers.ply`
- 3D detections JSON: `results/<target_label>_detections_3d.json`
- 3D visualization (interactive)

## Cell 8: Detection Sphere Creation
**Inputs:**
- 3D detections JSON: `results/<target_label>_detections_3d.json`
- Point cloud: `results/fused.ply` (for visualization)  

**Outputs:**
- 3D sphere mesh: `results/<target_label>_sphere_single.ply`
- 3D visualization (interactive)

## Cell 9: Complete Workflow Execution
**Inputs:**
- Video file
- Target object label (e.g., 'dog')  

**Outputs:** All outputs from cells 1-8

## File Format Details
- **PLY**: Polygon File Format for 3D data
- **JSON**: JavaScript Object Notation for structured data
- **DB**: SQLite database used by COLMAP
- **TXT**: Text files containing camera parameters and 3D points