### Task: Using this sample dataset, create or optimize data preprocessing pipeline using opencv and pytorch and include image mask technique in the pipeline and save the output.

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# data_path = '/content/drive/MyDrive/AI_Vision_Extract_Nov25/data (1)/train2017'

In [None]:
import os

# List the contents of the data_path directory
print(f"Contents of {data_path}:")
for item in os.listdir(data_path):
    print(f"- {item}")
images_dir = os.path.join(data_path, 'val2017') # Common COCO image directory name
annotations_dir = os.path.join(data_path, 'annotations') # Common COCO annotations directory name


print(f"\nProposed image directory path: {images_dir}")
print(f"Proposed annotations directory path: {annotations_dir}")

Contents of /content/drive/MyDrive/AI_Vision_Extract_Nov25/data (1)/train2017:


FileNotFoundError: [Errno 2] No such file or directory: '/content/drive/MyDrive/AI_Vision_Extract_Nov25/data (1)/train2017'

In [None]:
import os

# User provided parent directory
parent_data_path = '/content/drive/MyDrive/AI_Vision_Extract_Nov25'

print(f"Contents of {parent_data_path}:")
if os.path.exists(parent_data_path):
    for item in os.listdir(parent_data_path):
        print(f"- {item}")
else:
    print(f"Error: The path '{parent_data_path}' does not exist. Please double-check the path.")

In [None]:
import os
import glob

# Correct images_dir to point directly to data_path as images are listed there.
images_dir = data_path
print(f"Updated image directory path: {images_dir}")

# Identify image files
image_files = glob.glob(os.path.join(images_dir, '*.jpg')) # Assuming JPG images based on previous output
print(f"Found {len(image_files)} image files.")

if image_files:
    sample_image_path = image_files[0]
    print(f"Sample image file: {sample_image_path}")
else:
    sample_image_path = None
    print("No image files found in the specified path.")

# Now, let's check for COCO annotation files.
# Common COCO annotation filenames are like 'instances_val2017.json', 'captions_val2017.json', etc.
# We'll look for any .json file in data_path or a potential 'annotations' subdirectory if it existed.

# Check in data_path directly
annotation_files_in_data_path = glob.glob(os.path.join(data_path, '*.json'))
print(f"Found {len(annotation_files_in_data_path)} JSON annotation files in {data_path}: {annotation_files_in_data_path}")

# Check in the *assumed* annotations_dir, even if we know it doesn't exist based on previous listing,
# just to be thorough and inform the user.
assumed_annotations_dir = os.path.join(data_path, 'annotations')
annotation_files_in_assumed_dir = glob.glob(os.path.join(assumed_annotations_dir, '*.json'))
print(f"Found {len(annotation_files_in_assumed_dir)} JSON annotation files in {assumed_annotations_dir}: {annotation_files_in_assumed_dir}")

# Based on the output, it's highly likely that a COCO annotation file is not present
# in the immediate data_path or a standard 'annotations' subfolder.
if not annotation_files_in_data_path and not annotation_files_in_assumed_dir:
    coco_annotation_file = None
    print("\nWarning: No COCO-like JSON annotation files found in the provided data_path or a standard 'annotations' subdirectory.")
    print("Please ensure the COCO annotation file (e.g., instances_val2017.json) is present or specify its correct path.")
else:
    # If there are multiple, we need to pick the right one, usually 'instances_*.json' for object detection/segmentation
    # For now, let's just take the first one if found
    if annotation_files_in_data_path:
        coco_annotation_file = annotation_files_in_data_path[0]
    else:
        coco_annotation_file = annotation_files_in_assumed_dir[0]
    print(f"Proposed COCO annotation file: {coco_annotation_file}")

### Update `data_path` to your new image directory

Edit the `data_path` variable in the cell below to point to the new Google Drive folder where your desired images are located. After updating, please run this cell and then continue executing the notebook cells from the beginning of the 'Inspect and Load Dataset' subtask onwards to apply the new images to the pipeline.

