# Image Labelling and Preprocessing

This notebook handles the **image labeling** and **preprocessing** steps for the Malnutrition Classification Project.
It assumes the dataset consists of images corresponding to different children in multiple poses (e.g., frontal, lateral, back, etc.).
The goal is to map each image to its label (malnourished / not malnourished) and prepare it for model training.


In [1]:
# Import the necessary packages
import os
import re
import argparse
from pathlib import Path
import random
import json

import pandas as pd
import numpy as np
from PIL import Image
from tqdm import tqdm

try:
    import cv2
except Exception:
    cv2 = None

try:
    import mediapipe as mp
except Exception:
    mp = None

try:
    import albumentations as A
    from albumentations.pytorch import ToTensorV2
except Exception:
    A = None

from sklearn.model_selection import train_test_split

  from .autonotebook import tqdm as notebook_tqdm


# Utilities : 

In [2]:
# Compiles a regular expression (regex) pattern into an object. Regex is used to match strings that follow a certain pattern.
# (?P<index>\d+) → Named group index. \d+ means one or more digits. For example, 0000 will be captured as index.
FILENAME_RE = re.compile(r"^(?P<index>\d+)[_](?P<child>\d+).*[_](?P<pose>[^.]+)\.(?:jpg|jpeg|png)$", re.IGNORECASE)


# These are standard mean and standard deviation values used when normalizing images for models trained on ImageNet.
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]


# Extract index, child_id and pose from filename. Returns (index, child_id, pose) or (None, None, None)
def parse_filename(fname: str):
    # Tries to match the filename with the regex we defined
    m = FILENAME_RE.match(fname)
    if m:
        return m.group('index'), m.group('child'), m.group('pose')
    # fallback: split(stem->file name without extension) and try
    parts = Path(fname).stem.split('_')
    if len(parts) >= 3:
        return parts[0], parts[1], parts[-1]
    return None, None, None


# images_dir: Path → The folder where all images are stored as argument to function
def find_images(images_dir: Path):
    # Stores image info
    imgs = []
    # Recursively search all files in images_dir and subfolders
    for p in images_dir.rglob('*'):
        if p.suffix.lower() in ('.jpg', '.jpeg', '.png') and p.is_file():
            idx, child, pose = parse_filename(p.name)
            # Skip or include with unknown child
            if child is None:
                continue
            imgs.append({'path': str(p), 'index': idx, 'child_id': str(int(child)), 'pose': pose.lower()})
    # Get a structured table of all images and their metadata
    return pd.DataFrame(imgs)


# Function returns a clean DataFrame mapping each child ID to its label, ready to merge with image metadata
def read_labels_csv(labels_csv: Path):

    df = pd.read_csv(labels_csv)

    # Use the correct columns based on your CSV headers
    child_col = 'tag'
    label_col = 'binary_label'

    # Select only relevant columns and rename for standardization
    labels = df[[child_col, label_col]].copy()
    labels.columns = ['child_id', 'label']

    # Ensure child IDs are strings and remove any '.0' if present
    labels['child_id'] = labels['child_id'].astype(str).str.replace(r'\.0$', '', regex=True)

    # Remove rows with missing labels
    labels = labels[labels['label'].notnull()]

    # Drop duplicates in case the same child appears multiple times
    return labels.drop_duplicates(subset=['child_id'])


# MediaPipe / Keypoints & BBoxes

In [3]:
# Keypoints (also called landmarks) are specific, meaningful points on a human body. Used for Body measurements
# For example: head, shoulders, elbows, wrists, hips, knees, ankles, etc.
# Each keypoint usually has x, y coordinates in the image, and sometimes a visibility score (how confident the model is that the point is visible).

# A bounding box is a rectangle that tightly encloses the object of interest (in our case, the child).

# The following code deals with extracting body keypoints and bounding boxes using MediaPipe, a library for pose estimation

