In [42]:
from scqbf.scqbf_instance import *
from scqbf.scqbf_solution import *
from scqbf.scqbf_evaluator import *
from scqbf.scqbf_ga import *
import os
import glob
import time
from concurrent.futures import ProcessPoolExecutor, as_completed


In [43]:
def run_instance(file, log_dir, pop_size, mutation_rate_mult, ga_strategy, termination_options, debug_options):
    print(f"[DEBUG] run_instance called with file: {file}")
    in_file = os.path.basename(file).replace(".txt", "")
    log_file = os.path.join(log_dir, f"{in_file}.log")
    filename = os.path.splitext(os.path.basename(file))[0]

    if not os.path.exists(log_dir):
        raise ValueError(f"Log directory {log_dir} does not exist.")

    print(f"[{time.strftime('%H:%M')}] Running instance {filename}...", flush=True)
    
    start = time.time()
    try:
        with open(file, "r") as fin, open(log_file, "w") as fout:
            instance = read_max_sc_qbf_instance(file)
            ga = ScQbfGA(instance, 
            population_size=pop_size, 
            mutation_rate_multiplier=mutation_rate_mult, 
            ga_strategy=ga_strategy, 
            debug_options=debug_options, 
            termination_options=termination_options)
            
            solution = ga.solve()
            
            fout.write(f"Solution: {solution}\n")

        end = time.time()
        minutes, seconds = divmod(end - start, 60)
        with open(log_file, "a") as fout:
            fout.write(f"\n--- Finished in {end - start:.2f} seconds ---\n")

        print(f"[{time.strftime('%H:%M')}] Finished {filename} in {minutes:.0f}m{seconds:.2f}s\n", flush=True)
        return filename, True

    except Exception as e:
        with open(log_file, "a") as fout:
            fout.write(f"\n--- Error: {e} ---\n")
        print(f" Error running {filename}: {e}", flush=True)
        return filename, False

In [44]:
run_instance(file="instances/gen025_01.txt", 
             log_dir="logs/test_logs", 
             pop_size=100, 
             mutation_rate_mult=2, 
             ga_strategy=GAStrategy(population_init="latin_hypercube"),
             termination_options={'max_iter': 2},
             debug_options={'verbose': True, 'patience': 100, 'save_history': True}
             )

[DEBUG] run_instance called with file: instances/gen025_01.txt
[16:13] Running instance gen025_01...
Iteration 1: Best fitness = N/A
[16:13] Finished gen025_01 in 0m0.02s



('gen025_01', True)

In [45]:
def run_experiment(log_dir, pop_size, mutation_rate_mult, GA_strategy=GAStrategy(), debug_options=None, termination_options=None):
    # prepare in directory
    log_dir = "logs/" + log_dir
    os.makedirs(log_dir, exist_ok=True)
    for f in glob.glob(os.path.join(log_dir, "*")):
        try:
            os.remove(f)
        except Exception as e:
            print(f"[WARNING] Could not remove {f}: {e}")

    files = sorted(glob.glob(os.path.join("instances", "*.txt")))

    # number of workers = available threads or number of instances
    max_workers = min(len(files), os.cpu_count() or 4)

    print(f"Running {len(files)} instances in up to {max_workers} processes...\n")

    # runs sequentially #TODO change to parallel
    try:
        futures = []
        for file in files:
            try:
                filename, success = run_instance(file, log_dir, pop_size, mutation_rate_mult, GA_strategy, termination_options, debug_options )
                if not success:
                    print(f"Instance {filename} failed.")
            except Exception as e:
                print(f"Error processing {file}: {e}")
    except Exception as e:
        print(f"Error in ProcessPoolExecutor: {e}")
        return

    print("All files processed. Check the logs directory for outputs.")

In [46]:
run_experiment(log_dir="test_experiment", 
               pop_size=100,
               mutation_rate_mult=2,
               GA_strategy=GAStrategy(population_init="random", mutation_strategy="standard"),
               termination_options={"max_iter": 1},
               debug_options={"verbose": False}
               )

Running 16 instances in up to 12 processes...

[DEBUG] run_instance called with file: instances\gen025_01.txt
[16:13] Running instance gen025_01...
[16:13] Finished gen025_01 in 0m0.00s

[DEBUG] run_instance called with file: instances\gen025_02.txt
[16:13] Running instance gen025_02...
[16:13] Finished gen025_02 in 0m0.02s

[DEBUG] run_instance called with file: instances\gen025_03.txt
[16:13] Running instance gen025_03...
[16:13] Finished gen025_03 in 0m0.01s

[DEBUG] run_instance called with file: instances\gen025_04.txt
[16:13] Running instance gen025_04...
[16:13] Finished gen025_04 in 0m0.00s

[DEBUG] run_instance called with file: instances\gen050_01.txt
[16:13] Running instance gen050_01...
[16:13] Finished gen050_01 in 0m0.02s

