In [None]:
from functools import partial  # 导入partial，用于创建带有部分默认参数的函数
import numpy as np  # 导入NumPy库，用于数学运算
from dataclasses import dataclass  # 导入dataclass，用于创建包含数据的类

def vec_num2repr(val, base, prec, max_val):  # 定义函数，将数值向量转换为其在给定基数和精度下的表示形式
    base = float(base)  # 将基数转换为浮点数
    bs = val.shape[0]  # 获取val向量的长度
    sign = 1 * (val >= 0) - 1 * (val < 0)  # 计算val中每个元素的符号
    val = np.abs(val)  # 取val中每个元素的绝对值
    max_bit_pos = int(np.ceil(np.log(max_val) / np.log(base)).item())  # 计算最大值在给定基数下的位数

    before_decimals = []  # 创建一个空列表，用于存储小数点前的数字
    for i in range(max_bit_pos):  # 遍历每一个可能的位置
        digit = (val / base**(max_bit_pos - i - 1)).astype(int)  # 计算当前位置的数字
        before_decimals.append(digit)  # 将计算得到的数字添加到列表中
        val -= digit * base**(max_bit_pos - i - 1)  # 更新val，减去已经表示的部分

    before_decimals = np.stack(before_decimals, axis=-1)  # 将列表中的数字堆叠成一个NumPy数组

    if prec > 0:  # 如果指定了精度
        after_decimals = []  # 创建一个空列表，用于存储小数点后的数字
        for i in range(prec):  # 遍历每一个小数位
            digit = (val / base**(-i - 1)).astype(int)  # 计算当前小数位的数字
            after_decimals.append(digit)  # 将计算得到的数字添加到列表中
            val -= digit * base**(-i - 1)  # 更新val，减去已经表示的部分

        after_decimals = np.stack(after_decimals, axis=-1)  # 将列表中的数字堆叠成一个NumPy数组
        digits = np.concatenate([before_decimals, after_decimals], axis=-1)  # 将小数点前后的数字连接起来
    else:
        digits = before_decimals  # 如果没有指定精度，则所有的数字都在小数点前
    return sign, digits  # 返回符号和数字的数组

def vec_repr2num(sign, digits, base, prec, half_bin_correction=True):  # 定义函数，将数字的表示形式转换回数值
    base = float(base)  # 将基数转换为浮点数
    bs, D = digits.shape  # 获取digits数组的形状
    digits_flipped = np.flip(digits, axis=-1)  # 将数字数组翻转，因为计算时从最低位开始
    powers = -np.arange(-prec, -prec + D)  # 计算每一位数字对应的幂
    val = np.sum(digits_flipped/base**powers, axis=-1)  # 计算数值

    if half_bin_correction:  # 如果启用了半位修正
        val += 0.5/base**prec  # 在数值中加上半位的修正值

    return sign * val  # 返回计算得到的数值，包括符号

@dataclass  # 使用dataclass装饰器，简化类的定义
class SerializerSettings:  # 定义一个类，用于存储序列化和反序列化的设置
    base: int = 10  # 数字表示的基数，默认为10
    prec: int = 3  # 小数点后的精度，默认为3
    signed: bool = True  # 是否允许负数，默认为True
    fixed_length: bool = False  # 是否固定长度，默认为False
    max_val: float = 1e7  # 可序列化的最大绝对值，默认为1e7
    time_sep: str = ' ,'  # 不同时间步之间的分隔符，默认为' ,'
    bit_sep: str = ' '  # 数字之间的分隔符，默认为' '
    plus_sign: str = ''  # 正号的字符串表示，默认为空
    minus_sign: str = ' -'  # 负号的字符串表示，默认为' -'
    half_bin_correction: bool = True  # 是否应用半位修正，默认为True
    decimal_point: str = ''  # 小数点的字符串表示，默认为空
    missing_str: str = ' Nan'  # 缺失值的字符串表示，默认为' Nan'

def serialize_arr(arr, settings: SerializerSettings):  # 定义函数，将数值数组序列化为字符串
    assert np.all(np.abs(arr[~np.isnan(arr)]) <= settings.max_val), f"abs(arr) must be <= max_val,\
         but abs(arr)={np.abs(arr)}, max_val={settings.max_val}"  # 确保所有数值都在可序列化的范围内
    
    if not settings.signed:  # 如果不允许负数
        assert np.all(arr[~np.isnan(arr)] >= 0), f"unsigned arr must be >= 0"  # 确保所有数值都非负
        plus_sign = minus_sign = ''  # 正负号的表示为空
    else:  # 如果允许负数
        plus_sign = settings.plus_sign  # 使用设置中的正号表示
        minus_sign = settings.minus_sign  # 使用设置中的负号表示
    
    vnum2repr = partial(vec_num2repr,base=settings.base,prec=settings.prec,max_val=settings.max_val)  # 创建一个带有部分默认参数的函数
    sign_arr, digits_arr = vnum2repr(np.where(np.isnan(arr),np.zeros_like(arr),arr))  # 将数值数组转换为符号和数字的表示
    ismissing = np.isnan(arr)  # 检查数组中的每个元素是否为缺失值
    
    def tokenize(arr):  # 定义函数，将数字数组转换为字符串
        return ''.join([settings.bit_sep+str(b) for b in arr])  # 使用分隔符连接数字，形成字符串
    
    bit_strs = []  # 创建一个空列表，用于存储每个数值的字符串表示
    for sign, digits, missing in zip(sign_arr, digits_arr, ismissing):  # 遍历每个数值的符号、数字表示和是否缺失的信息
        if not settings.fixed_length:  # 如果不固定长度
            # remove leading zeros  移除前导零
            nonzero_indices = np.where(digits != 0)[0]  # 找到非零数字的索引
            if len(nonzero_indices) == 0:  # 如果所有数字都是零
                digits = np.array([0])  # 数字表示为单个零
            else:  # 如果存在非零数字
                digits = digits[nonzero_indices[0]:]  # 保留从第一个非零数字开始的部分
            # add a decimal point  添加小数点
            prec = settings.prec  # 获取精度
            if len(settings.decimal_point):  # 如果定义了小数点的表示
                digits = np.concatenate([digits[:-prec], np.array([settings.decimal_point]), digits[-prec:]])  # 在适当位置插入小数点
        digits = tokenize(digits)  # 将数字数组转换为字符串
        sign_sep = plus_sign if sign == 1 else minus_sign  # 根据符号选择正负号的表示
        if missing:  # 如果当前值是缺失值
            bit_strs.append(settings.missing_str)  # 添加缺失值的表示
        else:  # 如果当前值不是缺失值
            bit_strs.append(sign_sep + digits)  # 添加符号和数字的字符串表示
    bit_str = settings.time_sep.join(bit_strs)  # 使用时间分隔符连接所有字符串
    bit_str += settings.time_sep # 在最后添加一个时间分隔符，以避免最后一个时间步的数字位数不明确的问题
    return bit_str  # 返回序列化后的字符串

def deserialize_str(bit_str, settings: SerializerSettings, ignore_last=False, steps=None):  # 定义函数，将字符串反序列化为数值数组
    orig_bitstring = bit_str  # 保存原始字符串，用于出错时的参考
    bit_strs = bit_str.split(settings.time_sep)  # 使用时间分隔符将字符串分割为多个部分
    # remove empty strings  移除空字符串
    bit_strs = [a for a in bit_strs if len(a) > 0]  # 保留非空的部分
    if ignore_last:  # 如果需要忽略最后一个时间步
        bit_strs = bit_strs[:-1]  # 移除最后一个部分
    if steps is not None:  # 如果指定了步数
        bit_strs = bit_strs[:steps]  # 保留指定数量的部分
    vrepr2num = partial(vec_repr2num,base=settings.base,prec=settings.prec,half_bin_correction=settings.half_bin_correction)  # 创建一个带有部分默认参数的函数
    max_bit_pos = int(np.ceil(np.log(settings.max_val)/np.log(settings.base)).item())  # 计算最大值在给定基数下的位数
    sign_arr = []  # 创建一个空列表，用于存储每个数值的符号
    digits_arr = []  # 创建一个空列表，用于存储每个数值的数字表示
    try:  # 尝试执行反序列化操作
        for i, bit_str in enumerate(bit_strs):  # 遍历每一个分割后的字符串
            if bit_str.startswith(settings.minus_sign):  # 如果字符串以负号开始
                sign = -1  # 符号为负
            elif bit_str.startswith(settings.plus_sign):  # 如果字符串以正号开始
                sign = 1  # 符号为正
            else:  # 如果字符串既不以正号也不以负号开始
                assert settings.signed == False, f"signed bit_str must start with {settings.minus_sign} or {settings.plus_sign}"  # 断言设置允许的是无符号数
            bit_str = bit_str[len(settings.plus_sign):] if sign==1 else bit_str[len(settings.minus_sign):]  # 移除符号部分
            if settings.bit_sep=='':  # 如果没有定义数字之间的分隔符
                bits = [b for b in bit_str.lstrip()]  # 将字符串分割为单个字符
            else:  # 如果定义了数字之间的分隔符
                bits = [b[:1] for b in bit_str.lstrip().split(settings.bit_sep)]  # 使用分隔符分割字符串
            if settings.fixed_length:  # 如果固定了长度
                assert len(bits) == max_bit_pos+settings.prec, f"fixed length bit_str must have {max_bit_pos+settings.prec} bits, but has {len(bits)}: '{bit_str}'"  # 断言字符串的长度符合预期
            digits = []  # 创建一个空列表，用于存储数字
            for b in bits:  # 遍历每一个字符
                if b==settings.decimal_point:  # 如果字符是小数点
                    continue  # 跳过小数点
                # check if is a digit  检查字符是否是数字
                if b.isdigit():  # 如果字符是数字
                    digits.append(int(b))  # 将字符转换为数字并添加到列表中
                else:  # 如果字符不是数字
                    break  # 结束循环
            sign_arr.append(sign)  # 添加符号
            digits_arr.append(digits)  # 添加数字表示
    except Exception as e:  # 如果在反序列化过程中遇到异常
        print(f"Error deserializing {settings.time_sep.join(bit_strs[i-2:i+5])}{settings.time_sep}\n\t{e}")  # 打印错误信息
        print(f'Got {orig_bitstring}')  # 打印原始字符串
        print(f"Bitstr {bit_str}, separator {settings.bit_sep}")  # 打印当前处理的字符串和数字之间的分隔符
        # At this point, we have already deserialized some of the bit_strs, so we return those below  此时，我们已经反序列化了一部分字符串，因此下面返回这些结果
    if digits_arr:  # 如果成功反序列化了一部分字符串
        # add leading zeros to get to equal lengths  添加前导零以使所有数字表示的长度相同
        max_len = max([len(d) for d in digits_arr])  # 计算所有数字表示中最长的长度
        for i in range(len(digits_arr)):  # 遍历每一个数字表示
            digits_arr[i] = [0]*(max_len-len(digits_arr[i])) + digits_arr[i]  # 在前面添加零以达到最长长度
        return vrepr2num(np.array(sign_arr), np.array(digits_arr))  # 返回反序列化得到的数值数组
    else:  # 如果在第一步就出错了
        return None  # 返回None以表示反序列化失败


In [None]:
import pandas as pd  # 导入pandas库，用于数据处理和分析
import numpy as np  # 导入numpy库，用于数值计算
from collections import defaultdict  # 导入defaultdict，一个提供默认值的字典
from sklearn.preprocessing import StandardScaler  # 导入StandardScaler，用于数据标准化
import datasets  # 导入datasets库，用于加载和处理数据集
from datasets import load_dataset  # 从datasets库中导入load_dataset函数，用于加载数据集
import os  # 导入os库，用于处理文件和目录
import pickle  # 导入pickle库，用于对象序列化和反序列化

# 定义一个字典，指定某些数据集的预测长度
fix_pred_len = {
    'australian_electricity_demand': 336,  # 澳大利亚电力需求数据集
    'pedestrian_counts': 24,  # 行人计数数据集
    'traffic_hourly': 168,  # 每小时交通数据集    
}

