# Summary Statistics Pipeline
## Generate summary_statistics_RTW files for Hist-RTSGA, RTSGA, and BCPSO

In [2]:
# ======================================================
# Cell 1: Imports and Setup
# ======================================================

import pandas as pd
import numpy as np
import random
import pickle
import os
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Experiment Configuration
RNG_SEED = 42
random.seed(RNG_SEED)
np.random.seed(RNG_SEED)

NUM_CYCLES = 20
RUNS_PER_CYCLE = 30
RTW_RATIOS = [0.05]  # 5% and 10%

# GA Parameters
GA_PARAMS = {
    'max_generations': 100,
    'crossover_prob': 0.8,
    'mutation_prob': 0.05,
    'population_size': 10
}

# Checkpoint directory
CHECKPOINT_DIR = '/content/drive/MyDrive/RTS/History'

print("Setup complete. Ready for experiments.")
print(f"Configuration: {NUM_CYCLES} cycles, {RUNS_PER_CYCLE} runs per cycle")
print(f"RTW ratios: {[f'{r*100}%' for r in RTW_RATIOS]}")
print(f"Checkpoint directory: {CHECKPOINT_DIR}")

Setup complete. Ready for experiments.
Configuration: 20 cycles, 30 runs per cycle
RTW ratios: ['5.0%']
Checkpoint directory: /content/drive/MyDrive/RTS/History


In [3]:
# ======================================================
# Cell 2: Data Loading and Preparation
# ======================================================

# Load dataset
filename = '/content/drive/MyDrive/RTS/Dataset/mapped_dataset_1-half.xlsx'  # Update this path as needed
df = pd.read_excel(filename)

# Normalize column names
df.columns = [col.strip().lower() for col in df.columns]

# Create data maps
data_maps = {
    'req_to_tests': df.groupby('us_id')['tc_id'].apply(set).to_dict(),
    'req_to_bv': df.groupby('us_id')['us_businessvalue'].first().to_dict(),
    'test_to_time': df.groupby('tc_id')['tc_executiontime'].first().to_dict(),
    'all_req_ids': sorted(df['us_id'].unique()),
    'tests': sorted(df['tc_id'].unique())
}

# For BCPSO: test to requirements mapping
data_maps['test_to_reqs'] = df.groupby('tc_id')['us_id'].apply(set).to_dict()

data_maps['n_reqs'] = len(data_maps['all_req_ids'])
data_maps['n_tests'] = len(data_maps['tests'])

# Calculate total execution time
TOTAL_EXEC_TIME = df.drop_duplicates(subset=['tc_id'])['tc_executiontime'].sum()

print(f"Dataset loaded: {data_maps['n_tests']} tests, {data_maps['n_reqs']} requirements")
print(f"Total execution time: {TOTAL_EXEC_TIME:.2f}")
print(f"Requirements range: {min(data_maps['all_req_ids'])} to {max(data_maps['all_req_ids'])}")
print(f"Business values range: {min(data_maps['req_to_bv'].values())} to {max(data_maps['req_to_bv'].values())}")

Dataset loaded: 40 tests, 124 requirements
Total execution time: 349.65
Requirements range: 1 to 124
Business values range: 2 to 89


In [4]:
# ======================================================
# Cell 3: Algorithm Implementations
# ======================================================