In [None]:
# IMPORTANT: Replace the path below with the path to your new image directory in Google Drive.
# Example: data_path = '/content/drive/MyDrive/MyNewImages'
data_path = '/content/drive/MyDrive/AI_Vision_Extract_Nov25/data (1)/train2017'

print(f"Data path updated to: {data_path}")

# You would typically re-run the notebook from the 'Inspect and Load Dataset' section
# to load images from this new path.

**Reasoning**:
The previous steps have confirmed the location of image files and the absence of COCO annotation files. Now, following the instructions, I will load and display a sample image using `cv2` and `matplotlib.pyplot`. I will also explicitly handle the case where no COCO annotation file was found.



In [None]:
import cv2
import matplotlib.pyplot as plt
import json # Although no JSON file was found, it's good to import as per instructions

print(f"Attempting to load sample image: {sample_image_path}")

if sample_image_path and os.path.exists(sample_image_path):
    # Load the image using OpenCV
    image = cv2.imread(sample_image_path)

    # OpenCV loads images in BGR format, convert to RGB for matplotlib display
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Display the image
    plt.figure(figsize=(10, 8))
    plt.imshow(image_rgb)
    plt.title(f"Sample Image: {os.path.basename(sample_image_path)}")
    plt.axis('off') # Hide axes ticks
    plt.show()
    print("Sample image loaded and displayed successfully.")
else:
    print(f"Error: Sample image file not found at {sample_image_path}.")

# Handle COCO annotations
if coco_annotation_file:
    print(f"Attempting to load COCO annotations from: {coco_annotation_file}")
    try:
        with open(coco_annotation_file, 'r') as f:
            coco_data = json.load(f)
        print("COCO annotation file loaded successfully. Displaying first few keys:")
        # Display some high-level keys to understand structure
        for key, value in coco_data.items():
            print(f"- {key}: {type(value)}")
            if isinstance(value, list) and len(value) > 0:
                print(f"  First element type: {type(value[0])}")
                print(f"  Length: {len(value)}")
                if len(value) > 0 and isinstance(value[0], dict):
                    print(f"  First element keys: {list(value[0].keys())}")
            elif isinstance(value, dict):
                print(f"  Keys: {list(value.keys())}")
    except Exception as e:
        print(f"Error loading COCO annotation file: {e}")
        coco_data = None
else:
    print("\nSkipping COCO annotation loading as no annotation file was found.")
    coco_data = None


## Implement Image and Mask Loading/Generation

### Subtask:
Develop a function to load an image using OpenCV and generate its corresponding binary segmentation mask from the COCO annotations using `pycocotools`.


**Reasoning**:
The previous steps confirmed the absence of COCO annotation files. To proceed with the subtask's objective of demonstrating the image mask technique, I will now implement a function to load an image and generate a placeholder binary mask, as instructed. This function will load the image, convert it to RGB, and create an all-white mask of the same dimensions. Finally, I will test this function and display both the image and the placeholder mask.



In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os

def load_image_and_placeholder_mask(image_path):
    """
    Loads an image using OpenCV and generates a placeholder binary mask.

    Args:
        image_path (str): The full path to the image file.

    Returns:
        tuple: A tuple containing:
            - image_rgb (np.ndarray): The loaded image in RGB format.
            - placeholder_mask (np.ndarray): A binary placeholder mask of the same dimensions.
    """
    if not os.path.exists(image_path):
        raise FileNotFoundError(f"Image file not found at: {image_path}")

    # Load the image using OpenCV
    image = cv2.imread(image_path)

    if image is None:
        raise ValueError(f"Could not load image from: {image_path}")

    # Convert image from BGR to RGB format
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Get image dimensions (height, width)
    h, w, _ = image_rgb.shape

    # Create an all-white placeholder binary mask (all pixels are 1)
    # The mask should have the same height and width as the image, and be of type uint8
    placeholder_mask = np.ones((h, w), dtype=np.uint8) * 255 # All white mask

    return image_rgb, placeholder_mask