def get_benchmark_test_sets():
    test_set_dir = "datasets/monash"  # 定义存放测试集的目录
    if not os.path.exists(test_set_dir):  # 如果目录不存在
        os.makedirs(test_set_dir)  # 创建目录

    if len(os.listdir(test_set_dir)) > 0:  # 如果目录非空
        print(f'Loading test sets from {test_set_dir}')  # 打印加载测试集的消息
        test_sets = {}  # 初始化测试集字典
        for file in os.listdir(test_set_dir):  # 遍历目录中的文件
            test_sets[file.split(".")[0]] = pickle.load(open(os.path.join(test_set_dir, file), 'rb'))  # 加载并反序列化测试集
        return test_sets
    else:  # 如果目录为空
        print(f'No files found in {test_set_dir}. You are not using our preprocessed datasets!')  # 打印未找到文件的消息
    
    benchmarks = {
        "monash_tsf": datasets.get_dataset_config_names("monash_tsf"),  # 从datasets库中获取monash_tsf数据集的配置名
    }

    test_sets = defaultdict(list)  # 初始化一个默认值为列表的字典，用于存放测试集
    for path in benchmarks:  # 遍历benchmarks字典的键
        pred_lens = [24, 48, 96, 192] if path == "ett" else [None]  # 根据数据集路径设置预测长度
        for name in benchmarks[path]:  # 遍历数据集名称
            for pred_len in pred_lens:  # 遍历预测长度
                if pred_len is None:  # 如果预测长度未指定
                    ds = load_dataset(path, name)  # 直接加载数据集
                else:  # 如果预测长度指定
                    ds = load_dataset(path, name, multivariate=False, prediction_length=pred_len)  # 加载数据集并指定预测长度
                
                train_example = ds['train'][0]['target']  # 获取训练集第一个样本的目标值
                val_example = ds['validation'][0]['target']  # 获取验证集第一个样本的目标值

                if len(np.array(train_example).shape) > 1:  # 如果目标值为多变量
                    print(f"Skipping {name} because it is multivariate")  # 打印跳过多变量数据集的消息
                    continue

                pred_len = len(val_example) - len(train_example)  # 计算预测长度
                if name in fix_pred_len:  # 如果数据集名称在fix_pred_len字典中
                    print(f"Fixing pred len for {name}: {pred_len} -> {fix_pred_len[name]}")  # 打印修正预测长度的消息
                    pred_len = fix_pred_len[name]  # 修正预测长度

                tag = name  # 设置标签为数据集名称
                print("Processing", tag)  # 打印处理中的消息

                pairs = []  # 初始化用于存放历史数据和目标值的列表
                for x in ds['test']:  # 遍历测试集
                    if np.isnan(x['target']).any():  # 如果目标值包含NaN
                        print(f"Skipping {name} because it has NaNs")  # 打印跳过包含NaN的数据集的消息
                        break
                    history = np.array(x['target'][:-pred_len])  # 获取历史数据
                    target = np.array(x['target'][-pred_len:])  # 获取目标值
                    pairs.append((history, target))  # 将历史数据和目标值添加到列表中
                else:  # 如果没有跳过数据集
                    scaler = None  # 初始化缩放器为None
                    if path == "ett":  # 如果数据集路径为ett
                        trainset = np.array(ds['train'][0]['target'])  # 获取训练集目标值
                        scaler = StandardScaler().fit(trainset[:,None])  # 训练并获取标准化缩放器
                    test_sets[tag] = (pairs, scaler)  # 将数据对和缩放器添加到测试集字典中
    
    for name in test_sets:  # 遍历测试集字典的键
        try:
            with open(os.path.join(test_set_dir,f"{name}.pkl"), 'wb') as f:  # 打开文件进行写入
                pickle.dump(test_sets[name], f)  # 序列化并保存测试集
            print(f"Saved {name}")  # 打印保存成功的消息
        except:
            print(f"Failed to save {name}")  # 打印保存失败的消息

    return test_sets  # 返回测试集字典

def get_datasets():
    benchmarks = get_benchmark_test_sets()  # 获取基准测试集
    # 对基准测试集进行随机打乱
    for k, v in benchmarks.items():
        x, _scaler = v  # 解包测试集和缩放器（缩放器未使用）
        train, test = zip(*x)  # 解包训练和测试数据对
        np.random.seed(0)  # 设置随机种子
        ind = np.arange(len(train))  # 生成索引数组
        ind = np.random.permutation(ind)  # 随机打乱索引数组
        train = [train[i] for i in ind]  # 根据打乱的索引重新组织训练数据
        test = [test[i] for i in ind]  # 根据打乱的索引重新组织测试数据
        benchmarks[k] = [list(train), list(test)]  # 更新基准测试集字典

    df = pd.read_csv('data/last_val_mae.csv')  # 读取最后一次验证MAE的CSV文件
    df.sort_values(by='mae')  # 根据MAE排序

    df_paper = pd.read_csv('data/paper_mae_raw.csv')  # 读取原始论文MAE的CSV文件
    datasets = df_paper['Dataset']  # 获取数据集列
    name_map = {
        'Aus. Electricity Demand' :'australian_electricity_demand',  # 名称映射
        'Kaggle Weekly': 'kaggle_web_traffic_weekly',
        'FRED-MD': 'fred_md',
        'Saugeen River Flow': 'saugeenday',        
    }
    datasets = [name_map.get(d, d) for d in datasets]  # 应用名称映射
    # 将数据集名称转换为小写并用下划线替换空格
    datasets = [d.lower().replace(' ', '_') for d in datasets]
    df_paper['Dataset'] = datasets  # 更新数据集列
    df_paper = df_paper.reset_index(drop=True)  # 重置索引
    # 为每个数据集添加最后一次验证MAE到df_paper
    for dataset in df_paper['Dataset']:
        if dataset in df['dataset'].values:
            df_paper.loc[df_paper['Dataset'] == dataset, 'Last Value'] = df[df['dataset'] == dataset]['mae'].values[0]
    # 将'-'转换为np.nan
    df_paper = df_paper.replace('-', np.nan)
    # 将所有值转换为浮点数
    for method in df_paper.columns[1:]:
        df_paper[method] = df_paper[method].astype(float)
    df_paper.to_csv('data/paper_mae.csv', index=False)  # 将处理后的数据保存为CSV文件
    # 通过除以最后一次验证MAE来标准化每个方法
    for method in df_paper.columns[1:-1]:  # 跳过数据集和最后一个值
        df_paper[method] = df_paper[method] / df_paper['Last Value']
    # 根据方法的最小MAE排序
    df_paper['normalized_min'] = df_paper[df_paper.columns[1:-1]].min(axis=1)
    df_paper['normalized_median'] = df_paper[df_paper.columns[1:-1]].median(axis=1)
    df_paper = df_paper.sort_values(by='normalized_min')
    df_paper = df_paper.reset_index(drop=True)
    # 将标准化后的数据保存为CSV文件
    df_paper.to_csv('data/paper_mae_normalized.csv', index=False)
    return benchmarks  # 返回基准测试集字典

def main():
    get_datasets()  # 调用get_datasets函数
    
if __name__ == "__main__":
    main()  # 如果是主程序，则执行main函数


In [None]:
import numpy as np  # 导入numpy库，用于数值计算
import numbers  # 导入numbers库，用于数字类型检查
import random  # 导入random库，用于生成随机数
from collections import defaultdict  # 导入defaultdict，用于创建带有默认值的字典
from collections.abc import Iterable  # 导入Iterable，用于判断对象是否可迭代

import itertools  # 导入itertools库，用于实现各种迭代器
import operator  # 导入operator库，提供一系列对操作符的封装
import functools  # 导入functools库，用于高阶函数和可调用对象的操作

class FixedNumpySeed(object):  # 定义一个固定Numpy随机种子的类
    def __init__(self, seed):  # 类的初始化方法
        self.seed = seed  # 保存随机种子值
    def __enter__(self):  # 进入上下文管理器的方法
        self.np_rng_state = np.random.get_state()  # 保存numpy随机状态
        np.random.seed(self.seed)  # 设置numpy的随机种子
        self.rand_rng_state = random.getstate()  # 保存random模块的随机状态
        random.seed(self.seed)  # 设置random模块的随机种子
    def __exit__(self, *args):  # 退出上下文管理器的方法
        np.random.set_state(self.np_rng_state)  # 恢复numpy的随机状态
        random.setstate(self.rand_rng_state)  # 恢复random模块的随机状态

class ReadOnlyDict(dict):  # 定义一个只读字典类，继承自dict
    def __readonly__(self, *args, **kwargs):  # 定义一个只读的方法
        raise RuntimeError("Cannot modify ReadOnlyDict")  # 抛出运行时错误，不允许修改
    __setitem__ = __readonly__  # 设置字典项时调用只读方法
    __delitem__ = __readonly__  # 删除字典项时调用只读方法
    pop = __readonly__  # pop方法调用只读方法
    popitem = __readonly__  # popitem方法调用只读方法
    clear = __readonly__  # clear方法调用只读方法
    update = __readonly__  # update方法调用只读方法
    setdefault = __readonly__  # setdefault方法调用只读方法
    del __readonly__  # 删除只读方法定义，确保不会被外部调用

class NoGetItLambdaDict(dict):  # 定义一个不允许获取lambda函数和非字符串可迭代对象的字典类
    def __init__(self,d={}):  # 类的初始化方法
        super().__init__()  # 调用父类的初始化方法
        for k,v in d.items():  # 遍历传入的字典项
            if isinstance(v,dict):  # 如果值是字典类型
                self[k] = NoGetItLambdaDict(v)  # 递归创建NoGetItLambdaDict对象
            else:  # 如果值不是字典类型
                self[k] = v  # 直接设置值
    def __getitem__(self, key):  # 重写获取字典项的方法
        value = super().__getitem__(key)  # 调用父类的方法获取值
        if callable(value) and value.__name__ == "<lambda>":  # 如果值是lambda函数
            raise LookupError("You shouldn't try to retrieve lambda {} from this dict".format(value))  # 抛出查找错误，不允许获取lambda函数
        if isinstance(value,Iterable) and not isinstance(value,(str,bytes,dict,tuple)):  # 如果值是非字符串的可迭代对象
            raise LookupError("You shouldn't try to retrieve iterable {} from this dict".format(value))  # 抛出查找错误，不允许获取非字符串的可迭代对象
        return value  # 返回值

def sample_config(config_spec):  # 定义一个函数，用于从配置规范中生成配置
    cfg_all = config_spec  # 将配置规范赋值给cfg_all
    more_work=True  # 初始化一个标志，表示是否需要继续处理
    i=0  # 初始化计数器
    while more_work:  # 当需要继续处理时
        cfg_all, more_work = _sample_config(cfg_all,NoGetItLambdaDict(cfg_all))  # 调用_sample_config函数处理配置，并更新more_work标志
        i+=1  # 更新计数器
        if i>10:  # 如果尝试次数超过10次
            raise RecursionError("config dependency unresolvable with {}".format(cfg_all))  # 抛出递归错误，表示配置依赖无法解决
    out = defaultdict(dict)  # 创建一个默认值为字典的defaultdict对象
    out.update(cfg_all)  # 更新out对象，将处理后的配置赋值给它
    return out  # 返回处理后的配置

def _sample_config(config_spec,cfg_all):  # 定义一个辅助函数，用于处理配置规范和当前配置
    cfg = {}  # 初始化一个空字典，用于存储处理后的配置
    more_work = False  # 初始化一个标志，表示是否需要继续处理
    for k,v in config_spec.items():  # 遍历配置规范中的项
        if isinstance(v,dict):  # 如果值是字典类型
            new_dict,extra_work = _sample_config(v,cfg_all)  # 递归调用_sample_config函数处理字典类型的值
            cfg[k] = new_dict  # 将处理后的字典赋值给cfg
            more_work |= extra_work  # 更新more_work标志
        elif isinstance(v,Iterable) and not isinstance(v,(str,bytes,dict,tuple)):  # 如果值是非字符串的可迭代对象
            cfg[k] = random.choice(v)  # 从可迭代对象中随机选择一个值赋值给cfg
        elif callable(v) and v.__name__ == "<lambda>":  # 如果值是lambda函数
            try:cfg[k] = v(cfg_all)  # 尝试调用lambda函数，并将结果赋值给cfg
            except (KeyError, LookupError,Exception):  # 如果调用lambda函数时发生错误
                cfg[k] = v  # 将lambda函数本身赋值给cfg
                more_work = True  # 更新more_work标志，表示需要继续处理
        else: cfg[k] = v  # 如果值不是字典、非字符串可迭代对象或lambda函数，直接赋值给cfg
    return cfg, more_work  # 返回处理后的配置和more_work标志