# ----- CORE GA FUNCTION (Used by both Hist-RTSGA and RTSGA) -----
def run_ga_instance(max_exec_time, ga_params, data_maps):
    """Core GA logic used by both history-aware and standard RTSGA"""
    max_generations = ga_params['max_generations']
    crossover_prob = ga_params['crossover_prob']
    mutation_prob = ga_params['mutation_prob']
    population_size = ga_params['population_size']

    req_to_tests = data_maps['req_to_tests']
    req_to_bv = data_maps['req_to_bv']
    test_to_time = data_maps['test_to_time']
    all_req_ids = data_maps['all_req_ids']
    n_requirements = len(all_req_ids)

    def evaluate_chrom(chrom):
        covered_tests = set()
        total_bv = 0
        for idx, val in enumerate(chrom):
            if val:
                req = all_req_ids[idx]
                covered_tests |= req_to_tests.get(req, set())
                total_bv += req_to_bv.get(req, 0)
        total_time = sum(test_to_time.get(t, 0) for t in covered_tests)
        feasible = (total_time <= max_exec_time)
        violation = max(0, total_time - max_exec_time)
        return total_bv, total_time, feasible, violation

    def feasible_chromosome():
        chrom = [0] * n_requirements
        indices = list(range(n_requirements))
        random.shuffle(indices)
        covered_tests = set()
        total_time = 0
        for idx in indices:
            req = all_req_ids[idx]
            tests = req_to_tests.get(req, set())
            new_tests = tests - covered_tests
            additional_time = sum(test_to_time.get(t, 0) for t in new_tests)
            if total_time + additional_time <= max_exec_time:
                chrom[idx] = 1
                covered_tests |= new_tests
                total_time += additional_time
        return chrom

    def greedy_seed_variant():
        chrom = [0] * n_requirements
        covered = set()
        used_time = 0
        remaining = set(range(n_requirements))
        while remaining:
            best_idx, best_score = None, float("-inf")
            for idx in remaining:
                req = all_req_ids[idx]
                tests = req_to_tests.get(req, set())
                new_tests = tests - covered
                add_time = sum(test_to_time.get(t, 0) for t in new_tests)
                if add_time < 0:
                    continue
                score = req_to_bv.get(req, 0) if add_time == 0 else (req_to_bv.get(req, 0) / (add_time + 1e-12))
                if used_time + add_time <= max_exec_time and score > best_score:
                    best_idx, best_score = idx, score
            if best_idx is None:
                break
            req = all_req_ids[best_idx]
            new_tests = req_to_tests.get(req, set()) - covered
            add_time = sum(test_to_time.get(t, 0) for t in new_tests)
            if used_time + add_time <= max_exec_time:
                chrom[best_idx] = 1
                covered |= new_tests
                used_time += add_time
            remaining.remove(best_idx)
        return chrom

    def repair_value_aware(chrom):
        chrom = chrom[:]
        # DROP phase
        while True:
            _, _, feasible, _ = evaluate_chrom(chrom)
            if feasible:
                break
            selected = [i for i, v in enumerate(chrom) if v]
            if not selected:
                break
            worst_idx, worst_ratio = None, float("inf")
            for idx in selected:
                req = all_req_ids[idx]
                covered_other = set()
                for j, v in enumerate(chrom):
                    if v and j != idx:
                        covered_other |= req_to_tests.get(all_req_ids[j], set())
                unique_tests = req_to_tests.get(req, set()) - covered_other
                time_save = sum(test_to_time.get(t, 0) for t in unique_tests)
                bv_loss = req_to_bv.get(req, 0)
                ratio = bv_loss / (time_save + 1e-12) if time_save > 0 else float("inf")
                ratio += random.uniform(-1e-9, 1e-9)
                if ratio < worst_ratio:
                    worst_ratio, worst_idx = ratio, idx
            if worst_idx is None:
                break
            chrom[worst_idx] = 0
        # ADD phase
        while True:
            _, tt, _, _ = evaluate_chrom(chrom)
            slack = max_exec_time - tt
            if slack <= 0:
                break
            candidates = [i for i, v in enumerate(chrom) if not v]
            best_idx, best_score = None, float("-inf")
            covered_now = set()
            for j, v in enumerate(chrom):
                if v:
                    covered_now |= req_to_tests.get(all_req_ids[j], set())
            for idx in candidates:
                req = all_req_ids[idx]
                new_tests = req_to_tests.get(req, set()) - covered_now
                add_time = sum(test_to_time.get(t, 0) for t in new_tests)
                if add_time < 0:
                    continue
                score = req_to_bv.get(req, 0) if add_time == 0 else (req_to_bv.get(req, 0) / (add_time + 1e-12))
                score += random.uniform(-1e-9, 1e-9)
                if add_time <= slack and score > best_score:
                    best_idx, best_score = idx, score
            if best_idx is None:
                break
            chrom[best_idx] = 1
        return chrom

    # Initialize population
    population = [feasible_chromosome() for _ in range(population_size)]
    num_seeds = min(3, population_size)
    for i in range(num_seeds):
        population[i] = greedy_seed_variant()

    def deb_better(a_eval, b_eval):
        _, _, a_feas, a_viol = a_eval
        _, _, b_feas, b_viol = b_eval
        if a_feas and not b_feas:
            return True
        if b_feas and not a_feas:
            return False
        if a_feas and b_feas:
            return a_eval[0] > b_eval[0]
        return a_viol < b_viol

    def deb_tournament_selection(population, k=3):
        contestants = random.sample(population, k=min(k, len(population)))
        best = contestants[0]
        best_eval = evaluate_chrom(best)
        for c in contestants[1:]:
            c_eval = evaluate_chrom(c)
            if deb_better(c_eval, best_eval):
                best, best_eval = c, c_eval
        return best

    def single_point_crossover(parent1, parent2):
        if n_requirements < 2:
            return parent1[:], parent2[:]
        point = random.randint(1, n_requirements - 1)
        child1 = parent1[:point] + parent2[point:]
        child2 = parent2[:point] + parent1[point:]
        return child1, child2

    def mutate(chromosome):
        if random.random() < mutation_prob and n_requirements > 0:
            mutation_idx = random.randint(0, n_requirements - 1)
            chromosome[mutation_idx] = 1 - chromosome[mutation_idx]

    # Main GA loop
    for generation in range(max_generations):
        mating_pool = []
        for _ in range(population_size // 2):
            parent1 = deb_tournament_selection(population)
            parent2 = deb_tournament_selection(population)
            mating_pool.append((parent1, parent2))

        offspring = []
        for parent1, parent2 in mating_pool:
            if random.random() < crossover_prob:
                child1, child2 = single_point_crossover(parent1, parent2)
            else:
                child1, child2 = parent1[:], parent2[:]
            mutate(child1)
            mutate(child2)
            child1 = repair_value_aware(child1)
            child2 = repair_value_aware(child2)
            offspring.extend([child1, child2])

        combined_population = population + offspring
        def deb_key(ch):
            bv, _, feas, viol = evaluate_chrom(ch)
            return (0 if feas else 1, viol, -bv)
        combined_population.sort(key=deb_key)
        population = combined_population[:population_size]

    best_solution = population[0]
    best_fitness, _, _, _ = evaluate_chrom(best_solution)
    selected_reqs = [all_req_ids[i] for i, val in enumerate(best_solution) if val]

    return selected_reqs, best_fitness


# ----- BCPSO ALGORITHM -----
def run_bcpso(budget, data_maps):
    """Binary Constrained PSO for test case selection"""
    POP_SIZE = 20
    C1, C2 = 1.5, 1.5
    MAX_EVALS = 30000
    W_MAX, W_MIN = 0.9, 0.4
    PENALTY = 1000

    tests = data_maps['tests']
    n_tests = data_maps['n_tests']
    costs = data_maps['test_to_time']
    test_to_reqs = data_maps['test_to_reqs']
    n_reqs = data_maps['n_reqs']

    def evaluate_pso(sol):
        sel_tests = [tests[i] for i, bit in enumerate(sol) if bit]
        cov_reqs = set().union(*(test_to_reqs.get(t, set()) for t in sel_tests)) if sel_tests else set()
        cov_pct = (len(cov_reqs) / n_reqs) * 100 if n_reqs > 0 else 0
        cost_val = sum(costs.get(t, 0) for t in sel_tests)
        return cov_pct, cost_val, sel_tests, cov_reqs

    def fitness_fn(cov, cost, bud):
        if cost <= bud:
            return cov
        return cov - PENALTY * (1 + (cost - bud) / bud)

    # Initialize
    positions = np.random.randint(0, 2, (POP_SIZE, n_tests))
    velocities = np.random.uniform(0, 1, (POP_SIZE, n_tests))
    pbest_pos = positions.copy()
    pbest_fit = np.full(POP_SIZE, -np.inf, dtype=np.float64)

    for i in range(POP_SIZE):
        cov, cost, _, _ = evaluate_pso(positions[i])
        pbest_fit[i] = fitness_fn(cov, cost, budget)

    gbest_idx = np.argmax(pbest_fit)
    gbest_pos = pbest_pos[gbest_idx].copy()
    gbest_fit = pbest_fit[gbest_idx]

    evals = POP_SIZE
    iteration = 0

    while evals < MAX_EVALS:
        w = W_MAX - (W_MAX - W_MIN) * (evals / MAX_EVALS)

        for i in range(POP_SIZE):
            r1, r2 = np.random.rand(n_tests), np.random.rand(n_tests)
            velocities[i] = (w * velocities[i] +
                           C1 * r1 * (pbest_pos[i] - positions[i]) +
                           C2 * r2 * (gbest_pos - positions[i]))

            sigmoid_v = 1 / (1 + np.exp(-velocities[i]))
            positions[i] = (np.random.rand(n_tests) < sigmoid_v).astype(int)

            cov, cost, _, _ = evaluate_pso(positions[i])
            fit = fitness_fn(cov, cost, budget)
            evals += 1

            if fit > pbest_fit[i]:
                pbest_fit[i] = fit
                pbest_pos[i] = positions[i].copy()

                if fit > gbest_fit:
                    gbest_fit = fit
                    gbest_pos = positions[i].copy()

        iteration += 1

    # Extract final results
    cov_pct, cost_val, sel_tests, cov_reqs = evaluate_pso(gbest_pos)

    return list(cov_reqs), cov_pct

print("All algorithm implementations loaded successfully.")

All algorithm implementations loaded successfully.


In [5]:
# ======================================================
# Cell 4: Run RTSGA ONLY for 5% RTW (with Checkpointing)
# ======================================================

# Ensure checkpoint directory exists
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
print(f"üìÅ Checkpoint directory: {CHECKPOINT_DIR}\n")

def save_checkpoint(checkpoint_data, checkpoint_file):
    """Save checkpoint to disk"""
    with open(checkpoint_file, 'wb') as f:
        pickle.dump(checkpoint_data, f)
    print(f"    üíæ Checkpoint saved")

def load_checkpoint(checkpoint_file):
    """Load checkpoint from disk if it exists"""
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file, 'rb') as f:
            return pickle.load(f)
    return None

