Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 104 additions & 8 deletions openevolve/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,23 +636,21 @@ def _calculate_feature_coords(self, program: Program) -> List[int]:
if dim == "complexity":
# Use code length as complexity measure
complexity = len(program.code)
bin_idx = min(int(complexity / 1000 * self.feature_bins), self.feature_bins - 1)
bin_idx = self._calculate_complexity_bin(complexity)
coords.append(bin_idx)
elif dim == "diversity":
# Use average edit distance to other programs
if len(self.programs) < 5:
# Use average fast code diversity to other programs
if len(self.programs) < 2:
bin_idx = 0
else:
sample_programs = random.sample(
list(self.programs.values()), min(5, len(self.programs))
)
avg_distance = sum(
calculate_edit_distance(program.code, other.code)
avg_diversity = sum(
self._fast_code_diversity(program.code, other.code)
for other in sample_programs
) / len(sample_programs)
bin_idx = min(
int(avg_distance / 1000 * self.feature_bins), self.feature_bins - 1
)
bin_idx = self._calculate_diversity_bin(avg_diversity)
coords.append(bin_idx)
elif dim == "score":
# Use average of numeric metrics
Expand All @@ -677,6 +675,104 @@ def _calculate_feature_coords(self, program: Program) -> List[int]:
)
return coords

def _calculate_complexity_bin(self, complexity: int) -> int:
"""
Calculate the bin index for a given complexity value using adaptive binning.

Args:
complexity: The complexity value (code length)

Returns:
Bin index in range [0, self.feature_bins - 1]
"""
if len(self.programs) < 2:
# Cold start: use fixed range binning
# Assume reasonable range of 0-10000 characters for code length
max_complexity = 10000
min_complexity = 0
else:
# Adaptive binning: use actual range from existing programs
existing_complexities = [len(p.code) for p in self.programs.values()]
min_complexity = min(existing_complexities)
max_complexity = max(existing_complexities)

# Ensure range is not zero
if max_complexity == min_complexity:
max_complexity = min_complexity + 1

# Normalize complexity to [0, 1] range
if max_complexity > min_complexity:
normalized = (complexity - min_complexity) / (max_complexity - min_complexity)
else:
normalized = 0.0

# Clamp to [0, 1] range
normalized = max(0.0, min(1.0, normalized))

# Convert to bin index
bin_idx = int(normalized * self.feature_bins)

# Ensure bin index is within valid range
bin_idx = max(0, min(self.feature_bins - 1, bin_idx))

return bin_idx

def _calculate_diversity_bin(self, diversity: float) -> int:
"""
Calculate the bin index for a given diversity value using adaptive binning.

Args:
diversity: The average fast code diversity to other programs

Returns:
Bin index in range [0, self.feature_bins - 1]
"""
def _fast_diversity(program, sample_programs):
"""Calculate average fast diversity for a program against sample programs"""
avg_diversity = sum(
self._fast_code_diversity(program.code, other.code)
for other in sample_programs
) / len(sample_programs)
return avg_diversity

if len(self.programs) < 2:
# Cold start: use fixed range binning
# Assume reasonable range of 0-10000 for fast diversity
max_diversity = 10000
min_diversity = 0
else:
# Sample programs for calculating diversity range (limit to 5 for performance)
sample_programs = list(self.programs.values())
if len(sample_programs) > 5:
import random
sample_programs = random.sample(sample_programs, 5)

# Adaptive binning: use actual range from existing programs
existing_diversities = [_fast_diversity(p, sample_programs) for p in self.programs.values()]
min_diversity = min(existing_diversities)
max_diversity = max(existing_diversities)

# Ensure range is not zero
if max_diversity == min_diversity:
max_diversity = min_diversity + 1

# Normalize diversity to [0, 1] range
if max_diversity > min_diversity:
normalized = (diversity - min_diversity) / (max_diversity - min_diversity)
else:
normalized = 0.0

# Clamp to [0, 1] range
normalized = max(0.0, min(1.0, normalized))

# Convert to bin index
bin_idx = int(normalized * self.feature_bins)

# Ensure bin index is within valid range
bin_idx = max(0, min(self.feature_bins - 1, bin_idx))

return bin_idx