def flatten(d, parent_key='', sep='/'):  # 定义一个函数，用于将嵌套的字典扁平化
    items = []  # 初始化一个空列表，用于存储扁平化后的项
    for k, v in d.items():  # 遍历字典中的项
        new_key = parent_key + sep + k if parent_key else k  # 生成新的键，将嵌套的键连接起来
        if isinstance(v, dict) and v: # 如果值是非空字典
            items.extend(flatten(v, new_key, sep=sep).items())  # 递归调用flatten函数处理字典类型的值，并将结果扩展到items列表中
        else:  # 如果值不是字典类型
            items.append((new_key, v))  # 将项添加到items列表中
    return dict(items)  # 将items列表转换为字典并返回
# flatten简单来说就是将字典里面的嵌套字典转化为非嵌套字典
# nested_dict = {
#     'a': 1,
#     'b': {'b1': 2, 'b2': 3},
#     'c': {'c1': {'c11': 4, 'c12': 5}, 'c2': 6}
# }
# {
#     'a': 1, 
#     'b/b1': 2, 
#     'b/b2': 3, 
#     'c/c1/c11': 4, 
#     'c/c1/c12': 5, 
#     'c/c2': 6
# }

def unflatten(d,sep='/'):  # 定义一个函数，用于将扁平化的字典还原为嵌套的字典
    out_dict={}  # 初始化一个空字典，用于存储还原后的嵌套字典
    for k,v in d.items():  # 遍历扁平化的字典中的项
        if isinstance(k,str):  # 如果键是字符串类型
            keys = k.split(sep)  # 将键分割为多个部分
            dict_to_modify = out_dict  # 初始化一个变量，用于指向需要修改的字典
            for partial_key in keys[:-1]:  # 遍历除了最后一个部分之外的所有部分
                try: dict_to_modify = dict_to_modify[partial_key]  # 尝试获取对应的子字典
                except KeyError:  # 如果子字典不存在
                    dict_to_modify[partial_key] = {}  # 创建一个新的子字典
                    dict_to_modify = dict_to_modify[partial_key]  # 更新需要修改的字典为新创建的子字典
                # Base level reached  达到基础层级
            if keys[-1] in dict_to_modify:  # 如果最后一个部分的键已经存在于字典中
                dict_to_modify[keys[-1]].update(v)  # 更新对应的值
            else:  # 如果最后一个部分的键不存在于字典中
                dict_to_modify[keys[-1]] = v  # 创建一个新的项
        else: out_dict[k]=v  # 如果键不是字符串类型，直接赋值
    return out_dict  # 返回还原后的嵌套字典

class grid_iter(object):  # 定义一个类，用于实现对配置规范中网格变量的迭代
    def __init__(self,config_spec,num_elements=-1,shuffle=True):  # 类的初始化方法
        self.cfg_flat = flatten(config_spec)  # 将配置规范扁平化 将hypers进行扁平化
        is_grid_iterable = lambda v: (isinstance(v,Iterable) and not isinstance(v,(str,bytes,dict,tuple)))  # 定义一个函数，用于判断值是否是网格可迭代对象
        iterables = sorted({k:v for k,v in self.cfg_flat.items() if is_grid_iterable(v)}.items())  # 从扁平化的配置中筛选出网格可迭代对象
        if iterables: self.iter_keys,self.iter_vals = zip(*iterables)  # 如果存在网格可迭代对象，分别获取键和值
        else: self.iter_keys,self.iter_vals = [],[[]]  # 如果不存在网格可迭代对象，初始化为空列表
        self.vals = list(itertools.product(*self.iter_vals))  # 使用itertools.product生成所有可能的组合
        if shuffle:  # 如果需要打乱顺序
            with FixedNumpySeed(0): random.shuffle(self.vals)  # 使用固定的随机种子打乱组合的顺序
        self.num_elements = num_elements if num_elements>=0 else (-1*num_elements)*len(self)  # 设置迭代的元素数量

    def __iter__(self):  # 定义迭代器的__iter__方法
        self.i=0  # 初始化计数器
        self.vals_iter = iter(self.vals)  # 创建组合的迭代器
        return self  # 返回自身作为迭代器
    def __next__(self):  # 定义迭代器的__next__方法
        self.i+=1  # 更新计数器
        if self.i > self.num_elements: raise StopIteration  # 如果达到设定的元素数量，抛出StopIteration异常
        if not self.vals: v = []  # 如果没有组合，设置v为空列表
        else:  # 如果有组合
            try: v = next(self.vals_iter)  # 尝试获取下一个组合
            except StopIteration:  # 如果组合已经遍历完毕
                self.vals_iter = iter(self.vals)  # 重新创建组合的迭代器
                v = next(self.vals_iter)  # 获取下一个组合
        chosen_iter_params = dict(zip(self.iter_keys,v))  # 将键和值组合为字典
        self.cfg_flat.update(chosen_iter_params)  # 更新扁平化的配置
        return sample_config(unflatten(self.cfg_flat))  # 返回处理后的配置
    def __len__(self):  # 定义迭代器的__len__方法
        product = functools.partial(functools.reduce, operator.mul)  # 使用functools.reduce和operator.mul计算组合的总数
        return product(len(v) for v in self.iter_vals) if self.vals else 1  # 返回组合的总数

def flatten_dict(d):  # 定义一个函数，用于将字典扁平化，忽略外部键
    out = {}  # 初始化一个空字典，用于存储扁平化后的项
    for k,v in d.items():  # 遍历字典中的项
        if isinstance(v,dict):  # 如果值是字典类型
            out.update(flatten_dict(v))  # 递归调用flatten_dict函数处理字典类型的值，并更新out字典
        elif isinstance(v,(numbers.Number,str,bytes)):  # 如果值是数字、字符串或字节类型
            out[k] = v  # 直接赋值给out字典
        else:  # 如果值是其他类型
            out[k] = str(v)  # 将值转换为字符串后赋值给out字典
    return out  # 返回扁平化后的字典


In [None]:
# 导入所需的库和函数
import numpy as np  # 用于数值计算
from tqdm.auto import tqdm  # 用于显示进度条
from concurrent.futures import ThreadPoolExecutor, as_completed  # 用于并行执行
from models.utils import grid_iter  # 用于生成超参数的迭代器
from dataclasses import is_dataclass  # 检查对象是否为数据类
from typing import Any  # 用于类型注解

# 定义一个函数，用于将训练集划分为训练和验证集
def make_validation_dataset(train, n_val, val_length):
    assert isinstance(train, list), 'Train should be a list of series'  # 确保train是列表类型

    train_minus_val_list, val_list = [], []  # 初始化存放训练集和验证集的列表
    if n_val is None:
        n_val = len(train)  # 如果未指定验证样本数量，则使用训练集的数量
    for train_series in train[:n_val]:  # 遍历训练集中的前n_val个序列
        train_len = max(len(train_series) - val_length, 1)  # 确定每个序列用于训练的长度  val_length表示验证集的长度 要么是test 要么是train训练集的一半
        train_minus_val, val = train_series[:train_len], train_series[train_len:]  # 划分训练集和验证集
        print(f'Train length: {len(train_minus_val)}, Val length: {len(val)}')  # 打印训练集和验证集的长度
        train_minus_val_list.append(train_minus_val)  # 添加到相应的列表中
        val_list.append(val)

    return train_minus_val_list, val_list, n_val  # 返回划分后的训练集、验证集和验证样本数量

# 定义一个函数，用于评估一组超参数在验证集上的性能
def evaluate_hyper(hyper, train_minus_val, val, get_predictions_fn):
    assert isinstance(train_minus_val, list) and isinstance(val, list), 'Train minus val and val should be lists of series'
    return get_predictions_fn(train_minus_val, val, **hyper, num_samples=0)['NLL/D']  # 调用预测函数并返回NLL/D值

# 定义一个函数，用于自动调优超参数，并获取最优超参数下的预测结果
def get_autotuned_predictions_data(train, test, hypers, num_samples, get_predictions_fn, verbose=False, parallel=True, n_train=None, n_val=None):
    if isinstance(hypers,dict):
        hypers = list(grid_iter(hypers))  # 如果hypers为字典，则通过grid_iter生成超参数组合的列表
    else:
        assert isinstance(hypers, list), 'hypers must be a list or dict'  # 确保hypers为列表或字典
    if not isinstance(train, list):
        train = [train]  # 确保train和test为列表类型
        test = [test]
    if n_val is None:
        n_val = len(train)  # 如果未指定验证样本数量，则使用训练集的长度
    if len(hypers) > 1:  # 如果有多组超参数需要评估
        val_length = min(len(test[0]), int(np.mean([len(series) for series in train])/2))  # 确定验证集长度 要么是训练集的一半 要么是测试集的长度
        train_minus_val, val, n_val = make_validation_dataset(train, n_val=n_val, val_length=val_length)  # 划分训练集和验证集 n_val表示验证集的数量 val_length表示验证集的长度
        # 过滤掉长度不足的验证序列
        train_minus_val, val = zip(*[(train_series, val_series) for train_series, val_series in zip(train_minus_val, val) if len(val_series) == val_length]) #有些验证集的数量过短 没有val_length那么长 就删掉
        train_minus_val = list(train_minus_val)#两个都变为list
        val = list(val)
        if len(train_minus_val) <= int(0.9*n_val):  # 如果过滤后的验证集数量过少，则抛出异常
            raise ValueError(f'Removed too many validation series. Only {len(train_minus_val)} out of {len(n_val)} series have length >= {val_length}. Try or decreasing val_length.')
        val_nlls = []  # 初始化存放每组超参数NLL值的列表
        def eval_hyper(hyper):  # 定义一个内部函数，用于评估单组超参数
            try:
                return hyper, evaluate_hyper(hyper, train_minus_val, val, get_predictions_fn)#evaluate_hyper评估超参数怎么样 get_predictions_fn是大模型预测评估函数get_llmtime_predictions_data
            except ValueError:
                return hyper, float('inf')  # 如果评估过程中出现异常，则返回无穷大
            
        best_val_nll = float('inf')  # 初始化最佳NLL值
        best_hyper = None  # 初始化最佳超参数组合
        if not parallel:  # 如果不使用并行计算
            for hyper in tqdm(hypers, desc='Hyperparameter search'):  # 遍历超参数组合
                _,val_nll = eval_hyper(hyper)  # 评估超参数
                val_nlls.append(val_nll)  # 添加NLL值到列表
                if val_nll < best_val_nll:  # 如果当前NLL值优于之前的最佳NLL值
                    best_val_nll = val_nll  # 更新最佳NLL值
                    best_hyper = hyper  # 更新最佳超参数组合
                if verbose:  # 如果开启详细输出模式
                    print(f'Hyper: {hyper} \n\t Val NLL: {val_nll:3f}')  # 打印当前超参数和对应的NLL值
        else:  # 如果使用并行计算
            with ThreadPoolExecutor() as executor:  # 创建线程池执行器
                futures = [executor.submit(eval_hyper,hyper) for hyper in hypers]  # 提交任务
                for future in tqdm(as_completed(futures), total=len(hypers), desc='Hyperparameter search'):  # 等待所有任务完成
                    hyper,val_nll = future.result()  # 获取结果
                    val_nlls.append(val_nll)  # 添加NLL值到列表
                    if val_nll < best_val_nll:  # 如果当前NLL值优于之前的最佳NLL值
                        best_val_nll = val_nll  # 更新最佳NLL值
                        best_hyper = hyper  # 更新最佳超参数组合
                    if verbose:  # 如果开启详细输出模式
                        print(f'Hyper: {hyper} \n\t Val NLL: {val_nll:3f}')  # 打印当前超参数和对应的NLL值
    else:  # 如果只有一组超参数
        best_hyper = hypers[0]  # 直接使用这组超参数
        best_val_nll = float('inf')  # 设置NLL值为无穷大（未评估）
    print(f'Sampling with best hyper... {best_hyper} \n with NLL {best_val_nll:3f}')  # 打印最佳超参数和对应的NLL值
    out = get_predictions_fn(train, test, **best_hyper, num_samples=num_samples, n_train=n_train, parallel=parallel)  # 使用最佳超参数获取预测结果
    out['best_hyper']=convert_to_dict(best_hyper)  # 将最佳超参数组合转换为字典格式并添加到输出中
    return out  # 返回包含预测结果和最佳超参数的字典