# Main experiment loop - ONLY 5% RTW, ONLY RTSGA
results_storage = {}

rtw_ratio = 0.05  # ONLY 5%
rtw_label = "5%"
checkpoint_file = os.path.join(CHECKPOINT_DIR, f'checkpoint_RTSGA_only_RTW_{rtw_label}.pkl')

print(f"\n{'='*60}")
print(f"Running RTSGA ONLY for RTW = {rtw_label}")
print(f"Max execution time: {rtw_ratio * TOTAL_EXEC_TIME:.2f}")
print(f"{'='*60}\n")

# Check for existing checkpoint
checkpoint = load_checkpoint(checkpoint_file)

if checkpoint is not None:
    print(f"üìÇ Found existing checkpoint!")
    print(f"   Resuming from previous progress...")
    rtsga_results = checkpoint.get('RTSGA', [])
    print(f"   RTSGA: {len(rtsga_results)}/{NUM_CYCLES} cycles completed\n")
else:
    print(f"üÜï No checkpoint found. Starting fresh...\n")
    rtsga_results = []

# ========== STANDARD RTSGA ==========
if len(rtsga_results) < NUM_CYCLES:
    print(f"Running RTSGA for {NUM_CYCLES - len(rtsga_results)} remaining cycles...")

    start_cycle = len(rtsga_results) + 1
    max_exec_time = rtw_ratio * TOTAL_EXEC_TIME

    for cycle_idx in range(start_cycle, NUM_CYCLES + 1):
        print(f"  Cycle {cycle_idx}/{NUM_CYCLES}...", end=' ', flush=True)

        # No BV modification - use original values (NO HISTORY)
        all_runs = []
        for run_idx in range(RUNS_PER_CYCLE):
            selected_reqs, best_fitness = run_ga_instance(
                max_exec_time=max_exec_time,
                ga_params=GA_PARAMS,
                data_maps=data_maps
            )
            all_runs.append({
                'selected_reqs': selected_reqs,
                'fitness': best_fitness
            })

        # Find best run (HIGHEST BV from 30 runs)
        best_run = max(all_runs, key=lambda r: r['fitness'])

        rtsga_results.append({
            'cycle_idx': cycle_idx,
            'best_selected_reqs': best_run['selected_reqs'],
            'best_bv': best_run['fitness'],
            'all_runs': all_runs
        })

        # Save checkpoint after each cycle
        checkpoint_data = {'RTSGA': rtsga_results}
        save_checkpoint(checkpoint_data, checkpoint_file)
        print(f"‚úì (BV: {best_run['fitness']:.2f})")

    print(f"  ‚úì Completed all {len(rtsga_results)} RTSGA cycles")
