# Project 1: Integration of Mergesort & Insertion Sort

In Mergesort, when the sizes of subarrays are small, the overhead of many recursive
calls makes the algorithm inefficient. Therefore, in real use, we often combine
Mergesort with Insertion Sort to come up with a hybrid sorting algorithm for better
efficiency. The idea is to set a small integer S as a threshold for the size of subarrays.
Once the size of a subarray in a recursive call of Mergesort is less than or equal to S,
the algorithm will switch to Insertion Sort, which is efficient for small-sized input.

In [1]:
import concurrent.futures
import math
import pandas as pd
from pydantic import BaseModel, Field, field_validator
import random
import time
from typing import List, TypedDict

(a) Algorithm implementation: Implement the above hybrid algorithm.

In [2]:
class Solution:
    def __init__(self, merge_sort_threshold: int):
        self.merge_sort_threshold: int = merge_sort_threshold

    def sortArray(self, nums: List[int]) -> List[int]:
        self.arr = nums
        self.mergeSort(0, len(nums) - 1)
        return self.arr

    def insertionSort(self, start_idx: int, end_idx: int):
        if end_idx - start_idx < 1:
            return

        for current_idx in range(start_idx, end_idx+1):
            current_value = self.arr[current_idx]
            previous_idx = current_idx - 1
            while previous_idx >= 0 and current_value < self.arr[previous_idx]:
                self.arr[previous_idx+1] = self.arr[previous_idx]
                previous_idx -= 1
            self.arr[previous_idx+1] = current_value

        return

    def mergeSort(self, start_idx: int, end_idx: int):
        if end_idx - start_idx < self.merge_sort_threshold:
            self.insertionSort(start_idx, end_idx)
            return

        mid_idx = (start_idx + end_idx) // 2
        self.mergeSort(start_idx, mid_idx)
        self.mergeSort(mid_idx + 1, end_idx)
        self.mergeArray(start_idx, mid_idx, end_idx)

    def mergeArray(self, start_idx: int, mid_idx: int, end_idx: int):
        left_part = self.arr[start_idx:mid_idx + 1]
        right_part = self.arr[mid_idx + 1:end_idx + 1]

        left_idx = 0
        right_idx = 0
        sorted_idx = start_idx

        while left_idx < len(left_part) and right_idx < len(right_part):
            if left_part[left_idx] <= right_part[right_idx]:
                self.arr[sorted_idx] = left_part[left_idx]
                left_idx += 1
            else:
                self.arr[sorted_idx] = right_part[right_idx]
                right_idx += 1
            sorted_idx += 1

        while left_idx < len(left_part):
            self.arr[sorted_idx] = left_part[left_idx]
            left_idx += 1
            sorted_idx += 1

        while right_idx < len(right_part):
            self.arr[sorted_idx] = right_part[right_idx]
            right_idx += 1
            sorted_idx += 1

        return

(b) Generate input data: Generate arrays of increasing sizes, in a range from
1,000 to 10 million. For each of the sizes, generate a random dataset of integers
in the range of [1, …, x], where x is the largest number you allow for your
datasets.

(c) Analyze time complexity: Run your program of the hybrid algorithm on the
datasets generated in Step (b). Record the number of key comparisons
performed in each case.

In [3]:
# Insufficient memory to store all datasets
# DATASETS_PER_SIZE: int = 3
# DATASET_MIN_SIZE = 1000
# DATASET_MAX_SIZE = int(1e7+1)

# class TestDataset(BaseModel):
#     size: int
#     datasets: List[List[int]] = Field(..., min_items=DATASETS_PER_SIZE, max_items=DATASETS_PER_SIZE)
        
# #     @field_validator("datasets")
# #     @classmethod
# #     def check_num_datasets(cls, datasets, values):
# #         if len(datasets) != DATASETS_PER_SIZE:
# #             raise ValueError(f"Required {DATASETS_PER_SIZE} datasets, got {len(datasets)}")
# #         return datasets
    
#     @field_validator("datasets")
#     @classmethod
#     def check_dataset_size(cls, datasets, values):
#         size = values.data.get("size")
#         if size:
#             for dataset in datasets:
#                 if len(dataset) != size:
#                     raise ValueError(f"Each dataset must have {size} elements, got {len(dataset)}")
#         return datasets

# DATASETS: List[TestDataset] = []

# for dataset_size in range(DATASET_MIN_SIZE, DATASET_MAX_SIZE):
#     DATASETS.append(
#         TestDataset(
#             size=dataset_size,
#             datasets=[
#                 [random.randint(1, DATASET_MAX_SIZE) for _ in range(dataset_size)]
#                 for _ in range(DATASETS_PER_SIZE)
#             ]
#         )
#     )

In [4]:
class PerformanceData(BaseModel):
    dataset_size: int
    insert_sort_threshold: int = Field(..., ge=1)
    time: float

In [5]:
# DATASET_MIN_SIZE = 1000
# DATASET_MAX_SIZE = 1001 # int(1e7+1)
# RUNS_PER_SIZE: int = 1

# performance_data: List[PerformanceData] = []
    
# for dataset_size in range(DATASET_MIN_SIZE, DATASET_MAX_SIZE):
#     for test_run in range(RUNS_PER_SIZE):
#         dataset = [random.randint(1, DATASET_MAX_SIZE) for _ in range(dataset_size)]
#         for insert_sort_threshold in range(1, 1001):
#             solution = Solution(insert_sort_threshold)
#             start_time = time.perf_counter()
#             solution.sortArray(dataset)
#             elapsed_time = time.perf_counter() - start_time
            
#             performance_data.append(
#                 PerformanceData(
#                     dataset_size=dataset_size,
#                     insert_sort_threshold=insert_sort_threshold,
#                     time=elapsed_time
#                 )
#             )

In [6]:
DATASET_MIN_SIZE = 1000
DATASET_MAX_SIZE = int(1e7+1)
RUNS_PER_SIZE: int = 1

In [7]:
def test_dataset(dataset_size: int):
    performance_data: List[PerformanceData] = []
    
    for test_run in range(RUNS_PER_SIZE):
        dataset = [random.randint(1, DATASET_MAX_SIZE) for _ in range(dataset_size)]
        for insert_sort_threshold in range(1, 1001):
            solution = Solution(insert_sort_threshold)
            start_time = time.perf_counter()
            solution.sortArray(dataset)
            elapsed_time = time.perf_counter() - start_time
            
            performance_data.append(
                PerformanceData(
                    dataset_size=dataset_size,
                    insert_sort_threshold=insert_sort_threshold,
                    time=elapsed_time
                )
            )
    
    return performance_data

In [None]:
performance_data: List[PerformanceData] = []

with concurrent.futures.ThreadPoolExecutor() as executor:
    performance_data.extend(
        executor.map(
            test_dataset,
            range(DATASET_MIN_SIZE, DATASET_MAX_SIZE)
        )
    )

In [None]:
df = pd.DataFrame([data.model_dump() for data in performance_data[0]])
df

i. With the value of S fixed, plot the number of key comparisons over
different sizes of the input list n. Compare your empirical results with
your theoretical analysis of the time complexity.


ii. With the input size n fixed, plot the number of key comparisons over
different values of S. Compare your empirical results with your
theoretical analysis of the time complexity.

iii. Using different sizes of input datasets, study how to determine an
optimal value of S for the best performance of this hybrid algorithm.