# 定义一个函数，用于将对象转换为字典格式
def convert_to_dict(obj: Any) -> Any:
    if isinstance(obj, dict):  # 如果对象是字典
        return {k: convert_to_dict(v) for k, v in obj.items()}  # 递归转换每个值
    elif isinstance(obj, list):  # 如果对象是列表
        return [convert_to_dict(elem) for elem in obj]  # 递归转换每个元素
    elif is_dataclass(obj):  # 如果对象是数据类
        return convert_to_dict(obj.__dict__)  # 转换数据类的__dict__属性
    else:  # 如果对象是其他类型
        return obj  # 直接返回对象


In [None]:
# 导入所需的库
from tqdm import tqdm  # 进度条库，用于显示循环的进度
from data.serialize import serialize_arr, deserialize_str, SerializerSettings  # 数据序列化和反序列化函数及设置
from concurrent.futures import ThreadPoolExecutor  # 线程池，用于并行处理
import numpy as np  # 数值计算库
import pandas as pd  # 数据处理库
from dataclasses import dataclass  # 数据类，用于定义类更简洁
from models.llms import completion_fns, nll_fns, tokenization_fns, context_lengths  # 导入模型相关的函数和设置

# 定义一个步骤乘数，用于调整预测的步数
STEP_MULTIPLIER = 1.2

# 定义一个数据缩放器类
@dataclass
class Scaler:
    """
    表示一个数据缩放器，具有转换和逆转换函数的属性。

    属性:
        transform (callable): 应用转换的函数。
        inv_transform (callable): 应用逆转换的函数。
    """
    transform: callable = lambda x: x  # 默认的转换函数，不做任何处理
    inv_transform: callable = lambda x: x  # 默认的逆转换函数，不做任何处理

# 定义一个函数，基于给定的历史数据生成一个Scaler对象
def get_scaler(history, alpha=0.95, beta=0.3, basic=False):
    """
    根据给定的历史数据生成一个Scaler对象。

    参数:
        history (array-like): 用于派生缩放的数据。
        alpha (float, optional): 缩放的分位数，默认为0.95。
        beta (float, optional): 偏移参数，默认为0.3。
        basic (bool, optional): 如果为True，不应用偏移，避免按小于0.01的值缩放，默认为False。

    返回:
        Scaler: 配置好的缩放器对象。
    """
    history = history[~np.isnan(history)]  # 移除NaN值
    if basic:
        # 基础缩放：仅基于分位数缩放，不应用偏移
        q = np.maximum(np.quantile(np.abs(history), alpha), .01)
        def transform(x):
            return x / q
        def inv_transform(x):
            return x * q
    else:
        # 高级缩放：基于分位数和偏移量进行缩放
        min_ = np.min(history) - beta*(np.max(history)-np.min(history))
        q = np.quantile(history-min_, alpha)
        if q == 0:
            q = 1
        def transform(x):
            return (x - min_) / q
        def inv_transform(x):
            return x * q + min_
    return Scaler(transform=transform, inv_transform=inv_transform)

# 定义一个函数，用于根据给定模型的最大上下文长度截断输入
def truncate_input(input_arr, input_str, settings, model, steps):
    """
    根据给定模型的最大上下文长度截断输入。
    
    参数:
        input_arr (array-like): 输入时间序列。
        input_str (str): 序列化的输入时间序列。
        settings (SerializerSettings): 序列化设置。
        model (str): 要使用的LLM模型名称。
        steps (int): 要预测的步数。
    返回:
        tuple: 包含:
            - input_arr (array-like): 截断的输入时间序列。
            - input_str (str): 截断的序列化输入时间序列。
    """
    # 检查模型是否在支持的模型列表中
    if model in tokenization_fns and model in context_lengths:
        tokenization_fn = tokenization_fns[model]  # 获取模型的tokenization函数
        context_length = context_lengths[model]  # 获取模型支持的最大上下文长度
        input_str_chunks = input_str.split(settings.time_sep)  # 根据时间分隔符分割输入字符串
        for i in range(len(input_str_chunks) - 1):
            truncated_input_str = settings.time_sep.join(input_str_chunks[i:])  # 从当前位置开始截断输入字符串
            if not truncated_input_str.endswith(settings.time_sep):
                truncated_input_str += settings.time_sep  # 确保截断后的字符串以时间分隔符结尾
            input_tokens = tokenization_fn(truncated_input_str)  # 将截断后的字符串转换为tokens
            num_input_tokens = len(input_tokens)  # 计算输入tokens的数量
            avg_token_length = num_input_tokens / (len(input_str_chunks) - i)  # 计算平均token长度
            num_output_tokens = avg_token_length * steps * STEP_MULTIPLIER  # 计算输出tokens的预期数量
            if num_input_tokens + num_output_tokens <= context_length:
                truncated_input_arr = input_arr[i:]  # 截断输入数组
                break
        if i > 0:
            print(f'Warning: Truncated input from {len(input_arr)} to {len(truncated_input_arr)}')
        return truncated_input_arr, truncated_input_str
    else:
        return input_arr, input_str

# 定义一个函数处理LLM预测后的输出
def handle_prediction(pred, expected_length, strict=False):
    """
    处理LLM预测后的输出，可能过长或过短，或者如果在第一步预测就失败了，则为None。

    参数:
        pred (array-like or None): 预测值，None表示反序列化失败。
        expected_length (int): 预测的期望长度。
        strict (bool, optional): 如果为True，对于无效预测返回None。默认为False。

    返回:
        array-like: 处理后的预测。
    """
    if pred is None:
        return None
    else:
        if len(pred) < expected_length:
            if strict:
                print(f'Warning: Prediction too short {len(pred)} < {expected_length}, returning None')
                return None
            else:
                print(f'Warning: Prediction too short {len(pred)} < {expected_length}, padded with last value')
                return np.concatenate([pred, np.full(expected_length - len(pred), pred[-1])])
        else:
            return pred[:expected_length]

# 定义一个函数，生成并处理来自语言模型的文本完成对输入时间序列的预测
def generate_predictions(
    completion_fn, 
    input_strs, 
    steps, 
    settings: SerializerSettings, 
    scalers: None,
    num_samples=1, 
    temp=0.7, 
    parallel=True,
    strict_handling=False,
    max_concurrent=10,
    **kwargs
):
    """
    从语言模型获取文本完成，生成输入时间序列的预测并处理这些预测。

    参数:
        completion_fn (callable): 从LLM获取文本完成的函数。
        input_strs (list of array-like): 输入时间序列列表。
        steps (int): 预测的步数。
        settings (SerializerSettings): 序列化设置。
        scalers (list of Scaler, optional): Scaler对象列表。默认为None，意味着不应用缩放。
        num_samples (int, optional): 返回的样本数量。默认为1。
        temp (float, optional): 抽样的温度。默认为0.7。
        parallel (bool, optional): 如果为True，以并行方式运行完成。默认为True。
        strict_handling (bool, optional): 如果为True，对格式不正确或预期长度不正确的预测返回None。默认为False。
        max_concurrent (int, optional): 并发完成的最大数量。默认为10。
        **kwargs: 其他关键字参数。

    返回:
        tuple: 包含：
            - preds (list of lists): 数值预测。
            - completions_list (list of lists): 原始文本完成。
            - input_strs (list of str): 序列化输入字符串。
    """
    completions_list = []
    complete = lambda x: completion_fn(input_str=x, steps=steps*STEP_MULTIPLIER, settings=settings, num_samples=num_samples,
temp=temp, **kwargs)  # 定义一个lambda函数用于获取文本完成
    if parallel and len(input_strs) > 1:  # 如果启用并行处理且输入字符串列表长度大于1
        print('Running completions in parallel for each input')
        with ThreadPoolExecutor(min(max_concurrent, len(input_strs))) as p:  # 创建一个线程池
            completions_list = list(tqdm(p.map(complete, input_strs), total=len(input_strs)))  # 并行执行complete函数并显示进度
    else:  # 如果不并行或输入字符串列表长度不大于1
        completions_list = [complete(input_str) for input_str in tqdm(input_strs)]  # 顺序执行complete函数并显示进度
    def completion_to_pred(completion, inv_transform):
        # 定义一个函数将文本完成转换为预测值
        pred = handle_prediction(deserialize_str(completion, settings, ignore_last=False, steps=steps), expected_length=steps, strict=strict_handling)
        if pred is not None:  # 如果预测值非None
            return inv_transform(pred)  # 应用逆变换函数
        else:  # 如果预测值为None
            return None
    preds = [[completion_to_pred(completion, scaler.inv_transform) for completion in completions] for completions, scaler in zip(completions_list, scalers)]
    # 将文本完成列表和缩放器列表转换为预测值列表
    return preds, completions_list, input_strs

def get_llmtime_predictions_data(train, test, model, settings, num_samples=10, temp=0.7, alpha=0.95, beta=0.3, basic=False, parallel=True, **kwargs):
    """
    基于训练序列(历史)获得LLM的预测，并在测试序列(真实未来)上评估似然。
    train和test可以是单个时间序列或时间序列的列表。

    参数:
        train (array-like or list of array-like): 训练时间序列数据(历史)。
        test (array-like or list of array-like): 测试时间序列数据(真实未来)。
        model (str): 要使用的LLM模型名称。必须在completion_fns中有对应项。
        settings (SerializerSettings or dict): 序列化设置。
        num_samples (int, optional): 返回的样本数量。默认为10。
        temp (float, optional): 抽样的温度。默认为0.7。
        alpha (float, optional): 缩放参数。默认为0.95。
        beta (float, optional): 偏移参数。默认为0.3。
        basic (bool, optional): 如果为True，使用基础版本的数据缩放。默认为False。
        parallel (bool, optional): 如果为True，以并行方式运行预测。默认为True。
        **kwargs: 其他关键字参数。

    返回:
        dict: 包含预测、样本、中位数、NLL/D平均值以及其他相关信息的字典。
    """

    assert model in completion_fns, f'Invalid model {model}, must be one of {list(completion_fns.keys())}'  # 检查模型是否有效
    completion_fn = completion_fns[model]  # 获取模型对应的完成函数
    nll_fn = nll_fns[model] if model in nll_fns else None  # 获取模型对应的NLL函数
    
    if isinstance(settings, dict):  # 如果settings是字典
        settings = SerializerSettings(**settings)  # 将settings转换为SerializerSettings对象
    if not isinstance(train, list):  # 如果train不是列表
        train = [train]  # 将train转换为列表
        test = [test]  # 将test转换为列表

    for i in range(len(train)):  # 遍历train列表
        if not isinstance(train[i], pd.Series):  # 如果train的元素不是pd.Series
            train[i] = pd.Series(train[i], index=pd.RangeIndex(len(train[i])))  # 将train的元素转换为pd.Series
            test[i] = pd.Series(test[i], index=pd.RangeIndex(len(train[i]), len(test[i])+len(train[i])))  # 将test的元素转换为pd.Series

    test_len = len(test[0])  # 获取测试序列的长度
    assert all(len(t)==test_len for t in test), f'All test series must have same length, got {[len(t) for t in test]}'

    # 为每个序列创建一个独特的缩放器
    scalers = [get_scaler(train[i].values, alpha=alpha, beta=beta, basic=basic) for i in range(len(train))]

    # 转换输入序列
    input_arrs = [train[i].values for i in range(len(train))]
    transformed_input_arrs = [scaler.transform(input_array) for input_array, scaler in zip(input_arrs, scalers)]
    # 序列化输入序列
    input_strs = [serialize_arr(scaled_input_arr, settings) for scaled_input_arr in transformed_input_arrs]
    # 根据模型的最大上下文长度截断输入序列
    input_arrs, input_strs = zip(*[truncate_input(input_array, input_str, settings, model, test_len) for input_array, input_str in zip(input_arrs, input_strs)])

    steps = test_len  # 设置预测步数
    samples = None
    medians = None
    completions_list = None
    if num_samples > 0:  # 如果需要返回样本
        preds, completions_list, input_strs = generate_predictions(completion_fn, input_strs, steps, settings, scalers,
                                                                    num_samples=num_samples, temp=temp, 
                                                                    parallel=parallel, **kwargs)
        # 将预测结果转换为pandas DataFrame
        samples = [pd.DataFrame(preds[i], columns=test[i].index) for i in range(len(preds))]
        # 计算中位数
        medians = [sample.median(axis=0) for sample in samples]
        samples = samples if len(samples) > 1 else samples[0]  # 如果样本数大于1，则返回样本列表，否则返回单个样本
        medians = medians if len(medians) > 1 else medians[0]  # 如果中位数数大于1，则返回中位数列表，否则返回单个中位数

    out_dict = {
        'samples': samples,
        'median': medians,
        'info': {
            'Method': model,
        },
        'completions_list': completions_list,
        'input_strs': input_strs,
    }
    # 计算在真实测试序列上，条件于（截断的）输入序列的NLL/D
    if nll_fn is not None:
        BPDs = [nll_fn(input_arr=input_arrs[i], target_arr=test[i].values, settings=settings, transform=scalers[i].transform, count_seps=True, temp=temp) for i in range(len(train))]
        out_dict['NLL/D'] = np.mean(BPDs)  # 计算NLL/D的平均值

    return out_dict


