In [1]:
import pandas as pd
from preProccesPuzzle import PreProcess
from clue_classifier import ClueClassifier
from constraints import Constraint, IdentityConstrain, NextToConstrain, DistanceConstrain,RightConstrain, LeftConstrain, DirectRightConstrain, DirectLeftConstrain, PositionAbsoluteConstrain, PositionAbsoluteNegativeConstrain
from constraint_solver import ConstraintSolver
from concurrent.futures import ProcessPoolExecutor
import time


In [2]:
def constraint_factory(attrs, clues):
    constrains: list[Constraint] = []
    classifier = ClueClassifier()
    for c in clues:
        clue, clue_type = classifier.classify(c)

        if clue_type == "IDENTITY":
            constrains.append(IdentityConstrain(attrs, clue))
        if clue_type == "NEXT_TO":
            constrains.append(NextToConstrain(attrs, clue))
        if clue_type == "LEFT":
            constrains.append(LeftConstrain(attrs, clue))
        if clue_type == "RIGHT":
            constrains.append(RightConstrain(attrs, clue))
        if clue_type == "DISTANCE":
            constrains.append(DistanceConstrain(attrs, clue))
        if clue_type == "DIRECT_LEFT":
            constrains.append(DirectLeftConstrain(attrs, clue))
        if clue_type == "DIRECT_RIGHT":
            constrains.append(DirectRightConstrain(attrs, clue))
        if clue_type == "POSITION_ABSOLUTE":
            constrains.append(PositionAbsoluteConstrain(attrs, clue))
        if clue_type == "POSITION_ABSOLUTE_NEGATIVE":
            constrains.append(PositionAbsoluteNegativeConstrain(attrs, clue))
        if clue_type == "UNKNOWN":
            raise TypeError

    return constrains

In [3]:
def convert_solver_solution_to_gridmode_format(solver_solution, attrs, gridmode_solution):
    header = gridmode_solution.get("header", [])
    rows = gridmode_solution.get("rows", [])

    # Create mapping by matching header (excluding 'House') with attrs keys in order
    attr_keys = list(attrs.keys())
    header_without_house = [h for h in header if h.lower() != 'house']

    attr_mapping = dict(zip(attr_keys, header_without_house))

    # Convert solver solution to gridmode format
    converted_rows = []
    sorted_positions = sorted(solver_solution.keys())

    for pos in sorted_positions:
        row = []
        attrs_for_pos = solver_solution[pos]

        for header_name in header:
            if header_name.lower() == "house":
                row.append(str(pos))
            else:
                # Find the attr_key for this header column
                attr_key = next((k for k, v in attr_mapping.items() if v == header_name), None)
                if attr_key:
                    value = attrs_for_pos.get(attr_key, "")
                    row.append(str(value))
                else:
                    row.append("")

        converted_rows.append(row)

    converted_solution = {
        "header": list(header),
        "rows": converted_rows
    }

    # Compare with ground truth
    is_correct = True
    mismatches = []

    if len(converted_rows) != len(rows):
        is_correct = False
        mismatches.append(f"Row count mismatch: expected {len(rows)}, got {len(converted_rows)}")

    for i, (converted_row, truth_row) in enumerate(zip(converted_rows, rows)):
        for j, (converted_val, truth_val) in enumerate(zip(converted_row, truth_row)):
            if converted_val.lower() != truth_val.lower():
                is_correct = False
                mismatches.append(
                    f"Row {i}, Column {header[j]}: expected '{truth_val}', got '{converted_val}'"
                )

    return converted_solution, is_correct, mismatches


def validate_solution(solver_solution, attrs, gridmode_solution):

    if not solver_solution:
        print("no solution found!")
        return
    converted_solution, is_correct, mismatches = convert_solver_solution_to_gridmode_format(
        solver_solution, attrs, gridmode_solution
    )
    return is_correct


In [4]:
gridmode = pd.read_parquet("Gridmode-00000-of-00001.parquet")
mc = pd.read_parquet("mc-00000-of-00001.parquet")
test = pd.read_parquet("Test_100_Puzzles.parquet")

skip = 0

gridmode = gridmode[skip:]

In [5]:
print(gridmode.puzzle.iloc[99])

