# Constructive Heuristic Template
# Type here the Name of the Combinatorial problem you studied
Prof. María Angélica Salazar Aguilar

Selected Topics of Optimization

---
This notebook is a template for documenting a combinatorial optimization problem and its constructive heuristic. Follow the instructions in each section and provide the required content.



## 1. Team: TeamID

Provide the names, student IDs, and contact information of all team members below.

|Student ID      | Name  | Email             |
|----------------|------------|-------------------|
|2013939         | Aldo Sebastian Lopez Rivas | aldo.lopezrvs@uanl.edu.mx                  |
|2173850                |Josue Sebastian Cruz Cantu            |            josue.cruzcn@uanl.edu.mx       |
|2173891                |Iver Jair Salas Sanchez            |     iver.salass@uanl.edu.mx              |
|2014777                |Juan Carlos Sanchez Valencia|        juan.sanchezvln@uanl.edu.mx           |

## 2. General Description of the Problem

Flow Shop Scheduling Problem (FSSP) schedules n jobs on m machines in the same order on all machines, minimizing an objective such as makespan (Cmax). Applications include manufacturing lines, semiconductor fabrication, and any staged processing systems with identical routing per job. The problem is NP-hard for m ≥ 3, motivating heuristics for high-quality solutions quickly

## 3. Mathematical Formulation

**Flow Shop Scheduling Problem (FSSP)**

**Sets and data**  
- $J=\{1,\dots,n\}$ jobs, $M=\{1,\dots,m\}$ machines.  
- $p_{ij}\ge 0$: processing time of job $j$ on machine $i$.  
- A permutation (sequence) of jobs is denoted by $\pi=(\pi_1,\ldots,\pi_n)$.

**State (completion-time) recursions**  
Let $C_{i,\pi_k}$ be the completion time of job $\pi_k$ on machine $i$. Then

\begin{align}
C_{1,\pi_1} &= p_{1,\pi_1}, \\
C_{1,\pi_k} &= C_{1,\pi_{k-1}} + p_{1,\pi_k} \qquad (k=2,\ldots,n), \\
C_{i,\pi_1} &= C_{i-1,\pi_1} + p_{i,\pi_1} \qquad (i=2,\ldots,m), \\
C_{i,\pi_k} &= \max\{\, C_{i-1,\pi_k},\; C_{i,\pi_{k-1}} \,\} + p_{i,\pi_k} \qquad (i=2,\ldots,m;\; k=2,\ldots,n).
\end{align}

The **makespan** is $C_{\max} = C_{m,\pi_n}$.

**Objective**  
$$
\min_{\pi \in \mathcal{S}_n} \; C_{\max}(\pi) \;=\; C_{m,\pi_n}.
$$

where $\mathcal{S}_n$ is the set of all permutations of $n$ jobs (identical machine order for every job).

> *Note.* A full MILP can be written with binary assignment variables $x_{jk}\in\{0,1\}$ indicating job $j$ at position $k$ and time variables, but here we emphasize a constructive permutation heuristic.

---

### Pendulum Heuristic (used to construct $\pi$)

**Idea.** Sort jobs by total processing time $T_j=\sum_{i=1}^m p_{ij}$ (ascending) and place them alternately at the left and right ends of the sequence so that small totals go to the extremes and large totals concentrate toward the center.

**Steps**
1. Compute $T_j=\sum_{i=1}^m p_{ij}$ for all $j\in J$.
2. Let $(j_1,\ldots,j_n)$ be the order of jobs such that $T_{j_1}\le \cdots \le T_{j_n}$.  
3. Initialize two pointers $l\leftarrow 1$, $r\leftarrow n$. For $k=1$ to $n$:
   - if $k$ is odd, set $\pi_l \leftarrow j_k$ and $l\leftarrow l+1$;
   - if $k$ is even, set $\pi_r \leftarrow j_k$ and $r\leftarrow r-1$.

This yields a permutation $\pi$ with lighter jobs at the ends and heavier jobs near the center (a “pendulum” layout).


## 4. Pseudocode of the Proposed Constructive Heuristic

