In [2]:
import numpy as np
import cv2
from cv2 import aruco

def detect_position(image, mtx, dist):
    x = 250  # Side length of the square
    pts_known = np.array([[-x/2, -x/2], [x/2, -x/2], [x/2, x/2], [-x/2, x/2]], dtype='float32')
    frame_width = image.shape[1]
    frame_height = image.shape[0]

    # Define the ArUco marker dictionary
    aruco_dict = aruco.getPredefinedDictionary(aruco.DICT_4X4_50)
    parameters = aruco.DetectorParameters()

    frame = image

    # Convert to grayscale for marker detection
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Detect ArUco markers
    detector = aruco.ArucoDetector(aruco_dict, parameters)
    corners, ids, _ = detector.detectMarkers(gray)

    # if ids is not None:
    #     img = cv2.aruco.drawDetectedMarkers(frame, corners, ids)
    #     for corner in corners:
    #         center = np.mean(corner[0], axis=0)
    #         cv2.circle(img, tuple(center.astype(int)), 5, (0, 255, 0), -1)  # Draw a circle at the center of the marker

    # cv2.imshow('ArUco Markers', img)
    # cv2.waitKey(0)
    # cv2.destroyAllWindows()

    if ids is not None:
        
        ids = ids.flatten()  # Flatten to a 1D array
        if len(ids) < 5:
            return None
        pts_detected = np.zeros((4, 2), dtype='float32')
        for i, marker_id in enumerate([0, 1, 2, 3]):
            idx = np.where(ids == marker_id)[0][0]
            pts_detected[i] = np.mean(corners[idx][0], axis=0)
        
        # Compute homography matrix
        H, _ = cv2.findHomography(pts_known, pts_detected)

        virtual_points = []
        # Process each detected marker
        for i, marker_id in enumerate(ids):
            if marker_id != 4:
                continue
            c = corners[i][0]
            center = np.mean(c, axis=0)
            # Transform marker coordinates to virtual plane
            pts_img = np.array([[center[0], center[1]]], dtype='float32')
            
            return cv2.perspectiveTransform(np.array([pts_img]), np.linalg.inv(H))

    return None

In [3]:
import os
import ipywidgets as widgets
from IPython.display import display, Video, HTML
import cv2
import csv
import numpy as np

# --- Global Variables ---
current_frame = 0
cap = None
video_path = None
total_frames = 0
fps = 30  # Default FPS
start_frame = 50
stop_frame = 100
mtx = None # Placeholder for camera matrix
dist = None # Placeholder for distortion coefficients

experiments_folder = 'experiments'

# --- Helper Functions ---

def get_frame(frame_num, cap):
    """Reads a specific frame from the video."""
    global total_frames
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
    ret, frame = cap.read()
    if not ret:
        print("Error reading frame or end of video.")
        return None

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    _, encoded_frame = cv2.imencode('.jpg', frame)
    return encoded_frame.tobytes()

def update_video_display(frame_data, video_widget):
    """Updates the displayed video frame."""
    video_widget.value = frame_data
    with current_frame_output:
        current_frame_output.clear_output(wait=True)
        display(format_frame_info(current_frame, fps))

def format_frame_info(frame_num, fps):
    """Formats frame number and time information."""
    if fps > 0:  # Avoid division by zero
        seconds = frame_num / fps
        return f"Frame: {frame_num} ({seconds:.2f}s)"
    else:
        return f"Frame: {frame_num} (FPS not available)"

