In [9]:
%%writefile hand_distance_streamlit_nearest.py
# hand_distance_streamlit_nearest.py  (WITH live plot)
import streamlit as st
import cv2
import os
import tempfile
import time
import math
from pathlib import Path
from collections import deque
import numpy as np
import csv
import matplotlib.pyplot as plt

st.set_page_config(page_title="Hand Distance — Nearest Hand Only (with plot)", layout="wide")

st.title("Hand Distance — Nearest Hand Only (with plot)")
st.markdown("**Name:** Dhanush M  \n**College:** Sri Venkateswara College of Engineering, Tirupati")

# --- Sidebar settings ---
st.sidebar.header("MediaPipe / Source")
try:
    CAM_INDEX = int(os.environ.get("CAM_INDEX", 0))
except Exception:
    CAM_INDEX = 0

model_complexity = st.sidebar.selectbox("Model complexity", (0, 1), index=1)
min_detection_confidence = st.sidebar.slider("Min detection confidence", 0.1, 1.0, 0.5)
min_tracking_confidence = st.sidebar.slider("Min tracking confidence", 0.1, 1.0, 0.5)
max_num_hands = st.sidebar.slider("Max number of hands to detect (MP)", 1, 6, 4)

source = st.sidebar.selectbox(
    "Input source",
    ("Webcam (local)", "Auto (webcam/upload/sample/ip)", "Upload video file", "IP camera / HTTP stream", "Sample video")
)

SAMPLE_VIDEO_PATH = "sample_hand_video.mp4"
uploaded_file = None
ip_stream_url = ""

if source in ("Upload video file", "Auto (webcam/upload/sample/ip)"):
    uploaded_file = st.sidebar.file_uploader("Upload video (mp4/avi/mov/mkv)", type=["mp4","avi","mov","mkv"])
if source in ("IP camera / HTTP stream", "Auto (webcam/upload/sample/ip)"):
    ip_stream_url = st.sidebar.text_input("IP/HTTP stream URL (e.g. http://192.168.1.10:8080/video)")

st.sidebar.header("Pinch / thresholds")
enable_pinch = st.sidebar.checkbox("Enable pinch display", True)
pinch_max_dist = st.sidebar.slider("Pinch max distance (px) -> 0% pinch", 50, 800, 200)
pinch_pixel_threshold = st.sidebar.slider("Pinch pixel threshold (px)", 2, 300, 40)
pinch_norm_threshold = st.sidebar.slider("Pinch normalized threshold (landmark units)", 0.001, 0.10, 0.03, step=0.001)
pinch_smoothing = st.sidebar.slider("Pinch smoothing (EMA α)", 0.01, 1.0, 0.25)

st.sidebar.header("Plot / logging")
LOG_TO_CSV = st.sidebar.checkbox("Enable CSV logging", False)
csv_filename = st.sidebar.text_input("CSV filename", value="hand_distance_nearest.csv")
MAX_PTS = st.sidebar.slider("Plot window points", 50, 1000, 200)
PLOT_EVERY_N_FRAMES = st.sidebar.slider("Plot update frequency (frames)", 1, 30, 3)

# Start/Stop
if 'running' not in st.session_state:
    st.session_state.running = False
if 'pinch_ema' not in st.session_state:
    st.session_state.pinch_ema = None
if 'csv_writer' not in st.session_state:
    st.session_state.csv_writer = None
if 'csv_file' not in st.session_state:
    st.session_state.csv_file = None

col1, col2 = st.columns([1,4])
with col1:
    if st.button("Start"):
        st.session_state.running = True
    if st.button("Stop"):
        st.session_state.running = False

# utilities
def check_camera(index=0, timeout=2.0):
    cap = cv2.VideoCapture(index, cv2.CAP_DSHOW if os.name == "nt" else 0)
    start = time.time()
    ok = False
    while time.time() - start < timeout:
        if not cap.isOpened():
            time.sleep(0.05)
            continue
        ret, _ = cap.read()
        if ret:
            ok = True
            break
        time.sleep(0.05)
    cap.release()
    return ok

def open_capture(source_value):
    if isinstance(source_value, int):
        cap = cv2.VideoCapture(source_value, cv2.CAP_DSHOW if os.name == "nt" else 0)
    else:
        cap = cv2.VideoCapture(str(source_value))
    if not cap.isOpened():
        return None
    return cap

# choose source
chosen_source = None
temp_video_path = None
if source == "Webcam (local)":
    if check_camera(CAM_INDEX):
        chosen_source = CAM_INDEX
    else:
        st.warning(f"Webcam (index={CAM_INDEX}) not available.")
