In [1]:
"""here are many variants of local search (LS) and you are free to select which one you
want to implement. Please implement 2 types/variants of local search. They can be in different families
of LS such as Simulated Annealing vs Hill Climbing, or they can be in the same general family but
should differ by the neighborhood they are using, or by the perturbation strategy, etc.

Hint: While local search does not guarantee the convergence of solutions, most of the time it should
converge relatively quickly given a good search space. How do you formulate the search space such that
the exploration is efficient and doesn’t get stuck? This can have a night-and-day difference in simulated
annealing or hill climbing algorithms."""

'here are many variants of local search (LS) and you are free to select which one you\nwant to implement. Please implement 2 types/variants of local search. They can be in different families\nof LS such as Simulated Annealing vs Hill Climbing, or they can be in the same general family but\nshould differ by the neighborhood they are using, or by the perturbation strategy, etc.\n\nHint: While local search does not guarantee the convergence of solutions, most of the time it should\nconverge relatively quickly given a good search space. How do you formulate the search space such that\nthe exploration is efficient and doesn’t get stuck? This can have a night-and-day difference in simulated\nannealing or hill climbing algorithms.'

In [2]:
# The input is a set of n elements and a collection of subsets of these elements.
# The goal is to find the smallest number of subsets such that their union covers all elements in the set.

In [4]:
# idea: Simulated Annealing
# - start with approx algo solution that covers all elements
# - iteratively explores neighboring solutions by adding, swapping, or removing subsets based on probability -- 33% each
# - The cost function will minimize number of subsets (cost will be length of set of subsets)
# - SA's temperature parameter starts at 200, uses cooling rate of 0.995 per iteration
# - after 100 iterations, if no improvement, algorithm will return the set cover

In [None]:
import argparse
import random
import time
import os
import math
import glob


#parsing the input file
def parse_instance(filepath):
    with open(filepath, 'r') as f:
        lines = f.readlines()
    n, m = map(int, lines[0].split())
    S = []
    raw_indices = []
    for i, line in enumerate(lines[1:], 1):
        parts = list(map(int, line.split()))
        S_i = set(parts[1:])
        S.append(S_i)
        raw_indices.append(i)
    U = set(range(1, n + 1))
    return U, S, raw_indices

#approx algo to initialize guess
def greedy_approx(U, S, raw_indices):
    cover_indices = []
    uncovered = set(U)
    S_copy = list(zip(S, range(len(S)), raw_indices))
    while uncovered:
        best_idx = max(range(len(S_copy)), key=lambda i: len(uncovered & S_copy[i][0]))
        best_set, idx, raw_idx = S_copy[best_idx]
        cover_indices.append(idx)
        uncovered -= best_set
        S_copy.pop(best_idx)
        if not uncovered:
            break
    original_indices = [raw_indices[i] for i in cover_indices]
    return cover_indices, original_indices

#Return the set of elements covered by the given solution indices
def get_coverage(solution_indices, S):
    covered = set()
    for i in solution_indices:
        covered.update(S[i])
    return covered

#checks if the solution currently is valid
def is_valid_solution(solution_indices, S, U):
    return get_coverage(solution_indices, S) == U

#implementing pruning to get rid of redundancies in the set
def prune_solution(solution_indices, S, U):
    res = solution_indices.copy()
    i = 0
    while i < len(res):
        candidate = res[:i] + res[i+1:]
        if is_valid_solution(candidate, S, U):
            res = candidate
            i = 0
        else:
            i += 1
    return res

