In [23]:
# Example usage
object_nm = "0-felt_pad"
folder_path = "0 degrees_new"
images_dir = f"/Users/ankushdhawan/Documents/Stanford/Pupper/tactile_gait/pupper_tactile_gait/classification/data_processing/dataset/{folder_path}/{object_nm}/images"
csv_path = f"/Users/ankushdhawan/Documents/Stanford/Pupper/tactile_gait/pupper_tactile_gait/classification/data_processing/dataset/{folder_path}/{object_nm}/contact.csv"
npz_output_path = f"/Users/ankushdhawan/Documents/Stanford/Pupper/tactile_gait/pupper_tactile_gait/classification/data_processing/dataset/{folder_path}/{object_nm}/npz_files/{object_nm}_img_data.npz"


In [24]:
import os
import csv
import re
import numpy as np
import cv2
from collections import defaultdict

def process_trials(images_dir, csv_path, save_npz=True, npz_output_path=None):
    """
    Process images and contact data to identify trials.
    
    A trial is defined as starting from out of contact (0) and then moving into contact (1).
    
    Args:
        images_dir (str): Path to directory containing images
        csv_path (str): Path to CSV file containing contact data
        save_npz (bool): Whether to save trial data to NPZ file
        npz_output_path (str): Path where to save the NPZ file
        
    Returns:
        dict: Dictionary mapping trial numbers to lists of image filenames
        int: Total number of trials
    """
    # Read contact data from CSV
    contact_data = {}
    with open(csv_path, 'r') as csv_file:
        reader = csv.reader(csv_file)
        headers = next(reader)  # Skip header row
        
        # Find timestamp and contact columns
        timestamp_col = None
        contact_col = None
        for i, header in enumerate(headers):
            if 'time' in header.lower():
                timestamp_col = i
            if 'contact' in header.lower():
                contact_col = i
        
        if timestamp_col is None or contact_col is None:
            raise ValueError("CSV file must contain timestamp and contact columns")
        
        # Read data
        for row in reader:
            timestamp = row[timestamp_col]
            contact = int(row[contact_col])
            contact_data[timestamp] = contact
    
    # Get all jpg images
    image_files = [f for f in os.listdir(images_dir) if f.lower().endswith('.jpg')]
    
    # Extract timestamps from image filenames
    image_timestamps = {}
    for img_file in image_files:
        match = re.search(r'img_(\d+(?:\.\d+)?)', img_file)
        if match:
            timestamp = match.group(1)
            image_timestamps[img_file] = timestamp
    
    # Associate images with contact data
    image_contact = {}
    for img_file, timestamp in image_timestamps.items():
        if timestamp in contact_data:
            image_contact[img_file] = contact_data[timestamp]
        else:
            # Try to find closest timestamp
            closest_ts = min(contact_data.keys(), key=lambda x: abs(float(x) - float(timestamp)))
            image_contact[img_file] = contact_data[closest_ts]
    
    # Sort images by timestamp
    sorted_images = sorted(image_timestamps.keys(), key=lambda x: float(image_timestamps[x]))
    
    # Group images into trials
    trials = defaultdict(list)
    trial_num = 0
    in_trial = False
    
    for img in sorted_images:
        contact = image_contact[img]
        
        # Start of a new trial: transition from no contact to contact
        if not in_trial and contact == 0:
            # We've found the beginning of a potential trial (no contact)
            in_trial = True
            trial_num += 1
            trials[trial_num].append(img)
        elif in_trial:
            trials[trial_num].append(img)
            
            # If we see contact, mark the end of the beginning phase
            if contact == 1:
                # We've entered the contact phase, so we're done with this trial
                in_trial = False
    
    # Remove incomplete trials (trials that didn't transition to contact)
    complete_trials = {}
    for trial_id, images in trials.items():
        # Check if any image in this trial has contact=1
        has_contact = any(image_contact[img] == 1 for img in images)
        if has_contact:
            complete_trials[trial_id] = images
    
    # Save the data to NPZ file if requested
    if save_npz:
        save_trials_to_npz(complete_trials, image_contact, images_dir, npz_output_path)
    
    return complete_trials, len(complete_trials)

