In [31]:
from dotenv import load_dotenv, find_dotenv
import os
import shutil
import requests
import json
from pathlib import Path
from uuid import uuid4
from datetime import datetime
from PIL import Image

# Load .env and override the existing vars
load_dotenv(find_dotenv(), override=True)

source_folder = 'Images_Input'
destination_folder = 'Images_Output'

azure_key = os.getenv('API_KEY')
azure_region = os.getenv('REGION')
azure_endpoint = os.getenv('URL')

#Debug to see if keys are ok
print('Using face endpoint:', azure_endpoint)
print('Face API key present:', bool(azure_key))

Using face endpoint: https://foundrygoncalo.cognitiveservices.azure.com/
Face API key present: True


In [32]:
def analyze_image(image_path, endpoint, key):
    """
    Sends an image to Azure Vision's analyze API and returns the JSON response.
    """
    # Remove trailing slash if present
    endpoint = endpoint.rstrip('/')
    
    # Construct the full URL
    analyze_url = f"{endpoint}/vision/v3.2/analyze"
    
    params = {
        'visualFeatures': 'Description,Objects,Tags,Categories,Faces,Adult,Brands,Color',
        'details': '',
        'language': 'en'
    }
    
    headers = {
        'Ocp-Apim-Subscription-Key': key,
        'Content-Type': 'application/octet-stream'
    }
    
    with open(image_path, 'rb') as image_data:
        response = requests.post(analyze_url, headers=headers, params=params, data=image_data)
    
    # Print response for debugging
    print(f"Status Code: {response.status_code}")
    if response.status_code != 200:
        print(f"Error Response: {response.text}")
    
    response.raise_for_status()
    return response.json()

In [33]:
def process_images(source_folder, destination_folder, endpoint, key, confidence_threshold=0.7):
    """
    Processes images with Azure AI Vision and creates clean, readable metadata.
    
    Args:
        source_folder (str): Path to folder with input images
        destination_folder (str): Path to output folder
        endpoint (str): Azure Vision endpoint URL
        key (str): Azure Vision API subscription key
        confidence_threshold (float): Minimum confidence (default: 0.7)
    """
    all_photos = []
    summary_stats = {
        'total_images': 0,
        'processed_successfully': 0,
        'failed': 0,
        'processing_date': datetime.now().isoformat(),
        'confidence_threshold': confidence_threshold
    }
    
    dest = Path(destination_folder)
    dest.mkdir(exist_ok=True)

    image_files = [f for f in os.listdir(source_folder) 
                   if f.lower().endswith(('jpg', 'jpeg', 'png', 'bmp'))]
    
    summary_stats['total_images'] = len(image_files)

    for img_file in image_files:
        img_path = os.path.join(source_folder, img_file)
        
        try:
            # Get Azure Vision analysis
            analysis = analyze_image(img_path, endpoint, key)
            
            # Extract caption/description
            captions = analysis.get('description', {}).get('captions', [])
            main_caption = captions[0]['text'] if captions else 'No description available'
            caption_confidence = captions[0].get('confidence', 0) if captions else 0

            # Get image dimensions
            try:
                with Image.open(img_path) as img:
                    width, height = img.size
            except Exception:
                width, height = None, None

            # High-confidence objects only
            objects = []
            for obj in analysis.get('objects', []):
                if obj.get('confidence', 0) >= confidence_threshold:
                    objects.append({
                        'name': obj['object'],
                        'confidence': round(obj['confidence'], 3),
                        'location': {
                            'x': obj['rectangle']['x'],
                            'y': obj['rectangle']['y'],
                            'width': obj['rectangle']['w'],
                            'height': obj['rectangle']['h']
                        }
                    })

            # High-confidence tags only
            tags = []
            for tag in analysis.get('tags', []):
                if tag.get('confidence', 0) >= confidence_threshold:
                    tags.append({
                        'name': tag['name'],
                        'confidence': round(tag['confidence'], 3)
                    })

            # Categories above threshold
            categories = []
            for cat in analysis.get('categories', []):
                if cat.get('score', 0) >= confidence_threshold:
                    categories.append({
                        'name': cat['name'],
                        'confidence': round(cat['score'], 3)
                    })

            # Face detection info
            faces = []
            for face in analysis.get('faces', []):
                face_info = {}
                if 'gender' in face:
                    face_info['gender'] = face['gender']
                if 'age' in face:
                    face_info['age'] = face['age']
                if 'faceRectangle' in face:
                    face_info['location'] = face['faceRectangle']
                if face_info:
                    faces.append(face_info)

            # Color analysis
            color_info = analysis.get('color', {})
            colors = {
                'dominant': color_info.get('dominantColors', []),
                'accent': color_info.get('accentColor', ''),
                'is_black_white': color_info.get('isBWImg', False)
            }
            # Remove empty values
            colors = {k: v for k, v in colors.items() if v}

            # Brand detection
            brands = []
            for brand in analysis.get('brands', []):
                if brand.get('confidence', 0) >= confidence_threshold:
                    brands.append({
                        'name': brand['name'],
                        'confidence': round(brand['confidence'], 3)
                    })

            # Content safety ratings
            adult_info = analysis.get('adult', {})
            content_flags = {
                'is_adult': adult_info.get('isAdultContent', False),
                'is_racy': adult_info.get('isRacyContent', False),
                'is_gory': adult_info.get('isGoryContent', False)
            }

            # OCR text if available
            ocr_text = analysis.get('text', '')

            # Collect all labels for organizing
            all_labels = set()
            all_labels.update([obj['name'] for obj in objects])
            all_labels.update([tag['name'] for tag in tags])
            all_labels.update([cat['name'].split('_')[-1] for cat in categories])
            
            if not all_labels:
                all_labels = {'Uncategorized'}

            # Copy to labeled folders
            for label in all_labels:
                safe_label = str(label).replace('/', '_').replace(' ', '_')
                label_dir = dest / safe_label
                label_dir.mkdir(exist_ok=True)
                shutil.copy2(img_path, label_dir / img_file)

            # Build clean photo record
            photo_record = {
                'filename': img_file,
                'id': str(uuid4()),
                'description': main_caption,
                'description_confidence': round(caption_confidence, 3),
                'dimensions': {
                    'width': width,
                    'height': height
                } if width and height else None,
                'file_size_kb': round(os.path.getsize(img_path) / 1024, 2),
                'detected_content': {
                    'objects': objects,
                    'tags': tags,
                    'categories': categories
                },
                'people': {
                    'faces_detected': len(faces),
                    'faces': faces
                } if faces else None,
                'colors': colors if colors else None,
                'brands': brands if brands else None,
                'text': ocr_text if ocr_text else None,
                'content_safety': content_flags,
                'organized_into_folders': sorted(list(all_labels))
            }

            # Remove None values for cleaner JSON
            photo_record = {k: v for k, v in photo_record.items() if v is not None}
            
            all_photos.append(photo_record)
            summary_stats['processed_successfully'] += 1
            
            print(f"✓ Processed: {img_file}")

        except Exception as e:
            print(f"✗ Error processing {img_file}: {str(e)}")
            summary_stats['failed'] += 1
            all_photos.append({
                'filename': img_file,
                'id': str(uuid4()),
                'error': str(e),
                'status': 'failed'
            })

    # Create final output with summary at top
    output = {
        'summary': summary_stats,
        'photos': all_photos
    }

    # Write clean, indented JSON
    metadata_path = dest / 'metadata.json'
    with open(metadata_path, 'w', encoding='utf-8') as f:
        json.dump(output, f, ensure_ascii=False, indent=2)
    
    print("-" * 50)
    print(f"Processed {summary_stats['processed_successfully']} of {summary_stats['total_images']} images.")
    if summary_stats['failed']:
        print(f"{summary_stats['failed']} image(s) failed. See metadata for details.")
    print(f"Metadata saved to: {metadata_path}")
    print("-" * 50)

    return output