class PoseExtractor:

    def __init__(self, use_mediapipe=True):
        # instance variable that stores whether we should use MediaPipe(imported as mp) or not
        self.use_mediapipe = use_mediapipe and (mp is not None)
        if self.use_mediapipe:
            # gives access to the pre-trained pose detection model
            self.mp_pose = mp.solutions.pose
            # tells that we are using static images and sensitivity of pose detection(Lower = more sensitive, Higher = more precise)
            self.pose = self.mp_pose.Pose(static_image_mode=True, min_detection_confidence=0.4)


    # Argument to method : the image we want to process, represented as a NumPy array in BGR format (OpenCV standard)
    # Returns dict with 'keypoints' (landmark_name -> (x,y,visibility)) and 'bbox' (x_min,y_min,x_max,y_max) or None if not found
    def extract(self, image_bgr: np.ndarray):
        
        # store the width and height
        h, w = image_bgr.shape[:2]
        
        # Dictionary which stores keypoints and bounding box
        result = {'keypoints': {}, 'bbox': None}

        if not self.use_mediapipe:
            return result
        
        # convert to RGB for mediapipe(OpenCV uses BGR)
        img_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)

        # Run the pose detection model on the image. res containns the landmarks/keypoints
        res = self.pose.process(img_rgb)

        if not res.pose_landmarks:
            return result
        
        # lms is a list of landmarks (keypoints) detected. Each landmark has:
        # x, y, z: normalized coordinates (0–1)
        # visibility: likelihood the landmark is visible
        lms = res.pose_landmarks.landmark

        kp = {}     # Dict to store keypoints
        xs = []     # x coordinates for bounding box
        ys = []     # y coordinates for bounding box

        # Loop through each landmark
        for i, lm in enumerate(lms):
            # Multiply with image width to convert normalized to pixels coordinates
            x = lm.x * w
            y = lm.y * h
            v = lm.visibility
            xs.append(x)
            ys.append(y)
            kp[i] = (x, y, v)

        # Minimal set of keypoints of interest — map to indices if needed
        result['keypoints'] = kp

        # Compute bounding box
        x_min = max(0, int(min(xs)))    # leftmost x-coordinate of any keypoint
        x_max = min(w, int(max(xs)))
        y_min = max(0, int(min(ys)))    # topmost y-coordinate
        y_max = min(h, int(max(ys)))

        # expand bbox by 10% padding
        pad_x = int((x_max - x_min) * 0.1)
        pad_y = int((y_max - y_min) * 0.1)
        result['bbox'] = [max(0, x_min - pad_x), max(0, y_min - pad_y), min(w, x_max + pad_x), min(h, y_max + pad_y)]
        return result

# Data Augmentation

In [4]:
# Return an albumentations augmentation pipeline. Will be applied consistently per-child when needed.

def make_augmentations(output_size=(224, 224), training=True):

    # A is the Albumentations library imported earlier
    if A is None:
        raise RuntimeError('Albumentations is required for augmentations. pip install albumentations')

    # Normalization adjusts pixel values to a standard range, which helps neural networks train faster and more stably
    norm = A.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD)

    if training:
        aug = A.Compose([

            # Randomly crops a part of the image and resizes it to output_size
            A.RandomResizedCrop(
                size=output_size, 
                scale=(0.8, 1.0), 
                ratio=(0.9, 1.1), 
                p=1.0
            ),
            
            # Randomly flips the image left-right 25% of the time
            A.HorizontalFlip(p=0.25),

            # choose one of the listed augmentations 60% of the time
            A.OneOf([
                # Shift + zoom + rotate
                A.ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.1, rotate_limit=20, p=0.6),
                # Skews the image along x or y axis by shear percent
                A.Affine(shear=10, p=0.4),
            ], p=0.6),

            # Randomly adjusts brightness, contrast, hue, saturation.
            A.OneOf([
                A.RandomBrightnessContrast(p=0.6),
                A.HueSaturationValue(p=0.6),
            ], p=0.5),

            # occlusion / coarse dropout (Randomly blacks out a small rectangle in the image)
            A.CoarseDropout(max_holes=1, max_height=int(0.15*output_size[0]), max_width=int(0.15*output_size[1]), min_holes=1, p=0.3),

            norm,

        ], keypoint_params=A.KeypointParams(format='xy', remove_invisible=False)
        # Last line tells that we may have landmarks/keypoints and keep then even if it goes outside image after transformation
    )

    else:
        # For validation/test images, we do not apply random augmentations, we only resize and normalize
        aug = A.Compose(
            [
                A.Resize(*output_size), 
                norm
            ], 
            keypoint_params=A.KeypointParams(format='xy', remove_invisible=False)
        )
    
    return aug

