# Offspring Generation with ComfyUI - Version 6 (hopefully the last!)

This time, adding ImageBlend node in ComfyUI to blend the two input photos.


The scenarios include:
1. `rand_blend{ratio}`: blend_factor randomly between 0.2-0.8
2. `donor_blend80`: blend_factor = 0.8, 80% donor image
3. `image_blend50`: blend_factor = 0.5, 50% donor + 50% my image
4. `me_blend80`: blend_factor = 0.2, 20% donor image

---

## 1. Setup and Imports

In [1]:
# Basic imports
import sys
import os
from pathlib import Path
import json
import csv
from datetime import datetime
import random
import logging
import shutil
import time
from typing import Dict, List, Tuple, Optional

In [2]:
# Server-related imports
import subprocess
import webbrowser
from time import sleep

In [3]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

In [4]:
# Path configurations
COMFY_BASE = Path("/Users/cindylinsf/Documents/CCI/THESIS/Msc_Thesis_Project_Files/models/ComfyUI")

PATHS = {
    'output': COMFY_BASE / "output",
    'workflows': COMFY_BASE / "output/workflows",
    'logs': COMFY_BASE / "output/logs",
    'donors': Path("/Users/cindylinsf/Documents/CCI/THESIS/Msc_Thesis_Project_Files/data/input/comfyui_source_files"),
    'my_photo': Path("/Users/cindylinsf/Documents/CCI/THESIS/Msc_Thesis_Project_Files/data/input/cindy.jpg")
}

BASE_WORKFLOW_PATH = PATHS['workflows'] / "base_workflow_comfyui5.json"

## 2. Configuration and Constants

In [5]:
# Blend scenarios and naming
BLEND_SCENARIOS = {
    'random': (0.2, 0.8),  # Random blend ratio between 20-80% donor
    'donor_dominant': 0.8,  # 80% donor, 20% me
    'balanced': 0.5,       # 50% donor, 50% me
    'me_dominant': 0.2     # 20% donor, 80% me
}

OUTPUT_NAMING = {
    'random': 'rand_blend{ratio}',  # Will show actual blend ratio e.g., rand_blend65 means 65% donor
    'donor_dominant': 'donor_blend80',
    'balanced': 'image_blend50',
    'me_dominant': 'me_blend80'
}

# Model parameters (keeping some consistent settings)
MODEL_PARAMS = {
    'denoise': 0.7,       # Keeping this constant since we're using image blending
    'cfg_scale': 7.5,
    'steps': 30
}

In [6]:
# Prompt variations and model parameters
PROMPT_VARIATIONS = {
    'age_range': (10, 14),  # months
    'expressions': [
        'candid', 'curious', 'happy', 'neutral', 
        'gentle smile', 'attentive', 'peaceful'
    ],
    'lighting_conditions': [
        'natural daylight', 'soft studio lighting', 
        'gentle window light', 'professional portrait lighting'
    ]
}



In [7]:
# Model parameters
MODEL_PARAMS = {
    'denoise_strength': (0.4, 0.7),
    'cfg_scale': (4, 7),
    'controlnet_strength': (0.6, 0.8),
    'steps': (25, 40)  # Keep steps in reasonable range for SDXL Turbo
}

## 3. Utility Functions

In [8]:
# Utility Functions
def setup_directories() -> None:
    """Create necessary directories and verify paths."""
    try:
        for name, path in PATHS.items():
            if name != 'my_photo':
                path.mkdir(parents=True, exist_ok=True)
                logging.info(f"Created/verified directory: {path}")
        
        # Verify my_photo exists
        if not PATHS['my_photo'].exists():
            raise FileNotFoundError(f"My photo not found at: {PATHS['my_photo']}")
        
        # Load and verify base workflow exists
        if not BASE_WORKFLOW_PATH.exists():
            raise FileNotFoundError(f"Base workflow not found at: {BASE_WORKFLOW_PATH}")
            
    except Exception as e:
        logging.error(f"Error in directory setup: {str(e)}")
        raise