def save_trials_to_npz(trials, image_contact, images_dir, output_path=None):
    """
    Save trial data to NPZ file.
    
    Args:
        trials (dict): Dictionary mapping trial IDs to lists of image filenames
        image_contact (dict): Dictionary mapping image filenames to contact values
        images_dir (str): Directory containing the images
        output_path (str): Path to save the NPZ file. If None, saves in the images_dir.
    
    Returns:
        str: Path to the saved NPZ file
    """
    if output_path is None:
        # Default output path is in the same directory as images
        output_path = os.path.join(images_dir, "trial_data.npz")
    
    # Create directory if it doesn't exist
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    # Dictionary to hold all data for NPZ file
    npz_data = {}
    
    # Process each trial
    for trial_id, image_files in trials.items():
        # Create arrays for this trial
        trial_images = []
        trial_contacts = []
        
        # Load each image and associated contact value
        for img_file in image_files:
            # Load image
            img_path = os.path.join(images_dir, img_file)
            img = cv2.imread(img_path)
            
            if img is not None:
                # Convert to grayscale if needed
                # img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
                
                # Add to arrays
                trial_images.append(img)
                trial_contacts.append(image_contact[img_file])
        
        if trial_images:
            # Convert to numpy arrays
            trial_images = np.array(trial_images)
            trial_contacts = np.array(trial_contacts)
            
            # Add to NPZ data dictionary
            npz_data[f'trial_{trial_id}_images'] = trial_images
            npz_data[f'trial_{trial_id}_contacts'] = trial_contacts
            npz_data[f'trial_{trial_id}_filenames'] = np.array(image_files, dtype=object)
    
    # Add trial ID list for easier access
    npz_data['trial_ids'] = np.array(list(trials.keys()))
    
    # Save to NPZ file
    np.savez_compressed(output_path, **npz_data)
    print(f"Saved trial data to {output_path}")
    
    return output_path

trials, num_trials = process_trials(images_dir, csv_path, save_npz=True, npz_output_path=npz_output_path)

print(f"Total number of trials: {num_trials}")

# Print details of first few trials
for trial_id in list(trials.keys())[:3]:
    print(f"\nTrial {trial_id}:")
    print(f"  Number of images: {len(trials[trial_id])}")
    print(f"  First few images: {trials[trial_id][:3]}")

# Example of how to load and use the NPZ file
def load_trial_data(npz_path):
    """Load trial data from NPZ file."""
    data = np.load(npz_path, allow_pickle=True)
    
    # Print available arrays in the NPZ file
    print(f"Available data in the NPZ file: {list(data.keys())}")
    
    # Get trial IDs
    trial_ids = data['trial_ids']
    print(f"Trial IDs: {trial_ids}")
    
    # Example: Access data for the first trial
    if len(trial_ids) > 0:
        first_trial_id = trial_ids[0]
        print(f"\nFirst trial (ID: {first_trial_id}):")
        print(f"  Images shape: {data[f'trial_{first_trial_id}_images'].shape}")
        print(f"  Contact values shape: {data[f'trial_{first_trial_id}_contacts'].shape}")
        print(f"  Number of filenames: {len(data[f'trial_{first_trial_id}_filenames'])}")
    
    return data

# Uncomment to test loading the NPZ file


Saved trial data to /Users/ankushdhawan/Documents/Stanford/Pupper/tactile_gait/pupper_tactile_gait/classification/data_processing/dataset/0 degrees_new/0-felt_pad/npz_files/0-felt_pad_img_data.npz
Total number of trials: 10

Trial 1:
  Number of images: 7
  First few images: ['img_1741463013.787980.jpg', 'img_1741463014.120948.jpg', 'img_1741463014.453839.jpg']

Trial 2:
  Number of images: 10
  First few images: ['img_1741463017.787622.jpg', 'img_1741463018.121514.jpg', 'img_1741463018.454335.jpg']

Trial 3:
  Number of images: 9
  First few images: ['img_1741463022.787478.jpg', 'img_1741463023.121774.jpg', 'img_1741463023.453768.jpg']


In [25]:
trial_data = load_trial_data(npz_output_path)

Available data in the NPZ file: ['trial_1_images', 'trial_1_contacts', 'trial_1_filenames', 'trial_2_images', 'trial_2_contacts', 'trial_2_filenames', 'trial_3_images', 'trial_3_contacts', 'trial_3_filenames', 'trial_4_images', 'trial_4_contacts', 'trial_4_filenames', 'trial_5_images', 'trial_5_contacts', 'trial_5_filenames', 'trial_6_images', 'trial_6_contacts', 'trial_6_filenames', 'trial_7_images', 'trial_7_contacts', 'trial_7_filenames', 'trial_8_images', 'trial_8_contacts', 'trial_8_filenames', 'trial_9_images', 'trial_9_contacts', 'trial_9_filenames', 'trial_10_images', 'trial_10_contacts', 'trial_10_filenames', 'trial_ids']
Trial IDs: [ 1  2  3  4  5  6  7  8  9 10]

First trial (ID: 1):
  Images shape: (7, 240, 320, 3)
  Contact values shape: (7,)
  Number of filenames: 7