```

Input:
  processing_times  // m x n matrix, processing_times[i][j] ≥ 0

Initialize solution
  if processing_times is empty:
      return []
  n ← number of jobs (number of columns)
  // compute job totals
  T[0..n-1] ← array
  for j from 0 to n-1:
      T[j] ← 0
      for i from 0 to m-1:
          T[j] ← T[j] + processing_times[i][j]
  // order jobs by ascending totals
  L ← list of job indices 0..n-1 sorted by T[j] ascending
  π[0..n-1] ← empty array
  left  ← 0
  right ← n - 1
  k ← 0

While [k < n]:
    Select next element according to heuristic
        job ← L[k]                       // kth smallest by total time
    Update solution
        if (k mod 2) == 0:
            π[left]  ← job
            left ← left + 1
        else:
            π[right] ← job
            right ← right - 1
        k ← k + 1

Output:
  π  // permutation with small-total jobs at the extremes and large totals near the center

```

## 5. Description and Definition of Main Functions

Below are the core functions used in our solver. The **Pendulum heuristic** is the primary constructor; other heuristics are included only for comparison.

---

### `read_csv_data(file_path) -> (processing_times, job_names)`  
*Module:* `io_utils`  
- **Purpose:** Load the instance from CSV with light auto-detection (headers, sizes) and validation of values.  
- **Inputs:** Path to CSV.  
- **Outputs:** `processing_times` (list of jobs × machines), `job_names` (list of strings).

---

### `validate_processing_times(processing_times) -> bool`  
*Module:* `io_utils`  
- **Purpose:** Sanity-check the matrix (dimensions consistent, non-negative numbers, correct types).  
- **Inputs:** Processing-time matrix.  
- **Outputs:** `True` or raises a descriptive `ValueError`.

---

### `pendulum_heuristic(processing_times) -> sequence` *(main constructive heuristic)*  
*Module:* `heuristics`  
- **Purpose:** Build a permutation by sorting jobs by total time (ascending) and placing them alternately at the left and right ends so small totals go to the extremes and large totals gravitate to the center.  
- **Inputs:** Processing-time matrix.  
- **Outputs:** `sequence` (list of job indices, 0-based).

---

### `calculate_makespan(processing_times, sequence) -> float`  
*Module:* `makespan`  
- **Purpose:** Evaluate a sequence using the standard flow-shop completion-time recursion; returns the completion time of the last job on the last machine.  
- **Inputs:** Processing-time matrix, permutation.  
- **Outputs:** `makespan` (float).

---

### `print_sequence_analysis(processing_times, sequence, job_names=None) -> None`  
*Module:* `makespan`  
- **Purpose:** Convenience reporter: prints makespan, per-machine idle times, and utilization/efficiency for a given sequence.  
- **Inputs:** Matrix, permutation, optional job names.  
- **Outputs:** Console report (no return).

---

### `(utility for demos) create_sample_data(file_path, num_jobs=5, num_machines=3) -> None`  
*Module:* `main`  
- **Purpose:** Generate a small synthetic instance for testing and demos.  
- **Inputs:** Output path, numbers of jobs/machines.  
- **Outputs:** CSV file written to disk.

---

> **Note:** Additional constructive heuristics (NEH, SPT/LPT, Palmer, CDS/Johnson) are implemented to enable fair comparisons but are not the primary focus of this study.


In [None]:
# 6. Implementation of the Constructive Heuristic