In [None]:
from functools import partial # 从functools模块导入partial函数，用于固定函数的一些参数值
from models.gpt import gpt_completion_fn, gpt_nll_fn # 从models.gpt模块导入gpt的文本完成和负对数似然函数
from models.gpt import tokenize_fn as gpt_tokenize_fn # 从models.gpt模块导入tokenize函数，并重命名为gpt_tokenize_fn
from models.llama import llama_completion_fn, llama_nll_fn # 从models.llama模块导入llama的文本完成和负对数似然函数
from models.llama import tokenize_fn as llama_tokenize_fn # 从models.llama模块导入tokenize函数，并重命名为llama_tokenize_fn

from models.mistral import mistral_completion_fn, mistral_nll_fn # 从models.mistral模块导入mistral的文本完成和负对数似然函数
from models.mistral import tokenize_fn as mistral_tokenize_fn # 从models.mistral模块导入tokenize函数，并重命名为mistral_tokenize_fn

from models.mistral_api import mistral_api_completion_fn, mistral_api_nll_fn # 从models.mistral_api模块导入mistral_api的文本完成和负对数似然函数
from models.mistral_api import tokenize_fn as mistral_api_tokenize_fn # 从models.mistral_api模块导入tokenize函数，并重命名为mistral_api_tokenize_fn


# Required: Text completion function for each model
# 必需：为每个模型映射一个用于采样文本完成的函数
# 每个模型映射到一个采样文本完成的函数。
# 完成函数应遵循以下签名：
# 
# 参数：
#   - input_str (str): 输入时间序列的字符串表示。
#   - steps (int): 预测的步数。
#   - settings (SerializerSettings): 序列化设置。
#   - num_samples (int): 要采样的完成数量。
#   - temp (float): 模型输出随机性的温度参数。
# 
# 返回：
#   - list: 从模型中采样的完成字符串列表。
completion_fns = {
    'text-davinci-003': partial(gpt_completion_fn, model='text-davinci-003'),
    'gpt-4': partial(gpt_completion_fn, model='gpt-4'),
    'gpt-4-1106-preview':partial(gpt_completion_fn, model='gpt-4-1106-preview'),
    'gpt-3.5-turbo-instruct': partial(gpt_completion_fn, model='gpt-3.5-turbo-instruct'),
    'mistral': partial(mistral_completion_fn, model='mistral'),
    'mistral-api-tiny': partial(mistral_api_completion_fn, model='mistral-tiny'),
    'mistral-api-small': partial(mistral_api_completion_fn, model='mistral-small'),
    'mistral-api-medium': partial(mistral_api_completion_fn, model='mistral-medium'),
    'llama-7b': partial(llama_completion_fn, model='7b'),
    'llama-13b': partial(llama_completion_fn, model='13b'),
    'llama-70b': partial(llama_completion_fn, model='70b'),
    'llama-7b-chat': partial(llama_completion_fn, model='7b-chat'),
    'llama-13b-chat': partial(llama_completion_fn, model='13b-chat'),
    'llama-70b-chat': partial(llama_completion_fn, model='70b-chat'),
}

# Optional: NLL/D functions for each model
# 可选：为每个模型映射一个计算连续负对数似然/维度（NLL/D）的函数。这用于仅计算似然，采样时不需要。
# 
# 每个模型映射到一个计算连续负对数似然/维度（NLL/D）的函数。这用于计算似然值，采样时不需要。
# 
# NLL函数应遵循以下签名：
# 
# 参数：
#   - input_arr (np.ndarray): 数据转换后的输入时间序列（历史）。
#   - target_arr (np.ndarray): 数据转换后的真实时间序列（未来）。
#   - settings (SerializerSettings): 序列化设置。
#   - transform (callable): 数据转换函数（例如，缩放）用于确定雅可比因子。
#   - count_seps (bool): 如果为True，则在NLL计算中计算时间步分隔符，如果允许数字的变化长度则需要。
#   - temp (float): 采样的温度参数。
# 
# 返回：
#   - float: 给定输入数组条件下目标数组的NLL/D计算值。
nll_fns = {
    'text-davinci-003': partial(gpt_nll_fn, model='text-davinci-003'),
    'mistral': partial(mistral_nll_fn, model='mistral'),
    'mistral-api-tiny': partial(mistral_api_nll_fn, model='mistral-tiny'),
    'mistral-api-small': partial(mistral_api_nll_fn, model='mistral-small'),
    'mistral-api-medium': partial(mistral_api_nll_fn, model='mistral-medium'),
    'llama-7b': partial(llama_nll_fn, model='7b'),
    'llama-13b': partial(llama_nll_fn, model='13b'),
    'llama-70b': partial(llama_nll_fn, model='70b'),
    'llama-7b-chat': partial(llama_nll_fn, model='7b-chat'),
    'llama-13b-chat': partial(llama_nll_fn, model='13b-chat'),
    'llama-70b-chat': partial(llama_nll_fn, model='70b-chat'),
}

# Optional: Tokenization function for each model, only needed if you want automatic input truncation.
# 可选：为每个模型映射一个tokenization函数，仅在您想要自动输入截断时需要。
# Tokenization函数应遵循以下签名：
#
# 参数：
#   - str (str): 要tokenize的字符串。
# 返回：
#   - token_ids (list): token ids列表。
tokenization_fns = {
    'text-davinci-003': partial(gpt_tokenize_fn, model='text-davinci-003'),
    'gpt-3.5-turbo-instruct': partial(gpt_tokenize_fn, model='gpt-3.5-turbo-instruct'),
    'mistral': partial(mistral_tokenize_fn, model='mistral'),
    'mistral-api-tiny': partial(mistral_api_tokenize_fn, model='mistral-tiny'),
    'mistral-api-small': partial(mistral_api_tokenize_fn, model='mistral-small'),
    'mistral-api-medium': partial(mistral_api_tokenize_fn, model='mistral-medium'),
    'llama-7b': partial(llama_tokenize_fn, model='7b'),
    'llama-13b': partial(llama_tokenize_fn, model='13b'),
    'llama-70b': partial(llama_tokenize_fn, model='70b'),
    'llama-7b-chat': partial(llama_tokenize_fn, model='7b-chat'),
    'llama-13b-chat': partial(llama_tokenize_fn, model='13b-chat'),
    'llama-70b-chat': partial(llama_tokenize_fn, model='70b-chat'),
}

# Optional: Context lengths for each model, only needed if you want automatic input truncation.
# 可选：为每个模型映射一个上下文长度，仅在您想要自动输入截断时需要。
context_lengths = {
    'text-davinci-003': 4097,
    'gpt-3.5-turbo-instruct': 4097,
    'mistral-api-tiny': 4097,
    'mistral-api-small': 4097,
    'mistral-api-medium': 4097,
    'mistral': 4096,
    'llama-7b': 4096,
    'llama-13b': 4096,
    'llama-70b': 4096,
    'llama-7b-chat': 4096,
    'llama-13b-chat': 4096,
    'llama-70b-chat': 4096,
}


In [None]:
from data.serialize import serialize_arr, SerializerSettings # 从data.serialize模块导入序列化数组函数和序列化设置类
import openai # 导入openai库用于访问OpenAI的API
import tiktoken # 导入tiktoken库用于进行文本编码和解码
import numpy as np # 导入numpy库用于进行数学计算
from jax import grad, vmap # 从jax库导入grad和vmap函数，用于计算梯度和对函数进行向量化


def tokenize_fn(str, model):
    """
    为特定GPT模型的字符串获取token ID。

    参数：
        str (str): 要被tokenize的字符串。
        model (str): LLM模型的名称。

    返回：
        list of int: 对应的token ID列表。
    """
    encoding = tiktoken.encoding_for_model(model) # 获取模型对应的编码器
    return encoding.encode(str) # 对字符串进行编码并返回token ID

def get_allowed_ids(strs, model):
    """
    为特定GPT模型的一系列字符串获取token ID。

    参数：
        strs (list of str): 要被转换的字符串列表。
        model (str): LLM模型的名称。

    返回：
        list of int: 对应的token ID列表。
    """
    encoding = tiktoken.encoding_for_model(model) # 获取模型对应的编码器
    ids = []
    for s in strs:
        id = encoding.encode(s) # 对每个字符串进行编码
        ids.extend(id) # 将编码后的ID添加到列表中
    return ids

def gpt_completion_fn(model, input_str, steps, settings, num_samples, temp):
    """
    使用OpenAI的API从GPT生成文本完成。

    参数：
        model (str): 要使用的GPT-3模型的名称。
        input_str (str): 序列化的输入时间序列数据。
        steps (int): 要预测的时间步数。
        settings (SerializerSettings): 序列化设置。
        num_samples (int): 要生成的完成数量。
        temp (float): 采样的温度。

    返回：
        list of str: 生成的样本列表。
    """
    avg_tokens_per_step = len(tokenize_fn(input_str, model)) / len(input_str.split(settings.time_sep)) # 计算每个步骤的平均token数量
    # 定义logit偏差以防止GPT-3生成不需要的token
    logit_bias = {}
    allowed_tokens = [settings.bit_sep + str(i) for i in range(settings.base)] 
    allowed_tokens += [settings.time_sep, settings.plus_sign, settings.minus_sign]
    allowed_tokens = [t for t in allowed_tokens if len(t) > 0] # 移除空token，如隐式加号
    if (model not in ['gpt-3.5-turbo','gpt-4','gpt-4-1106-preview']): # 对于非聊天模型，支持logit偏差
        logit_bias = {id: 30 for id in get_allowed_ids(allowed_tokens, model)}
    if model in ['gpt-3.5-turbo','gpt-4','gpt-4-1106-preview']: # 对于聊天模型
        chatgpt_sys_message = "You are a helpful assistant that performs time series predictions. The user will provide a sequence and you will predict the remaining sequence. The sequence is represented by decimal strings separated by commas."
        extra_input = "Please continue the following sequence without producing any additional text. Do not say anything like 'the next terms in the sequence are', just return the numbers. Sequence:\n"
        response = openai.ChatCompletion.create(
            model=model,
            messages=[
                    {"role": "system", "content": chatgpt_sys_message},
                    {"role": "user", "content": extra_input+input_str+settings.time_sep}
                ],
            max_tokens=int(avg_tokens_per_step*steps), 
            temperature=temp,
            logit_bias=logit_bias,
            n=num_samples,
        )
        return [choice.message.content for choice in response.choices]
    else: # 对于非聊天模型
        response = openai.Completion.create(
            model=model,
            prompt=input_str, 
            max_tokens=int(avg_tokens_per_step*steps), 
            temperature=temp,
            logit_bias=logit_bias
            n=num_samples
        )
        return [choice.text for choice in response.choices]
    