elif source == "Upload video file":
    if uploaded_file is not None:
        tfile = tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix)
        uploaded_file.seek(0)
        tfile.write(uploaded_file.read())
        tfile.flush()
        temp_video_path = tfile.name
        chosen_source = temp_video_path
    else:
        st.info("Upload a video file in the sidebar.")
elif source == "IP camera / HTTP stream":
    if ip_stream_url:
        chosen_source = ip_stream_url
    else:
        st.info("Enter an IP/HTTP stream URL.")
elif source == "Sample video":
    if Path(SAMPLE_VIDEO_PATH).exists():
        chosen_source = SAMPLE_VIDEO_PATH
    else:
        st.info("Sample video not found.")
elif source == "Auto (webcam/upload/sample/ip)":
    if check_camera(CAM_INDEX):
        chosen_source = CAM_INDEX
    elif uploaded_file is not None:
        tfile = tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix)
        uploaded_file.seek(0)
        tfile.write(uploaded_file.read())
        tfile.flush()
        temp_video_path = tfile.name
        chosen_source = temp_video_path
    elif ip_stream_url:
        chosen_source = ip_stream_url
    elif Path(SAMPLE_VIDEO_PATH).exists():
        chosen_source = SAMPLE_VIDEO_PATH
    else:
        st.warning("No source available. Upload or provide IP stream.")

frame_slot = st.empty()
plot_slot = st.empty()
status_slot = st.empty()

# setup mediapipe
_media_pipe_setup = False
_mp_resource = {}
try:
    import mediapipe as mp
    mp_hands = mp.solutions.hands
    mp_drawing = mp.solutions.drawing_utils
    hands = mp_hands.Hands(
        static_image_mode=False,
        model_complexity=int(model_complexity),
        min_detection_confidence=float(min_detection_confidence),
        min_tracking_confidence=float(min_tracking_confidence),
        max_num_hands=int(max_num_hands),
    )
    _media_pipe_setup = True
    _mp_resource['hands'] = hands
    _mp_resource['mp_hands'] = mp_hands
    _mp_resource['mp_drawing'] = mp_drawing
except Exception as e:
    _media_pipe_setup = False
    st.error("Failed to import MediaPipe: " + str(e))

# CSV open if requested
if LOG_TO_CSV and st.session_state.csv_writer is None:
    try:
        first_write = not os.path.exists(csv_filename)
        f = open(csv_filename, "a", newline="")
        writer = csv.writer(f)
        if first_write:
            writer.writerow(["timestamp_iso", "elapsed_s", "norm_dist", "pixel_dist", "pinch"])
        st.session_state.csv_file = f
        st.session_state.csv_writer = writer
    except Exception as e:
        st.error("Failed to open CSV: " + str(e))

# --- plotting buffers & figure ---
timestamps = deque(maxlen=MAX_PTS)
norm_dists = deque(maxlen=MAX_PTS)
pixel_dists = deque(maxlen=MAX_PTS)

plt.ioff()  # we'll manually draw and send to Streamlit
fig, ax = plt.subplots(figsize=(8, 3))
line_norm, = ax.plot([], [], label="norm_dist")
line_pix, = ax.plot([], [], label="pixel_dist (scaled)")
ax.set_xlabel("time (s)")
ax.set_ylabel("distance")
ax.set_title("Thumb Tip ↔ Index Tip Distance (live)")
ax.legend(loc="upper right")
ax.grid(True)

start_time = time.time()
frame_count = 0