def save_location_data_with_missing_info(location_data, folder_path, missing_data_info, interpolated_data):
    """Saves the location data, including missing frame information, to CSV."""
    
    # 确保文件夹存在
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    
    # 保存位置数据
    location_csv_path = os.path.join(folder_path, 'location_data.csv')
    missing_info_csv_path = os.path.join(folder_path, 'missing_data_info.csv')
    interpolated_csv_path = os.path.join(folder_path, 'interpolated_data.csv')
    
    # 保存位置数据
    with open(location_csv_path, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Frame Number', 'X', 'Y'])
        for frame_num, loc in location_data:
            if loc is not None and isinstance(loc, np.ndarray) and loc.shape[-1] == 2:
                x = loc[0][0][0]  # 从loc中提取x坐标
                y = loc[0][0][1]  # 从loc中提取y坐标
                writer.writerow([frame_num, x, y])
            else:
                print(f"Invalid location data for frame {frame_num}: {loc}")
    
    print(f"Location data saved to {location_csv_path}.")
    
    # 保存缺失数据的信息
    with open(missing_info_csv_path, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Frame Number', 'Status', 'Fill Method'])
        for frame_num, fill_method in missing_data_info:
            writer.writerow([frame_num, 'Missing', fill_method])
    print(f"Missing frame data saved to {missing_info_csv_path}.")
    
    # 保存插值后的数据
    with open(interpolated_csv_path, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Frame Number', 'X', 'Y'])
        for frame_num, loc in interpolated_data:
            if loc is not None and isinstance(loc, np.ndarray) and loc.shape[-1] == 2:
                x = loc[0][0][0]  # 从loc中提取x坐标
                y = loc[0][0][1]  # 从loc中提取y坐标
                writer.writerow([frame_num, x, y])
            else:
                print(f"Invalid interpolated data for frame {frame_num}: {loc}")
    
    print(f"Interpolated location data saved to {interpolated_csv_path}.")

import numpy as np

def interpolate_missing_frames(location_data, missing_frames):
    interpolated_data = list(location_data)  # 初始化为原始数据

    for frame_num in missing_frames:
        # 找到前后有效的帧
        prev_frame = next((f for f in reversed(location_data) if f[0] < frame_num), None)
        next_frame = next((f for f in location_data if f[0] > frame_num), None)

        if prev_frame and next_frame:
            prev_num, prev_loc = prev_frame
            next_num, next_loc = next_frame

            # 解包嵌套数据
            prev_loc = np.squeeze(np.array(prev_loc))
            next_loc = np.squeeze(np.array(next_loc))

            # 确保是 1D 数组
            if prev_loc.ndim == 1 and next_loc.ndim == 1:
                # 计算线性插值
                ratio = (frame_num - prev_num) / (next_num - prev_num)
                interpolated_loc = prev_loc + ratio * (next_loc - prev_loc)

                # 将插值结果添加到数据中
                interpolated_data.append((frame_num, np.array([[interpolated_loc]])))
        elif prev_frame:  # 只有前帧有效
            interpolated_data.append((frame_num, np.array(prev_frame[1])))
        elif next_frame:  # 只有后帧有效
            interpolated_data.append((frame_num, np.array(next_frame[1])))

    # 按帧号排序
    interpolated_data = sorted(interpolated_data, key=lambda x: x[0])
    
    return interpolated_data


# --- UI Elements ---
# Get all subfolders
subfolders = [f.name for f in os.scandir(experiments_folder) if f.is_dir()]

# Dropdown
dropdown = widgets.Dropdown(
    options=subfolders,
    description='Subfolders:',
    disabled=False,
)

# Video Widget
video_widget = widgets.Image(format='jpeg', width=640, height=480)

# Current Frame Display
current_frame_output = widgets.Output()

# Buttons
forward_button = widgets.Button(description=">>")
backward_button = widgets.Button(description="<<")
forward_1s_button = widgets.Button(description="> 1s")
backward_1s_button = widgets.Button(description="< 1s")
forward_60s_button = widgets.Button(description="> 60s")
backward_60s_button = widgets.Button(description="< 60s")
load_button = widgets.Button(description="Load Video")
start_frame_button = widgets.Button(description="Set Start")
stop_frame_button = widgets.Button(description="Set Stop")
transform_frame_button = widgets.Button(description="Transform Image")
extract_location_button = widgets.Button(description="Extract Location")
# Output for Start/Stop Frame Information
start_frame_output = widgets.Output()
stop_frame_output = widgets.Output()

# --- Event Handlers ---

def on_load_clicked(b):
    """Loads the video."""
    global cap, video_path, current_frame, fps, total_frames, start_frame, stop_frame, mtx, dist
    current_frame = 0
    start_frame = 50
    stop_frame = 1500
    
    files = [f.name for f in os.scandir(os.path.join(experiments_folder, dropdown.value)) if f.is_file()]
    video_file = next((f for f in files if f.endswith('.mp4')), None)

    if video_file:
        video_path = os.path.join(experiments_folder, dropdown.value, video_file)
        cap = cv2.VideoCapture(video_path)

        if not cap.isOpened():
            print("Error opening video.")
            return
        
        # Load camera parameters if available
        param_file = os.path.join(experiments_folder, dropdown.value, 'c922_params.npz')
        if os.path.exists(param_file):
            data = np.load(param_file)
            mtx, dist = data['mtx'], data['dist']
        else:
            print("Camera parameters not found. Using default values.")

        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        initial_frame = get_frame(current_frame, cap)
        if initial_frame:
            update_video_display(initial_frame, video_widget)

        # Reset start/stop frame info
        with start_frame_output:
            start_frame_output.clear_output()
            print(format_frame_info(start_frame, fps))
        with stop_frame_output:
            stop_frame_output.clear_output()
            print(format_frame_info(stop_frame, fps))

    else:
        video_path = None
        print('No video file found.')

def on_forward_clicked(b):
    """Moves forward by one frame."""
    global current_frame, total_frames
    if cap and current_frame < total_frames - 1:
        current_frame += 1
        frame_data = get_frame(current_frame, cap)
        if frame_data:
            update_video_display(frame_data, video_widget)

def on_backward_clicked(b):
    """Moves backward by one frame."""
    global current_frame
    if cap and current_frame > 0:
        current_frame -= 1
        frame_data = get_frame(current_frame, cap)
        if frame_data:
            update_video_display(frame_data, video_widget)

def on_forward_1s_clicked(b):
    """Moves forward by 1 second."""
    global current_frame, fps, total_frames
    if cap:
        current_frame += int(fps)
        current_frame = min(current_frame, total_frames - 1)
        frame_data = get_frame(current_frame, cap)
        if frame_data:
            update_video_display(frame_data, video_widget)

def on_backward_1s_clicked(b):
    """Moves backward by 1 second."""
    global current_frame, fps
    if cap:
        current_frame -= int(fps)
        current_frame = max(current_frame, 0)
        frame_data = get_frame(current_frame, cap)
        if frame_data:
            update_video_display(frame_data, video_widget)

def on_forward_60s_clicked(b):
    """Moves forward by 60 seconds."""
    global current_frame, fps, total_frames
    if cap:
        current_frame += int(fps * 60)
        current_frame = min(current_frame, total_frames - 1)
        frame_data = get_frame(current_frame, cap)
        if frame_data:
            update_video_display(frame_data, video_widget)

def on_backward_60s_clicked(b):
    """Moves backward by 60 seconds."""
    global current_frame, fps
    if cap:
        current_frame -= int(fps * 60)
        current_frame = max(current_frame, 0)
        frame_data = get_frame(current_frame, cap)
        if frame_data:
            update_video_display(frame_data, video_widget)

def on_start_frame_clicked(b):
    """Sets the start frame."""
    global start_frame
    start_frame = current_frame
    with start_frame_output:
        start_frame_output.clear_output()
        print(format_frame_info(start_frame, fps))

def on_stop_frame_clicked(b):
    """Sets the stop frame."""
    global stop_frame
    stop_frame = current_frame
    with stop_frame_output:
        stop_frame_output.clear_output()
        print(format_frame_info(stop_frame, fps))

def on_transform_image_clicked(b):
    """Transforms the image."""
    global current_frame
    if cap:
        frame_data = get_frame(current_frame, cap)
        if frame_data:
            frame = cv2.imdecode(np.frombuffer(frame_data, np.uint8), cv2.IMREAD_COLOR)
            center_location = detect_position(frame, mtx, dist)
            if center_location is not None:
                _, encoded_frame = cv2.imencode('.jpg', frame)
                update_video_display(encoded_frame.tobytes(), video_widget)

def on_extract_location_with_missing_info_clicked(b):
    """Extracts the location of the sound source and interpolates missing frame info."""
    global current_frame, start_frame, stop_frame
    
    folder_path = os.path.join(experiments_folder, dropdown.value)
    # Ensure the folder exists (create if it doesn't)
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    location_data = []  # 存储帧的位置信息
    missing_data_info = []  # 存储缺失帧信息
    missing_frames = []  # 记录所有缺失的帧号

    if cap:
        for frame_num in range(start_frame, stop_frame + 1):
            current_frame = frame_num
            frame_data = get_frame(current_frame, cap)
            if frame_data:
                frame = cv2.imdecode(np.frombuffer(frame_data, np.uint8), cv2.IMREAD_COLOR)
                center_location = detect_position(frame, mtx, dist)

                if center_location is not None:
                    _, encoded_frame = cv2.imencode('.jpg', frame)
                    update_video_display(encoded_frame.tobytes(), video_widget)
                    location_data.append((frame_num, center_location))
                else:
                    print(f"No ArUco marker detected in frame {frame_num}.")
                    missing_frames.append(frame_num)
                    missing_data_info.append((frame_num, 'Linear Interpolation'))  # 标记补充方法

    # 对缺失帧进行线性插值
    interpolated_data = []
    if missing_frames:
        interpolated_data = interpolate_missing_frames(location_data, missing_frames)

    # 调用保存函数时传入插值后的数据
    save_location_data_with_missing_info(location_data, folder_path, missing_data_info, interpolated_data)     

def on_extract_location_clicked(b):
    """Extracts the location of the sound source."""
    global current_frame, start_frame, stop_frame
    location_data = []
    missing_data_info = []  # 初始化缺失数据补充信息列表
    
    # Get the folder path based on dropdown selection
    folder_path = os.path.join(experiments_folder, dropdown.value)

    # Ensure the folder exists (create if it doesn't)
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    
    if cap:
        for frame_num in range(start_frame, stop_frame + 1):
            current_frame = frame_num
            frame_data = get_frame(current_frame, cap)
            if frame_data:
                frame = cv2.imdecode(np.frombuffer(frame_data, np.uint8), cv2.IMREAD_COLOR)
                center_location = detect_position(frame, mtx, dist)
                if center_location is not None:
                    _, encoded_frame = cv2.imencode('.jpg', frame)
                    update_video_display(encoded_frame.tobytes(), video_widget)
                    location_data.append((frame_num, center_location))
                else:
                    print(f"No ArUco marker detected in frame {frame_num}.")
    
    # Save the location data to a CSV file in the current folder
    if len(location_data) > 0:
        location_csv_path = os.path.join(folder_path, 'location_data.csv')
        with open(location_csv_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Frame Number', 'X', 'Y'])
            for frame_num, loc in location_data:
                x = loc[0][0][0]  # Access the x coordinate
                y = loc[0][0][1]  # Access the y coordinate
                writer.writerow([frame_num, x, y])
        print(f"Location data saved to {location_csv_path}.")

    print("Location data:", location_data)
 
# --- Attach Event Handlers ---

load_button.on_click(on_load_clicked)
forward_button.on_click(on_forward_clicked)
backward_button.on_click(on_backward_clicked)
forward_1s_button.on_click(on_forward_1s_clicked)
backward_1s_button.on_click(on_backward_1s_clicked)
forward_60s_button.on_click(on_forward_60s_clicked)
backward_60s_button.on_click(on_backward_60s_clicked)
start_frame_button.on_click(on_start_frame_clicked)
stop_frame_button.on_click(on_stop_frame_clicked)
transform_frame_button.on_click(on_transform_image_clicked)
extract_location_button.on_click(on_extract_location_with_missing_info_clicked)

# --- Layout ---

buttons_60s = widgets.HBox([backward_60s_button, forward_60s_button])
buttons_1s = widgets.HBox([backward_1s_button, forward_1s_button])
buttons_frames = widgets.HBox([backward_button, forward_button])
buttons_start_stop = widgets.HBox([start_frame_button, stop_frame_button])
info_layout = widgets.HBox([
    widgets.VBox([widgets.Label("Start Frame:"), start_frame_output]),
    widgets.VBox([widgets.Label("Stop Frame:"), stop_frame_output]),
])

ui_layout = widgets.VBox([
    dropdown,
    load_button,
    video_widget,
    current_frame_output,
    buttons_60s,
    buttons_1s,
    buttons_frames,
    buttons_start_stop,
    info_layout,
    transform_frame_button,
    extract_location_button,
])

display(ui_layout)

VBox(children=(Dropdown(description='Subfolders:', options=('exp_1', 'exp_10', 'exp_11', 'exp_12', 'exp_13', '…

Camera parameters not found. Using default values.


No ArUco marker detected in frame 219.
No ArUco marker detected in frame 220.


No ArUco marker detected in frame 222.


No ArUco marker detected in frame 294.


No ArUco marker detected in frame 753.


No ArUco marker detected in frame 1142.


No ArUco marker detected in frame 1216.


No ArUco marker detected in frame 1314.


No ArUco marker detected in frame 1374.


No ArUco marker detected in frame 1379.


Location data saved to experiments\exp_1\location_data.csv.
Missing frame data saved to experiments\exp_1\missing_data_info.csv.
Interpolated location data saved to experiments\exp_1\interpolated_data.csv.


In [1]:
import os
import subprocess

# Get all subfolders in the "experiments" folder
experiments_folder = 'D:/MOOD-SENSE/sound_localization-no_multicast_to_esp32/experiments'  # Adjust the path as needed
subfolders = [f.name for f in os.scandir(experiments_folder) if f.is_dir()]

avi_files = {}
mp4_files = {}

# Process each subfolder
for subfolder in subfolders:
    subfolder_path = os.path.join(experiments_folder, subfolder)
    files = [f.name for f in os.scandir(subfolder_path) if f.is_file()]

    # Find the first .avi file in the subfolder
    avi_file = next((f for f in files if f.endswith('.avi')), None)
    if avi_file is None:
        print(f"No .avi file found in {subfolder_path}")
        continue

    avi_file_path = os.path.join(subfolder_path, avi_file)
    output_file_path = os.path.splitext(avi_file_path)[0] + '.mp4'

    # Run ffmpeg command
    try:
        subprocess.run([
            'D:\\MOOD-SENSE\\ffmpeg-master-latest-win64-gpl-shared\\bin\\ffmpeg.exe',
            '-i', avi_file_path,
            '-ac', '2',
            '-b:v', '2000k',
            '-c:a', 'aac',
            '-c:v', 'libx264',
            '-b:a', '160k',
            '-vprofile', 'high',
            '-f', 'mp4',
            output_file_path
        ], check=True)
        avi_files[subfolder] = avi_file_path
        mp4_files[subfolder] = output_file_path
        print(f"Successfully converted {avi_file_path} to {output_file_path}")
    except subprocess.CalledProcessError as e:
        print(f"Error converting {avi_file_path}: {e}")
    except Exception as e:
        print(f"Unexpected error in {subfolder}: {e}")

# Log results
print("Processed .avi files:")
print(avi_files)
print("Generated .mp4 files:")
print(mp4_files)


Successfully converted D:/MOOD-SENSE/sound_localization-no_multicast_to_esp32/experiments\exp_1\recorded_20241219_185715_177641.avi to D:/MOOD-SENSE/sound_localization-no_multicast_to_esp32/experiments\exp_1\recorded_20241219_185715_177641.mp4
Successfully converted D:/MOOD-SENSE/sound_localization-no_multicast_to_esp32/experiments\exp_10\recorded_20241220_135643_142996.avi to D:/MOOD-SENSE/sound_localization-no_multicast_to_esp32/experiments\exp_10\recorded_20241220_135643_142996.mp4
Successfully converted D:/MOOD-SENSE/sound_localization-no_multicast_to_esp32/experiments\exp_11\recorded_20241220_140020_852143.avi to D:/MOOD-SENSE/sound_localization-no_multicast_to_esp32/experiments\exp_11\recorded_20241220_140020_852143.mp4
Successfully converted D:/MOOD-SENSE/sound_localization-no_multicast_to_esp32/experiments\exp_12\recorded_20241220_140730_724828.avi to D:/MOOD-SENSE/sound_localization-no_multicast_to_esp32/experiments\exp_12\recorded_20241220_140730_724828.mp4
Successfully conve