def gpt_nll_fn(model, input_arr, target_arr, settings:SerializerSettings, transform, count_seps=True, temp=1):
    """
    根据LLM计算目标数组的每维度负对数似然(NLL)。

    参数：
        model (str): 要使用的LLM模型名称。
        input_arr (array-like): 输入数组（历史数据）。
        target_arr (array-like): 目标数组（未来数据）。
        settings (SerializerSettings): 序列化设置。
        transform (callable): 应用于数值的转换，在序列化前。
        count_seps (bool, 可选): 是否在计算中考虑分隔符。对于生成变量位数的模型应为真。默认为True。
        temp (float, 可选): 采样的温度。默认为1。

    返回：
        float: 计算出的每维度NLL。
    """
    input_str = serialize_arr(vmap(transform)(input_arr), settings) # 序列化输入数组
    target_str = serialize_arr(vmap(transform)(target_arr), settings) # 序列化目标数组
    assert input_str.endswith(settings.time_sep), f'输入字符串必须以{settings.time_sep}结束, 得到的是{input_str}'
    full_series = input_str + target_str # 拼接输入和目标序列
    response = openai.Completion.create(model=model, prompt=full_series, logprobs=5, max_tokens=0, echo=True, temperature=temp)
    logprobs = np.array(response['choices'][0].logprobs.token_logprobs, dtype=np.float32) # 获取log概率
    tokens = np.array(response['choices'][0].logprobs.tokens) # 获取生成的tokens
    top5logprobs = response['choices'][0].logprobs.top_logprobs # 获取top 5 log概率
    seps = tokens == settings.time_sep # 找到时间分隔符的位置
    target_start = np.argmax(np.cumsum(seps) == len(input_arr)) + 1 # 定位目标序列开始的位置
    logprobs = logprobs[target_start:] # 获取目标序列的log概率
    tokens = tokens[target_start:] # 获取目标序列的tokens
    top5logprobs = top5logprobs[target_start:] # 获取目标序列的top 5 log概率
    seps = tokens == settings.time_sep # 重新找到目标序列中时间分隔符的位置
    assert len(logprobs[seps]) == len(target_arr), f'每个目标应有一个分隔符。得到了{len(logprobs[seps])}个分隔符和{len(target_arr)}个目标。'
    # 通过移除多余的并重新规范化来调整log概率（见论文附录）
    allowed_tokens = [settings.bit_sep + str(i) for i in range(settings.base)] 
    allowed_tokens += [settings.time_sep, settings.plus_sign, settings.minus_sign, settings.bit_sep+settings.decimal_point]
    allowed_tokens = {t for t in allowed_tokens if len(t) > 0}
    p_extra = np.array([sum(np.exp(ll) for k,ll in top5logprobs[i].items() if not (k in allowed_tokens)) for i in range(len(top5logprobs))])
    if settings.bit_sep == '':
        p_extra = 0
    adjusted_logprobs = logprobs - np.log(1-p_extra) # 调整后的log概率
    digits_bits = -adjusted_logprobs[~seps].sum() # 数字的bits
    seps_bits = -adjusted_logprobs[seps].sum() # 分隔符的bits
    BPD = digits_bits / len(target_arr) # 每个目标的bits
    if count_seps:
        BPD += seps_bits / len(target_arr) # 如果计算分隔符，则加上分隔符的bits
    # log p(x) = log p(token) - log bin_width = log p(token) + prec * log base
    transformed_nll = BPD - settings.prec * np.log(settings.base) # 转换后的NLL
    avg_logdet_dydx = np.log(vmap(grad(transform))(target_arr)).mean() # 计算目标数组变换的平均log雅可比行列式
    return transformed_nll - avg_logdet_dydx # 返回调整后的NLL减去平均log雅可比行列式


In [None]:
from functools import partial  # 从functools模块导入partial函数，用于创建偏函数
import numpy as np  # 导入numpy库，通常简称为np
from dataclasses import dataclass  # 从dataclasses模块导入dataclass装饰器，用于创建数据类

def vec_num2repr(val, base, prec, max_val):  # 定义一个函数，将数值转换为指定基数和精度的表示形式
    """
    将数字转换为指定基数和精度的表示形式。

    参数:
    - val (np.array): 要表示的数字。
    - base (int): 表示的基数。
    - prec (int): 基数表示中小数点后的精度。
    - max_val (float): 数字的最大绝对值。

    返回:
    - tuple: 指定基数表示形式中的符号和数字。
    
    示例:
        以基数10, 精度2为例:
            0.5   ->    50
            3.52  ->   352
            12.5  ->  1250
    """
    base = float(base)  # 将基数转换为浮点数
    bs = val.shape[0]  # 获取val数组的大小
    sign = 1 * (val >= 0) - 1 * (val < 0)  # 计算val中每个元素的符号
    val = np.abs(val)  # 取val的绝对值
    max_bit_pos = int(np.ceil(np.log(max_val) / np.log(base)).item())  # 计算最大值的位数

    before_decimals = []  # 初始化小数点前的数字列表
    for i in range(max_bit_pos):  # 对于每个位数
        digit = (val / base**(max_bit_pos - i - 1)).astype(int)  # 计算每位的数字
        before_decimals.append(digit)  # 将数字添加到列表中
        val -= digit * base**(max_bit_pos - i - 1)  # 更新剩余的值

    before_decimals = np.stack(before_decimals, axis=-1)  # 将列表转换为numpy数组

    if prec > 0:  # 如果精度大于0
        after_decimals = []  # 初始化小数点后的数字列表
        for i in range(prec):  # 对于每个小数位
            digit = (val / base**(-i - 1)).astype(int)  # 计算每位的数字
            after_decimals.append(digit)  # 将数字添加到列表中
            val -= digit * base**(-i - 1)  # 更新剩余的值

        after_decimals = np.stack(after_decimals, axis=-1)  # 将列表转换为numpy数组
        digits = np.concatenate([before_decimals, after_decimals], axis=-1)  # 将小数点前后的数字合并
    else:
        digits = before_decimals  # 如果精度为0，则只有小数点前的数字
    return sign, digits  # 返回符号和数字

def vec_repr2num(sign, digits, base, prec, half_bin_correction=True):  # 定义一个函数，将指定基数和精度的表示形式转换回数字
    """
    将指定基数的字符串表示形式转换回数字。

    参数:
    - sign (np.array): 数字的符号。
    - digits (np.array): 指定基数中的数字。
    - base (int): 基数。
    - prec (int): 小数点后的精度。
    - half_bin_correction (bool): 如果为True，在数字上加上最小bin大小的0.5。

    返回:
    - np.array: 给定基数表示的数字对应的数组。
    """
    base = float(base)  # 将基数转换为浮点数
    bs, D = digits.shape  # 获取digits数组的形状
    digits_flipped = np.flip(digits, axis=-1)  # 将digits数组在最后一个轴上翻转
    powers = -np.arange(-prec, -prec + D)  # 计算每个位的幂次
    val = np.sum(digits_flipped/base**powers, axis=-1)  # 计算转换后的数字

    if half_bin_correction:  # 如果启用半bin校正
        val += 0.5/base**prec  # 在数值上加上最小bin大小的一半

    return sign * val  # 返回最终的数值数组

@dataclass  # 使用dataclass装饰器定义一个数据类
class SerializerSettings:  # 定义序列化数字的设置类
    """
    数字序列化的设置。

    属性:
    - base (int): 数字表示的基数。
    - prec (int): 小数点后的精度。
    - signed (bool): 如果为True，允许负数。默认为False。
    - fixed_length (bool): 如果为True，确保序列化字符串的固定长度。默认为False。
    - max_val (float): 序列化的数字的最大绝对值。
    - time_sep (str): 不同时间步的分隔符。
    - bit_sep (str): 个别数字的分隔符。
    - plus_sign (str): 正号的字符串表示。
    - minus_sign (str): 负号的字符串表示。
    - half_bin_correction (bool): 如果为True，在反序列化时应用半bin校正。默认为True。
    - decimal_point (str): 小数点的字符串表示。
    - missing_str (str): 缺失值的字符串表示。
    """
    base: int = 10
    prec: int = 3
    signed: bool = True
    fixed_length: bool = False
    max_val: float = 1e7
    time_sep: str = ' ,'
    bit_sep: str = ' '
    plus_sign: str = ''
    minus_sign: str = ' -'
    half_bin_correction: bool = True
    decimal_point: str = ''
    missing_str: str = ' Nan'

def serialize_arr(arr, settings: SerializerSettings):  # 定义一个函数，根据提供的设置将数字数组序列化为字符串
    """
    根据提供的设置将数字数组（时间序列）序列化为字符串。

    参数:
    - arr (np.array): 要序列化的数字数组。
    - settings (SerializerSettings): 序列化的设置。

    返回:
    - str: 数组的字符串表示。
    """
    # max_val仅用于固定nunm2repr中位数的数量，因此可以vmapped
    assert np.all(np.abs(arr[~np.isnan(arr)]) <= settings.max_val), f"abs(arr)必须<=max_val，\
         但abs(arr)={np.abs(arr)}, max_val={settings.max_val}"
    
    if not settings.signed:  # 如果不允许负数
        assert np.all(arr[~np.isnan(arr)] >= 0), f"无符号数组必须>=0"
        plus_sign = minus_sign = ''
    else:
        plus_sign = settings.plus_sign
        minus_sign = settings.minus_sign
    
    vnum2repr = partial(vec_num2repr,base=settings.base,prec=settings.prec,max_val=settings.max_val)  # 创建偏函数，准备进行数值到表示的转换
    sign_arr, digits_arr = vnum2repr(np.where(np.isnan(arr),np.zeros_like(arr),arr))  # 对数组进行转换
    ismissing = np.isnan(arr)  # 检查数组中的缺失值
    
    def tokenize(arr):  # 定义一个函数，用于将数组转换为字符串
        return ''.join([settings.bit_sep+str(b) for b in arr])  # 将数组中的每个元素转换为字符串并连接
    
    bit_strs = []  # 初始化位字符串列表
    for sign, digits,missing in zip(sign_arr, digits_arr,ismissing):  # 遍历每个数字的符号、位和是否缺失
        if not settings.fixed_length:  # 如果不要求固定长度
            # 移除前导零
            nonzero_indices = np.where(digits != 0)[0]
            if len(nonzero_indices) == 0:  # 如果全为零
                digits = np.array([0])  # 只保留一个零
            else:
                digits = digits[nonzero_indices[0]:]  # 移除前导零
            # 添加小数点
            prec = settings.prec
            if len(settings.decimal_point):
                digits = np.concatenate([digits[:-prec], np.array([settings.decimal_point]), digits[-prec:]])
        digits = tokenize(digits)  # 将数字数组转换为字符串
        sign_sep = plus_sign if sign == 1 else minus_sign  # 根据符号选择符号分隔符
        if missing:  # 如果该数值缺失
            bit_strs.append(settings.missing_str)  # 添加缺失值字符串
        else:
            bit_strs.append(sign_sep + digits)  # 添加符号和数字字符串
    bit_str = settings.time_sep.join(bit_strs)  # 使用时间分隔符连接所有数字字符串
    bit_str += settings.time_sep  # 在末尾添加时间分隔符，避免最后一个时间步的位数不明确
    return bit_str  # 返回序列化的字符串

