In [21]:
!pip install -q streamlit ultralytics supervision opencv-python-headless pillow python-magic numpy pandas
!npm install localtunnel

[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K
up to date, audited 23 packages in 960ms
[1G[0K⠦[1G[0K
[1G[0K⠦[1G[0K3 packages are looking for funding
[1G[0K⠦[1G[0K  run `npm fund` for details
[1G[0K⠦[1G[0K
2 [31m[1mhigh[22m[39m severity vulnerabilities

To address all issues (including breaking changes), run:
  npm audit fix --force

Run `npm audit` for details.
[1G[0K⠦[1G[0K

In [22]:
!mkdir -p src/utils
!mkdir -p model
!mkdir -p output
!mkdir -p sample_images
!mkdir -p sample_videos

In [23]:
!wget https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt -O model/yolov8n.pt

--2025-04-13 15:57:05--  https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/521807533/732c503e-9fcb-4a82-be7f-106baafbda15?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250413%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250413T155705Z&X-Amz-Expires=300&X-Amz-Signature=3371f64ab7b1e58cade034f9f34d87aaeb50d719fdaf36e0969532d585b282fe&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Dyolov8n.pt&response-content-type=application%2Foctet-stream [following]
--2025-04-13 15:57:05--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/521807533/732c503e-9fcb-4a82-be7f-106baafbda15?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=re

In [24]:
%%writefile src/__init__.py
# Makes src a Python package
from .detect import ObjectDetector
from .image_test import process_image, process_directory
from .video_test import process_video

Overwriting src/__init__.py


In [25]:
%%writefile src/utils/__init__.py
# Makes utils a Python package
from .security import validate_file_upload

Overwriting src/utils/__init__.py


In [26]:
%%writefile src/detect.py
import cv2
import numpy as np
from ultralytics import YOLO

class ObjectDetector:
    def __init__(self, model_path='model/yolov8n.pt'):
        self.model = YOLO(model_path)
        self.class_names = self.model.names

    def detect(self, image):
        results = self.model(image)[0]
        boxes = results.boxes.xyxy.cpu().numpy()
        classes = results.boxes.cls.cpu().numpy()
        confidences = results.boxes.conf.cpu().numpy()
        return boxes, classes, confidences

    def annotate_image(self, image, boxes, classes, confidences, confidence_threshold=0.5):
        annotated_image = image.copy()
        for box, cls, conf in zip(boxes, classes, confidences):
            if conf < confidence_threshold:
                continue

            x1, y1, x2, y2 = map(int, box)
            label = f"{self.class_names[int(cls)]} {conf:.2f}"

            # Draw bounding box
            cv2.rectangle(annotated_image, (x1, y1), (x2, y2), (0, 255, 0), 2)

            # Draw label background
            (text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            cv2.rectangle(annotated_image, (x1, y1 - text_height - 10), (x1 + text_width, y1), (0, 255, 0), -1)

            # Draw text
            cv2.putText(annotated_image, label, (x1, y1 - 5),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)

        return annotated_image

Overwriting src/detect.py


In [27]:
%%writefile src/image_test.py
from src.detect import ObjectDetector
import cv2
import os

def process_image(image_path, output_dir='output', confidence_threshold=0.5):
    detector = ObjectDetector()
    image = cv2.imread(image_path)

    if image is None:
        raise ValueError("Could not read image")

    boxes, classes, confidences = detector.detect(image)
    annotated_image = detector.annotate_image(image, boxes, classes, confidences, confidence_threshold)

    os.makedirs(output_dir, exist_ok=True)
    output_path = os.path.join(output_dir, os.path.basename(image_path))
    cv2.imwrite(output_path, annotated_image)
    return output_path, (boxes, classes, confidences)

def process_directory(directory_path, output_dir='output', confidence_threshold=0.5):
    image_paths = [os.path.join(directory_path, f) for f in os.listdir(directory_path)
                  if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
    results = []
    for img_path in image_paths:
        output_path, _ = process_image(img_path, output_dir, confidence_threshold)
        results.append(output_path)
    return results

Overwriting src/image_test.py


In [28]:
import cv2
import os
import time
import streamlit as st
from src.detect import ObjectDetector
from tqdm import tqdm

def process_video(
    video_path,
    output_path='output/output_video.mp4',
    confidence_threshold=0.5,
    frame_skip=1,
    show_progress=True
):
    """
    Process video with YOLO object detection

    Args:
        video_path: Input video path
        output_path: Output video path
        confidence_threshold: Detection confidence (0-1)
        frame_skip: Process every nth frame (for faster processing)
        show_progress: Show progress bar in Streamlit

    Returns:
        output_path: Path to processed video
    """
    detector = ObjectDetector()
    start_time = time.time()
    temp_files = []
    cap = None
    out = None

    try:
        # Open video
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video: {video_path}")

        # Get video properties
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # Validate video
        if fps <= 0 or total_frames <= 0:
            raise ValueError("Invalid video properties (FPS or frame count)")

        # Prepare output
        os.makedirs(os.path.dirname(output_path), exist_ok=True)

        # Try multiple codecs in order of preference
        codecs = ['avc1', 'X264', 'mp4v']
        for codec in codecs:
            fourcc = cv2.VideoWriter_fourcc(*codec)
            out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
            if out.isOpened():
                break

        if not out or not out.isOpened():
            raise ValueError("Failed to create output video with any codec")

        # Progress tracking
        progress_bar = st.progress(0) if show_progress else None
        status_text = st.empty() if show_progress else None

        # Process frames
        frame_count = 0
        processed_count = 0

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Skip frames if requested
            if frame_count % frame_skip != 0:
                frame_count += 1
                continue

            # Detect objects
            boxes, classes, confidences = detector.detect(frame)
            annotated_frame = detector.annotate_image(
                frame, boxes, classes, confidences, confidence_threshold
            )
            out.write(annotated_frame)
            processed_count += 1

            # Update progress
            if show_progress:
                progress = min(1.0, frame_count / total_frames)
                progress_bar.progress(progress)
                status_text.text(
                    f"Processed {processed_count}/{total_frames} frames "
                    f"({frame_skip}x skip) | "
                    f"Elapsed: {time.time()-start_time:.1f}s"
                )

            frame_count += 1

        # Explicitly release the video writer
        out.release()

        # Verify the output file was created
        if not os.path.exists(output_path) or os.path.getsize(output_path) == 0:
            raise RuntimeError("Output video file was not created properly")

        return output_path

    except Exception as e:
        # Cleanup on error
        if os.path.exists(output_path):
            os.remove(output_path)
        raise RuntimeError(f"Video processing failed: {str(e)}")

    finally:
        # Release resources
        if cap is not None and cap.isOpened():
            cap.release()
        if out is not None and out.isOpened():
            out.release()

        # Clean temp files
        for f in temp_files:
            if os.path.exists(f):
                os.remove(f)

In [29]:
%%writefile src/utils/security.py
import magic
from io import BytesIO
from PIL import Image
import cv2
import os  # <-- THIS WAS MISSING

def validate_file_upload(file):
    # Allowed video MIME types
    VIDEO_TYPES = [
        'video/mp4',
        'video/x-msvideo',  # AVI
        'video/quicktime',  # MOV
        'video/x-matroska' # MKV
    ]

    # Validate image files
    if file.type.startswith('image/'):
        try:
            img = Image.open(BytesIO(file.getvalue()))
            img.verify()
            return True
        except Exception:
            raise ValueError("Invalid image file - corrupted or unsupported format")

    # Validate video files
    elif file.type in VIDEO_TYPES:
        try:
            # Save temporary file
            temp_path = f"temp_{file.name}"
            with open(temp_path, "wb") as f:
                f.write(file.getbuffer())

            # Check if OpenCV can read it
            cap = cv2.VideoCapture(temp_path)
            if not cap.isOpened():
                raise ValueError("Unsupported video codec")
            if cap.get(cv2.CAP_PROP_FRAME_COUNT) < 1:
                raise ValueError("Corrupted video (no frames detected)")
            cap.release()
            return True

        except Exception as e:
            raise ValueError(f"Video error: {str(e)}")
        finally:
            # Clean up temp file
            if os.path.exists(temp_path):
                os.remove(temp_path)

    raise ValueError(f"Unsupported file type: {file.type}")

Overwriting src/utils/security.py


In [30]:
%%writefile app.py
import streamlit as st
from PIL import Image
import os
import cv2
import numpy as np
import pandas as pd
import urllib.request
import warnings  # <-- For handling JS warnings

# ===== 1. FIRST configure page settings =====
st.set_page_config(
    page_title="Object Detection (Colab)",
    page_icon="🔍",
    layout="wide"
)

# ===== 2. THEN add warning filters =====
warnings.filterwarnings("ignore", message=".*Failed to fetch dynamically imported module.*")

# ===== 3. THEN import your custom modules =====
from src.image_test import process_image, process_directory
from src.video_test import process_video
from src.utils.security import validate_file_upload
from src.detect import ObjectDetector

st.markdown("""
    <style>
    .stButton>button { background-color: #4CAF50; color: white; }
    .stFileUploader>div>div>button { background-color: #4CAF50; color: white; }
    .css-1aumxhk { background-color: #f0f2f6; border-radius: 10px; padding: 20px; }
    </style>
    """, unsafe_allow_html=True)

st.title("Object Detection in Colab")
st.write("Upload files or use sample data")

# Create sample data directory
os.makedirs("sample_images", exist_ok=True)
os.makedirs("sample_videos", exist_ok=True)

# Mode selection
mode = st.sidebar.radio(
    "Select Mode",
    ["Single Image", "Image Directory", "Video File"],
    index=0
)

confidence_threshold = st.sidebar.slider(
    "Confidence Threshold",
    0.0, 1.0, 0.5, 0.05
)

def download_sample_file(url, save_path):
    """Download sample files using Python instead of !wget"""
    try:
        urllib.request.urlretrieve(url, save_path)
        return True
    except Exception as e:
        st.error(f"Failed to download sample file: {str(e)}")
        return False

if mode == "Single Image":
    st.header("Image Detection")
    col1, col2 = st.columns(2)

    with col1:
        uploaded_file = st.file_uploader("Upload image", type=["jpg", "jpeg", "png"])
        use_sample = st.checkbox("Use sample image")

    if use_sample or uploaded_file:
        try:
            if use_sample:
                sample_url = "https://ultralytics.com/images/zidane.jpg"
                sample_path = "sample_images/zidane.jpg"
                if download_sample_file(sample_url, sample_path):
                    image_path = sample_path
                    image = Image.open(image_path)
            else:
                validate_file_upload(uploaded_file)
                image = Image.open(uploaded_file)
                image_path = f"temp_{uploaded_file.name}"
                image.save(image_path)

            col2.image(image, caption="Original Image", use_column_width=True)

            if st.button("Detect Objects"):
                with st.spinner("Processing..."):
                    output_path, detections = process_image(
                        image_path,
                        confidence_threshold=confidence_threshold
                    )
                    result_image = Image.open(output_path)
                    st.image(result_image, caption="Detected Objects")

                    with st.expander("Detection Metrics"):
                        st.json({
                            "Total Detections": len(detections),
                            "Classes": list(set(detections.class_id)),
                            "Avg Confidence": float(np.mean(detections.confidence))
                        })

                        df = pd.DataFrame({
                            "Class": [detector.class_names[cls] for cls in detections.class_id],
                            "Confidence": detections.confidence,
                            "Area": detections.area
                        })
                        st.dataframe(df)

        except Exception as e:
            st.error(f"Error: {str(e)}")

elif mode == "Image Directory":
    st.header("Batch Image Processing")
    uploaded_files = st.file_uploader(
        "Upload images",
        type=["jpg", "jpeg", "png"],
        accept_multiple_files=True
    )
    use_sample = st.checkbox("Use sample images")

    if use_sample or uploaded_files:
        try:
            if use_sample:
                sample_urls = [
                    "https://ultralytics.com/images/bus.jpg",
                    "https://ultralytics.com/images/zidane.jpg"
                ]
                sample_paths = [
                    "sample_images/bus.jpg",
                    "sample_images/zidane.jpg"
                ]
                for url, path in zip(sample_urls, sample_paths):
                    if not download_sample_file(url, path):
                        raise Exception("Failed to download sample images")
                image_paths = sample_paths
            else:
                for file in uploaded_files:
                    validate_file_upload(file)
                image_paths = [f"temp_{file.name}" for file in uploaded_files]
                for file, path in zip(uploaded_files, image_paths):
                    with open(path, "wb") as f:
                        f.write(file.getbuffer())

            if st.button("Process Images"):
                with st.spinner("Processing..."):
                    results = []
                    for path in image_paths:
                        output_path, _ = process_image(
                            path,
                            confidence_threshold=confidence_threshold
                        )
                        results.append(output_path)

                    st.success(f"Processed {len(results)} images")
                    cols = st.columns(2)
                    for i, path in enumerate(results):
                        img = Image.open(path)
                        cols[i%2].image(img, use_column_width=True)

        except Exception as e:
            st.error(f"Error: {str(e)}")

elif mode == "Video File":
    st.header("Video Processing")
    uploaded_file = st.file_uploader("Upload video", type=["mp4", "avi", "mov"])
    use_sample = st.checkbox("Use sample video")

    if use_sample or uploaded_file:
        try:
            if use_sample:
                sample_url = "https://ultralytics.com/videos/people.mp4"
                sample_path = "sample_videos/people.mp4"
                if download_sample_file(sample_url, sample_path):
                    video_path = sample_path
            else:
                validate_file_upload(uploaded_file)
                video_path = f"temp_{uploaded_file.name}"
                with open(video_path, "wb") as f:
                    f.write(uploaded_file.getbuffer())

            # ===== Display Original Video (Reliable Method) =====
            with open(video_path, 'rb') as original_video:
                original_bytes = original_video.read()
            st.video(original_bytes)

            if st.button("Process Video"):
                with st.spinner("Processing..."):
                    output_path = process_video(
                        video_path,
                        confidence_threshold=confidence_threshold
                    )
                    st.success("Processing complete!")

                    # ===== Display Processed Video (Reliable Method) =====
                    try:
                        with open(output_path, 'rb') as result_video:
                            result_bytes = result_video.read()
                        st.video(result_bytes)

                        # Optional: Add a download button
                        st.download_button(
                            label="Download Processed Video",
                            data=result_bytes,
                            file_name="processed_video.mp4",
                            mime="video/mp4"
                        )
                    except Exception as e:
                        st.error(f"Failed to display video: {str(e)}")

        except Exception as e:
            st.error(f"Error: {str(e)}")

Overwriting app.py


In [31]:
!curl ifconfig.me

34.86.123.220

In [32]:
!rm -rf /tmp/*  # Clear temp files
import importlib
importlib.invalidate_caches()

rm: cannot remove '/tmp/colab_runtime.sock': Device or resource busy


In [None]:
!pkill -f streamlit
!streamlit run app.py & npx localtunnel --port 8501


Collecting usage statistics. To deactivate, set browser.gatherUsageStats to false.
[0m
[1G[0K⠙[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8501[0m
[34m  Network URL: [0m[1mhttp://172.28.0.12:8501[0m
[34m  External URL: [0m[1mhttp://34.86.123.220:8501[0m
[0m
[1G[0Kyour url is: https://slimy-taxis-remain.loca.lt
[31m──[0m[31m────────────────────────[0m[31m [0m[1;31mTraceback [0m[1;2;31m(most recent call last)[0m[31m [0m[31m─────────────────────────[0m[31m──[0m
[31m [0m [2;33m/usr/local/lib/python3.11/dist-packages/streamlit/runtime/scriptrunner/[0m[1;33mexec_code.py[0m: [31m [0m
[31m [0m [94m121[0m in [92mexec_func_with_error_handling[0m                                                 [31m [0m
[31m [0m                                                                                      [31m [0m
[31m [0m [2;33m/usr/local/lib/python3.11/dist-packages/streamlit/