# Code to  crate 3D digital twin form live realtime feed of came

In [None]:
# Install necessary dependencies
!pip install torch torchvision opencv-python matplotlib open3d numpy ultralytics transformers timm pillow psutil pycolmap

# Import required libraries
import os
import cv2
import torch
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import open3d as o3d
import time
from transformers import DPTImageProcessor, DPTForDepthEstimation
import psutil
import pycolmap

# Set up device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Load advanced depth estimation model
image_processor = DPTImageProcessor.from_pretrained("Intel/dpt-large")
depth_model = DPTForDepthEstimation.from_pretrained("Intel/dpt-large").to(device)

# Function to run PyCOLMAP for camera pose and intrinsic estimation
def run_pycolmap(image_folder, workspace_folder):
    # Feature extraction
    pycolmap.extract_features(database_path=f"{workspace_folder}/database.db", image_path=image_folder)
    
    # Exhaustive matching
    pycolmap.match_exhaustive(database_path=f"{workspace_folder}/database.db")
    
    # Sparse reconstruction
    sparse_folder = f"{workspace_folder}/sparse"
    os.makedirs(sparse_folder, exist_ok=True)
    pycolmap.mapper(database_path=f"{workspace_folder}/database.db", image_path=image_folder, output_path=sparse_folder)
    
    # Dense reconstruction
    dense_folder = f"{workspace_folder}/dense"
    os.makedirs(dense_folder, exist_ok=True)
    pycolmap.undistort_images(image_path=image_folder, input_path=f"{sparse_folder}/0", output_path=dense_folder)
    pycolmap.patch_match_stereo(workspace_path=dense_folder)
    pycolmap.stereo_fusion(workspace_path=dense_folder, output_path=f"{dense_folder}/fused.ply")
    
    # Export model
    pycolmap.convert_model(input_path=f"{sparse_folder}/0", output_path=workspace_folder, output_type="TXT")

# Video to Point Cloud Converter
class VideoToTwinConverter:
    def __init__(self, batch_size=32):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.depth_model = depth_model
        self.image_processor = image_processor
        self.batch_size = batch_size

    def _generate_pointcloud(self, rgb, depth):
        h, w = depth.shape
        fx, fy = 0.8*w, 0.8*h  # Simplified focal length

        yy, xx = np.meshgrid(np.arange(h), np.arange(w), indexing='ij')
        z = depth * 100  # Scale factor
        x = (xx - w/2) * z / fx
        y = (yy - h/2) * z / fy

        points = np.vstack((x.ravel(), y.ravel(), z.ravel())).T
        colors = rgb.reshape(-1, 3)/255.0

        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(points)
        pcd.colors = o3d.utility.Vector3dVector(colors)
        return pcd

    def process_frames(self, frames):
        # Preprocess frames
        inputs = self.image_processor(images=frames, return_tensors="pt").to(self.device)

        # Depth prediction
        with torch.no_grad():
            outputs = self.depth_model(**inputs)
            depths = outputs.predicted_depth.squeeze().cpu().numpy()

        # Generate point clouds
        pointclouds = [self._generate_pointcloud(frames[i], depths[i]) for i in range(len(frames))]
        return pointclouds

# Main Execution
if __name__ == "__main__":
    # Initialize components
    converter = VideoToTwinConverter(batch_size=32)

    # Open the video file
    video_path = '/path/to/your/video.mp4'  # Update this path to your video file
    cap = cv2.VideoCapture(video_path)

    frames = []
    image_folder = './datafiles/custom/JPEGImages/640p/custom'
    workspace_folder = './datafiles/custom/triangulation'
    os.makedirs(image_folder, exist_ok=True)
    os.makedirs(workspace_folder, exist_ok=True)

    start_time = time.time()
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frames.append(frame)
        frame_path = os.path.join(image_folder, f"frame_{frame_count:04d}.jpg")
        cv2.imwrite(frame_path, frame)
        frame_count += 1

        # Process frames in batches
        if len(frames) == converter.batch_size:
            pointclouds = converter.process_frames(frames)
            frames = []

            # Save point clouds to files
            for i, pcd in enumerate(pointclouds):
                o3d.io.write_point_cloud(f"pointcloud_{frame_count - len(pointclouds) + i:04d}.ply", pcd)

            # Monitor system and GPU memory usage
            system_memory = psutil.virtual_memory()
            gpu_memory = torch.cuda.memory_allocated()
            print(f"System Memory Usage: {system_memory.percent}%")
            print(f"GPU Memory Usage: {gpu_memory / (1024 ** 2)} MB")

    # Process remaining frames
    if frames:
        pointclouds = converter.process_frames(frames)
        for i, pcd in enumerate(pointclouds):
            o3d.io.write_point_cloud(f"pointcloud_{frame_count - len(pointclouds) + i:04d}.ply", pcd)

    cap.release()

    # Run PyCOLMAP to estimate camera pose and intrinsic parameters
    run_pycolmap(image_folder, workspace_folder)

    end_time = time.time()
    print(f"Total processing time: {end_time - start_time} seconds")

    # Visualize saved point clouds
    for i in range(frame_count):
        pcd = o3d.io.read_point_cloud(f"pointcloud_{i:04d}.ply")
        o3d.visualization.draw_geometries([pcd])