else:
    print(f"‚úì RTSGA already completed ({len(rtsga_results)}/{NUM_CYCLES} cycles)")

# Store final results
results_storage[rtw_label] = {
    'RTSGA': rtsga_results
}

print(f"\n‚úÖ RTSGA experiments for RTW = {rtw_label} completed!")
print(f"   Total cycles: RTSGA={len(rtsga_results)}")
print(f"üìÇ Checkpoint saved: {checkpoint_file}")

üìÅ Checkpoint directory: /content/drive/MyDrive/RTS/History


Running RTSGA ONLY for RTW = 5%
Max execution time: 17.48

üÜï No checkpoint found. Starting fresh...

Running RTSGA for 20 remaining cycles...
  Cycle 1/20...     üíæ Checkpoint saved
‚úì (BV: 739.00)
  Cycle 2/20...     üíæ Checkpoint saved
‚úì (BV: 739.00)
  Cycle 3/20...     üíæ Checkpoint saved
‚úì (BV: 739.00)
  Cycle 4/20...     üíæ Checkpoint saved
‚úì (BV: 739.00)
  Cycle 5/20...     üíæ Checkpoint saved
‚úì (BV: 735.00)
  Cycle 6/20...     üíæ Checkpoint saved
‚úì (BV: 735.00)
  Cycle 7/20...     üíæ Checkpoint saved