def read_csv_data(file_path: str) -> Tuple[List[List[float]], List[str]]:
    """
    Read CSV file and return processing times matrix and job names.
    
    Args:
        file_path (str): Path to the CSV file
        
    Returns:
        Tuple[List[List[float]], List[str]]: (processing_times, job_names)
        
    Raises:
        FileNotFoundError: If file doesn't exist
        ValueError: If data format is invalid
    """
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"CSV file not found: {file_path}")
        
    try:
        has_headers, num_jobs, num_machines = detect_csv_format(file_path)
        
        # Read the CSV file
        df = pd.read_csv(file_path, header=0 if has_headers else None)
        
        # Generate job names
        if has_headers:
            job_names = [f"Job_{i+1}" for i in range(num_jobs)]
        else:
            job_names = [f"Job_{i+1}" for i in range(len(df))]
            
        # Extract processing times
        processing_times = []
        for _, row in df.iterrows():
            # Convert row to list of floats, filtering out empty values
            times = []
            for value in row:
                if pd.notna(value) and str(value).strip():
                    try:
                        times.append(float(value))
                    except (ValueError, TypeError):
                        continue
            if times:  # Only add non-empty rows
                processing_times.append(times)
                
        # Validate data consistency
        if not processing_times:
            raise ValueError("No valid processing time data found in CSV")
            
        # Check that all jobs have the same number of machines
        machine_counts = [len(job) for job in processing_times]
        if len(set(machine_counts)) > 1:
            raise ValueError(f"Inconsistent number of machines per job: {set(machine_counts)}")
            
        # Ensure non-negative processing times
        for i, job_times in enumerate(processing_times):
            for j, time in enumerate(job_times):
                if time < 0:
                    raise ValueError(f"Negative processing time found at Job {i+1}, Machine {j+1}: {time}")
                    
        return processing_times, job_names[:len(processing_times)]
        
    except pd.errors.EmptyDataError:
        raise ValueError("CSV file is empty")
    except pd.errors.ParserError as e:
        raise ValueError(f"Error parsing CSV file: {str(e)}")
    except Exception as e:
        raise ValueError(f"Error reading CSV data: {str(e)}")

def validate_processing_times(processing_times: List[List[float]]) -> bool:
    """
    Validate the processing times matrix for logical consistency.
    
    Args:
        processing_times (List[List[float]]): Matrix of processing times
        
    Returns:
        bool: True if valid, raises ValueError if invalid
        
    Raises:
        ValueError: If data is logically inconsistent
    """
    if not processing_times:
        raise ValueError("Processing times matrix is empty")
        
    if not all(isinstance(job, list) for job in processing_times):
        raise ValueError("Processing times must be a list of lists")
        
    # Check dimensions consistency
    num_machines = len(processing_times[0])
    for i, job_times in enumerate(processing_times):
        if len(job_times) != num_machines:
            raise ValueError(f"Job {i+1} has {len(job_times)} machines, expected {num_machines}")
            
    # Check for non-negative values
    for i, job_times in enumerate(processing_times):
        for j, time in enumerate(job_times):
            if not isinstance(time, (int, float)) or time < 0:
                raise ValueError(f"Invalid processing time at Job {i+1}, Machine {j+1}: {time}")
                
    return True

def pendulum_heuristic(processing_times: List[List[float]]) -> List[int]:
    """
    Pendulum heuristic: place jobs with smaller total times at extremes,
    larger total times in the center (like a pendulum weight distribution).
    
    Args:
        processing_times (List[List[float]]): Matrix of processing times
        
    Returns:
        List[int]: Sequence following pendulum pattern
    """
    if not processing_times:
        return []
    
    num_jobs = len(processing_times)
    
    # Calculate total processing time for each job and sort ascending
    job_totals = [(i, calculate_total_processing_time(processing_times, i)) 
                  for i in range(num_jobs)]
    job_totals.sort(key=lambda x: x[1])  # Sort by total time (ascending)
    sorted_jobs = [job[0] for job in job_totals]
    
    # Build pendulum sequence: small jobs at ends, big jobs in center
    sequence = [0] * num_jobs
    left = 0
    right = num_jobs - 1
    
    # Place jobs alternating from extremes toward center
    for i, job_idx in enumerate(sorted_jobs):
        if i % 2 == 0:  # Even indices: place at left extreme, move inward
            sequence[left] = job_idx
            left += 1
        else:  # Odd indices: place at right extreme, move inward
            sequence[right] = job_idx
            right -= 1
    
    return sequence