[DEBUG] run_instance called with file: instances\gen050_02.txt
[16:13] Running instance gen050_02...
[16:13] Finished gen050_02 in 0m0.03s

[DEBUG] run_instance called with file: instances\gen100_01.txt
[16:13] Running instance gen100_01...
[16:13] Fini

In [None]:
instance = read_max_sc_qbf_instance("instances/gen1/instance2.txt")
ga = ScQbfGA(instance, population_size=100, mutation_rate_multiplier=2, ga_strategy=GAStrategy(mutation_strategy="adaptive"), debug_options={
    'verbose': True,
    'save_history': True,
    'save_mrate_history': True
}, termination_options={
    'max_iter': 1000,
}
)
ga.solve()

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.plot(ga.history[1:])
plt.title('GA Algorithm Progress')
plt.xlabel('Generation')
plt.ylabel('Fitness Value')
plt.grid(True)
plt.show()

In [None]:
import matplotlib.pyplot as plt

fig, ax1 = plt.subplots(figsize=(12, 6))

# Plot mutation rate on the left y-axis
color = 'tab:red'
ax1.set_xlabel('Generation')
ax1.set_ylabel('Mutation Rate', color=color)
ax1.plot(ga.mutation_rate_history, color=color, label='Mutation Rate')
ax1.tick_params(axis='y', labelcolor=color)
ax1.grid(True, alpha=0.3)

# Create a second y-axis for diversity
ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Diversity', color=color)
ax2.plot(ga.diversity_history[1:], color=color, label='Diversity')
ax2.tick_params(axis='y', labelcolor=color)

plt.title('GA Mutation Rate and Diversity Progress')
fig.tight_layout()
plt.show()

In [None]:
import random

def test_latin_hypercube_initialization(population_size, n):
    """
    Standalone function to test Latin Hypercube Sampling initialization.
    
    Args:
        population_size: Size of the population (must be even for binary alleles)
        n: Number of genes (chromosome length)
    
    Returns:
        population: List of chromosomes
    """
    if population_size % 2 != 0:
        raise ValueError("Population size must be a multiple of the allele count (here, 2) for Latin Hypercube Sampling.")
    
    population = []
    
    # Initialize empty chromosomes
    for i in range(population_size):
        chromosome = [0] * n
        population.append(chromosome)
    
    # For each gene position (column), create a random permutation
    for gene_pos in range(n):
        # Create permutation of population indices [0, 1, 2, ..., population_size-1]
        permutation = list(range(population_size))
        random.shuffle(permutation)
        
        # Assign alleles based on permutation index modulo 2
        for pop_idx in range(population_size):
            allele = permutation[pop_idx] % 2
            population[pop_idx][gene_pos] = allele
    
    return population

# Test the function
def analyze_population(population):
    """Analyze the population to verify Latin Hypercube properties."""
    population_size = len(population)
    n = len(population[0]) if population else 0
    
    print(f"Population size: {population_size}")
    print(f"Chromosome length: {n}")
    print("\nFirst 5 chromosomes:")
    for i, chrom in enumerate(population[:5]):
        print(f"Individual {i}: {chrom}")
    
    print(f"\nAllele distribution per gene position:")
    for gene_pos in range(min(10, n)):  # Show first 10 genes
        zeros = sum(1 for chrom in population if chrom[gene_pos] == 0)
        ones = sum(1 for chrom in population if chrom[gene_pos] == 1)
        print(f"Gene {gene_pos}: 0s={zeros}, 1s={ones}")
    
    # Check if all gene positions have exactly population_size/2 of each allele
    balanced = True
    for gene_pos in range(n):
        zeros = sum(1 for chrom in population if chrom[gene_pos] == 0)
        ones = sum(1 for chrom in population if chrom[gene_pos] == 1)
        if zeros != population_size // 2 or ones != population_size // 2:
            balanced = False
            break
    
    print(f"\nPerfectly balanced alleles across all genes: {balanced}")
    
    return population

# Test with different parameters
print("=== Test 1: Small population ===")
pop1 = test_latin_hypercube_initialization(6, 8)
analyze_population(pop1)

print("\n=== Test 2: Larger population ===")
pop2 = test_latin_hypercube_initialization(20, 10)
analyze_population(pop2)

# Compare with random initialization
def random_initialization(population_size, n):
    """Random initialization for comparison."""
    population = []
    for _ in range(population_size):
        chromosome = [random.randint(0, 1) for _ in range(n)]
        population.append(chromosome)
    return population

print("\n=== Comparison: Random vs Latin Hypercube ===")
print("Random initialization:")
random_pop = random_initialization(20, 10)
analyze_population(random_pop)

print("\nLatin Hypercube initialization:")
lh_pop = test_latin_hypercube_initialization(20, 10)
analyze_population(lh_pop)