‚úì (BV: 739.00)
  Cycle 8/20...     üíæ Checkpoint saved
‚úì (BV: 739.00)
  Cycle 9/20...     üíæ Checkpoint saved
‚úì (BV: 721.00)
  Cycle 10/20...     üíæ Checkpoint saved
‚úì (BV: 726.00)
  Cycle 11/20...     üíæ Checkpoint saved
‚úì (BV: 739.00)
  Cycle 12/20...     üíæ Checkpoint saved
‚úì (BV: 735.00)
  Cycle 13/20...     üíæ Checkpoint saved
‚úì (BV: 739.00)
  Cycle 14/20...    

In [6]:
# ======================================================
# Cell 5: Calculate Summary Statistics for RTSGA at 5% RTW
# ======================================================

def calculate_cycle_statistics(results_dict, data_maps, method_names):
    """Calculate mean, std, median, max, min for BV, req coverage, and test coverage"""
    stats_dict = {}

    for method_name in method_names:
        if method_name not in results_dict:
            continue

        cycles_data = results_dict[method_name]
        method_stats = []

        for cycle_result in cycles_data:
            cycle_idx = cycle_result['cycle_idx']
            all_runs = cycle_result['all_runs']

            # Extract metrics from all 30 runs
            bv_values = []
            req_cov_values = []
            test_cov_values = []

            for run in all_runs:
                selected_reqs = run['selected_reqs']

                # BV
                total_bv = run['fitness']
                bv_values.append(total_bv)

                # Requirement coverage
                req_cov = (len(selected_reqs) / data_maps['n_reqs']) * 100
                req_cov_values.append(req_cov)

                # Test coverage
                covered_tests = set()
                for req in selected_reqs:
                    covered_tests |= data_maps['req_to_tests'].get(req, set())
                test_cov = (len(covered_tests) / data_maps['n_tests']) * 100
                test_cov_values.append(test_cov)

            # Calculate statistics
            cycle_stats = {
                'cycle': cycle_idx,
                'bv_mean': np.mean(bv_values),
                'bv_std': np.std(bv_values),
                'bv_median': np.median(bv_values),
                'bv_max': np.max(bv_values),
                'bv_min': np.min(bv_values),
                'req_cov_mean': np.mean(req_cov_values),
                'req_cov_std': np.std(req_cov_values),
                'req_cov_median': np.median(req_cov_values),
                'req_cov_max': np.max(req_cov_values),
                'req_cov_min': np.min(req_cov_values),
                'test_cov_mean': np.mean(test_cov_values),
                'test_cov_std': np.std(test_cov_values),
                'test_cov_median': np.median(test_cov_values),
                'test_cov_max': np.max(test_cov_values),
                'test_cov_min': np.min(test_cov_values)
            }
            method_stats.append(cycle_stats)

        stats_dict[method_name] = pd.DataFrame(method_stats)

    return stats_dict