def calculate_makespan(processing_times: List[List[float]], sequence: List[int]) -> float:
    """
    Calculate the makespan for a given job sequence in a flow shop.
    
    The makespan is the total time required to complete all jobs, which is
    determined by the completion time of the last job on the last machine.
    
    Args:
        processing_times (List[List[float]]): Matrix where processing_times[i][j] 
                                            represents the processing time of job i on machine j
        sequence (List[int]): Sequence of job indices (0-based)
        
    Returns:
        float: The makespan (total completion time)
        
    Raises:
        ValueError: If sequence contains invalid job indices
    """
    if not processing_times or not sequence:
        return 0.0
        
    num_jobs = len(processing_times)
    num_machines = len(processing_times[0])
    
    # Validate sequence
    for job_idx in sequence:
        if job_idx < 0 or job_idx >= num_jobs:
            raise ValueError(f"Invalid job index {job_idx}. Must be between 0 and {num_jobs-1}")
    
    # Initialize completion time matrix
    # completion_times[i][j] = completion time of job i on machine j
    completion_times = [[0.0 for _ in range(num_machines)] for _ in range(len(sequence))]
    
    # Calculate completion times for each job in the sequence
    for seq_pos, job_idx in enumerate(sequence):
        for machine in range(num_machines):
            # Processing time for current job on current machine
            proc_time = processing_times[job_idx][machine]
            
            if seq_pos == 0 and machine == 0:
                # First job on first machine
                completion_times[seq_pos][machine] = proc_time
            elif seq_pos == 0:
                # First job on subsequent machines
                completion_times[seq_pos][machine] = (
                    completion_times[seq_pos][machine - 1] + proc_time
                )
            elif machine == 0:
                # Subsequent jobs on first machine
                completion_times[seq_pos][machine] = (
                    completion_times[seq_pos - 1][machine] + proc_time
                )
            else:
                # Subsequent jobs on subsequent machines
                completion_times[seq_pos][machine] = (
                    max(completion_times[seq_pos - 1][machine],
                        completion_times[seq_pos][machine - 1]) + proc_time
                )
    
    # Makespan is the completion time of the last job on the last machine
    return completion_times[-1][-1]

def print_sequence_analysis(processing_times: List[List[float]], 
                          sequence: List[int], 
                          job_names: List[str] = None) -> None:
    """
    Print detailed analysis of a job sequence.
    
    Args:
        processing_times (List[List[float]]): Matrix of processing times
        sequence (List[int]): Sequence of job indices
        job_names (List[str], optional): Names of jobs for display
    """
    if job_names is None:
        job_names = [f"Job_{i+1}" for i in range(len(processing_times))]
    
    metrics = evaluate_sequence_quality(processing_times, sequence)
    
    print(f"\n=== Sequence Analysis ===")
    print(f"Job sequence: {' -> '.join([job_names[i] for i in sequence])}")
    print(f"Makespan: {metrics['makespan']:.2f}")
    print(f"Total idle time: {metrics['total_idle_time']:.2f}")
    print(f"Machine utilization: {metrics['utilization']:.2%}")
    print(f"Overall efficiency: {metrics['efficiency']:.2%}")
    
    print(f"\nMachine idle times:")
    for i, idle_time in enumerate(metrics['machine_idle_times']):
        print(f"  Machine {i+1}: {idle_time:.2f}")
    print("=" * 25)
    



In [None]:
## 7. Main Function to Run the Code

