In [10]:
import matplotlib as mpl
import matplotlib.font_manager as fm
import matplotlib.pyplot as plt

# 在Windows系统中使用微软雅黑字体（如果有）
try:
    font_paths = fm.findSystemFonts(fontpaths=None, fontext='ttf')
    # 查找Windows中的微软雅黑字体
    msyh_fonts = [f for f in font_paths if 'msyh' in f.lower() or 'microsoftyahei' in f.lower()]
    if msyh_fonts:
        plt.rcParams['font.family'] = ['Microsoft YaHei']
    else:
        # 如果没有找到微软雅黑，尝试使用SimHei或其他中文字体
        plt.rcParams['font.sans-serif'] = ['SimHei', 'SimSun', 'NSimSun', 'FangSong', 'KaiTi']
except:
    # 如果上述方法失败，尝试直接设置
    plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']

# 解决负号显示问题
plt.rcParams['axes.unicode_minus'] = False

# 显示当前字体设置
print("当前使用的字体设置:", mpl.rcParams['font.family'])
print("当前支持的中文字体:", plt.rcParams.get('font.sans-serif'))

当前使用的字体设置: ['Microsoft YaHei']
当前支持的中文字体: ['DejaVu Sans', 'Bitstream Vera Sans', 'Computer Modern Sans Serif', 'Lucida Grande', 'Verdana', 'Geneva', 'Lucid', 'Arial', 'Helvetica', 'Avant Garde', 'sans-serif']


In [11]:
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display
import ipywidgets as widgets

class DiskScheduler:
    """磁盘调度算法的基类"""
    def __init__(self, name, total_tracks, initial_position, requests):
        self.name = name
        self.total_tracks = total_tracks  # 磁盘总道数
        self.initial_position = initial_position  # 磁头初始位置
        self.requests = requests.copy()  # 请求队列的副本
        self.sequence = [initial_position]  # 磁头访问序列，起始位置是初始位置
        self.total_seek_distance = 0  # 总寻道距离
    
    def schedule(self):
        """执行调度算法，由子类实现"""
        pass
    
    def calculate_total_seek_distance(self):
        """计算总寻道距离"""
        self.total_seek_distance = 0
        for i in range(1, len(self.sequence)):
            self.total_seek_distance += abs(self.sequence[i] - self.sequence[i-1])
        return self.total_seek_distance
    
    def visualize(self, ax=None):
        """可视化磁头的移动路径"""
        if ax is None:
            fig, ax = plt.subplots(figsize=(10, 6))
        
        # 绘制请求点
        requests_x = list(range(len(self.requests)))
        ax.scatter(requests_x, self.requests, color='blue', s=50, label='请求')
        
        # 绘制起始位置
        ax.scatter([-1], [self.initial_position], color='green', s=100, marker='*', label='起始位置')
        
        # 绘制访问序列
        x = [-1] + list(range(len(self.sequence) - 1))
        ax.plot(x, self.sequence, 'r-', label=f"{self.name}路径")
        ax.plot(x, self.sequence, 'ko')
        
        ax.set_xlim(-1.5, len(self.requests) - 0.5)
        ax.set_ylim(-10, self.total_tracks + 10)
        ax.set_xlabel('请求序列')
        ax.set_ylabel('磁道号')
        ax.set_title(f"{self.name} 算法 (总寻道距离: {self.total_seek_distance})")
        ax.legend()
        ax.grid(True, linestyle='--', alpha=0.7)
        
        return ax

In [12]:
class FCFSScheduler(DiskScheduler):
    """先来先服务调度算法"""
    
    def schedule(self):
        current_position = self.initial_position
        self.sequence = [current_position]
        
        for request in self.requests:
            self.sequence.append(request)
            current_position = request
        
        self.calculate_total_seek_distance()
        return self.sequence