There are 6 houses, numbered 1 to 6 from left to right, as seen from across the street. Each house is occupied by a different person. Each house has a unique attribute for each of the following characteristics:
 - Each person has a unique name: `Alice`, `Carol`, `Eric`, `Peter`, `Arnold`, `Bob`
 - Everyone has a unique favorite cigar: `pall mall`, `yellow monster`, `dunhill`, `blends`, `blue master`, `prince`
 - Everyone has a favorite smoothie: `watermelon`, `dragonfruit`, `lime`, `desert`, `blueberry`, `cherry`

## Clues:
1. The person who drinks Lime smoothies is in the third house.
2. The person partial to Pall Mall is the Watermelon smoothie lover.
3. The Desert smoothie lover is Arnold.
4. The Watermelon smoothie lover is somewhere to the left of Eric.
5. There is one house between Peter and Carol.
6. The person who drinks Blueberry smoothies is somewhere to the left of the person who drinks Lime smoothies.
7. The Watermelon smoothie lover is somewhere to the left of the person w

In [6]:
def solve_puzzle(index,df, verbose=False):
    import json
    import time
    ppp = PreProcess()
    id = df.id.iloc[index]
    puzzle = df.puzzle.iloc[index]
    attrs, clues = ppp.proccess(puzzle)

    # Debug: Check if attrs is empty
    if not attrs:
        print(f"Puzzle {index}: attrs is empty")
        print(f"ID: {id}")
        print(f"Puzzle: {puzzle}")
        return None

    # Convert all attribute keys and values to lowercase
    attrs_lower = {
        k.lower(): [val.lower() if isinstance(val, str) else val for val in v] if isinstance(v, list) else v.lower() if isinstance(v, str) else v
        for k, v in attrs.items()
    }

    # Convert all clues to lowercase
    clues_lower = [clue.lower() if isinstance(clue, str) else clue for clue in clues]

    # Track constraint creation time

    constraint_start = time.time()
    constrains: list[Constraint] = constraint_factory(attrs_lower, clues_lower)
    constraint_time = time.time() - constraint_start

    # Track solver time and save logging
    solver_start = time.time()
    Cs = ConstraintSolver(attrs_lower, constrains)

    try:
        # Use the puzzle ID for the filename
        trace_filename = f"/content/trace_{id}.csv"
        # Call the saving method we added to the solver
        Cs.save_trace_to_csv(trace_filename)
        if verbose:
            print(f"Trace saved to: {trace_filename}")
    except Exception as e:
        print(f"Error saving trace for {id}: {e}")

    solution = Cs.solve()
    solver_time = time.time() - solver_start

    if verbose:
        print(puzzle)
        print("================================")
        print("Build constraints")
        for c in constrains:
            print(c.get_info())
        print("================================")
        print("Timing Information")
        print(f"Constraint creation time: {constraint_time:.4f}s")
        print(f"Solver time: {solver_time:.4f}s")
        print(f"Backtrack count: {Cs.backtrack_count}")
        print(f"Propagation calls: {Cs.propagation_calls}")
        print("================================")
        print("Our Solution")
        Cs.print_solution(solution)

    if solution:
        # Create mapping from lowercase to original keys and values
        key_mapping = {k.lower(): k for k in attrs.keys()}
        value_mappings = {}
        for k, v in attrs.items():
            if isinstance(v, list):
                value_mappings[k.lower()] = {val.lower(): val for val in v if isinstance(val, str)}

        # Convert to gridmode format with original attribute casing
        header = ["House"] + [key_mapping.get(k, k) for k in attrs_lower.keys()]
        rows = []
        sorted_positions = sorted(solution.keys())

        for pos in sorted_positions:
            row = [str(pos)]
            for attr_key_lower in attrs_lower.keys():
                attr_key_original = key_mapping.get(attr_key_lower, attr_key_lower)
                value = solution[pos].get(attr_key_lower, "")
                # Map value back to original casing if possible
                if attr_key_lower in value_mappings:
                    value = value_mappings[attr_key_lower].get(value, value)
                row.append(str(value))
            rows.append(row)

        grid_solution = {
            "header": header,
            "rows": rows
        }

        # Return in format: id | grid_solution_json | steps
        result = f"{id} | {json.dumps(grid_solution)} | {Cs.backtrack_count}"
        return result

    return None