def deserialize_str(bit_str, settings: SerializerSettings, ignore_last=False, steps=None):  # 定义一个函数，根据提供的设置将字符串反序列化为数字数组
    """
    根据提供的设置将字符串反序列化为数字数组（时间序列）。

    参数:
    - bit_str (str): 数字数组的字符串表示。
    - settings (SerializerSettings): 反序列化的设置。
    - ignore_last (bool): 如果为True，忽略字符串中的最后一个时间步（可能因为令牌限制等原因不完整）。默认为False。
    - steps (int, optional): 要反序列化的步数或条目数。

    返回:
    - 如果反序列化第一个数字失败，则返回None；否则
    - np.array: 对应于字符串的数字数组。
    """
    # ignore_last用于忽略预测中的最后一个时间步，这个步骤经常由于令牌限制等原因而部分生成
    orig_bitstring = bit_str  # 保存原始字符串
    bit_strs = bit_str.split(settings.time_sep)  # 使用时间分隔符分割字符串
    # 移除空字符串
    bit_strs = [a for a in bit_strs if len(a) > 0]
    if ignore_last:  # 如果忽略最后一个时间步
        bit_strs = bit_strs[:-1]
    if steps is not None:  # 如果指定了步数
        bit_strs = bit_strs[:steps]
    vrepr2num = partial(vec_repr2num, base=settings.base, prec=settings.prec, half_bin_correction=settings.half_bin_correction)  # 创建偏函数，准备进行表示到数值的转换
    max_bit_pos = int(np.ceil(np.log(settings.max_val) / np.log(settings.base)).item())  # 计算最大值的位数
    sign_arr = []  # 初始化符号数组
    digits_arr = []  # 初始化数字数组
    try:
        for i, bit_str in enumerate(bit_strs):  # 遍历每个位字符串
            if bit_str.startswith(settings.minus_sign):  # 如果以负号开始
                sign = -1
            elif bit_str.startswith(settings.plus_sign):  # 如果以正号开始
                sign = 1
            else:
                assert settings.signed == False, f"有符号的位字符串必须以{settings.minus_sign}或{settings.plus_sign}开始"
            bit_str = bit_str[len(settings.plus_sign):] if sign == 1 else bit_str[len(settings.minus_sign):]  # 去除符号
            if settings.bit_sep == '':  # 如果位分隔符为空
                bits = [b for b in bit_str.lstrip()]
            else:
                bits = [b[:1] for b in bit_str.lstrip().split(settings.bit_sep)]  # 根据位分隔符分割字符串
            if settings.fixed_length:
                # 检查位字符串是否具有固定长度
                assert len(bits) == max_bit_pos + settings.prec, f"固定长度的位字符串必须有{max_bit_pos + settings.prec}位，但有{len(bits)}位: '{bit_str}'"
            digits = []
            for b in bits:
                if b == settings.decimal_point:
                    continue  # 跳过小数点
                # 检查是否为数字
                if b.isdigit():
                    digits.append(int(b))
                else:
                    break  # 如果遇到非数字字符，则停止处理当前位字符串
            sign_arr.append(sign)  # 添加当前数字的符号
            digits_arr.append(digits)  # 添加当前数字的位
    except Exception as e:
        # 如果在反序列化过程中出现异常
        print(f"反序列化时出错 {settings.time_sep.join(bit_strs[i-2:i+5])}{settings.time_sep}\n\t{e}")
        print(f'原始字符串: {orig_bitstring}')
        print(f"位字符串: {bit_str}, 分隔符: {settings.bit_sep}")
        # 此时我们已经反序列化了部分位字符串，因此下面返回这些数字
    if digits_arr:
        # 添加前导零以使所有数字位数相同
        max_len = max([len(d) for d in digits_arr])
        for i in range(len(digits_arr)):
            digits_arr[i] = [0] * (max_len - len(digits_arr[i])) + digits_arr[i]
        # 使用sign_arr和digits_arr反序列化为数字数组
        return vrepr2num(np.array(sign_arr), np.array(digits_arr))
    else:
        # 如果在第一步就出错，则返回None
        return None


In [None]:
import torch  # 导入PyTorch库，用于深度学习模型
import numpy as np  # 导入NumPy库，用于数值计算
from jax import grad, vmap  # 从JAX库导入grad和vmap，用于自动微分和向量化映射
from tqdm import tqdm  # 从tqdm库导入tqdm，用于显示进度条
import argparse  # 导入argparse库，用于解析命令行参数
from transformers import (  # 从transformers库导入LLaMA相关的类
    LlamaForCausalLM, 
    LlamaTokenizer, 
)
from data.serialize import serialize_arr, deserialize_str, SerializerSettings  # 从data.serialize导入序列化相关的函数和类

DEFAULT_EOS_TOKEN = "</s>"  # 定义默认的句尾标记
DEFAULT_BOS_TOKEN = "<s>"  # 定义默认的句首标记
DEFAULT_UNK_TOKEN = "<unk>"  # 定义默认的未知词标记

loaded = {}  # 初始化一个字典，用于缓存加载的模型

def llama2_model_string(model_size, chat):  # 定义一个函数，用于生成LLaMA模型的字符串标识
    chat = "chat-" if chat else ""  # 根据是否是聊天模式，添加相应的前缀
    return f"meta-llama/Llama-2-{model_size.lower()}-{chat}hf"  # 返回模型的完整字符串标识

def get_tokenizer(model):  # 定义一个函数，用于获取指定模型的分词器
    name_parts = model.split("-")  # 将模型名按照"-"分割
    model_size = name_parts[0]  # 获取模型大小
    chat = len(name_parts) > 1  # 判断是否是聊天模式
    assert model_size in ["7b", "13b", "70b"]  # 断言模型大小是预定义的值之一

    tokenizer = LlamaTokenizer.from_pretrained(  # 从预训练的分词器中加载
        llama2_model_string(model_size, chat),
        use_fast=False,
    )

    special_tokens_dict = dict()  # 初始化一个字典，用于添加特殊标记
    if tokenizer.eos_token is None:  # 如果没有句尾标记，添加默认的
        special_tokens_dict["eos_token"] = DEFAULT_EOS_TOKEN
    if tokenizer.bos_token is None:  # 如果没有句首标记，添加默认的
        special_tokens_dict["bos_token"] = DEFAULT_BOS_TOKEN
    if tokenizer.unk_token is None:  # 如果没有未知词标记，添加默认的
        special_tokens_dict["unk_token"] = DEFAULT_UNK_TOKEN

    tokenizer.add_special_tokens(special_tokens_dict)  # 将特殊标记添加到分词器中
    tokenizer.pad_token = tokenizer.eos_token  # 将填充标记设置为句尾标记

    return tokenizer  # 返回配置好的分词器

def get_model_and_tokenizer(model_name, cache_model=False):  # 定义一个函数，用于获取指定模型名的模型和分词器
    if model_name in loaded:  # 如果模型已经加载过，直接返回
        return loaded[model_name]
    name_parts = model_name.split("-")  # 将模型名按照"-"分割
    model_size = name_parts[0]  # 获取模型大小
    chat = len(name_parts) > 1  # 判断是否是聊天模式

    assert model_size in ["7b", "13b", "70b"]  # 断言模型大小是预定义的值之一

    tokenizer = get_tokenizer(model_name)  # 获取分词器

    model = LlamaForCausalLM.from_pretrained(  # 从预训练模型中加载LLaMA模型
        llama2_model_string(model_size, chat),
        device_map="auto",   # 自动分配设备
        torch_dtype=torch.float16,  # 使用半精度浮点数以节省内存
    )
    model.eval()  # 将模型设置为评估模式
    if cache_model:  # 如果启用了缓存模型
        loaded[model_name] = (model, tokenizer)  # 缓存模型和分词器以便再次使用
    return model, tokenizer  # 返回模型和分词器

def tokenize_fn(str, model):  # 定义一个函数，用于对输入字符串进行分词
    tokenizer = get_tokenizer(model)  # 获取对应模型的分词器
    return tokenizer(str)  # 返回分词结果

def llama_nll_fn(model, input_arr, target_arr, settings: SerializerSettings, transform, count_seps=True, temp=1, cache_model=True):
    """
    计算目标数组（连续的）根据LM条件下的NLL/维度（以自然对数为底）。应用变换的相关对数行列式，
    并通过假设在箱内均匀分布，将离散的NLL从LLM转换为连续。
    输入：
        input_arr: (n,) 上下文数组
        target_arr: (n,) 真实数组
        cache_model: 是否缓存模型和分词器以加快重复调用的速度
    返回：NLL/D
    """
    model, tokenizer = get_model_and_tokenizer(model, cache_model=cache_model)  # 获取模型和分词器

    input_str = serialize_arr(vmap(transform)(input_arr), settings)  # 将输入数组序列化为字符串
    target_str = serialize_arr(vmap(transform)(target_arr), settings)  # 将目标数组序列化为字符串
    full_series = input_str + target_str  # 将输入和目标字符串连接
    
    batch = tokenizer(  # 将字符串批量化处理
        [full_series], 
        return_tensors="pt",  # 返回PyTorch张量
        add_special_tokens=True  # 添加特殊标记
    )
    batch = {k: v.cuda() for k, v in batch.items()}  # 将数据移动到GPU

    with torch.no_grad():  # 关闭梯度计算
        out = model(**batch)  # 使用模型进行预测

    good_tokens_str = list("0123456789" + settings.time_sep)  # 定义有效的字符
    good_tokens = [tokenizer.convert_tokens_to_ids(token) for token in good_tokens_str]  # 将有效字符转换为ID
    bad_tokens = [i for i in range(len(tokenizer)) if i not in good_tokens]  # 定义无效字符的ID
    out['logits'][:, :, bad_tokens] = -100  # 将无效字符的对数概率设为一个很小的值

    input_ids = batch['input_ids'][0][1:]  # 获取输入的ID
    logprobs = torch.nn.functional.log_softmax(out['logits'], dim=-1)[0][:-1]  # 计算对数概率
    logprobs = logprobs[torch.arange(len(input_ids)), input_ids].cpu().numpy()  # 提取目标对数概率并转换为NumPy数组

    tokens = tokenizer.batch_decode(  # 解码生成的ID
        input_ids,
        skip_special_tokens=False, 
        clean_up_tokenization_spaces=False
    )
    
    input_len = len(tokenizer([input_str], return_tensors="pt",)['input_ids'][0])  # 计算输入长度
    input_len = input_len - 2 # 移除BOS标记

    logprobs = logprobs[input_len:]  # 提取目标数组的对数概率
    tokens = tokens[input_len:]  # 提取目标数组的标记
    BPD = -logprobs.sum() / len(target_arr)  # 计算每维的负对数概率

    # 计算调整后的BPD
    transformed_nll = BPD - settings.prec * np.log(settings.base)  # 将离散的NLL转换为连续
    avg_logdet_dydx = np.log(vmap(grad(transform))(target_arr)).mean()  # 计算变换的平均对数行列式
    return transformed_nll - avg_logdet_dydx  # 返回变换后的NLL减去平均对数行列式