def generate_random_prompt() -> str:
    """Generate a randomized prompt with variations."""
    age = random.randint(*PROMPT_VARIATIONS['age_range'])
    expression = random.choice(PROMPT_VARIATIONS['expressions'])
    lighting = random.choice(PROMPT_VARIATIONS['lighting_conditions'])
    
    return (
        f"portrait of a {age} month old baby with distinctive facial features, "
        f"{expression} expression, detailed facial features, {lighting}, "
        f"high quality photograph, photorealistic"
    )

def get_output_filename(donor_id: str, scenario: str, params: Dict) -> str:
    """Generate output filename based on scenario."""
    if scenario == 'random':
        strength_int = int(params['controlnet_strength'] * 100)
        suffix = f"rand{strength_int}"
    else:
        suffix_map = {
            'donor_dominant': 'donor80',
            'balanced': 'blend50',
            'me_dominant': 'me80'
        }
        suffix = suffix_map[scenario]
    
    return f"{donor_id}_{suffix}.png"

def get_random_parameters(scenario: str) -> Dict:
    """Generate random parameters for a specific scenario."""
    # Get blend factor based on scenario
    if scenario == 'random':
        blend_factor = round(random.uniform(*BLEND_SCENARIOS['random']), 2)
    else:
        blend_factor = BLEND_SCENARIOS[scenario]
    
    return {
        'blend_factor': blend_factor,  # This controls donor percentage
        'denoise': 0.7,  # Keep this constant now since we're focusing on blending
        'cfg': 7.5,
        'steps': 30,
        'seed': random.randint(1, 2**32 - 1),
        'prompt': generate_random_prompt()
    }
    
    prompt = f"portrait of a {age} month old baby with distinctive facial features, {expression} expression, detailed facial features, {lighting}, high quality photograph, photorealistic"
    
    return {
        'controlnet_strength': controlnet_strength,
        'denoise': round(random.uniform(0.4, 0.7), 2),
        'cfg': round(random.uniform(4, 7), 1),
        'steps': random.randint(25, 40),
        'seed': random.randint(1, 2**32 - 1),
        'prompt': prompt
    }

## 4. Logging Functions