def solve_all_puzzles(df):
    total = len(df)
    passed = 0
    failed = []
    times = []
    results = []

    start_total = time.time()

    for i in range(total):
        print(f"Solving {i}/{total}...", end="\r")
        test_start = time.time()
        result = solve_puzzle(i,df)
        if result is None:
            failed.append(i)
            print(f"wrong at index: {i}")
        else:
            results.append(result)
            passed += 1
        test_end = time.time()
        times.append(test_end - test_start)

    total_time = time.time() - start_total
    avg_time = sum(times) / len(times) if times else 0

    print(f"\nResults: {passed}/{total} passed")
    if failed:
        print(f"Failed indices: {failed}")
    print(f"Total time: {total_time:.2f}s | Average time per solution: {avg_time:.4f}s")

    # Print results in specified format
    print("\n" + "="*80)
    print("Results (id | grid_solution | steps):")
    print("="*80)
    for res in results:
        print(res)

In [7]:
# ==========================================
# TEST RUN FOR LOGGING
# ==========================================

# Select the first puzzle to test
test_index = 0

print(f"Starting test run for puzzle at index {test_index}...")

# This calls your UPDATED function
# It will try to solve the puzzle AND create the CSV file
result = solve_puzzle(test_index, gridmode, verbose=True)

print("\n" + "="*30)
if result:
    print("Solver finished.")

    # Get the ID to show you the filename
    puzzle_id = gridmode.id.iloc[test_index]
    expected_filename = f"trace_{puzzle_id}.csv"

    print(f"CHECK YOUR FILES")
    print(f"Look for a file named: '{expected_filename}'")
else:
    print("Solver failed (no solution found).")

Starting test run for puzzle at index 0...
Hinweis: Puzzle wurde ohne Backtracking gel√∂st (Trace ist leer). Erstelle leere CSV in /content/trace_lgp-test-5x6-16.csv.
Trace successfully saved to /content/trace_lgp-test-5x6-16.csv (0 rows)
Trace saved to: /content/trace_lgp-test-5x6-16.csv
There are 5 houses, numbered 1 to 5 from left to right, as seen from across the street. Each house is occupied by a different person. Each house has a unique attribute for each of the following characteristics:
 - Each person has a unique name: `Peter`, `Alice`, `Bob`, `Eric`, `Arnold`
 - The people are of nationalities: `norwegian`, `german`, `dane`, `brit`, `swede`
 - People have unique favorite book genres: `fantasy`, `biography`, `romance`, `mystery`, `science fiction`
 - Everyone has something unique for lunch: `stir fry`, `grilled cheese`, `pizza`, `spaghetti`, `stew`
 - Each person has a favorite color: `red`, `green`, `blue`, `yellow`, `white`
 - The people keep unique animals: `bird`, `dog`, 

In [10]:
### (CELL SHOULD BE DELETED - JUST FOR TESTING THAT LOGGING WORKS FINE) Wir erstellen eine Test-Klasse, die AC3 und Propagation √ºberspringt
class DumbSolver(ConstraintSolver):
    def solve(self):
        # Wir √ºberspringen AC3 und Propagate absichtlich!
        # Wir gehen DIREKT ins Backtracking.
        print("‚ö†Ô∏è TEST-MODUS: AC3 & Propagation deaktiviert. Backtracking erzwungen!")
        return self._backtrack({})

# Jetzt testen wir Puzzle 0 mit diesem dummen Solver
print("Starte 'DumbSolver' Test...")