# Processing pipeline

In [5]:
# Load image, optionally crop to bbox, apply augmentation pipeline and save output image and keypoints if provided by aug

def process_and_save_image(img_path, out_path, bbox=None, aug=None):

    # loads an image as a NumPy array in BGR format
    img = cv2.imread(img_path)
    if img is None:
        raise RuntimeError(f'Could not read image: {img_path}')
    
    # Original dimensions
    orig_h, orig_w = img.shape[:2]
    
    # Crop according to bounding box
    if bbox is not None:
        x1, y1, x2, y2 = bbox
        # ensure ints
        x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
        # clip - Ensures the bounding box doesn’t go outside the image
        x1 = max(0, min(x1, orig_w-1))
        x2 = max(0, min(x2, orig_w))
        y1 = max(0, min(y1, orig_h-1))
        y2 = max(0, min(y2, orig_h))
        # Check for invalid boxes
        if x2 <= x1 or y2 <= y1:
            cropped = img
        else:
            cropped = img[y1:y2, x1:x2]
    else:
        cropped = img

    # convert BGR->RGB for albumentations
    rgb = cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB)

    # Apply data augmenta
    if aug is not None:
        res = aug(image=rgb)
        rgb = res['image']

    # convert back to BGR for saving via cv2
    if rgb.dtype == np.float32 or rgb.dtype == np.float64:
        rgb_uint8 = np.clip(rgb * 255.0, 0, 255).astype(np.uint8)   # ensures pixel values are in the valid range and integers
    else:
        rgb_uint8 = rgb
    final_bgr = cv2.cvtColor(rgb_uint8, cv2.COLOR_RGB2BGR)

    # Creates the folder where the output image will be saved
    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    cv2.imwrite(out_path, final_bgr)

# MAIN DRIVER

Define path and parameters

In [6]:
root = Path('.')  # Project root
images_dir = root / 'Anthrovision' / 'merged_dataset'
labels_csv = root / 'Anthrovision' / 'anthrovision_labels.csv'
out_dir = root / 'Anthrovision' 

# A manifest CSV will track all processed images, their labels, child IDs, split (train/val/test), and keypoints
manifest_path = out_dir / 'manifest.csv'

size = 224
train_frac = 0.7
val_frac = 0.15
test_frac = 0.15
seed = 42
use_mediapipe = True

Scan Images

In [7]:
# Get a structured table of all images and their metadata
images_df = find_images(images_dir)

# Reports total images and unique children
print(f"Found {len(images_df)} images belonging to {images_df.child_id.nunique()} unique children")

Found 16935 images belonging to 2139 unique children


Read Labels and Merge

In [8]:
# This function returns a clean DataFrame mapping each child ID to its label
labels_df = read_labels_csv(labels_csv)

# Merges image metadata with label data by child_id
merged = images_df.merge(labels_df, on='child_id', how='left')

# Drop images without labels
merged = merged[merged['label'].notnull()]

# Ensures labels are integers, which is required for classification
label_mapping = {
    'healthy': 0,
    'malnourished': 1
}

# Apply mapping
merged['label'] = merged['label'].map(label_mapping)

Compute Per Child Label

In [9]:
# Drop images with NaN labels first
merged = merged[merged['label'].notnull()]

# Ensure label is numeric
merged['label'] = merged['label'].astype(int)

# Some children have multiple images; here we compute one label per child:
# Use mode (most frequent label) across all images. If no mode, use the first label.
def child_label_func(x):
    x = x.dropna()  # remove NaNs
    if len(x) == 0:
        return None   # if no valid labels
    mode_vals = x.mode()
    if len(mode_vals) > 0:
        return int(mode_vals.iloc[0])
    else:
        return int(x.iloc[0])