def _feature_coords_to_key(self, coords: List[int]) -> str:
"""
Convert feature coordinates to a string key
Expand Down
150 changes: 150 additions & 0 deletions tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,156 @@ def test_population_limit_enforcement(self):
# Restore original limit
self.db.config.population_size = original_limit

def test_calculate_complexity_bin_adaptive(self):
"""Test adaptive complexity binning with multiple programs"""
# Add programs with different complexities
programs = [
Program(id="short", code="x=1", metrics={"score": 0.5}),
Program(id="medium", code="def func():\n return x*2\n pass", metrics={"score": 0.5}),
Program(id="long", code="def complex_function():\n result = []\n for i in range(100):\n result.append(i*2)\n return result", metrics={"score": 0.5}),
]

for program in programs:
self.db.add(program)

# Test binning for different complexity values
short_bin = self.db._calculate_complexity_bin(len("x=1"))
medium_bin = self.db._calculate_complexity_bin(len("def func():\n return x*2\n pass"))
long_bin = self.db._calculate_complexity_bin(len("def complex_function():\n result = []\n for i in range(100):\n result.append(i*2)\n return result"))

# Bins should be different and within valid range
self.assertNotEqual(short_bin, long_bin)
self.assertGreaterEqual(short_bin, 0)
self.assertLess(short_bin, self.db.feature_bins)
self.assertGreaterEqual(long_bin, 0)
self.assertLess(long_bin, self.db.feature_bins)

def test_calculate_complexity_bin_cold_start(self):
"""Test complexity binning during cold start (< 2 programs)"""
# Empty database - should use fixed range
bin_idx = self.db._calculate_complexity_bin(500)

self.assertGreaterEqual(bin_idx, 0)
self.assertLess(bin_idx, self.db.feature_bins)

# Add one program - still cold start
program = Program(id="single", code="x=1", metrics={"score": 0.5})
self.db.add(program)

bin_idx = self.db._calculate_complexity_bin(500)
self.assertGreaterEqual(bin_idx, 0)
self.assertLess(bin_idx, self.db.feature_bins)

def test_calculate_diversity_bin_adaptive(self):
"""Test adaptive diversity binning with multiple programs"""
# Add programs with different code structures for diversity testing
programs = [
Program(id="simple", code="x = 1", metrics={"score": 0.5}),
Program(id="function", code="def add(a, b):\n return a + b", metrics={"score": 0.5}),
Program(id="loop", code="for i in range(10):\n print(i)\n x += i", metrics={"score": 0.5}),
Program(id="complex", code="class MyClass:\n def __init__(self):\n self.data = []\n def process(self, items):\n return [x*2 for x in items]", metrics={"score": 0.5}),
]

for program in programs:
self.db.add(program)

# Test binning for different diversity values
# Use fast diversity to calculate test values
simple_prog = programs[0]
complex_prog = programs[3]

# Calculate diversity for simple vs complex programs
simple_diversity = self.db._fast_code_diversity(simple_prog.code, complex_prog.code)

# Test the binning
bin_idx = self.db._calculate_diversity_bin(simple_diversity)

# Should be within valid range
self.assertGreaterEqual(bin_idx, 0)
self.assertLess(bin_idx, self.db.feature_bins)

def test_calculate_diversity_bin_cold_start(self):
"""Test diversity binning during cold start (< 2 programs)"""
# Empty database - should use fixed range
bin_idx = self.db._calculate_diversity_bin(500.0)

self.assertGreaterEqual(bin_idx, 0)
self.assertLess(bin_idx, self.db.feature_bins)

# Add one program - still cold start
program = Program(id="single", code="x=1", metrics={"score": 0.5})
self.db.add(program)

bin_idx = self.db._calculate_diversity_bin(500.0)
self.assertGreaterEqual(bin_idx, 0)
self.assertLess(bin_idx, self.db.feature_bins)

def test_calculate_diversity_bin_identical_programs(self):
"""Test diversity binning when all programs have identical diversity"""
# Add multiple identical programs
for i in range(3):
program = Program(
id=f"identical_{i}",
code="x = 1", # Same code
metrics={"score": 0.5}
)
self.db.add(program)

# Test binning - should handle zero range gracefully
bin_idx = self.db._calculate_diversity_bin(0.0)

self.assertGreaterEqual(bin_idx, 0)
self.assertLess(bin_idx, self.db.feature_bins)

def test_fast_code_diversity_function(self):
"""Test the _fast_code_diversity function"""
# Test identical code
code1 = "def test(): pass"
code2 = "def test(): pass"
diversity = self.db._fast_code_diversity(code1, code2)
self.assertEqual(diversity, 0.0)

# Test different code
code1 = "x = 1"
code2 = "def complex_function():\n return [i*2 for i in range(100)]"
diversity = self.db._fast_code_diversity(code1, code2)
self.assertGreater(diversity, 0.0)

# Test length difference
short_code = "x = 1"
long_code = "x = 1" + "a" * 100
diversity = self.db._fast_code_diversity(short_code, long_code)
self.assertGreater(diversity, 0.0)

def test_diversity_feature_integration(self):
"""Test diversity feature calculation in feature coordinates"""
# Add programs with different structures
programs = [
Program(id="prog1", code="x = 1", metrics={"score": 0.5}),
Program(id="prog2", code="def func():\n return 2", metrics={"score": 0.5}),
Program(id="prog3", code="for i in range(5):\n print(i)", metrics={"score": 0.5}),
]

for program in programs:
self.db.add(program)

# Create a test program with diversity feature enabled
test_config = self.db.config
test_config.feature_dimensions = ["score", "complexity", "diversity"]

test_program = Program(id="test", code="def test(): return 42", metrics={"score": 0.7})

# Calculate feature coordinates - should include diversity dimension
coords = self.db._calculate_feature_coords(test_program)

# Should have 3 coordinates for score, complexity, and diversity
self.assertEqual(len(coords), 3)

# All coordinates should be within valid range
for coord in coords:
self.assertGreaterEqual(coord, 0)
self.assertLess(coord, self.db.feature_bins)


if __name__ == "__main__":
unittest.main()
Loading