def solve_flow_shop(csv_file_path: str, 
                   verbose: bool = True) -> dict:
    """
    Main function to solve Flow Shop Scheduling Problem.
    
    Args:
        csv_file_path (str): Path to CSV file containing processing times
        verbose (bool): Whether to print detailed output
        
    Returns:
        dict: Results containing best sequence, makespan, and other metrics
    """
    try:
        # Step 1: Read and validate input data
        if verbose:
            print("=" * 60)
            print("FLOW SHOP SCHEDULING PROBLEM SOLVER")
            print("=" * 60)
            print(f"Reading data from: {csv_file_path}")
        
        processing_times, job_names = read_csv_data(csv_file_path)
        validate_processing_times(processing_times)
        
        if verbose:
            print_data_summary(processing_times, job_names)
        
        # Step 2: Apply Pendulum heuristic for main solution
        if verbose:
            print("\nApplying Pendulum heuristic for main solution...")
        
        pendulum_sequence = pendulum_heuristic(processing_times)
        pendulum_makespan = calculate_makespan(processing_times, pendulum_sequence)
        
        if verbose:
            print(f"Pendulum Heuristic Result:")
            print(f"  Sequence: {' -> '.join([job_names[i] for i in pendulum_sequence])}")
            print(f"  Makespan: {pendulum_makespan:.2f}")
        
        # Step 3: Compare different heuristics
        if verbose:
            print_heuristic_comparison(processing_times, job_names)
        
        # Step 4: Final results (Pendulum is the main sequence)
        best_sequence = pendulum_sequence
        best_makespan = pendulum_makespan
        
        if verbose:
            print("\n" + "=" * 60)
            print("FINAL RESULTS")
            print("=" * 60)
            print(f"Best sequence: {' -> '.join([job_names[i] for i in best_sequence])}")
            print(f"Best makespan: {best_makespan:.2f}")
            print("=" * 60)
            
            # Detailed analysis of best sequence
            print_sequence_analysis(processing_times, best_sequence, job_names)
        
        # Return results for programmatic use
        return {
            'best_sequence': best_sequence,
            'best_makespan': best_makespan,
            'job_names': job_names,
            'processing_times': processing_times,
            'pendulum_sequence': pendulum_sequence,
            'pendulum_makespan': pendulum_makespan
        }
        
    except FileNotFoundError as e:
        print(f"Error: {e}")
        return None
    except ValueError as e:
        print(f"Data validation error: {e}")
        return None
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None


## 8. Computational Results and Discussion
(https://github.com/chneau/go-taillard/tree/master/pfsp/instances)


| Instance | Objective Value Heuristic | Objective Value (Optimal) | Gap (%) |
|----------|----------|----------------|----------|
|     1     |    1941      |      1582          |     22.7%     |
|     2     |     2020     |        1659        |       21.76%   |
|     3     |      2026    |          1496      |     35.43%     |
|     4     |      2590    |         2297       |     12.8%     |
|     5     |      2647    |         2100       |     26.1%     |
|     6     |      2803    |        2326        |     20.51%     |
|     7     |      3785    |        3025        |      25.1%    |
|     8     |     3474     |         2892       |     20.1%     |
|     9     |      3953    |        2864        |     38%     |
|    10     |      6744    |        5770        |      16.9%    |
|    11     |      6434    |        5349        |     20.3%     |
|    12     |      6700    |       5677         |    18%      |
|    13     |      7862    |        6286        |     25.1%     |
|    14     |      7759    |      6241          |    24.32%      |
|    15     |      7827    |        6329        |     23.7%     |

**Discussion:**
- Regarding the effectiveness of the heuristic, the results show an average gap of about 23%, with some instances reaching as low as 12.8%. This indicates a reasonably consistent performance, though the method still leaves a notable margin from the optimal.

- When comparing across instances, no clear trend of deterioration with problem size is observed, which suggests good scalability. Outliers such as instances 3 and 9, however, highlight situations where the heuristic struggles, and including computation times would help balance quality against efficiency.

- In terms of patterns, most results fall within the 20–25% gap range, with only a few extreme cases. This points to a stable baseline that could be improved by adding local refinement or multi-start strategies to reduce the worst gaps and bring the average closer to competitive levels.


## 9. General Conclusions

The heuristic developed in this work was intentionally kept simple and implemented relatively quickly during class. While it provides feasible solutions with an average gap of around 23%, its performance was slightly worse than initially expected. Part of this can be attributed to missing logic that was overlooked during implementation, which limited the quality of the results.

A key lesson learned is that even a basic heuristic can provide consistent approximations, but careful design choices and refinement are crucial to avoid systematic weaknesses. The analysis also highlighted the importance of testing across diverse instances, since a few outliers revealed specific scenarios where the method struggles.

For future work, there are several straightforward improvements that could be explored, such as adding local search steps, introducing randomized restarts, or refining the construction logic. These additions are expected to reduce the gap significantly and would serve as natural next steps if the goal were to turn this initial prototype into a more competitive approach.

## 10. Revised References

- Chneau, C. (n.d.). go-taillard: PFSP instances. GitHub. Retrieved October 2, 2025, from https://github.com/chneau/go-taillard/tree/master/pfsp/instances