# Test the function with a few images from image_files
print(f"Testing load_image_and_placeholder_mask with a few sample images:")

num_test_images = min(3, len(image_files)) # Test with up to 3 different images

plt.figure(figsize=(12, 6 * num_test_images)) # Adjust figure size for multiple rows

for i in range(num_test_images):
    current_image_path = image_files[i]
    print(f"\n--- Testing with: {current_image_path} ---")
    try:
        loaded_image_rgb_test, generated_mask_test = load_image_and_placeholder_mask(current_image_path)

        # Display the loaded image and the generated placeholder mask
        plt.subplot(num_test_images, 2, 2 * i + 1)
        plt.imshow(loaded_image_rgb_test)
        plt.title(f"Loaded Image: {os.path.basename(current_image_path)}")
        plt.axis('off')

        plt.subplot(num_test_images, 2, 2 * i + 2)
        plt.imshow(generated_mask_test, cmap='gray')
        plt.title("Placeholder Mask")
        plt.axis('off')

    except (FileNotFoundError, ValueError) as e:
        print(f"Error during function test for {current_image_path}: {e}")

plt.tight_layout()
plt.show()
print("Function tested successfully with multiple images.")

# Re-run for the first image to retain original `loaded_image_rgb` and `generated_mask`
# variables for consistency with subsequent cells in the notebook.
loaded_image_rgb, generated_mask = load_image_and_placeholder_mask(image_files[0])

## Define Preprocessing Transforms (OpenCV & PyTorch)

### Subtask:
Define a set of preprocessing transformations including resizing images and masks, normalization for images, and conversion to PyTorch tensors. Ensure that mask resizing uses appropriate interpolation (e.g., nearest-neighbor) to preserve boundaries.


**Reasoning**:
I need to define a custom preprocessing transform class that resizes images and masks, converts them to PyTorch tensors, and normalizes the image. I will follow the instructions to implement the `ImageMaskTransforms` class, instantiate it, and then test it with the previously loaded image and generated mask, printing their shapes and data types to verify the transformations.



In [None]:
import torch
import torchvision.transforms as T
import cv2
import numpy as np

# 2. Define a target size for resizing images and masks
target_size = (256, 256)

# 3. Create a custom transform class (e.g., ImageMaskTransforms)
class ImageMaskTransforms:
    def __init__(self, target_size, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)):
        self.target_size = target_size
        self.normalize = T.Normalize(mean=mean, std=std)
        self.to_tensor = T.ToTensor() # Converts PIL Image or numpy.ndarray (H x W x C) to a float tensor (C x H x W) and scales pixel values to [0.0, 1.0]

    def __call__(self, image, mask):
        # a. Resize the input image using cv2.resize()
        # cv2 expects (width, height) for target size
        resized_image = cv2.resize(image, self.target_size, interpolation=cv2.INTER_LINEAR)

        # b. Resize the input mask using cv2.resize() with cv2.INTER_NEAREST
        # Ensure mask remains single channel and binary (0 or 1)
        resized_mask = cv2.resize(mask, self.target_size, interpolation=cv2.INTER_NEAREST)
        # Ensure mask values are binary (0 or 1) after resizing if they were 0 or 255
        resized_mask = (resized_mask > 0).astype(np.uint8) # Convert to 0/1 if it was 0/255

        # c. Convert both the resized image and mask into PyTorch tensors
        image_tensor = self.to_tensor(resized_image) # Already scales to [0, 1]
        mask_tensor = torch.from_numpy(resized_mask).float() # Convert to float tensor

        # Masks typically have shape (H, W), add a channel dimension to make it (C, H, W) for consistency if needed,
        # but for segmentation masks, (H, W) or (1, H, W) is common.
        # Let's keep it (H, W) for now, as it's a binary mask.
        # If it were a multi-class mask, we might want (1, H, W).
        # For now, ensure it's (H, W) or (B, H, W) for batch, let's keep (H, W) as output.
        if mask_tensor.dim() == 2: # if it's (H, W)
            mask_tensor = mask_tensor.unsqueeze(0) # Make it (1, H, W)

        # d. Apply normalization to the image tensor
        normalized_image_tensor = self.normalize(image_tensor)

        # e. Return the transformed image tensor and mask tensor
        return normalized_image_tensor, mask_tensor