In [34]:
#Call the function to delete the output folder in case you want to start fresh
def delete_output_folder(destination_folder):
    if os.path.exists(destination_folder):
        shutil.rmtree(destination_folder)

In [35]:
delete_output_folder(destination_folder)
process_images(source_folder, destination_folder, azure_endpoint, azure_key, confidence_threshold=0.9)

Status Code: 200
✓ Processed: Buddy.png
Status Code: 200
✓ Processed: Captura de ecrã 2025-03-27 170649.png
Status Code: 200
✓ Processed: Captura de ecrã 2025-04-24 174656.png
--------------------------------------------------
Processed 3 of 3 images.
Metadata saved to: Images_Output\metadata.json
--------------------------------------------------


{'summary': {'total_images': 3,
  'processed_successfully': 3,
  'failed': 0,
  'processing_date': '2025-11-21T11:06:46.967323',
  'confidence_threshold': 0.9},
 'photos': [{'filename': 'Buddy.png',
   'id': '6f520b62-2a36-42ef-b60b-09a1be1e88b4',
   'description': 'a white and blue toy',
   'description_confidence': 0.314,
   'dimensions': {'width': 1024, 'height': 1024},
   'file_size_kb': 1686.3,
   'detected_content': {'objects': [],
    'tags': [{'name': 'toy', 'confidence': 0.928},
     {'name': 'animal figure', 'confidence': 0.928}],
    'categories': []},
   'colors': {'dominant': ['White', 'Pink'], 'accent': '071F38'},
   'content_safety': {'is_adult': False, 'is_racy': False, 'is_gory': False},
   'organized_into_folders': ['animal figure', 'toy']},
  {'filename': 'Captura de ecrã 2025-03-27 170649.png',
   'id': 'd69ec1c4-fd62-4a54-8d87-1cdc976846b8',
   'description': 'logo',
   'description_confidence': 0.48,
   'dimensions': {'width': 853, 'height': 527},
   'file_size_kb