In [1]:
# ===============================
# Cell 1: Setup and Configuration
# ===============================
%pip install openpyxl
import pandas as pd
import numpy as np
import random
import time
import os
import sys
import subprocess
import re
import ast
from google.colab import files
import matplotlib.pyplot as plt
import seaborn as sns
from openpyxl import Workbook
from openpyxl.drawing.image import Image as OpenpyxlImage
from io import BytesIO

# --- CONFIGURATION FOR THE EXPERIMENT ---
BUDGET_PERCENTAGES = [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95]
NUM_RUNS_PER_BUDGET = 30

# --- Define the different GA configurations to be tested ---
GA_CONFIGURATIONS = {
    "V1_Balanced": {
        "max_generations": 500, "crossover_prob": 0.8, "mutation_prob": 0.3
    },
    "V2_Aggressive": {
        "max_generations": 1000, "crossover_prob": 0.95, "mutation_prob": 0.3
    },
    "V3_Deep_Exploration": {
        "max_generations": 1000, "crossover_prob": 0.8, "mutation_prob": 0.3
    },
    "V4_Exhaustive": {
        "max_generations": 2000, "crossover_prob": 1.0, "mutation_prob": 0.3
    }
}



In [2]:
# ======================================================
# Cell 2: The GA Logic as a Reusable Function
# ======================================================