def llama_completion_fn(
    model,
    input_str,
    steps,
    settings,
    batch_size=5,
    num_samples=20,
    temp=0.9, 
    top_p=0.9,
    cache_model=True
):
    """
    完成给定输入字符串的文本生成任务，根据设定的步数和其他参数生成文本。
    输入：
        model: 模型名
        input_str: 输入字符串
        steps: 生成步数
        settings: 序列化设置
        batch_size: 批处理大小
        num_samples: 生成样本的数量
        temp: 温度参数，控制生成多样性
        top_p: 保留概率累积为top_p的最高概率词
        cache_model: 是否缓存模型和分词器以加快重复调用的速度
    返回：
        生成的字符串列表
    """
    avg_tokens_per_step = len(tokenize_fn(input_str, model)['input_ids']) / len(input_str.split(settings.time_sep))
    max_tokens = int(avg_tokens_per_step * steps)  # 根据步数估计最大令牌数
    
    model, tokenizer = get_model_and_tokenizer(model, cache_model=cache_model)  # 获取模型和分词器

    gen_strs = []  # 初始化生成字符串列表
    for _ in tqdm(range(num_samples // batch_size)):  # 以批处理的方式生成文本
        batch = tokenizer(
            [input_str], 
            return_tensors="pt",
        )

        batch = {k: v.repeat(batch_size, 1) for k, v in batch.items()}  # 复制批量输入
        batch = {k: v.cuda() for k, v in batch.items()}  # 将数据移动到GPU
        num_input_ids = batch['input_ids'].shape[1]  # 获取输入ID的数量

        good_tokens_str = list("0123456789" + settings.time_sep)  # 定义有效的字符
        good_tokens = [tokenizer.convert_tokens_to_ids(token) for token in good_tokens_str]  # 将有效字符转换为ID
        # good_tokens += [tokenizer.eos_token_id]  # 将EOS标记添加到有效标记列表中（此行被注释）
        bad_tokens = [i for i in range(len(tokenizer)) if i not in good_tokens]  # 定义无效字符的ID

        generate_ids = model.generate(
            **batch,
            do_sample=True,
            max_new_tokens=max_tokens,
            temperature=temp, 
            top_p=top_p, 
            bad_words_ids=[[t] for t in bad_tokens],
            renormalize_logits=True,
        )  # 生成文本的ID
        gen_strs += tokenizer.batch_decode(
            generate_ids[:, num_input_ids:],
            skip_special_tokens=True, 
            clean_up_tokenization_spaces=False
        )  # 解码生成的ID，添加到字符串列表中
    return gen_strs  # 返回生成的字符串列表


In [None]:
from functools import partial
from models.gpt import gpt_completion_fn, gpt_nll_fn
from models.gpt import tokenize_fn as gpt_tokenize_fn
from models.llama import llama_completion_fn, llama_nll_fn
from models.llama import tokenize_fn as llama_tokenize_fn

from models.mistral import mistral_completion_fn, mistral_nll_fn
from models.mistral import tokenize_fn as mistral_tokenize_fn

from models.mistral_api import mistral_api_completion_fn, mistral_api_nll_fn
from models.mistral_api import tokenize_fn as mistral_api_tokenize_fn


# Required: Text completion function for each model
# -----------------------------------------------
# Each model is mapped to a function that samples text completions.
# The completion function should follow this signature:
# 
# Args:
#   - input_str (str): String representation of the input time series.
#   - steps (int): Number of steps to predict.
#   - settings (SerializerSettings): Serialization settings.
#   - num_samples (int): Number of completions to sample.
#   - temp (float): Temperature parameter for model's output randomness.
# 
# Returns:
#   - list: Sampled completion strings from the model.
completion_fns = {
    'gpt-3.5-turbo-instruct': partial(gpt_completion_fn, model='gpt-3.5-turbo-instruct'),
    'gpt-4': partial(gpt_completion_fn, model='gpt-4'),
    'gpt-4-1106-preview':partial(gpt_completion_fn, model='gpt-4-1106-preview'),
    'gpt-3.5-turbo': partial(gpt_completion_fn, model='gpt-3.5-turbo'),
    'mistral': partial(mistral_completion_fn, model='mistral'),
    'mistral-api-tiny': partial(mistral_api_completion_fn, model='mistral-tiny'),
    'mistral-api-small': partial(mistral_api_completion_fn, model='mistral-small'),
    'mistral-api-medium': partial(mistral_api_completion_fn, model='mistral-medium'),
    'llama-7b': partial(llama_completion_fn, model='7b'),
    'llama-13b': partial(llama_completion_fn, model='13b'),
    'llama-70b': partial(llama_completion_fn, model='70b'),
    'llama-7b-chat': partial(llama_completion_fn, model='7b-chat'),
    'llama-13b-chat': partial(llama_completion_fn, model='13b-chat'),
    'llama-70b-chat': partial(llama_completion_fn, model='70b-chat'),
}

# Optional: NLL/D functions for each model
# -----------------------------------------------
# Each model is mapped to a function that computes the continuous Negative Log-Likelihood 
# per Dimension (NLL/D). This is used for computing likelihoods only and not needed for sampling.
# 
# The NLL function should follow this signature:
# 
# Args:
#   - input_arr (np.ndarray): Input time series (history) after data transformation.
#   - target_arr (np.ndarray): Ground truth series (future) after data transformation.
#   - settings (SerializerSettings): Serialization settings.
#   - transform (callable): Data transformation function (e.g., scaling) for determining the Jacobian factor.
#   - count_seps (bool): If True, count time step separators in NLL computation, required if allowing variable number of digits.
#   - temp (float): Temperature parameter for sampling.
# 
# Returns:
#   - float: Computed NLL per dimension for p(target_arr | input_arr).
nll_fns = {
    'gpt-3.5-turbo-instruct': partial(gpt_nll_fn, model='gpt-3.5-turbo-instruct'),
    'mistral': partial(mistral_nll_fn, model='mistral'),
    'mistral-api-tiny': partial(mistral_api_nll_fn, model='mistral-tiny'),
    'mistral-api-small': partial(mistral_api_nll_fn, model='mistral-small'),
    'mistral-api-medium': partial(mistral_api_nll_fn, model='mistral-medium'),
    'llama-7b': partial(llama_completion_fn, model='7b'),
    'llama-7b': partial(llama_nll_fn, model='7b'),
    'llama-13b': partial(llama_nll_fn, model='13b'),
    'llama-70b': partial(llama_nll_fn, model='70b'),
    'llama-7b-chat': partial(llama_nll_fn, model='7b-chat'),
    'llama-13b-chat': partial(llama_nll_fn, model='13b-chat'),
    'llama-70b-chat': partial(llama_nll_fn, model='70b-chat'),
}

# Optional: Tokenization function for each model, only needed if you want automatic input truncation.
# The tokenization function should follow this signature:
#
# Args:
#   - str (str): A string to tokenize.
# Returns:
#   - token_ids (list): A list of token ids.
tokenization_fns = {
    'gpt-3.5-turbo-instruct': partial(gpt_tokenize_fn, model='gpt-3.5-turbo-instruct'),
    'gpt-3.5-turbo': partial(gpt_tokenize_fn, model='gpt-3.5-turbo'),
    'mistral': partial(mistral_tokenize_fn, model='mistral'),
    'mistral-api-tiny': partial(mistral_api_tokenize_fn, model='mistral-tiny'),
    'mistral-api-small': partial(mistral_api_tokenize_fn, model='mistral-small'),
    'mistral-api-medium': partial(mistral_api_tokenize_fn, model='mistral-medium'),
    'llama-7b': partial(llama_tokenize_fn, model='7b'),
    'llama-13b': partial(llama_tokenize_fn, model='13b'),
    'llama-70b': partial(llama_tokenize_fn, model='70b'),
    'llama-7b-chat': partial(llama_tokenize_fn, model='7b-chat'),
    'llama-13b-chat': partial(llama_tokenize_fn, model='13b-chat'),
    'llama-70b-chat': partial(llama_tokenize_fn, model='70b-chat'),
}

# Optional: Context lengths for each model, only needed if you want automatic input truncation.
context_lengths = {
    'gpt-3.5-turbo-instruct': 4097,
    'gpt-3.5-turbo': 4097,
    'mistral-api-tiny': 4097,
    'mistral-api-small': 4097,
    'mistral-api-medium': 4097,
    'mistral': 4096,
    'llama-7b': 4096,
    'llama-13b': 4096,
    'llama-70b': 4096,
    'llama-7b-chat': 4096,
    'llama-13b-chat': 4096,
    'llama-70b-chat': 4096,
}

In [None]:
# 导入所需的库
import os
import pickle
from data.monash import get_datasets  # 用于获取时间序列数据集的函数
from data.serialize import SerializerSettings  # 序列化设置的类
from models.validation_likelihood_tuning import get_autotuned_predictions_data  # 自动调整预测数据的函数
from models.utils import grid_iter  # 用于迭代超参数网格的函数
from models.llmtime import get_llmtime_predictions_data  # 获取LLM时间序列预测数据的函数
import numpy as np
import openai
openai.api_key = os.environ['OPENAI_API_KEY']  # 从环境变量中读取OpenAI的API密钥
openai.api_base = os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1")  # 设置OpenAI的API基础URL

# 为每个模型指定超参数网格
gpt3_hypers = dict(
    temp=0.7,
    alpha=0.9,
    beta=0,
    basic=False,
    settings=SerializerSettings(base=10, prec=3, signed=True, half_bin_correction=True),
)
llama_hypers = dict(
    temp=1.0,
    alpha=0.99,
    beta=0.3,
    basic=False,
    settings=SerializerSettings(base=10, prec=3, time_sep=',', bit_sep='', plus_sign='', minus_sign='-', signed=True), 
)
model_hypers = {
    'text-davinci-003': {'model': 'text-davinci-003', **gpt3_hypers},
    'llama-7b': {'model': 'llama-7b', **llama_hypers},
    'llama-70b': {'model': 'llama-70b', **llama_hypers},
}

# 为每个模型指定获取预测的函数
model_predict_fns = {
    'text-davinci-003': get_llmtime_predictions_data,
    'llama-7b': get_llmtime_predictions_data,
    'llama-70b': get_llmtime_predictions_data,
}

# 定义一个函数，用于检测模型是否为GPT系列
def is_gpt(model):
    return any([x in model for x in ['ada', 'babbage', 'curie', 'davinci', 'text-davinci-003', 'gpt-4']])

# 指定保存结果的输出目录
output_dir = 'outputs/monash'
os.makedirs(output_dir, exist_ok=True)  # 如果目录不存在，则创建目录

# 指定要运行的模型和数据集
models_to_run = ['text-davinci-003']
datasets_to_run =  [
    "weather", "covid_deaths", "solar_weekly", "tourism_monthly", "australian_electricity_demand", "pedestrian_counts",
    "traffic_hourly", "hospital", "fred_md", "tourism_yearly", "tourism_quarterly", "us_births",
    "nn5_weekly", "traffic_weekly", "saugeenday", "cif_2016", "bitcoin", "sunspot", "nn5_daily"
]

max_history_len = 500  # 设置最大历史长度
datasets = get_datasets()  # 获取所有数据集
for dsname in datasets_to_run:  # 遍历要运行的数据集
    print(f"Starting {dsname}")  # 打印开始信息
    data = datasets[dsname]  # 获取数据集
    train, test = data  # 分解数据集为训练和测试数据
    train = [x[-max_history_len:] for x in train]  # 截取每个训练序列的最后max_history_len个点
    if os.path.exists(f'{output_dir}/{dsname}.pkl'):  # 检查结果文件是否已存在
        with open(f'{output_dir}/{dsname}.pkl','rb') as f:  # 如果存在，则加载之前的结果
            out_dict = pickle.load(f)
    else:
        out_dict = {}  # 如果不存在，则初始化一个空字典用于存储结果

    for model in models_to_run:  # 遍历要运行的模型
        if model in out_dict:
            print(f"Skipping {dsname} {model}")  # 如果当前模型的结果已存在，则跳过
            continue
        else:
            print(f"Starting {dsname} {model}")  # 打印开始处理当前数据集和模型的信息
            hypers = list(grid_iter(model_hypers[model]))  # 获取当前模型的超参数组合
        parallel = True if is_gpt(model) else False  # 如果是GPT模型，则启用并行处理
        num_samples = 5  # 设置预测样本数
        
        try:
            # 调用get_autotuned_predictions_data函数进行预测
            preds = get_autotuned_predictions_data(train, test, hypers, num_samples, model_predict_fns[model], verbose=False, parallel=parallel)
            medians = preds['median']  # 获取预测的中位数
            targets = np.array(test)  # 将测试数据转换为numpy数组
            maes = np.mean(np.abs(medians - targets), axis=1)  # 计算每个时间序列的平均绝对误差(MAE)
            preds['maes'] = maes  # 将MAE列表存储到预测结果字典中
            preds['mae'] = np.mean(maes)  # 计算所有时间序列的MAE平均值，并存储到预测结果字典中
            out_dict[model] = preds  # 将当前模型的预测结果存储到结果字典中
        except Exception as e:
            print(f"Failed {dsname} {model}")  # 如果预测失败，则打印失败信息
            print(e)  # 打印异常信息
            continue  # 继续处理下一个模型
        
        # 保存当前数据集的预测结果到文件中
        with open(f'{output_dir}/{dsname}.pkl', 'wb') as f:
            pickle.dump(out_dict, f)
    
    print(f"Finished {dsname}")  # 打印完成当前数据集处理的信息