In [9]:
# Logging Classes
class GenerationLogger:
    """Handles logging of generation parameters and results.
    Creates two types of logs:
    1. CSV log for quick overview
    2. Detailed JSON logs for each generation"""
    
    def __init__(self, log_dir: Path):
        self.log_dir = log_dir
        self.csv_path = log_dir / 'generation_log.csv'
        self.json_dir = log_dir / 'detailed_logs'
        self.json_dir.mkdir(parents=True, exist_ok=True)
        
        # Initialize CSV if it doesn't exist
        if not self.csv_path.exists():
            self._initialize_csv()
    
    def _initialize_csv(self):
        headers = [
            'timestamp', 'donor_id', 'scenario',
            'donor_strength', 'me_strength',
            'seed', 'denoise', 'cfg',
            'steps', 'controlnet_strength',
            'output_filename', 'prompt'
        ]
        with open(self.csv_path, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(headers)
    
    def log_generation(self, 
                      donor_id: str,
                      scenario: str,
                      parameters: Dict,
                      output_filename: str,
                      prompt: str):
        """Log a single generation to both CSV and JSON."""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        # CSV logging
        csv_row = [
            timestamp, donor_id, scenario,
            parameters['donor_strength'], parameters['me_strength'],
            parameters['seed'], parameters['denoise'],
            parameters['cfg'], parameters['steps'],
            parameters['controlnet_strength'],
            output_filename, prompt
        ]
        
        with open(self.csv_path, 'a', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(csv_row)
        
        # Detailed JSON logging
        json_log = {
            'timestamp': timestamp,
            'donor_id': donor_id,
            'scenario': scenario,
            'parameters': parameters,
            'output_filename': output_filename,
            'prompt': prompt
        }
        
        json_path = self.json_dir / f"{donor_id}_{timestamp}.json"
        with open(json_path, 'w') as f:
            json.dump(json_log, f, indent=2)

## 5. Workflow Management

In [10]:
# Workflow Tracking
class WorkflowTracker:
    """Tracks the progress of workflow processing."""
    
    def __init__(self, tracker_dir: Path):
        self.tracker_dir = tracker_dir
        self.tracker_file = tracker_dir / 'processed_workflows.csv'
        self._initialize_tracker()
        
    def _initialize_tracker(self):
        """Initialize tracker file if it doesn't exist."""
        if not self.tracker_file.exists():
            with open(self.tracker_file, 'w', newline='') as f:
                writer = csv.writer(f)
                writer.writerow([
                    'workflow_file', 'donor_id', 'scenario', 
                    'timestamp', 'processed', 'notes'
                ])
    
    def add_workflow(self, workflow_file: str, donor_id: str, scenario: str):
        """Add new workflow to tracking."""
        with open(self.tracker_file, 'a', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([
                workflow_file,
                donor_id,
                scenario,
                datetime.now().strftime('%Y%m%d_%H%M%S'),
                'No',
                ''
            ])
    
    def mark_processed(self, workflow_file: str, notes: str = ''):
        """Mark workflow as processed and add any notes about the result."""
        # Read existing data
        with open(self.tracker_file, 'r', newline='') as f:
            reader = csv.reader(f)
            data = list(reader)
            header = data[0]
            rows = data[1:]
        
        # Update processed status
        for row in rows:
            if row[0] == workflow_file:
                row[4] = 'Yes'
                row[5] = notes
        
        # Write back
        with open(self.tracker_file, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(header)
            writer.writerows(rows)
    
    def get_unprocessed(self) -> List[str]:
        """Get list of unprocessed workflows."""
        unprocessed = []
        with open(self.tracker_file, 'r', newline='') as f:
            reader = csv.reader(f)
            next(reader)  # Skip header
            for row in reader:
                if row[4] == 'No':
                    unprocessed.append(row[0])
        return unprocessed

## 6. Main Processing

In [11]:
class WorkflowManager:
    """Handles workflow creation and modification."""
    def __init__(self, base_workflow_path: Path):
        self.base_workflow_path = base_workflow_path
        self.load_base_workflow()
    
    def load_base_workflow(self):
        """Load the base workflow JSON."""
        try:
            with open(self.base_workflow_path, 'r') as f:
                self.base_workflow = json.load(f)
            logging.info("Base workflow loaded successfully")
        except Exception as e:
            logging.error(f"Error loading base workflow: {str(e)}")
            raise
    
    def create_variation_workflow(self, donor_photo_path: Path, parameters: Dict, output_filename: str) -> Dict:
        """Create a workflow variation with specified parameters."""
        workflow = self.base_workflow.copy()
        
        # Strip the .png extension for the SaveImage node
        output_name = output_filename.replace('.png', '')
        
        # Update nodes
        for node in workflow['nodes']:
            if node['type'] == 'LoadImage':
                if node['id'] == 1:  # My photo
                    node['widgets_values'] = [str(PATHS['my_photo']), "image"]
                elif node['id'] == 2:  # Donor photo
                    node['widgets_values'] = [str(donor_photo_path), "image"]
            
            # Update ImageBlend blend_factor
            elif node['type'] == 'ImageBlend':
                node['widgets_values'] = [
                    parameters['blend_factor'],  # This controls donor percentage
                    "normal"
                ]
            
            # Update KSampler parameters
            elif node['type'] == 'KSampler':
                node['widgets_values'] = [
                    parameters['seed'],
                    "randomize",
                    parameters['steps'],
                    parameters['cfg'],
                    "euler",
                    "normal",
                    parameters['denoise']
                ]
            
            # Update SaveImage filename
            elif node['type'] == 'SaveImage':
                node['widgets_values'] = [output_name]
        
        return workflow



## 7. Server Management and Execution

In [12]:
# Server Management
def start_comfyui_server():
    """Start ComfyUI server and open browser interface.
    IMPORTANT: Keep this notebook running while using ComfyUI"""
    try:
        server_process = subprocess.Popen(
            ['python', str(COMFY_BASE / 'main.py'), '--listen', '0.0.0.0', '--port', '8188'],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )

        # Wait for server to start
        sleep(5)

        # Open the UI in default browser
        webbrowser.open('http://localhost:8188')
        
        print("ComfyUI server started! The interface should open in your browser.")
        print("Keep this notebook running while running ComfyUI.")
        
        return server_process
    except Exception as e:
        logging.error(f"Failed to start ComfyUI server: {str(e)}")
        raise

In [13]:
def process_donor_batch():
    """Process a batch of donors with different scenarios."""
    try:
        # Initialize components
        setup_directories()
        workflow_manager = WorkflowManager(BASE_WORKFLOW_PATH)
        
        # Get donor files
        donor_files = sorted(list(PATHS['donors'].glob("*.jpeg")))
        logging.info(f"Found {len(donor_files)} donor files to process")
        
        # Process each donor
        for donor_idx, donor_path in enumerate(donor_files, 1):
            donor_id = donor_path.stem
            logging.info(f"\nProcessing donor {donor_idx}/{len(donor_files)}: {donor_id}")
            
            # Process each scenario
            for scenario in CONTROLNET_SCENARIOS.keys():
                try:
                    # Get parameters
                    parameters = get_random_parameters(scenario)
                    
                    # Generate output filename
                    if scenario == 'random':
                        strength_int = int(parameters['denoise'] * 100)
                        output_filename = f"{donor_id}_rand{strength_int}"
                    else:
                        scenario_suffix = {
                            'donor_dominant': 'donor80',
                            'balanced': 'blend50',
                            'me_dominant': 'me80'
                        }[scenario]
                        output_filename = f"{donor_id}_{scenario_suffix}"
                    
                    # Create and save workflow
                    workflow = workflow_manager.create_variation_workflow(
                        donor_path,
                        parameters,
                        output_filename
                    )
                    
                    # Save workflow
                    workflow_file = f"{output_filename}.json"
                    workflow_path = PATHS['workflows'] / workflow_file
                    
                    with open(workflow_path, 'w') as f:
                        json.dump(workflow, f, indent=2)
                    
                    logging.info(f"✓ Created workflow for {scenario} scenario: {output_filename}")
                    
                except Exception as e:
                    logging.error(f"Error processing {donor_id} - {scenario}: {str(e)}")
                    continue
            
            if donor_idx % 10 == 0:
                logging.info(f"Completed {donor_idx}/{len(donor_files)} donors")
            
            time.sleep(1)  # Brief pause between donors
    
    except Exception as e:
        logging.error(f"Batch processing error: {str(e)}")
        raise
    
    logging.info("Batch processing completed!")

### Execution Steps

In [14]:
# Start ComfyUI server
server = start_comfyui_server()


ComfyUI server started! The interface should open in your browser.
Keep this notebook running while running ComfyUI.


In [15]:
# # Process All Donors
# If this is commented out it's because sometimes ComfyUI gets stuck 
# and I had to restart the notebook to restart the server

# print("Starting batch processing...")
# proceed = input("Press Enter to continue or Ctrl+C to cancel...")
# process_donor_batch()

In [16]:
# # Check Progress
# # Run this cell to check remaining workflows
# unprocessed = check_progress()

In [17]:
# # Helper function to mark workflows as processed
# def mark_workflow_complete(workflow_file: str, notes: str = "Generated successfully"):
#     """Mark a workflow as completed after running it in ComfyUI.
#     Usage: mark_workflow_complete("donor_id_scenario.json")"""
#     tracker = WorkflowTracker(PATHS['logs'])
#     tracker.mark_processed(workflow_file, notes)
#     print(f"Marked {workflow_file} as processed")