# 5. Instantiate your custom ImageMaskTransforms class
transforms = ImageMaskTransforms(target_size=target_size)

# 6. Test the transformations with loaded_image_rgb and generated_mask
print(f"Original image shape: {loaded_image_rgb.shape}, dtype: {loaded_image_rgb.dtype}")
print(f"Original mask shape: {generated_mask.shape}, dtype: {generated_mask.dtype}")

transformed_image, transformed_mask = transforms(loaded_image_rgb, generated_mask)

print(f"\nTransformed image tensor shape: {transformed_image.shape}, dtype: {transformed_image.dtype}")
print(f"Transformed mask tensor shape: {transformed_mask.shape}, dtype: {transformed_mask.dtype}")
print(f"Transformed image tensor min/max: {transformed_image.min():.4f}/{transformed_image.max():.4f}")
print(f"Transformed mask tensor min/max: {transformed_mask.min():.4f}/{transformed_mask.max():.4f}")

## Build PyTorch Dataset and DataLoader

### Subtask:
Create a custom PyTorch `Dataset` class that integrates the image/mask loading and preprocessing steps. Then, instantiate a `DataLoader` to efficiently handle batching and loading of the processed data.


**Reasoning**:
I need to implement the `ImageMaskDataset` class, its `__len__` and `__getitem__` methods, instantiate it with the existing `image_files` and `transforms`, and then create a `DataLoader`. Finally, I will verify the output by fetching a batch and printing the shapes of the image and mask tensors.



In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

# 1. Define a custom PyTorch Dataset class named ImageMaskDataset
class ImageMaskDataset(Dataset):
    def __init__(self, image_paths, transform=None):
        self.image_paths = image_paths
        self.transform = transform

    # 2. Implement the __len__ method
    def __len__(self):
        return len(self.image_paths)

    # 3. Implement the __getitem__ method
    def __getitem__(self, idx):
        # a. Retrieve the image path for the given index.
        image_path = self.image_paths[idx]

        # b. Use the previously defined load_image_and_placeholder_mask function.
        # This function loads the image in RGB and returns an all-white placeholder mask.
        image_rgb, placeholder_mask = load_image_and_placeholder_mask(image_path)

        # c. Apply the transform object to the loaded image and mask.
        if self.transform:
            transformed_image, transformed_mask = self.transform(image_rgb, placeholder_mask)
        else:
            # If no transform is provided, convert to tensor manually (assuming no normalization)
            transformed_image = T.ToTensor()(image_rgb)
            transformed_mask = torch.from_numpy(placeholder_mask).float().unsqueeze(0) # Add channel dim

        # d. Return the transformed image and mask tensors.
        return transformed_image, transformed_mask

# 4. Create an instance of your ImageMaskDataset
# `image_files` and `transforms` were defined in previous steps.
# Ensure `load_image_and_placeholder_mask` is accessible (it was defined globally earlier).
image_mask_dataset = ImageMaskDataset(image_paths=image_files, transform=transforms)

print(f"Number of items in the dataset: {len(image_mask_dataset)}")

# 5. Instantiate a torch.utils.data.DataLoader
batch_size = 4 # Or 8, as suggested
# num_workers = 2 # Set to 0 for simpler debugging, or >0 for faster loading in production

# Check if running in Colab to adjust num_workers
# Using 0 workers for simplicity to avoid issues with multiprocessing on Colab/Jupyter
# Unless specifically required and environment is stable, 0 is often safer for debugging.
# num_workers = os.cpu_count() // 2 if os.cpu_count() else 0 # A safer way to set num_workers
num_workers = 0 # Explicitly setting to 0 for robust execution in varied environments

data_loader = DataLoader(image_mask_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)