class SSTFScheduler(DiskScheduler):
    """最短寻道时间优先调度算法"""
    
    def schedule(self):
        current_position = self.initial_position
        self.sequence = [current_position]
        remaining_requests = self.requests.copy()
        
        while remaining_requests:
            # 找到距离当前位置最近的请求
            distances = [abs(current_position - req) for req in remaining_requests]
            min_idx = distances.index(min(distances))
            
            next_request = remaining_requests[min_idx]
            self.sequence.append(next_request)
            current_position = next_request
            
            # 从剩余请求中移除该请求
            remaining_requests.pop(min_idx)
        
        self.calculate_total_seek_distance()
        return self.sequence

class SCANScheduler(DiskScheduler):
    """扫描算法（电梯算法）"""
    
    def schedule(self, direction='up'):
        current_position = self.initial_position
        self.sequence = [current_position]
        
        # 对请求排序
        sorted_requests = sorted(self.requests)
        
        # 确定当前位置对应的索引
        idx = 0
        while idx < len(sorted_requests) and sorted_requests[idx] < current_position:
            idx += 1
        
        # 根据方向处理请求
        if direction == 'up':
            # 先处理大于当前位置的请求（向上移动）
            for req in sorted_requests[idx:]:
                self.sequence.append(req)
            
            # 然后处理小于当前位置的请求（向下移动）
            for req in sorted_requests[idx-1::-1]:
                self.sequence.append(req)
        else:  # direction == 'down'
            # 先处理小于当前位置的请求（向下移动）
            for req in sorted_requests[idx-1::-1]:
                self.sequence.append(req)
            
            # 然后处理大于当前位置的请求（向上移动）
            for req in sorted_requests[idx:]:
                self.sequence.append(req)
        
        self.calculate_total_seek_distance()
        return self.sequence

class CSCANScheduler(DiskScheduler):
    """循环扫描算法"""
    
    def schedule(self, direction='up'):
        current_position = self.initial_position
        self.sequence = [current_position]
        
        # 对请求排序
        sorted_requests = sorted(self.requests)
        
        # 确定当前位置对应的索引
        idx = 0
        while idx < len(sorted_requests) and sorted_requests[idx] < current_position:
            idx += 1
        
        if direction == 'up':
            # 先处理大于当前位置的请求（向上移动）
            for req in sorted_requests[idx:]:
                self.sequence.append(req)
            
            # 回到最小位置并处理小于当前位置的请求
            if idx > 0:
                # 添加一个中间点表示从最高磁道快速移动到最低磁道
                if sorted_requests[0] != sorted_requests[-1]:
                    self.sequence.append(sorted_requests[0])
                
                # 处理剩余的小于当前位置的请求
                for req in sorted_requests[1:idx]:
                    self.sequence.append(req)
        else:  # direction == 'down'
            # 先处理小于当前位置的请求（向下移动）
            for req in sorted_requests[idx-1::-1]:
                self.sequence.append(req)
            
            # 回到最大位置并处理大于当前位置的请求
            if idx < len(sorted_requests):
                # 添加一个中间点表示从最低磁道快速移动到最高磁道
                if sorted_requests[0] != sorted_requests[-1]:
                    self.sequence.append(sorted_requests[-1])
                
                # 处理剩余的大于当前位置的请求
                for req in sorted_requests[len(sorted_requests)-2:idx-1:-1]:
                    self.sequence.append(req)
        
        self.calculate_total_seek_distance()
        return self.sequence