# main loop: detect only nearest hand (by smallest normalized 3D distance between thumb-tip & index-tip)
if st.session_state.running:
    if chosen_source is None:
        status_slot.warning("No source chosen.")
    else:
        status_slot.info(f"Opening source: {chosen_source}")
        cap = open_capture(chosen_source)
        if cap is None:
            status_slot.error("Failed to open source.")
            st.session_state.running = False
        else:
            try:
                last_time = time.time()
                frame_count = 0
                start_time = time.time()
                while st.session_state.running:
                    ok, frame = cap.read()
                    if not ok or frame is None:
                        # loop video if file
                        if isinstance(chosen_source, str) and Path(str(chosen_source)).exists():
                            cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
                            time.sleep(0.05)
                            continue
                        else:
                            status_slot.error("Frame read failed. Stopping.")
                            st.session_state.running = False
                            break

                    # mirror for webcam-like view
                    try:
                        frame = cv2.flip(frame, 1)
                    except Exception:
                        pass

                    if frame.dtype != 'uint8':
                        frame = frame.astype('uint8')

                    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    annotated = frame_rgb.copy()

                    pinch_info = None  # (norm_dist, pixel_dist, is_pinched, pinch_percent, handedness, smooth_px)

                    if _media_pipe_setup:
                        try:
                            results = _mp_resource['hands'].process(frame_rgb)
                            candidate_hands = []
                            if getattr(results, "multi_hand_landmarks", None):
                                h_img, w_img = frame.shape[:2]
                                # collect metrics for each detected hand
                                for idx, hand_landmarks in enumerate(results.multi_hand_landmarks):
                                    try:
                                        lm4 = hand_landmarks.landmark[4]
                                        lm8 = hand_landmarks.landmark[8]
                                        dx_n = lm4.x - lm8.x
                                        dy_n = lm4.y - lm8.y
                                        dz_n = lm4.z - lm8.z
                                        norm_dist = math.sqrt(dx_n*dx_n + dy_n*dy_n + dz_n*dz_n)
                                        tx = int(round(lm4.x * w_img)); ty = int(round(lm4.y * h_img))
                                        ix = int(round(lm8.x * w_img)); iy = int(round(lm8.y * h_img))
                                        pixel_dist = float(np.hypot(tx - ix, ty - iy))
                                    except Exception:
                                        continue

                                    handedness_label = "Unknown"
                                    try:
                                        handedness_label = results.multi_handedness[idx].classification[0].label
                                    except Exception:
                                        handedness_label = "Unknown"

                                    candidate_hands.append({
                                        "idx": idx,
                                        "landmarks": hand_landmarks,
                                        "norm_dist": norm_dist,
                                        "pixel_dist": pixel_dist,
                                        "thumb_px": (tx, ty),
                                        "index_px": (ix, iy),
                                        "handedness": handedness_label
                                    })

                                # pick the nearest hand by smallest norm_dist (closest)
                                if len(candidate_hands) > 0:
                                    nearest = min(candidate_hands, key=lambda x: x["norm_dist"])
                                    # use nearest for smoothing & pinch decision
                                    norm_dist = nearest["norm_dist"]
                                    pixel_dist = nearest["pixel_dist"]
                                    tx, ty = nearest["thumb_px"]
                                    ix, iy = nearest["index_px"]
                                    handedness_label = nearest["handedness"]

                                    # EMA smoothing (pixel-based)
                                    if st.session_state.pinch_ema is None:
                                        st.session_state.pinch_ema = pixel_dist
                                    else:
                                        alpha = float(pinch_smoothing)
                                        st.session_state.pinch_ema = alpha * pixel_dist + (1 - alpha) * st.session_state.pinch_ema
                                    smooth_px = st.session_state.pinch_ema

                                    pinch_percent = np.interp(smooth_px, [0.0, float(pinch_max_dist)], [100.0, 0.0])
                                    pinch_percent = float(np.clip(pinch_percent, 0.0, 100.0))

                                    is_pinched = (pixel_dist <= float(pinch_pixel_threshold)) or (norm_dist <= float(pinch_norm_threshold))

                                    pinch_info = (norm_dist, pixel_dist, is_pinched, pinch_percent, handedness_label, smooth_px)

                                    # DRAW ONLY THE NEAREST HAND
                                    _mp_resource['mp_drawing'].draw_landmarks(
                                        annotated, nearest["landmarks"], _mp_resource['mp_hands'].HAND_CONNECTIONS,
                                        _mp_resource['mp_drawing'].DrawingSpec(thickness=2, circle_radius=3),
                                        _mp_resource['mp_drawing'].DrawingSpec(thickness=2),
                                    )

                                    # highlight nearest with bigger markers & line
                                    cv2.circle(annotated, (tx, ty), 8, (0,0,255), -1)
                                    cv2.circle(annotated, (ix, iy), 8, (255,0,0), -1)
                                    cv2.line(annotated, (tx, ty), (ix, iy), (0,255,0), 2)

                            else:
                                # no hands -> decay EMA slightly
                                if st.session_state.pinch_ema is not None:
                                    st.session_state.pinch_ema *= 0.96
                        except Exception:
                            pass

                    # prepare display image and overlay texts
                    display_img = cv2.cvtColor(annotated, cv2.COLOR_RGB2BGR)
                    if pinch_info is not None:
                        n, p, pinched, pct, hand_label, smooth_px = pinch_info
                        status_str = f"Nearest: {hand_label} — Pinch: {int(p)} px — {int(pct)}% — {'PINCHED' if pinched else 'open'}"
                        cv2.rectangle(display_img, (10,10), (min(1000, 20 + 14 * len(status_str)), 55), (0,0,0), -1)
                        cv2.putText(display_img, status_str, (12,32), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)

                        # show explicit pinch distances on the frame (pixel & normalized)
                        dist_text = f"Pinch Dist: {p:.1f}px  |  Norm: {n:.5f}"
                        cv2.putText(display_img, dist_text, (12,52), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (200,200,200), 2)

                        # sidebar metrics
                        st.sidebar.metric("Pinch (px)", f"{int(p)}", delta=None)
                        st.sidebar.metric("Pinch (%)", f"{int(pct)}", delta=None)
                        st.sidebar.write(f"Nearest hand: **{hand_label}**")
                        st.sidebar.write(f"Pinch dist (px): **{p:.1f}**")
                        st.sidebar.write(f"Norm dist: **{n:.5f}**")
                    else:
                        cv2.putText(display_img, "Show thumb and index to measure pinch (nearest hand only)", (10,30),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)
                        st.sidebar.write("No nearest hand detected")

                    frame_slot.image(display_img, channels="BGR", use_container_width=True)

                    # CSV logging
                    elapsed = time.time() - start_time
                    if LOG_TO_CSV and st.session_state.csv_writer is not None:
                        iso_ts = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime())
                        if pinch_info is not None:
                            st.session_state.csv_writer.writerow([iso_ts, f"{elapsed:.3f}", f"{pinch_info[0]:.6f}", f"{pinch_info[1]:.2f}", "1" if pinch_info[2] else "0"])
                        else:
                            st.session_state.csv_writer.writerow([iso_ts, f"{elapsed:.3f}", "", "", "0"])
                        if frame_count % 50 == 0:
                            try:
                                st.session_state.csv_file.flush()
                            except Exception:
                                pass

                    # Update plotting buffers
                    timestamps.append(elapsed)
                    if pinch_info is not None:
                        norm_val = pinch_info[0]
                        pix_val = pinch_info[1]
                        norm_dists.append(norm_val if norm_val is not None else np.nan)
                        pixel_dists.append(pix_val if pix_val is not None else np.nan)
                    else:
                        norm_dists.append(np.nan)
                        pixel_dists.append(np.nan)

                    # Plot update (periodically to reduce overhead)
                    if frame_count % int(PLOT_EVERY_N_FRAMES) == 0:
                        t_arr = np.array(timestamps)
                        norm_arr = np.array(norm_dists)
                        pix_arr = np.array(pixel_dists)

                        # compute scale factor
                        if np.any(np.isfinite(pix_arr)) and np.any(np.isfinite(norm_arr)):
                            max_pix = np.nanmax(pix_arr)
                            max_norm = np.nanmax(norm_arr)
                            factor = ((max_norm + 1e-6) / (max_pix + 1e-6)) if max_pix > 1e-9 else 1.0
                        else:
                            factor = 1.0
                        scaled_pix = pix_arr * factor

                        # update matplotlib lines
                        line_norm.set_data(t_arr, norm_arr)
                        line_pix.set_data(t_arr, scaled_pix)
                        if t_arr.size > 0:
                            ax.set_xlim(t_arr[0], t_arr[-1] if t_arr[-1] > t_arr[0] else t_arr[0] + 1)

                        valid_norm = np.nan_to_num(norm_arr, nan=0.0)
                        valid_pix = np.nan_to_num(scaled_pix, nan=0.0)
                        y_vals = np.concatenate([valid_norm, valid_pix]) if (valid_norm.size + valid_pix.size) > 0 else np.array([0.0])
                        y_min, y_max = np.min(y_vals), np.max(y_vals)
                        if y_max - y_min < 1e-3:
                            padding = 0.5
                        else:
                            padding = (y_max - y_min) * 0.1
                        ax.set_ylim(y_min - padding, y_max + padding)

                        ax.relim()
                        ax.autoscale_view()
                        plot_slot.pyplot(fig)
                        # clear the axis artists to keep subsequent updates clean (we keep same fig/lines)
                        # (not strictly necessary but avoids duplicate legends in some Streamlit setups)

                    # status fps
                    frame_count += 1
                    if time.time() - last_time >= 1.0:
                        status_slot.info(f"Running — FPS: {frame_count}")
                        frame_count = 0
                        last_time = time.time()

                    time.sleep(0.01)

            finally:
                try:
                    cap.release()
                except Exception:
                    pass
                if _media_pipe_setup:
                    try:
                        _mp_resource['hands'].close()
                    except Exception:
                        pass
                st.session_state.running = False
                status_slot.info("Stopped.")

# cleanup temp file & csv when not running
if 'temp_video_path' in locals() and locals().get('temp_video_path') and not st.session_state.running:
    try:
        os.remove(temp_video_path)
    except Exception:
        pass

if not st.session_state.running and st.session_state.csv_file is not None:
    try:
        st.session_state.csv_file.close()
    except Exception:
        pass
    st.session_state.csv_file = None
    st.session_state.csv_writer = None

st.markdown("---")
st.write("Instructions:")
st.write("1. Choose your input source on the sidebar.  2. Press Start to run.  3. Press Stop or click the Stop button to end the stream.")


Overwriting hand_distance_streamlit_nearest.py


In [None]:
!streamlit run hand_distance_streamlit_nearest.py