print(f"DataLoader created with batch_size={batch_size} and num_workers={num_workers}")

# 6. Iterate once through the DataLoader to fetch a single batch and print shapes
try:
    for images_batch, masks_batch in data_loader:
        print(f"\nShape of image batch: {images_batch.shape}")
        print(f"Shape of mask batch: {masks_batch.shape}")
        break # Only take one batch for verification
    print("DataLoader batch successfully fetched and shapes verified.")
except Exception as e:
    print(f"Error fetching batch from DataLoader: {e}")

## Test Pipeline and Save Output

### Subtask:
Test the constructed `DataLoader` by fetching a batch of processed images and masks. Visualize a few samples to verify the correctness of the preprocessing and masking. Save a few processed image and mask pairs to a specified output directory to demonstrate the pipeline's output.


**Reasoning**:
I need to implement the instructions to test the pipeline by fetching a batch from the `DataLoader`, visualize a few samples, and save them to a new output directory. This involves denormalization, tensor-to-numpy conversions, and file operations using `cv2` and `matplotlib`.



In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import cv2
import torch

# 1. Fetch a batch of processed images and masks from the data_loader
# Iterate once to get the first batch
images_batch, masks_batch = next(iter(data_loader))

print(f"Fetched batch - Images shape: {images_batch.shape}, Masks shape: {masks_batch.shape}")

# 2. Create an output directory named 'processed_output'
output_dir = 'processed_output'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    print(f"Created output directory: {output_dir}")
else:
    print(f"Output directory '{output_dir}' already exists.")

# Get mean and std used for normalization from the transforms object
# The mean and std are stored in the normalize attribute of the transforms object
mean = torch.tensor(transforms.normalize.mean).view(3, 1, 1) # Reshape for broadcasting (C, 1, 1)
std = torch.tensor(transforms.normalize.std).view(3, 1, 1)   # Reshape for broadcasting (C, 1, 1)

# 3. Iterate through a few samples (e.g., 2-3) in the fetched batch
num_samples_to_show = min(3, images_batch.shape[0]) # Show up to 3 samples or less if batch is smaller

plt.figure(figsize=(15, 5 * num_samples_to_show))

for i in range(num_samples_to_show):
    # Get individual image and mask from batch
    image_tensor = images_batch[i]
    mask_tensor = masks_batch[i]

    # a. Denormalize the image tensor
    # (C, H, W) -> (C, H, W)
    denormalized_image = image_tensor * std + mean
    # Clamp values to [0, 1] in case of slight over/underflow due to floating point operations
    denormalized_image = torch.clamp(denormalized_image, 0, 1)

    # b. Convert denormalized image tensor to NumPy array (H, W, C) and scale to [0, 255]
    # Move channel to last dimension and convert to uint8
    # (C, H, W) -> (H, W, C)
    image_np = (denormalized_image.permute(1, 2, 0).numpy() * 255).astype(np.uint8)

    # c. Convert mask tensor to NumPy array, scale to [0, 255] and change dtype to uint8
    # Mask tensor is (1, H, W), remove channel dimension, scale to [0, 255] if values are 0/1
    mask_np = (mask_tensor.squeeze(0).numpy() * 255).astype(np.uint8)

    # d. Display the original (denormalized) image and its corresponding mask side-by-side
    plt.subplot(num_samples_to_show, 2, 2 * i + 1)
    plt.imshow(image_np)
    plt.title(f"Processed Image {i+1}")
    plt.axis('off')

    plt.subplot(num_samples_to_show, 2, 2 * i + 2)
    plt.imshow(mask_np, cmap='gray') # Use gray colormap for binary masks
    plt.title(f"Processed Mask {i+1}")
    plt.axis('off')

    # e. Save the processed image and mask to the 'processed_output' directory
    image_filename = os.path.join(output_dir, f"processed_image_{i+1}.jpg")
    mask_filename = os.path.join(output_dir, f"processed_mask_{i+1}.png")

    # For saving with cv2, convert RGB (image_np) to BGR
    cv2.imwrite(image_filename, cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR))
    cv2.imwrite(mask_filename, mask_np) # Masks are typically grayscale, no BGR conversion needed

    print(f"Saved {image_filename} and {mask_filename}")