child_label = merged.groupby('child_id')['label'].agg(child_label_func).reset_index()

# Remove children with no label at all (if any)
child_label = child_label[child_label['label'].notnull()]

children = child_label['child_id'].tolist()
labels = child_label['label'].tolist()

# Split
train_ids, temp_ids, train_lbls, temp_lbls = train_test_split(
    children, labels, test_size=(1-train_frac), random_state=seed, stratify=labels
)

rel = val_frac / (val_frac + test_frac)
val_ids, test_ids, _, _ = train_test_split(
    temp_ids, temp_lbls, test_size=(1-rel), random_state=seed, stratify=temp_lbls
)

# Assign split per image
def assign_split(row):
    cid = row['child_id']
    if cid in train_ids: return 'train'
    if cid in val_ids: return 'val'
    if cid in test_ids: return 'test'
    return 'none'

merged['split'] = merged.apply(assign_split, axis=1)
merged = merged[merged['split'] != 'none']

Initialize extractor and augmentations

In [10]:
# PoseExtractor will detect keypoints/landmarks and bounding boxes for each child image
extractor = PoseExtractor(use_mediapipe=use_mediapipe)

# Sets up image augmentations
train_aug = make_augmentations(output_size=(size, size), training=True)
val_aug = make_augmentations(output_size=(size, size), training=False)

  original_init(self, **validated_kwargs)
  A.CoarseDropout(max_holes=1, max_height=int(0.15*output_size[0]), max_width=int(0.15*output_size[1]), min_holes=1, p=0.3),


Process images

In [11]:
# A dictionary for info about processed images
records = []

for split in ['train', 'val', 'test']:
    
    # sub is the subset of images that belong to the current split.
    sub = merged[merged['split'] == split]

    # Loop over children within the split. tqdm is a progress bar for processing
    for child_id, group in tqdm(sub.groupby('child_id'), desc=f'Processing {split}'):

        # Ensures deterministic augmentations. Same child → same random transforms. Different children → different transforms.
        seed_child = int(child_id) if child_id.isdigit() else abs(hash(child_id)) % (2**31)
        random.seed(seed_child)
        np.random.seed(seed_child)
        
        # Choose the augmentation pipeline
        aug = train_aug if split=='train' else val_aug

        # Loop over each image of the child
        for _, row in group.iterrows():
            # loads the image in BGR format
            img_path = row['path']
            img_bgr = cv2.imread(img_path)
            
            # Keypoints and bounding box
            kp_data = None
            bbox = None
            
            # Extract keypoints and bounding box
            if img_bgr is not None and extractor.use_mediapipe:
                res = extractor.extract(img_bgr)
                kp_data = res.get('keypoints', {})
                bbox = res.get('bbox', None)

            # Build output path
            rel_out = Path(row['split']) / f"child_{row['child_id']}" / Path(img_path).name
            out_path = out_dir / rel_out

            process_and_save_image(img_path, str(out_path), bbox=bbox, aug=aug)

            # Create record for manifest
            record = {
                'orig_path': img_path,
                'proc_path': str(out_path),
                'child_id': row['child_id'],
                'pose': row['pose'],
                'label': int(row['label']),
                'split': row['split']
            }

            # Save the keypoints data
            if kp_data:
                kp_file = str(out_path) + '.kps.json'
                with open(kp_file, 'w') as fh:
                    json.dump(kp_data, fh)
                record['keypoints_file'] = kp_file

            records.append(record)

Processing train:   0%|          | 0/1497 [00:00<?, ?it/s]

Processing train: 100%|██████████| 1497/1497 [24:42<00:00,  1.01it/s]
Processing val: 100%|██████████| 321/321 [05:10<00:00,  1.03it/s]
Processing test: 100%|██████████| 321/321 [05:19<00:00,  1.01it/s]


Save

In [14]:
manifest = pd.DataFrame(records)
manifest.to_csv(manifest_path, index=False)
print('Processed dataset saved to', out_dir)
print('Manifest saved to', manifest_path)

Processed dataset saved to Anthrovision
Manifest saved to Anthrovision\manifest.csv
