<a href="https://colab.research.google.com/github/Morsalah/M.Sc-Research-HRI-using-DIGIT-tactile-sensor/blob/main/capture_image_manu.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import time
import logging
import csv
from digit_interface.digit import Digit

# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# Create directories if they don't exist
def create_directory(path: str):
    if not os.path.exists(path):
        os.makedirs(path)
        logger.info(f"Directory {path} created.")
    else:
        logger.info(f"Directory {path} already exists.")

# Capture and save images with labels
def capture_and_save_images(serial: str, save_path: str, num_images: int = 100, interval: float = 1.0):
    yes_path = os.path.join(save_path, "YES")
    no_path = os.path.join(save_path, "NO")
    create_directory(yes_path)
    create_directory(no_path)

    # Create or open CSV file for labeling
    csv_filename = os.path.join(save_path, "labels.csv")
    csv_exists = os.path.exists(csv_filename)

    with open(csv_filename, mode="a", newline="") as file:
        writer = csv.writer(file)
        if not csv_exists:
            writer.writerow(["filename", "label"])  # Add header if file is new

        # Connect to the DIGIT device
        digit = Digit(serial)
        digit.connect()

        yes_count, no_count = 0, 0

        for i in range(num_images):
            try:
                # Ask user for label (touch or no touch)
                label = input("Is this a 'YES' (touch) or 'NO' (no touch) image? Type 'yes' or 'no': ").strip().lower()
                while label not in ["y", "n"]:
                    print("Invalid input! Please type 'yes' or 'no'.")
                    label = input("Is this a 'YES' (touch) or 'NO' (no touch) image? ").strip().lower()

                if label == "y":
                    save_folder = yes_path
                    yes_count += 1
                else:
                    save_folder = no_path
                    no_count += 1

                # Generate a timestamped filename
                timestamp = time.strftime("%Y%m%d_%H%M%S")
                image_filename = f"{save_folder}/digit_{serial}_{timestamp}_{i+1}.png"

                # Capture and save image
                digit.save_frame(image_filename)
                logger.info(f"Image saved: {image_filename}")

                # Log filename and label in CSV
                writer.writerow([image_filename, label])

                # Stop if we've collected 50 images per category
                if yes_count >= num_images // 2 and no_count >= num_images // 2:
                    print("Collected required number of images for both categories.")
                    break

                # Wait for the specified interval
                time.sleep(interval)

            except Exception as e:
                logger.error(f"Error capturing image {i+1}: {e}")

        # Disconnect the device
        digit.disconnect()

if __name__ == "__main__":
    # Define parameters
    serial_number = "D21115"
    save_directory = "./captured_images"
    total_images = 100  # 50 YES and 50 NO images

    # Start capturing images
    capture_and_save_images(serial_number, save_directory, num_images=total_images)


In [None]:
import os
import time
import logging
import boto3  # AWS SDK for Python (for cloud storage upload)
from digit_interface.digit import Digit
from digit_interface.digit_handler import DigitHandler
from botocore.exceptions import NoCredentialsError

# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# Function to create a directory for saving images if it doesn't exist
def create_directory(path: str):
    if not os.path.exists(path):
        os.makedirs(path)
        logger.info(f"Directory {path} created.")
    else:
        logger.info(f"Directory {path} already exists.")

# Function to upload a file to S3

def upload_to_s3(file_path: str, bucket_name: str, object_name: str = None):
    s3_client = boto3.client("s3")
    if object_name is None:
        object_name = os.path.basename(file_path)
    try:
        s3_client.upload_file(file_path, bucket_name, object_name)
        logger.info(f"Uploaded {file_path} to S3 bucket {bucket_name} as {object_name}.")
    except NoCredentialsError:
        logger.error("AWS credentials not found.")
    except Exception as e:
        logger.error(f"Error uploading {file_path} to S3: {e}")

# Function to capture, save, and upload images
def capture_and_save_images(serial: str, save_path: str, bucket_name: str, num_images: int = 10, interval: float = 1.0, img_format="png"):
    create_directory(save_path)
    digit = Digit(serial)
    digit.connect()

    for i in range(num_images):
        try:
            timestamp = time.strftime("%Y%m%d_%H%M%S")
            image_filename = f"{save_path}/digit_{serial}_{timestamp}_{i+1}.{img_format}"
            digit.save_frame(image_filename)
            logger.info(f"Image saved: {image_filename}")

            # Upload to cloud storage
            upload_to_s3(image_filename, bucket_name)

            time.sleep(interval)

        except Exception as e:
            logger.error(f"Error capturing image {i+1}: {e}")

    digit.disconnect()

if __name__ == "__main__":
    serial_number = "D21115"
    save_directory = "./captured_images"
    s3_bucket_name = "your-s3-bucket-name"  # Replace with your actual S3 bucket name

    capture_and_save_images(serial_number, save_directory, s3_bucket_name)