plt.tight_layout()
plt.show()

print(f"Visualization and saving complete for {num_samples_to_show} samples.")

## Final Task

### Subtask:
Summarize the created data preprocessing pipeline, highlighting the use of OpenCV for image manipulation, PyTorch for tensor conversion, and the image mask technique, and discuss how the output is saved.


## Summary:

### Q&A
The created data preprocessing pipeline effectively handles image manipulation using OpenCV, converts data to PyTorch tensors, incorporates an image mask technique (though a placeholder in this instance), and saves the processed output as follows:

1.  **OpenCV for Image Manipulation**: OpenCV (`cv2`) was used to load images in BGR format, which were then converted to RGB. It was also utilized for resizing both images and masks to a target size of `(256, 256)`. For images, `cv2.INTER_LINEAR` interpolation was applied, and for masks, `cv2.INTER_NEAREST` was used to preserve sharp boundaries.
2.  **PyTorch for Tensor Conversion**: Images and masks were converted into PyTorch tensors using `torchvision.transforms.ToTensor()` for images (which also scales pixel values to \[0.0, 1.0]) and `torch.from_numpy()` for masks. Image tensors were further normalized using predefined mean and standard deviation values. Masks were given an additional channel dimension, resulting in shapes like `torch.Size([3, 256, 256])` for images and `torch.Size([1, 256, 256])` for masks.
3.  **Image Mask Technique**: Due to the absence of COCO annotation files in the provided dataset, a placeholder binary mask (an all-white mask with values of 255, later converted to 1.0 in tensor form) was generated for each image. This demonstrated the pipeline's capability to handle masks, even if they were not derived from actual annotations in this specific execution.
4.  **Output Saving**: After processing and visualization, the pipeline saved a few samples of the processed images and their corresponding masks. Images were denormalized, converted back to NumPy arrays, and saved as JPEG files (e.g., `processed_image_1.jpg`) in RGB format (converted to BGR for `cv2.imwrite`). Masks were converted to NumPy arrays and saved as PNG files (e.g., `processed_mask_1.png`) in grayscale within a newly created `processed_output` directory.

### Data Analysis Key Findings
*   The `COCO2017_SAMPLE` dataset contained image files (e.g., `image_0.jpg`) directly in the root directory `/content/drive/MyDrive/AI_Vision_Extract_Nov25/data (1)/COCO2017_SAMPLE`, not in typical `val2017` subdirectories.
*   No COCO-like JSON annotation files were found in the dataset, leading to the use of a placeholder all-white binary mask for demonstration.
*   A custom `ImageMaskTransforms` class successfully implemented resizing (images via `cv2.INTER_LINEAR`, masks via `cv2.INTER_NEAREST` to a target size of `(256, 256)`), conversion to PyTorch tensors (`torch.Size([3, 256, 256])` for images, `torch.Size([1, 256, 256])` for masks), and image normalization.
*   A custom `ImageMaskDataset` and `DataLoader` were successfully created and verified, efficiently handling batching of processed images and placeholder masks, with batch shapes `torch.Size([4, 3, 256, 256])` for images and `torch.Size([4, 1, 256, 256])` for masks.
*   The pipeline successfully saved processed images as `.jpg` and masks as `.png` to a `processed_output` directory, demonstrating its ability to persist the transformed data.

### Insights or Next Steps
*   To make the pipeline fully functional for segmentation tasks, the primary next step is to obtain or generate actual COCO-formatted annotation files for the `COCO2017_SAMPLE` dataset. This would allow the `load_image_and_placeholder_mask` function to be extended to load real segmentation masks using `pycocotools`.
*   Consider adding more advanced data augmentation techniques (e.g., random flips, rotations, color jitter) within the `ImageMaskTransforms` class to improve model robustness, ensuring masks are transformed consistently with images.