In [13]:
def run_disk_scheduling_simulation():
    """执行磁盘调度模拟并显示结果"""
    
    # UI控件定义
    total_tracks_slider = widgets.IntSlider(
        value=200, min=100, max=500, step=10, description='磁道总数:',
        style={'description_width': 'initial'}
    )
    
    initial_pos_slider = widgets.IntSlider(
        value=50, min=0, max=199, step=1, description='初始磁头位置:',
        style={'description_width': 'initial'}
    )
    
    request_count_slider = widgets.IntSlider(
        value=10, min=5, max=30, step=1, description='请求数量:',
        style={'description_width': 'initial'}
    )
    
    request_input = widgets.Text(
        value='', placeholder='例如: 98,183,37,122,14,124,65...',
        description='自定义请求序列:',
        style={'description_width': 'initial'}
    )
    
    algorithm_select = widgets.SelectMultiple(
        options=[('FCFS', 'FCFS'), ('SSTF', 'SSTF'), ('SCAN', 'SCAN'), ('CSCAN', 'CSCAN')],
        value=['FCFS', 'SSTF', 'SCAN', 'CSCAN'],
        description='选择算法:',
        style={'description_width': 'initial'}
    )
    
    random_seed = widgets.IntText(
        value=42, description='随机种子:', style={'description_width': 'initial'}
    )
    
    run_button = widgets.Button(
        description='运行模拟', button_style='success', tooltip='运行选定的磁盘调度算法'
    )
    
    output = widgets.Output()
    
    def on_run_button_click(b):
        with output:
            output.clear_output()
            
            total_tracks = total_tracks_slider.value
            initial_position = initial_pos_slider.value
            
            # 获取请求序列
            if request_input.value.strip():
                try:
                    # 使用用户输入的请求序列
                    requests = [int(x.strip()) for x in request_input.value.split(',')]
                    # 确保所有请求在有效范围内
                    requests = [min(max(0, r), total_tracks-1) for r in requests]
                except:
                    print("请求序列格式错误，使用随机生成的请求")
                    np.random.seed(random_seed.value)
                    requests = np.random.randint(0, total_tracks, request_count_slider.value).tolist()
            else:
                # 随机生成请求
                np.random.seed(random_seed.value)
                requests = np.random.randint(0, total_tracks, request_count_slider.value).tolist()
            
            print(f"磁道总数: {total_tracks}")
            print(f"初始位置: {initial_position}")
            print(f"请求序列: {requests}")
            
            # 创建并运行选定的调度器
            selected_algorithms = algorithm_select.value
            
            if not selected_algorithms:
                print("请至少选择一种算法")
                return
            
            schedulers = []
            if 'FCFS' in selected_algorithms:
                schedulers.append(FCFSScheduler("FCFS (先来先服务)", total_tracks, initial_position, requests))
            if 'SSTF' in selected_algorithms:
                schedulers.append(SSTFScheduler("SSTF (最短寻道时间优先)", total_tracks, initial_position, requests))
            if 'SCAN' in selected_algorithms:
                schedulers.append(SCANScheduler("SCAN (电梯算法)", total_tracks, initial_position, requests))
            if 'CSCAN' in selected_algorithms:
                schedulers.append(CSCANScheduler("CSCAN (循环扫描)", total_tracks, initial_position, requests))
            
            # 绘制图表
            n_algorithms = len(schedulers)
            if n_algorithms == 1:
                fig, ax = plt.subplots(figsize=(10, 6))
                axes = [ax]
            else:
                rows = (n_algorithms + 1) // 2
                cols = min(n_algorithms, 2)
                fig, axes = plt.subplots(rows, cols, figsize=(15, 6*rows))
                if n_algorithms > 1:
                    axes = axes.flatten()
            
            results = {}
            for i, scheduler in enumerate(schedulers):
                scheduler.schedule()
                scheduler.visualize(axes[i])
                results[scheduler.name] = scheduler.total_seek_distance
            
            # 处理多余的子图(如果有)
            if n_algorithms < len(axes):
                for i in range(n_algorithms, len(axes)):
                    fig.delaxes(axes[i])
            
            plt.tight_layout()
            plt.show()
            
            # 打印总寻道距离比较
            print("\n总寻道距离比较:")
            for name, distance in sorted(results.items(), key=lambda x: x[1]):
                print(f"{name}: {distance}")
    
    # 将按钮的点击事件与回调函数关联
    run_button.on_click(on_run_button_click)
    
    # 创建UI布局
    controls = widgets.VBox([
        total_tracks_slider,
        initial_pos_slider,
        request_count_slider,
        random_seed,
        request_input,
        algorithm_select,
        run_button
    ])
    
    # 显示UI
    display(widgets.VBox([controls, output]))

In [14]:
# 运行模拟
run_disk_scheduling_simulation()

VBox(children=(VBox(children=(IntSlider(value=200, description='磁道总数:', max=500, min=100, step=10, style=Slide…