def create_summary_table(stats_dict, metric='bv', method_names=['RTSGA']):
    """Create table with (max, min, std) for each cycle"""
    rows = []

    for method_name in method_names:
        if method_name not in stats_dict:
            continue

        df_stats = stats_dict[method_name]
        row = {'Method': method_name}

        for _, cycle_row in df_stats.iterrows():
            cycle_idx = int(cycle_row['cycle'])
            max_val = cycle_row[f'{metric}_max']
            min_val = cycle_row[f'{metric}_min']
            std_val = cycle_row[f'{metric}_std']

            row[f'C{cycle_idx}'] = f"({max_val:.2f}, {min_val:.2f}, {std_val:.2f})"

        rows.append(row)

    return pd.DataFrame(rows)


# Calculate and save statistics for RTSGA at 5% RTW only
all_statistics = {}

rtw_label = "5%"

print(f"\nCalculating statistics for RTSGA at RTW = {rtw_label}")
print("="*60)

# Only process RTSGA
stats_dict = calculate_cycle_statistics(
    results_storage[rtw_label],
    data_maps,
    method_names=['RTSGA']
)
all_statistics[rtw_label] = stats_dict

# Create summary file
output_file = f'summary_statistics_RTSGA_only_RTW_{rtw_label}.xlsx'

with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
    # BV summary
    bv_table = create_summary_table(stats_dict, 'bv', method_names=['RTSGA'])
    bv_table.to_excel(writer, sheet_name='BV_Summary', index=False)

    # Req coverage summary
    req_cov_table = create_summary_table(stats_dict, 'req_cov', method_names=['RTSGA'])
    req_cov_table.to_excel(writer, sheet_name='Req_Cov_Summary', index=False)

    # Test coverage summary
    test_cov_table = create_summary_table(stats_dict, 'test_cov', method_names=['RTSGA'])
    test_cov_table.to_excel(writer, sheet_name='Test_Cov_Summary', index=False)

    # Full statistics for RTSGA
    if 'RTSGA' in stats_dict:
        stats_dict['RTSGA'].to_excel(writer, sheet_name='RTSGA_Full', index=False)

print(f"‚úÖ Saved: {output_file}")
print(f"   Sheets: BV_Summary, Req_Cov_Summary, Test_Cov_Summary, RTSGA_Full")

# Display BV summary preview (all 20 cycles)
print(f"\nüìä BV Summary (max, min, std) - All 20 cycles:")
display_cols = ['Method'] + [f'C{i}' for i in range(1, NUM_CYCLES + 1)]
print(bv_table[display_cols].to_string(index=False))

print("\n" + "="*60)
print("‚úÖ Summary statistics for RTSGA at 5% RTW generated!")
print("="*60)
print(f"\nGenerated file: summary_statistics_RTSGA_only_RTW_5%.xlsx")


Calculating statistics for RTSGA at RTW = 5%
‚úÖ Saved: summary_statistics_RTSGA_only_RTW_5%.xlsx
   Sheets: BV_Summary, Req_Cov_Summary, Test_Cov_Summary, RTSGA_Full

üìä BV Summary (max, min, std) - All 20 cycles:
Method                     C1                     C2                     C3                     C4                     C5                     C6                     C7                     C8                     C9                    C10                    C11                    C12                    C13                    C14                    C15                    C16                    C17                     C18                    C19                    C20
 RTSGA (739.00, 711.00, 6.56) (739.00, 711.00, 8.47) (739.00, 711.00, 6.50) (739.00, 711.00, 8.06) (735.00, 711.00, 5.97) (735.00, 711.00, 8.15) (739.00, 711.00, 5.69) (739.00, 711.00, 6.47) (721.00, 711.00, 1.80) (726.00, 711.00, 2.69) (739.00, 711.00, 5.69) (735.00, 711.00, 4.64) (739.00, 711.00, 7.42) (739.00,