def run_ga_instance(max_exec_time, ga_params, data_maps):
    # Unpack GA parameters and data maps
    max_generations = ga_params['max_generations']
    crossover_prob = ga_params['crossover_prob']
    mutation_prob = ga_params['mutation_prob']

    req_to_tests = data_maps['req_to_tests']
    req_to_bv = data_maps['req_to_bv']
    test_to_time = data_maps['test_to_time']
    all_req_ids = data_maps['all_req_ids']
    bval_to_reqs_map = data_maps['bval_to_reqs_map']
    n_requirements = len(all_req_ids)

    # ----------------------------------------------------------------------
    # Helper: Evaluation Decomposition (Deb, 2000)
    # Source: Deb (2000), "An Efficient Constraint Handling Method for GAs"
    # ----------------------------------------------------------------------
    def evaluate_chrom(chrom):
        covered_tests = set(); total_bv = 0
        for idx, val in enumerate(chrom):
            if val:
                req = all_req_ids[idx]
                covered_tests |= req_to_tests.get(req, set())
                total_bv += req_to_bv.get(req, 0)
        total_time = sum(test_to_time.get(t, 0) for t in covered_tests)
        feasible = (total_time <= max_exec_time)
        violation = max(0, total_time - max_exec_time)
        return total_bv, total_time, feasible, violation

    # ----------------------------------------------------------------------
    # Feasible Initialization (Chu & Beasley, 1998)
    # ----------------------------------------------------------------------
    def feasible_chromosome():
        chrom = [0] * n_requirements; indices = list(range(n_requirements)); random.shuffle(indices)
        covered_tests = set(); total_time = 0
        for idx in indices:
            req = all_req_ids[idx]; tests = req_to_tests.get(req, set())
            new_tests = tests - covered_tests
            additional_time = sum(test_to_time.get(t, 0) for t in new_tests)
            if total_time + additional_time <= max_exec_time:
                chrom[idx] = 1; covered_tests |= new_tests; total_time += additional_time
        return chrom

    population_size = 10
    population = [feasible_chromosome() for _ in range(population_size)]

    # ----------------------------------------------------------------------
    # Greedy Seeding for Budgeted BV Coverage (Khuller, Moss, Naor, 1999)
    # Source: Khuller–Moss–Naor (1999), Budgeted Maximum Coverage (1−1/e)
    # ----------------------------------------------------------------------
    def greedy_seed_variant():
        chrom = [0] * n_requirements; covered = set(); used_time = 0
        remaining = set(range(n_requirements))
        while remaining:
            best_idx, best_score = None, float("-inf")
            for idx in remaining:
                req = all_req_ids[idx]; tests = req_to_tests.get(req, set()); new_tests = tests - covered
                add_time = sum(test_to_time.get(t, 0) for t in new_tests)
                if add_time < 0: continue
                score = req_to_bv.get(req, 0) if add_time == 0 else (req_to_bv.get(req, 0) / (add_time + 1e-12))
                if used_time + add_time <= max_exec_time and score > best_score:
                    best_idx, best_score = idx, score
            if best_idx is None: break
            req = all_req_ids[best_idx]; new_tests = req_to_tests.get(req, set()) - covered
            add_time = sum(test_to_time.get(t, 0) for t in new_tests)
            if used_time + add_time <= max_exec_time:
                chrom[best_idx] = 1; covered |= new_tests; used_time += add_time
            remaining.remove(best_idx)
        return chrom

    # Inject a few seeds to improve the starting population
    num_seeds = min(3, population_size)
    for i in range(num_seeds):
        population[i] = greedy_seed_variant()

    # ----------------------------------------------------------------------
    # Value-Aware Repair / Improve Operator
    # Source: Chu & Beasley (1998); randomized/efficiency-based repair literature
    # ----------------------------------------------------------------------
    def repair_value_aware(chrom):
        chrom = chrom[:]
        # DROP phase: remove the least efficient requirement until feasible
        while True:
            _, _, feasible, _ = evaluate_chrom(chrom)
            if feasible: break
            selected = [i for i, v in enumerate(chrom) if v]
            if not selected: break
            worst_idx, worst_ratio = None, float("inf")
            for idx in selected:
                req = all_req_ids[idx]; covered_other = set()
                for j, v in enumerate(chrom):
                    if v and j != idx: covered_other |= req_to_tests.get(all_req_ids[j], set())
                unique_tests = req_to_tests.get(req, set()) - covered_other
                time_save = sum(test_to_time.get(t, 0) for t in unique_tests)
                bv_loss = req_to_bv.get(req, 0)
                ratio = bv_loss / (time_save + 1e-12) if time_save > 0 else float("inf")
                ratio += random.uniform(-1e-9, 1e-9)
                if ratio < worst_ratio: worst_ratio, worst_idx = ratio, idx
            if worst_idx is None: break
            chrom[worst_idx] = 0
        # ADD phase: greedily add the most efficient requirement that fits
        while True:
            _, tt, _, _ = evaluate_chrom(chrom); slack = max_exec_time - tt
            if slack <= 0: break
            candidates = [i for i, v in enumerate(chrom) if not v]; best_idx, best_score = None, float("-inf")
            covered_now = set()
            for j, v in enumerate(chrom):
                if v: covered_now |= req_to_tests.get(all_req_ids[j], set())
            for idx in candidates:
                req = all_req_ids[idx]; new_tests = req_to_tests.get(req, set()) - covered_now
                add_time = sum(test_to_time.get(t, 0) for t in new_tests)
                if add_time < 0: continue
                score = req_to_bv.get(req, 0) if add_time == 0 else (req_to_bv.get(req, 0) / (add_time + 1e-12))
                score += random.uniform(-1e-9, 1e-9)
                if add_time <= slack and score > best_score: best_idx, best_score = idx, score
            if best_idx is None: break
            chrom[best_idx] = 1
        return chrom

    # ----------------------------------------------------------------------
    # Fitness Function (Yoo & Harman, 2012, Sec. 5.1)
    # Note: Fitness now returns total BV regardless of feasibility.
    # Feasibility is handled by the Deb selection operator.
    # ----------------------------------------------------------------------
    def fitness(chrom):
        total_bv, _, _, _ = evaluate_chrom(chrom)
        return total_bv

    # ----------------------------------------------------------------------
    # Deb-Style Tournament Selection (Deb, 2000)
    # ----------------------------------------------------------------------
    def deb_better(a_eval, b_eval):
        _, _, a_feas, a_viol = a_eval; _, _, b_feas, b_viol = b_eval
        if a_feas and not b_feas: return True
        if b_feas and not a_feas: return False
        if a_feas and b_feas: return a_eval[0] > b_eval[0] # Compare by BV
        return a_viol < b_viol # Compare by violation
    def deb_tournament_selection(population, k=3):
        contestants = random.sample(population, k=min(k, len(population))); best = contestants[0]
        best_eval = evaluate_chrom(best)
        for c in contestants[1:]:
            c_eval = evaluate_chrom(c)
            if deb_better(c_eval, best_eval): best, best_eval = c, c_eval
        return best

    # ----------------------------------------------------------------------
    # Crossover and Mutation Operators (Mitchell, 1998) - Unchanged
    # ----------------------------------------------------------------------
    def single_point_crossover(parent1, parent2):
        if n_requirements < 2: return parent1[:], parent2[:]
        point = random.randint(1, n_requirements - 1); child1 = parent1[:point] + parent2[point:]; child2 = parent2[:point] + parent1[point:]
        return child1, child2
    def mutate(chromosome, mutation_prob=mutation_prob):
        if random.random() < mutation_prob and n_requirements > 0:
            mutation_idx = random.randint(0, n_requirements - 1); chromosome[mutation_idx] = 1 - chromosome[mutation_idx]

    # ----------------------------------------------------------------------
    # Main GA Loop (modified to use new operators)
    # ----------------------------------------------------------------------
    for generation in range(max_generations):
        # --- Selection ---
        mating_pool = []
        for _ in range(population_size // 2):
            parent1 = deb_tournament_selection(population)
            parent2 = deb_tournament_selection(population)
            mating_pool.append((parent1, parent2))

        # --- Variation & Repair ---
        offspring = []
        for parent1, parent2 in mating_pool:
            if random.random() < crossover_prob: child1, child2 = single_point_crossover(parent1, parent2)
            else: child1, child2 = parent1[:], parent2[:]
            mutate(child1); mutate(child2)
            child1 = repair_value_aware(child1)
            child2 = repair_value_aware(child2)
            offspring.extend([child1, child2])

        # --- Replacement (using Deb's rules) ---
        combined_population = population + offspring
        def deb_key(ch):
            bv, _, feas, viol = evaluate_chrom(ch); return (0 if feas else 1, viol, -bv)
        combined_population.sort(key=deb_key)
        population = combined_population[:population_size]

    # Final best solution is the top of the population after the last sort
    best_solution = population[0]
    best_fitness, _, _, _ = evaluate_chrom(best_solution)
    selected_reqs = [all_req_ids[i] for i, val in enumerate(best_solution) if val]

    return selected_reqs, best_fitness

print("New GA logic has been refactored into a reusable function.")

New GA logic has been refactored into a reusable function.


In [None]:
 # ===========================================
# Cell 3: Experiment Wrapper & Execution
# ===========================================

def run_all_experiments():
    print("\n--- Starting Full Experiment Suite: Please upload your mapped_dataset file ---")
    uploaded = files.upload()
    if not uploaded: print("No file selected. Aborting."); return None, None
    input_filename = next(iter(uploaded))

    # Load data ONCE
    print(f"Loading data from '{input_filename}'...")
    original_df = pd.read_excel(input_filename)
    TOTAL_EXEC_TIME = original_df.drop_duplicates(subset=['tc_id'])['tc_executiontime'].sum()
    print(f"Total possible execution time from file: {TOTAL_EXEC_TIME:.2f}")

    # Create data maps ONCE
    data_maps = {
        'req_to_tests': original_df.groupby('us_id')['tc_id'].apply(set).to_dict(),
        'req_to_bv': original_df.groupby('us_id')['us_businessvalue'].first().to_dict(),
        'test_to_time': original_df.groupby('tc_id')['tc_executiontime'].first().to_dict(),
        'bval_to_reqs_map': original_df.groupby('us_businessvalue')['us_id'].apply(set).to_dict()
    }
    data_maps['all_req_ids'] = sorted(list(data_maps['req_to_tests'].keys()))

    all_version_results = {}
    for version_name, config in GA_CONFIGURATIONS.items():
        print("\n" + "="*80 + f"\n--- Running Experiment for Version: {version_name} ---")
        print(f"Parameters: {config}" + "\n" + "="*80)

        all_run_data = []
        for budget_pct in BUDGET_PERCENTAGES:
            budget_value = TOTAL_EXEC_TIME * (budget_pct / 100.0)
            print(f"\n--- Running for Budget: {budget_pct}% (Max Time: {budget_value:.2f}) ---")
            for run in range(NUM_RUNS_PER_BUDGET):
                print(f"  Run {run + 1}/{NUM_RUNS_PER_BUDGET}...", end='\r')

                # Call the GA function directly - much faster
                selected_reqs, final_bv = run_ga_instance(budget_value, config, data_maps)

                all_run_data.append({
                    'budget_pct': budget_pct, 'run': run + 1,
                    'total_business_value': final_bv,
                    'num_reqs_covered': len(selected_reqs),
                    'selected_reqs': selected_reqs # Add selected_reqs to the results
                })
            print(f"\n  -> Completed {NUM_RUNS_PER_BUDGET} runs.")

        all_version_results[version_name] = pd.DataFrame(all_run_data)

    return all_version_results, original_df

all_ga_results, original_df = run_all_experiments()


--- Starting Full Experiment Suite: Please upload your mapped_dataset file ---


Saving mapped_dataset_1.xlsx to mapped_dataset_1.xlsx
Loading data from 'mapped_dataset_1.xlsx'...
Total possible execution time from file: 1398.62

--- Running Experiment for Version: V1_Balanced ---
Parameters: {'max_generations': 500, 'crossover_prob': 0.8, 'mutation_prob': 0.3}

--- Running for Budget: 5% (Max Time: 69.93) ---

  -> Completed 30 runs.

--- Running for Budget: 10% (Max Time: 139.86) ---

  -> Completed 30 runs.

--- Running for Budget: 15% (Max Time: 209.79) ---

  -> Completed 30 runs.

--- Running for Budget: 20% (Max Time: 279.72) ---

  -> Completed 30 runs.

--- Running for Budget: 25% (Max Time: 349.65) ---

  -> Completed 30 runs.

--- Running for Budget: 30% (Max Time: 419.59) ---

  -> Completed 30 runs.

--- Running for Budget: 35% (Max Time: 489.52) ---

  -> Completed 30 runs.

--- Running for Budget: 40% (Max Time: 559.45) ---

  -> Completed 30 runs.

--- Running for Budget: 45% (Max Time: 629.38) ---

  -> Completed 30 runs.

--- Running for Budget: 5

In [3]:
# ========================================================
# Cell 4: Statistical Analysis and Final Report
# ========================================================

if 'all_ga_results' in locals() and all_ga_results:
    print("\n\n" + "="*60 + "\n--- Starting Final Analysis and Reporting ---")
    print("Please upload the 'BCPSO_Statistical_Analysis_Report.xlsx' file for comparison.")
    uploaded_bcpso = files.upload()

    if uploaded_bcpso:
        bcpso_filename = next(iter(uploaded_bcpso))
        bcpso_summary_df = pd.read_excel(bcpso_filename, sheet_name='BCPSO Summary Table')
        print("BCPSO summary data loaded successfully.")

        for version_name, results_df in all_ga_results.items():
            print("\n" + "-"*60 + f"\nProcessing: {version_name}\n" + "-"*60)

            # --- The summary table is now simpler, as the GA only has one BV type ---
            summary_stats_ga = results_df.groupby('budget_pct').agg(
                mean_reqs=('num_reqs_covered', 'mean'), median_reqs=('num_reqs_covered', 'median'),
                mean_bv=('total_business_value', 'mean'), median_bv=('total_business_value', 'median')
            ).reset_index()

            num_total_reqs = original_df['us_id'].nunique()
            summary_stats_ga['Req Coverage %'] = (summary_stats_ga['mean_reqs'] / num_total_reqs) * 100

            print(f"\n--- {version_name} Final Summary Table ---"); print(summary_stats_ga.to_string())

            # --- Individual GA Plots---
            fig_indiv, axes_indiv = plt.subplots(2, 2, figsize=(20, 15))
            fig_indiv.suptitle(f'RTS-GA Individual Analysis: {version_name}', fontsize=18)
            sns.boxplot(ax=axes_indiv[0, 0], x='budget_pct', y='total_business_value', data=results_df); axes_indiv[0, 0].set_title('Distribution of Total BV (30 Runs)'); axes_indiv[0, 0].tick_params(axis='x', rotation=45)
            axes_indiv[0, 1].plot(summary_stats_ga['budget_pct'], summary_stats_ga['Req Coverage %'], marker='o'); axes_indiv[0, 1].set_title('Mean Req Coverage % vs. Budget')
            sns.barplot(ax=axes_indiv[1, 0], x='budget_pct', y='median_bv', data=summary_stats_ga, color='purple', alpha=0.8); axes_indiv[1, 0].set_title('Median Business Value vs. Budget'); axes_indiv[1, 0].tick_params(axis='x', rotation=45)
            sns.barplot(ax=axes_indiv[1, 1], x='budget_pct', y='median_reqs', data=summary_stats_ga, color='orange', alpha=0.8); axes_indiv[1, 1].set_title('Median # of Requirements Covered vs. Budget')
            for ax in axes_indiv.flat: ax.set_xlabel('Time Budget (%)'); ax.grid(True, linestyle='--', alpha=0.7)
            plt.tight_layout(rect=[0, 0, 1, 0.96]); plt.show()

            # --- Comparative Plots ---
            fig_comp, axes_comp = plt.subplots(2, 2, figsize=(20, 15))
            fig_comp.suptitle(f'Comparative Analysis: {version_name} vs. BCPSO', fontsize=18)

            axes_comp[0, 0].plot(summary_stats_ga['budget_pct'], summary_stats_ga['Req Coverage %'], marker='o', label=f'{version_name}'); axes_comp[0, 0].plot(bcpso_summary_df['Time Budget %'], bcpso_summary_df['mean_atomic_req_cov'], marker='x', label='BCPSO (Atomic)'); axes_comp[0, 0].set_title('GA Coverage vs. BCPSO Atomic ("Touch") Coverage'); axes_comp[0, 0].legend(); axes_comp[0, 0].set_ylabel('Coverage (%)')
            axes_comp[0, 1].plot(summary_stats_ga['budget_pct'], summary_stats_ga['Req Coverage %'], marker='o', label=f'{version_name}'); axes_comp[0, 1].plot(bcpso_summary_df['Time Budget %'], bcpso_summary_df['mean_fully_covered_req_cov'], marker='x', label='BCPSO (Fully Covered)'); axes_comp[0, 1].set_title('GA Coverage vs. BCPSO Fully Covered Coverage'); axes_comp[0, 1].legend(); axes_comp[0, 1].set_ylabel('Coverage (%)')
            axes_comp[1, 0].plot(summary_stats_ga['budget_pct'], summary_stats_ga['mean_bv'], marker='o', label=f'{version_name}'); axes_comp[1, 0].plot(bcpso_summary_df['Time Budget %'], bcpso_summary_df['mean_atomic_bv'], marker='x', label='BCPSO (Atomic)'); axes_comp[1, 0].set_title('GA BV vs. BCPSO Atomic BV'); axes_comp[1, 0].legend(); axes_comp[1, 0].set_ylabel('Business Value')
            axes_comp[1, 1].plot(summary_stats_ga['budget_pct'], summary_stats_ga['mean_bv'], marker='o', label=f'{version_name}'); axes_comp[1, 1].plot(bcpso_summary_df['Time Budget %'], bcpso_summary_df['mean_fully_covered_bv'], marker='x', label='BCPSO (Fully Covered)'); axes_comp[1, 1].set_title('GA BV vs. BCPSO Fully Covered BV'); axes_comp[1, 1].legend(); axes_comp[1, 1].set_ylabel('Business Value')

            for ax in axes_comp.flat: ax.set_xlabel('Time Budget (%)'); ax.grid(True, linestyle='--', alpha=0.7); ax.tick_params(axis='x', rotation=45)
            plt.tight_layout(rect=[0, 0, 1, 0.95]); plt.show()

            output_excel_filename = f"Comprehensive_Report_{version_name}.xlsx"
            print(f"\nGenerating report: '{output_excel_filename}'")
            img_buffer_indiv = BytesIO(); fig_indiv.savefig(img_buffer_indiv, format='png'); img_buffer_indiv.seek(0)
            img_buffer_comp = BytesIO(); fig_comp.savefig(img_buffer_comp, format='png'); img_buffer_comp.seek(0)
            with pd.ExcelWriter(output_excel_filename, engine='openpyxl') as writer:
                results_df.to_excel(writer, sheet_name='RTS-GA Raw Runs', index=False); summary_stats_ga.to_excel(writer, sheet_name='RTS-GA Summary', index=False)
                bcpso_summary_df.to_excel(writer, sheet_name='BCPSO Summary', index=False); ws = writer.book.create_sheet(title="Analysis Plots")
                ws.add_image(OpenpyxlImage(img_buffer_indiv), 'A1'); ws.add_image(OpenpyxlImage(img_buffer_comp), 'A80')
            print(f"Report for {version_name} saved.")
    else: print("\nBCPSO file not uploaded.")
else: print("\nNo RTS-GA data collected.")


No RTS-GA data collected.