# Wir nutzen deine Variable 'gridmode'
if 'gridmode' in locals():
    ppp = PreProcess()

    # Zugriff √ºber 'gridmode' statt 'df'
    puzzle_data = gridmode.puzzle.iloc[0] # Nimm Puzzle 0
    p_id = gridmode.id.iloc[0]

    attrs, clues = ppp.proccess(puzzle_data)
    attrs_lower = {k.lower(): v for k, v in attrs.items()}
    clues_lower = [c.lower() for c in clues]

    # Solver initialisieren (aber den Dummen!)
    dumb_solver = DumbSolver(attrs_lower, constraint_factory(attrs_lower, clues_lower))

    # L√∂sen
    dumb_solution = dumb_solver.solve()

    # Checken ob Logs da sind
    log_count = len(dumb_solver.search_trace)
    print(f"\nüìä Anzahl der geloggten Schritte (ohne AC3): {log_count}")

    if log_count > 0:
        print("‚úÖ BEWEIS: Das Logging funktioniert! Der Solver musste raten.")
        # Speichern zum Anschauen
        trace_name = f"trace_FORCE_BACKTRACK_{p_id}.csv"
        dumb_solver.save_trace_to_csv(trace_name)
        print(f"Datei '{trace_name}' gespeichert.")
    else:
        print("‚ùå FEHLER: Immer noch keine Logs. Da stimmt was im _backtrack nicht.")
else:
    print("‚ùå Fehler: Die Variable 'gridmode' wurde nicht gefunden. Bitte lade erst die Daten.")

Starte 'DumbSolver' Test...
‚ö†Ô∏è TEST-MODUS: AC3 & Propagation deaktiviert. Backtracking erzwungen!

üìä Anzahl der geloggten Schritte (ohne AC3): 5
‚úÖ BEWEIS: Das Logging funktioniert! Der Solver musste raten.
Trace successfully saved to trace_FORCE_BACKTRACK_lgp-test-5x6-16.csv (5 rows)
Datei 'trace_FORCE_BACKTRACK_lgp-test-5x6-16.csv' gespeichert.


In [8]:
def solve_test(index, verbose=False):
    ppp = PreProcess()

    provided_solution = gridmode.solution.iloc[index]
    puzzle = gridmode.puzzle.iloc[index].lower()
    attrs, clues = ppp.proccess(puzzle)


    unsolvable = 0
    for key in attrs.keys():
        if key == "pet" or key == "animals":
            unsolvable += 1
            if unsolvable == 2:
                # Check if pet and animals have overlapping values
                pet_values = set(attrs.get("pet", []))
                animals_values = set(attrs.get("animals", []))
                overlap = pet_values & animals_values  # intersection
                if overlap:
                    if verbose:
                        print(puzzle)
                        print("================================")
                    print(f"Puzzle {index} not solvable because pet and animals have overlapping values: {overlap}")
                    print(f"can not deside what value belongs to what attribute")
                    return False

    # Track constraint creation time
    constraint_start = time.time()
    constrains: list[Constraint] = constraint_factory(attrs, clues)
    constraint_time = time.time() - constraint_start

    # Track solver time
    solver_start = time.time()
    Cs = ConstraintSolver(attrs, constrains)
    solution = Cs.solve()
    solver_time = time.time() - solver_start

    if verbose:
        print(puzzle)
        print("================================")
        print("Provided solution")
        print(provided_solution)
        print("================================")
        print("Build constraints")
        for c in constrains:
            print(c.get_info())
        print("================================")
        print("Timing Information")
        print(f"Constraint creation time: {constraint_time:.4f}s")
        print(f"Solver time: {solver_time:.4f}s")
        print(f"Backtrack count: {Cs.backtrack_count}")
        print(f"Propagation calls: {Cs.propagation_calls}")
        print("================================")
        print("Our Solution")
        Cs.print_solution(solution)

    return validate_solution(solution, attrs, provided_solution)

def solve_all_tests():
    total = len(gridmode)
    passed = 0
    failed = []
    times = []

    start_total = time.time()

    for i in range(total):
        print(f"Testing {i}/{total}...", end="\r")
        test_start = time.time()
        if not solve_test(i):
            failed.append(i)
            print(f"wrong at index: {i} true index: {i+skip}")
        test_end = time.time()
        times.append(test_end - test_start)
        passed += 1

    total_time = time.time() - start_total
    avg_time = sum(times) / len(times) if times else 0

    print(f"\nResults: {passed}/{total} passed")
    if failed:
        print(f"Failed indices: {failed}")
    print(f"Total time: {total_time:.2f}s | Average time per solution: {avg_time:.4f}s")

In [9]:
#solve_test(327,verbose=True)

In [10]:
#solve_all_tests()

In [11]:
#print(solve_puzzle(922,gridmode,verbose=True))

solve_all_puzzles(gridmode)



KeyboardInterrupt: 