#Simulated annealing main code
""" Simulated Annealing
- Uses Approximation Algorithm as the initial solution

- Implements Simulated Annealing with:
    - A time cutoff (in seconds)
    - A no-improvement cutoff along with time cutoff
    - A probabilistic acceptance of worse moves based on a temperature schedule
    - add, swap, and remove possibilities at each iteration
    - run for max of 10 minutes per .in file
    - use "python3 simulatedannealing.py -inst ../data -alg LS1 -time 600 -seed 45" to run
"""
def simulated_annealing(U, S, raw_indices, cutoff_time, seed=1, threshold=100, initial_solution=None):
    random.seed(seed)
    start_time = time.time()

    if initial_solution is None:
        solution_indices, n = greedy_approx(U, S, raw_indices)
    else:
        solution_indices = initial_solution.copy()
    solution_indices = prune_solution(solution_indices, S, U)
    trace = [(0.0, len(solution_indices))]
    temp = 200
    final_temp = 2
    alpha = 0.995
    base_iterations = max(10, len(S) // 5)
    best_solution = solution_indices.copy()
    best_quality = len(best_solution)
    current_solution = solution_indices.copy()
    current_quality = len(current_solution)
    s = 0
    c = 0
    while temp > final_temp and (time.time() - start_time < cutoff_time) and (s < threshold):
        improved = False
        if s > 50:
            iters = base_iterations * 2 #try and do more work if approaching 100 same results
        else:
            iters = base_iterations
        for i in range(iters):
            if time.time() - start_time >= cutoff_time:
                break
            c += 1
            if c % 100 == 0:
                print(f"Iteration {c}: Temp={temp:.2f}, Current Quality={current_quality}, "
                      f"Best Quality={best_quality}, stagnation={s}")
            neighbors = []
            for i in range(len(current_solution)):
                candidate = current_solution[:i] + current_solution[i+1:]
                if is_valid_solution(candidate, S, U):
                    candidate = prune_solution(candidate, S, U)
                    neighbors.append(candidate)
            curr = set(range(len(S)))
            not_in_solution = list(curr - set(current_solution))
            #add 
            if not_in_solution:
                candidate = current_solution.copy()
                candidate.append(random.choice(not_in_solution))
                candidate = prune_solution(candidate, S, U)
                if is_valid_solution(candidate, S, U):
                    neighbors.append(candidate)
            #swap
            if current_solution and not_in_solution:
                candidate = current_solution.copy()
                iswap = random.randint(0, len(candidate) - 1)
                candidate[iswap] = random.choice(not_in_solution)
                candidate = prune_solution(candidate, S, U)
                if is_valid_solution(candidate, S, U):
                    neighbors.append(candidate)
            if len(current_solution) > 1:
                candidate = current_solution.copy()
                removal_index = random.choice(range(len(current_solution)))
                candidate.pop(removal_index)
                if is_valid_solution(candidate, S, U):
                    candidate = prune_solution(candidate, S, U)
                    neighbors.append(candidate)
            if not neighbors:
                continue
            for q in range(1):
                new_solution = random.choice(neighbors)
                new_quality = len(new_solution)
                delta = new_quality - current_quality
                if delta < 0 or random.random() < math.exp(-delta / temp):
                    current_solution = new_solution.copy()
                    current_quality = new_quality
                    if current_quality < best_quality:
                        best_solution = current_solution.copy()
                        best_quality = current_quality
                        trace.append((time.time() - start_time, best_quality))
                        improved = True
        if not improved:
            s += 1
        else:
            s = 0
        temp *= alpha
    original_indices = [raw_indices[i] for i in best_solution]
    elapsed = time.time() - start_time
    return best_solution, original_indices, trace, elapsed

#writing .sol and .trace files
def write_output(instance_path, method, cutoff, original_indices, seed=None, trace=None):
    instance_name = os.path.splitext(os.path.basename(instance_path))[0]
    output_dir = "../output"
    os.makedirs(output_dir, exist_ok=True)
    if method in ["LS1"]:
        base_name = f"{instance_name}_{method}_{cutoff}_{seed}"
    solution_path = os.path.join(output_dir, f"{base_name}.sol")
    with open(solution_path, 'w') as f:
        f.write(f"{len(original_indices)}\n")
        f.write(" ".join(map(str, original_indices)) + "\n")
    if trace and method in ["LS1"]:
        trace_path = os.path.join(output_dir, f"{base_name}.trace")
        with open(trace_path, 'w') as f:
            for timestamp, quality in trace:
                f.write(f"{timestamp:.2f} {quality}\n")

#main code to run the simulated annealing helper function
def process_file(file_path, algorithm, cutoff_time, seed):
    U, S, raw_indices = parse_instance(file_path)
    if algorithm == "LS1":
        initial_solution, x = greedy_approx(U, S, raw_indices)
        initial_solution = prune_solution(initial_solution, S, U)
        best_solution, original_indices, trace, nxt = simulated_annealing(
            U, S, raw_indices, cutoff_time, seed=seed, initial_solution=initial_solution)
        write_output(file_path, algorithm, cutoff_time, original_indices, seed, trace)

#main function to establish terminal arguments and combining .in files
def main():
    parser = argparse.ArgumentParser(description="Minimum Set Cover Solver")
    parser.add_argument('-inst', required=True)
    parser.add_argument('-alg', choices=['LS1'], required=True)
    parser.add_argument('-time', type=int, required=True)
    parser.add_argument('-seed', type=int, default=42)
    args = parser.parse_args()
    if os.path.isfile(args.inst):
        process_file(args.inst, args.alg, args.time, args.seed)
    elif os.path.isdir(args.inst) or args.inst == 'data':
        data_dir = args.inst if args.inst.endswith(os.sep) else args.inst + os.sep
        in_files = sorted(glob.glob(f"{data_dir}*.in"))
        for file_path in in_files:
            process_file(file_path, args.alg, args.time, args.seed)

if __name__ == "__main__":
    main()

In [None]:
import os
import pandas as pd

def read_sol(filename):
    """Reads a solution (.sol) file and returns the solution size (the first line)."""
    with open(filename, 'r') as f:
        lines = f.readlines()
    sol_size = int(lines[0].strip())
    return sol_size

def read_trace(filename):
    last_time = None
    with open(filename, 'r') as f:
        lines = [line.strip() for line in f if line.strip()]
    if lines:
        last_line = lines[-1]
        parts = last_line.split()
        last_time = float(parts[0])
    return last_time

def read_output(filename):
    with open(filename, 'r') as f:
        lines = f.readlines()
    opt_val = int(lines[0].strip())
    return opt_val

def rel_error(approx_val, opt_val):
    return abs(approx_val - opt_val) / opt_val

def process_dataset(dataset_prefix, index_range, sol_dir, data_dir, method, cutoff, seed):
    records = []
    for i in index_range:
        input_filename = os.path.join(data_dir, f"{dataset_prefix}{i}.in")
        out_filename = os.path.join(data_dir, f"{dataset_prefix}{i}.out")
        sol_filename = os.path.join(sol_dir, f"{dataset_prefix}{i}_{method}_{cutoff}_{seed}.sol")
        trace_filename = os.path.join(sol_dir, f"{dataset_prefix}{i}_{method}_{cutoff}_{seed}.trace")
        
        alg_size = read_sol(sol_filename)
        opt_size = read_output(out_filename)
        error = rel_error(alg_size, opt_size)
        time_val = None
        if os.path.exists(trace_filename):
            time_val = read_trace(trace_filename)
        dataset_name = f"{dataset_prefix}{i}"
        records.append((dataset_name, time_val, alg_size, round(error, 2)))

    df = pd.DataFrame(records, columns=['Dataset', 'Time (s)', 'size', 'RelErr'])
    return df


In [None]:
sol_dir = "../output"
data_dir = "../data"
method = "LS1"
cutoff = 600
seed = 45

small_df = process_dataset("small", range(1, 19), sol_dir, data_dir, method, cutoff, seed)
print("Small Datasets DataFrame:")
small_df

Small Datasets DataFrame:


Unnamed: 0,Dataset,Time (s),size,RelErr
0,small1,0.0,5,0.0
1,small2,0.0,3,0.0
2,small3,0.0,5,0.0
3,small4,0.0,4,0.0
4,small5,0.0,5,0.0
5,small6,0.0,3,0.0
6,small7,0.0,3,0.0
7,small8,0.0,2,0.0
8,small9,0.0,3,0.0
9,small10,0.0,2,0.0


In [None]:
large_df = process_dataset("large", range(1, 13), sol_dir, data_dir, method, cutoff, seed)
print("Large Datasets DataFrame:")
large_df


Large Datasets DataFrame:


Unnamed: 0,Dataset,Time (s),size,RelErr
0,large1,0.0,50,0.0
1,large2,0.0,20,0.05
2,large3,0.16,16,0.07
3,large4,172.85,147,0.62
4,large5,0.0,7,0.17
5,large6,0.0,7,0.17
6,large7,258.97,168,0.77
7,large8,0.09,5,0.0
8,large9,2.95,15,0.07
9,large10,580.77,304,0.38


In [None]:
test_df = process_dataset("test", range(1, 6), sol_dir, data_dir, method, cutoff, seed)
print("Test Datasets DataFrame:")
test_df


Test Datasets DataFrame:


Unnamed: 0,Dataset,Time (s),size,RelErr
0,test1,0.0,2,0.0
1,test2,0.0,2,0.0
2,test3,0.0,6,0.0
3,test4,0.0,4,0.0
4,test5,0.0,4,0.0
