In [29]:
from collections import OrderedDict
from enum import Enum
from io_trace_set import *
from multiprocessing.dummy import Pool as ThreadPool
import random
import pandas as pd


class OpType(Enum):
    READ = 0
    WRITE = 1


class ReadWriteType(Enum):
    READ_HOT_WRITE_HOT = 0
    READ_HOT_WRITE_COLD = 1
    READ_COLD_WRITE_HOT = 2
    READ_COLD_WRITE_COLD = 3


class Freq:

    def __init__(self, read: int, write: int):
        self.read = read
        self.write = write


class LeastRecentlyUsed(OrderedDict):

    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = OrderedDict()

    def get(self, key) -> Freq:
        if key in self.cache:
            value = self.cache.get(key)
        else:
            value = Freq(0, 0)
        return value

    def set(self, key, type: OpType, size: int) -> None:
        if key in self.cache:
            value = self.cache.pop(key)
            if type == OpType.READ:
                value.read += size
            else:
                value.write += size
            self.cache[key] = value
        else:
            read = size if OpType == OpType.READ else 0
            write = size if OpType == OpType.WRITE else 0
            value = Freq(read, write)
            if len(self.cache) == self.capacity:
                zero_freq_found = False
                for k, v in self.cache.items():
                    if v.read == 0 and v.write == 0:
                        zero_freq_found = True
                        self.cache.pop(k)
                        self.cache[key] = value
                        break
                if not zero_freq_found and random.randint(0, 1) == 0:
                    # probability of 0.5 to evict the first item
                    # if there is no zero freq
                    self.cache.popitem(last=False)  # pop出第一个item
                    self.cache[key] = value
            else:
                self.cache[key] = value

    def decay(self) -> None:
        for _, v in self.cache.items():
            if v.read == 1:
                v.read = 0
            else:
                v.read /= 2
            if v.write == 1:
                v.write = 0
            else:
                v.write /= 2


class GroupLeastRecentyUsed:

    def __init__(self, k, capacity, period):
        self.k = k
        self.lru_group = [LeastRecentlyUsed(capacity) for _ in range(k)]
        self.period = period  # 每个周期的请求次数，其中每个周期结束时对lru_group执行一次decay
        self.count = 0  # 记录当前周期是第几次请求
        self.ratio = 10  # 读写频率比例
        self.penalty_ratio = 10  # 预测出错时的在线训练惩罚比率
        self.bonus_ratio = 1  # 预测正确时的在线训练奖励比率

    def fit(self, trace: IOTrace):
        LPN = trace.offset
        idx = LPN % self.k
        self.lru_group[idx].set(LPN, trace.type, trace.size)

        self.count = (self.count + 1) % self.period
        if self.count == 0:
            pool = ThreadPool()  # 用于并行执行decay操作的线程池
            pool.map(lambda x: x.decay(), self.lru_group)
            pool.close()
            pool.join()

    def predict(self, trace: IOTrace) -> ReadWriteType:
        LPN = trace.offset
        idx = LPN % self.k
        lru = self.lru_group[idx]

        freq = lru.get(LPN)
        read, write = freq.read, freq.write

        if read == 0 and write == 0:
            return ReadWriteType.READ_COLD_WRITE_COLD
        if (read + 1) / (write + 1) >= self.ratio:
            return ReadWriteType.READ_HOT_WRITE_COLD
        elif (write + 1) / (read + 1) >= self.ratio:
            return ReadWriteType.READ_COLD_WRITE_HOT
        else:
            return ReadWriteType.READ_HOT_WRITE_HOT


class GroupLeastRecentlyUsedModel:

    def __init__(self):
        self.glru = GroupLeastRecentyUsed(k=100000, capacity=10000, period=10000000)
        self.hit_cost = 18
        self.miss_cost = 103
        self.balance_cost = 22

    def trace_one(self, trace: IOTrace) -> list:
        rwType = self.glru.predict(trace)
        if rwType == ReadWriteType.READ_HOT_WRITE_HOT:
            cost = self.balance_cost
        elif rwType == ReadWriteType.READ_COLD_WRITE_COLD:
            # TODO: 为什么这里的cost是21？
            cost = self.balance_cost
        else:
            if rwType == ReadWriteType.READ_HOT_WRITE_COLD:
                cost = self.hit_cost if trace.type == 0 else self.miss_cost
            else:
                cost = self.hit_cost if trace.type == 1 else self.miss_cost

        # false prediction, add penalty
        if cost == self.miss_cost:
            trace.size *= self.glru.penalty_ratio
        elif cost == self.hit_cost:
            trace.size *= self.glru.bonus_ratio
        # train online
        self.glru.fit(trace)

        return cost


在GLRU基础上的优化点：
1. 增加惩罚系数，即当预测出错的时候，增加在线训练时该样本的权重，用以加速新的读写模式的识别
2. 记录每个Chunk的读写模式切换的周期和频率等指标，对于频繁切换的Chunk，应该让错误样本的在线训练的惩罚系数值更大，以识别出周期性的模式切换规律
3. 根据每个时刻工作集的大小动态调整GLRU列表的大小？（待进一步研究）
4. 。。。


In [30]:
datasets = [
    # 'hm', 'mds', 'prn', 
    # 'proj', 'prxy', 
    'rsrch',
    # 'src1', 'src2',
    # 'stg', 'ts', 
    # 'usr', 'wdev', 
    # 'web',
]
dataset_to_csv_file = lambda d: 'datasets/%s_processed.csv' % d

In [31]:
import pandas as pd

headers = ['dataset', 'original_cost_opt', 'original_cost', 'actual_cost', 'save_ratio']
results = []

for dataset in datasets:
    csv_file = dataset_to_csv_file(dataset)

    trace_set = IOTraceSet(csv_file, cost_opt_for_read)
    glru_model = GroupLeastRecentlyUsedModel()
    trace_set.replay(glru_model.trace_one)
    trace_set.show_result()
    result = [dataset, 'read']
    result.extend(trace_set.get_result())
    results.append(result)

    trace_set = IOTraceSet(csv_file, cost_opt_for_write)
    glru_model = GroupLeastRecentlyUsedModel()
    trace_set.replay(glru_model.trace_one)
    trace_set.show_result()
    result = [dataset, 'write']
    result.extend(trace_set.get_result())
    results.append(result)

    trace_set = IOTraceSet(csv_file, cost_balanced)
    glru_model = GroupLeastRecentlyUsedModel()
    trace_set.replay(glru_model.trace_one)
    trace_set.show_result()
    result = [dataset, 'balanced']
    result.extend(trace_set.get_result())
    results.append(result)

trace num: 1655022


Output()

total cost: 150554644
actual cost: 43162205
saved ratio: 0.713312
trace num: 1655022


Output()

total cost: 54668084
actual cost: 43162205
saved ratio: 0.210468
trace num: 1655022


Output()

total cost: 34755462
actual cost: 43162205
saved ratio: -0.241883


In [None]:
# df = pd.DataFrame(columns=headers,data=list)
# df.to_csv('results/glru.csv', index=False)